Skip to content

Instantly share code, notes, and snippets.

@nrbnlulu
Created June 26, 2025 11:22
Show Gist options
  • Save nrbnlulu/687753e6e0743c638d981be2b3c532e1 to your computer and use it in GitHub Desktop.
Save nrbnlulu/687753e6e0743c638d981be2b3c532e1 to your computer and use it in GitHub Desktop.
ffplay playground for nvrs
#!/usr/bin/env python3
import os
import signal
import subprocess
import sys
import time
# Milesight NVR Configuration
NVR_IP = "shut"
RTSP_PORT = "554"
USERNAME = "your"
PASSWORD = "face"
class RTSPStreamManager:
def __init__(self):
self.processes = []
self.running = True
# Setup signal handlers for clean shutdown
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signum, frame):
print(f"\nReceived signal {signum}. Shutting down all streams...")
self.cleanup()
sys.exit(0)
def open_stream(self, channel, stream_type, title, window_size=(320, 240)):
"""Open a single RTSP stream using ffplay"""
# Construct RTSP URL
url = f"rtsp://{USERNAME}:{PASSWORD}@{NVR_IP}:{RTSP_PORT}/ch_{stream_type}{channel:02d}"
# FFplay command optimized for stability and low latency with auto-restart capability
cmd = [
"ffplay",
"-tune",
"zerolatency",
"-strict",
"experimental",
"-an",
"-fflags",
"discardcorrupt",
"-fflags",
"nobuffer",
"-flags",
"low_delay",
"-framedrop",
"-rtsp_transport",
"tcp",
"-window_title",
title,
"-x",
str(window_size[0]),
"-y",
str(window_size[1]),
"-left",
str((channel % 4) * (window_size[0] + 10)),
"-top",
str((channel // 4) * (window_size[1] + 30)),
url,
]
try:
# Start ffplay process
process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
preexec_fn=os.setsid if os.name != "nt" else None,
)
self.processes.append(
{
"process": process,
"title": title,
"channel": channel + 1,
"stream_type": "Main" if stream_type == "1" else "Sub",
"url": url,
"cmd": cmd,
"start_time": time.time(),
"restart_count": 0,
}
)
print(f"✓ Opened {title} (PID: {process.pid})")
return True
except FileNotFoundError:
print("✗ Error: ffplay not found. Please install FFmpeg.")
return False
except Exception as e:
print(f"✗ Error opening {title}: {e}")
return False
def open_all_main_streams(self):
"""Open all 12 main streams (high quality)"""
print("Opening Main Streams (High Quality)...")
for i in range(12):
title = f"Channel {i + 1} - Main"
self.open_stream(i, "1", title)
time.sleep(0.3) # Small delay to prevent overwhelming the system
print(
f"\n✓ Opened {len([p for p in self.processes if 'Main' in p['title']])} main streams"
)
def open_all_sub_streams(self):
"""Open all 12 sub streams (lower quality)"""
print("Opening Sub Streams (Lower Quality)...")
for i in range(12):
title = f"Channel {i + 1} - Sub"
self.open_stream(
i, "4", title, window_size=(240, 180)
) # Smaller for sub streams
time.sleep(0.3)
sub_count = len([p for p in self.processes if "Sub" in p["title"]])
print(f"\n✓ Opened {sub_count} sub streams")
def monitor_streams(self):
"""Monitor running streams, restart unresponsive ones, and remove dead processes"""
active_processes = []
current_time = time.time()
for proc_info in self.processes:
process = proc_info["process"]
# Check if process is still running
if process.poll() is None:
# Check if process has been running too long without restart (potential hang)
runtime = current_time - proc_info["start_time"]
if runtime > 300: # 5 minutes without restart
print(f"⚠ Restarting potentially hung stream: {proc_info['title']}")
self._restart_stream(proc_info)
else:
active_processes.append(proc_info)
else:
# Process died, attempt restart if not too many failures
if proc_info["restart_count"] < 3:
print(
f"🔄 Auto-restarting stream: {proc_info['title']} (attempt {proc_info['restart_count'] + 1})"
)
restarted_proc = self._restart_stream(proc_info)
if restarted_proc:
active_processes.append(restarted_proc)
else:
print(f"✗ Stream failed too many times: {proc_info['title']}")
self.processes = active_processes
return len(self.processes)
def _restart_stream(self, proc_info):
"""Restart a single stream process"""
try:
# Kill the old process if still running
if proc_info["process"].poll() is None:
if os.name == "nt":
proc_info["process"].terminate()
else:
os.killpg(os.getpgid(proc_info["process"].pid), signal.SIGTERM)
# Wait for termination
try:
proc_info["process"].wait(timeout=2)
except subprocess.TimeoutExpired:
if os.name == "nt":
proc_info["process"].kill()
else:
os.killpg(os.getpgid(proc_info["process"].pid), signal.SIGKILL)
# Start new process
new_process = subprocess.Popen(
proc_info["cmd"],
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
preexec_fn=os.setsid if os.name != "nt" else None,
)
# Update process info
proc_info["process"] = new_process
proc_info["start_time"] = time.time()
proc_info["restart_count"] += 1
print(f"✓ Restarted {proc_info['title']} (PID: {new_process.pid})")
return proc_info
except Exception as e:
print(f"✗ Failed to restart {proc_info['title']}: {e}")
return None
def list_active_streams(self):
"""List all currently active streams"""
if not self.processes:
print("No active streams")
return
print(f"\nActive Streams ({len(self.processes)}):")
print("-" * 70)
for proc_info in self.processes:
status = "Running" if proc_info["process"].poll() is None else "Stopped"
runtime = int(time.time() - proc_info["start_time"])
restarts = proc_info["restart_count"]
print(
f"Channel {proc_info['channel']:2d} - {proc_info['stream_type']:4s} - {status:7s} - Runtime: {runtime:3d}s - Restarts: {restarts} (PID: {proc_info['process'].pid})"
)
def cleanup(self):
"""Clean shutdown of all processes"""
print(f"\nShutting down {len(self.processes)} streams...")
for proc_info in self.processes:
try:
process = proc_info["process"]
if process.poll() is None: # Process still running
if os.name == "nt": # Windows
process.terminate()
else: # Unix/Linux
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
# Wait a moment for graceful shutdown
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
# Force kill if still running
if os.name == "nt":
process.kill()
else:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
print(f"✓ Closed {proc_info['title']}")
except Exception as e:
print(f"✗ Error closing {proc_info['title']}: {e}")
self.processes.clear()
print("All streams closed.")
def interactive_menu(self):
"""Interactive menu for stream management"""
while self.running:
print("\n" + "=" * 60)
print("RTSP Stream Manager - Milesight NVR")
print("=" * 60)
print("1. Open all main streams (12 channels)")
print("2. Open all sub streams (12 channels)")
print("3. Open specific channel")
print("4. List active streams")
print("5. Monitor stream health")
print("6. Close all streams")
print("7. Exit")
print("-" * 60)
try:
choice = input("Enter your choice (1-7): ").strip()
if choice == "1":
self.open_all_main_streams()
elif choice == "2":
self.open_all_sub_streams()
elif choice == "3":
try:
channel = int(input("Enter channel number (1-12): ")) - 1
if 0 <= channel <= 11:
stream_type = input("Stream type (m=main, s=sub): ").lower()
if stream_type.startswith("m"):
title = f"Channel {channel + 1} - Main"
self.open_stream(channel, "1", title)
elif stream_type.startswith("s"):
title = f"Channel {channel + 1} - Sub"
self.open_stream(channel, "4", title)
else:
print("Invalid stream type")
else:
print("Invalid channel number")
except ValueError:
print("Invalid input")
elif choice == "4":
self.list_active_streams()
elif choice == "5":
print(
"Monitoring streams (this will auto-restart hung/dead streams)..."
)
active_count = self.monitor_streams()
print(f"Health check complete. {active_count} streams active.")
elif choice == "6":
self.cleanup()
elif choice == "7":
self.cleanup()
break
else:
print("Invalid choice")
except KeyboardInterrupt:
print("\nInterrupted by user")
self.cleanup()
break
except EOFError:
print("\nEOF received")
self.cleanup()
break
def main():
print("Milesight NVR RTSP Stream Manager")
print(f"NVR IP: {NVR_IP}:{RTSP_PORT}")
print(f"Username: {USERNAME}")
print("-" * 50)
manager = RTSPStreamManager()
if len(sys.argv) > 1:
# Command line arguments
if sys.argv[1] == "--all-main":
manager.open_all_main_streams()
print("\nPress Ctrl+C to close all streams...")
print(
"Auto-monitoring enabled - streams will restart automatically if they hang or die..."
)
try:
while True:
time.sleep(10) # Check every 10 seconds
active_count = manager.monitor_streams()
if active_count == 0:
print("All streams closed.")
break
except KeyboardInterrupt:
pass
elif sys.argv[1] == "--all-sub":
manager.open_all_sub_streams()
print("\nPress Ctrl+C to close all streams...")
print(
"Auto-monitoring enabled - streams will restart automatically if they hang or die..."
)
try:
while True:
time.sleep(10) # Check every 10 seconds
active_count = manager.monitor_streams()
if active_count == 0:
print("All streams closed.")
break
except KeyboardInterrupt:
pass
elif sys.argv[1] == "--all":
manager.open_all_main_streams()
response = input("\nOpen sub streams too? (y/n): ")
if response.lower().startswith("y"):
manager.open_all_sub_streams()
print("\nPress Ctrl+C to close all streams...")
print(
"Auto-monitoring enabled - streams will restart automatically if they hang or die..."
)
try:
while True:
time.sleep(10) # Check every 10 seconds
active_count = manager.monitor_streams()
if active_count == 0:
print("All streams closed.")
break
except KeyboardInterrupt:
pass
else:
print("Usage:")
print(" python3 rtsp_manager.py # Interactive mode")
print(" python3 rtsp_manager.py --all-main # Open all main streams")
print(" python3 rtsp_manager.py --all-sub # Open all sub streams")
print(" python3 rtsp_manager.py --all # Open all streams")
else:
# Interactive mode
manager.interactive_menu()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment