Skip to content

Instantly share code, notes, and snippets.

@neo22s
Last active March 3, 2025 14:53
Show Gist options
  • Save neo22s/e601c2e19ee2e401845b2aca9001719b to your computer and use it in GitHub Desktop.
Save neo22s/e601c2e19ee2e401845b2aca9001719b to your computer and use it in GitHub Desktop.
Change Bitrate of videos in bulk
#!/usr/bin/env python3
import os
import sys
import subprocess
import json
from pathlib import Path
from tqdm import tqdm
class VideoProcessor:
def __init__(self, input_folder, output_folder):
self.input_folder = Path(input_folder)
self.output_folder = Path(output_folder)
self.threshold_bitrate = 3000 # kbps
self.target_bitrate = "3000k"
self.savings_threshold = 20 # percentage
self.output_format = "mp4" # Output format is MP4
def get_video_info(self, video_path):
"""Get video duration and bitrate using ffprobe"""
try:
# First get duration
cmd_duration = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration',
'-of', 'json',
str(video_path)
]
result = subprocess.run(cmd_duration, capture_output=True, text=True)
data = json.loads(result.stdout)
duration = float(data.get('format', {}).get('duration', 0))
# Now get bitrate - use format's bit_rate first
cmd_bitrate = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=bit_rate',
'-of', 'json',
str(video_path)
]
result = subprocess.run(cmd_bitrate, capture_output=True, text=True)
data = json.loads(result.stdout)
# Try to get bitrate from format section first
bitrate_str = data.get('format', {}).get('bit_rate', '0')
try:
bitrate = int(bitrate_str) // 1000 # Convert to kbps
except (ValueError, TypeError):
# If bitrate is not available or not a number, calculate it from file size
file_size = video_path.stat().st_size # bytes
if duration > 0:
bitrate = (file_size * 8) / (duration * 1000) # Convert to kbps
else:
bitrate = 0
# If bitrate is still 0, do a more thorough probe to estimate it
if bitrate == 0:
cmd_detailed = [
'ffprobe',
'-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height,r_frame_rate',
'-of', 'json',
str(video_path)
]
result = subprocess.run(cmd_detailed, capture_output=True, text=True)
stream_data = json.loads(result.stdout)
if 'streams' in stream_data and stream_data['streams']:
stream = stream_data['streams'][0]
# Extract video dimensions and frame rate
width = int(stream.get('width', 0))
height = int(stream.get('height', 0))
# Parse frame rate (could be in "30000/1001" format)
frame_rate_str = stream.get('r_frame_rate', '0/1')
try:
if '/' in frame_rate_str:
num, den = map(int, frame_rate_str.split('/'))
frame_rate = num / den if den != 0 else 0
else:
frame_rate = float(frame_rate_str)
except (ValueError, ZeroDivisionError):
frame_rate = 0
# Estimate bitrate based on resolution and frame rate
# This is a very rough estimation
if width > 0 and height > 0 and frame_rate > 0:
pixels = width * height
if pixels >= 1920 * 1080: # HD or higher
bitrate = 5000 # Estimate 5 Mbps for HD content
elif pixels >= 1280 * 720: # 720p
bitrate = 3000 # Estimate 3 Mbps
elif pixels >= 854 * 480: # 480p
bitrate = 1500 # Estimate 1.5 Mbps
else:
bitrate = 800 # Lower resolution
# Adjust for frame rate
if frame_rate > 30:
bitrate = bitrate * (frame_rate / 30)
return {
'duration': duration,
'bitrate': bitrate
}
except Exception as e:
print(f"Error analyzing {video_path}: {e}")
return None
def calculate_savings(self, original_size, duration):
"""Calculate potential savings"""
estimated_size = (self.threshold_bitrate * 1000 * duration) / 8
savings_percent = ((original_size - estimated_size) / original_size) * 100
return max(0, savings_percent) # Ensure we don't get negative savings
def process_video(self, input_path):
"""Process a single video file"""
# Create relative output path with .mp4 extension
rel_path = input_path.relative_to(self.input_folder)
output_path = self.output_folder / rel_path.with_suffix(f'.{self.output_format}')
# Create output directory
output_path.parent.mkdir(parents=True, exist_ok=True)
# For MP4 files, if output is also MP4 and they're in the same location, ensure we don't overwrite
if input_path.suffix.lower() == f'.{self.output_format}' and input_path.name == output_path.name:
output_stem = output_path.stem
output_path = output_path.with_stem(f"{output_stem}_processed")
# Skip if output exists
if output_path.exists():
print(f"Skipping: {output_path} (already exists)")
return False
# Get original file size
original_size = input_path.stat().st_size
original_mb = original_size / (1024 * 1024)
# Get video information
info = self.get_video_info(input_path)
if not info:
print(f"Skipping: {input_path} (cannot analyze)")
return False
# Calculate potential savings
savings = self.calculate_savings(original_size, info['duration'])
# Debug output
print(f"\nAnalyzing: {input_path}")
print(f"File format: {input_path.suffix[1:].upper()}") # Show file format without the dot
print(f"Original size: {original_mb:.2f}MB")
print(f"Detected bitrate: {info['bitrate']:.1f}kbps")
print(f"Duration: {info['duration']:.2f} seconds")
print(f"Expected savings: {savings:.1f}%")
# Check if worth processing
if info['bitrate'] <= self.threshold_bitrate:
print(f"Skipping: {input_path} (bitrate too low: {info['bitrate']:.1f}kbps)")
return False
if savings < self.savings_threshold:
print(f"Skipping: {input_path} (insufficient savings: {savings:.1f}%)")
return False
print(f"Processing: {input_path}")
print(f"Converting to: {self.output_format.upper()}")
print(f"Output path: {output_path}")
# Process the video
cmd = [
'ffmpeg',
'-i', str(input_path),
'-b:v', self.target_bitrate,
'-c:v', 'h264', # Using H.264 codec for MP4
'-c:a', 'aac', # Using AAC for audio in MP4
'-pix_fmt', 'yuv420p', # Standard pixel format for compatibility
'-movflags', '+faststart', # Optimize for web streaming
'-threads', '0',
str(output_path)
]
try:
subprocess.run(cmd, check=True, capture_output=True)
# Verify output
if output_path.exists():
new_size = output_path.stat().st_size
new_mb = new_size / (1024 * 1024)
if new_size >= original_size:
print(f"Output larger than input, removing: {output_path}")
output_path.unlink()
return False
saved_mb = original_mb - new_mb
print(f"Success! Saved: {saved_mb:.2f}MB ({(saved_mb/original_mb*100):.1f}%)")
return True
except subprocess.CalledProcessError as e:
print(f"Error processing {input_path}: {e}")
if output_path.exists():
output_path.unlink()
return False
def find_video_files(self):
"""Find all video files with proper handling of case sensitivity"""
all_files = []
# Use different case patterns to ensure we catch all files regardless of case
for ext in ['.mp4', '.MP4', '.Mp4', '.mP4', '.mkv', '.MKV', '.Mkv', '.mKv', '.avi', '.AVI', '.Avi', '.aVi']:
# Use glob pattern with the specific extension
pattern = f"*{ext}"
files = list(self.input_folder.rglob(pattern))
all_files.extend(files)
# Deduplicate (in case a file matches multiple patterns on case-insensitive filesystems)
unique_files = list(set(all_files))
# Count by extension
mp4_count = sum(1 for f in unique_files if f.suffix.lower() == '.mp4')
mkv_count = sum(1 for f in unique_files if f.suffix.lower() == '.mkv')
avi_count = sum(1 for f in unique_files if f.suffix.lower() == '.avi')
return unique_files, mp4_count, mkv_count, avi_count
def process_all_videos(self):
"""Process all MP4, MKV, and AVI files in the input folder"""
# Find video files with robust case handling
video_files, mp4_count, mkv_count, avi_count = self.find_video_files()
if not video_files:
print("No video files found!")
return
print(f"Found {len(video_files)} video files ({mp4_count} MP4, {mkv_count} MKV, {avi_count} AVI)")
# Print first few files for debugging
if video_files:
print("\nSample of files found:")
for i, file in enumerate(sorted(video_files)[:5]): # Show first 5 files
print(f" {i+1}. {file} ({file.suffix})")
print(" ...")
processed = 0
for video in tqdm(sorted(video_files), desc="Processing videos"):
if self.process_video(video):
processed += 1
print(f"\nProcessing complete! {processed}/{len(video_files)} files processed")
def main():
if len(sys.argv) != 3:
print("Usage: python video_processor.py input_folder output_folder")
sys.exit(1)
input_folder = sys.argv[1]
output_folder = sys.argv[2]
if not os.path.exists(input_folder):
print(f"Input folder does not exist: {input_folder}")
sys.exit(1)
processor = VideoProcessor(input_folder, output_folder)
processor.process_all_videos()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment