Last active
March 3, 2025 14:53
-
-
Save neo22s/e601c2e19ee2e401845b2aca9001719b to your computer and use it in GitHub Desktop.
Change Bitrate of videos in bulk
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import subprocess | |
import json | |
from pathlib import Path | |
from tqdm import tqdm | |
class VideoProcessor: | |
def __init__(self, input_folder, output_folder): | |
self.input_folder = Path(input_folder) | |
self.output_folder = Path(output_folder) | |
self.threshold_bitrate = 3000 # kbps | |
self.target_bitrate = "3000k" | |
self.savings_threshold = 20 # percentage | |
self.output_format = "mp4" # Output format is MP4 | |
def get_video_info(self, video_path): | |
"""Get video duration and bitrate using ffprobe""" | |
try: | |
# First get duration | |
cmd_duration = [ | |
'ffprobe', | |
'-v', 'error', | |
'-show_entries', 'format=duration', | |
'-of', 'json', | |
str(video_path) | |
] | |
result = subprocess.run(cmd_duration, capture_output=True, text=True) | |
data = json.loads(result.stdout) | |
duration = float(data.get('format', {}).get('duration', 0)) | |
# Now get bitrate - use format's bit_rate first | |
cmd_bitrate = [ | |
'ffprobe', | |
'-v', 'error', | |
'-show_entries', 'format=bit_rate', | |
'-of', 'json', | |
str(video_path) | |
] | |
result = subprocess.run(cmd_bitrate, capture_output=True, text=True) | |
data = json.loads(result.stdout) | |
# Try to get bitrate from format section first | |
bitrate_str = data.get('format', {}).get('bit_rate', '0') | |
try: | |
bitrate = int(bitrate_str) // 1000 # Convert to kbps | |
except (ValueError, TypeError): | |
# If bitrate is not available or not a number, calculate it from file size | |
file_size = video_path.stat().st_size # bytes | |
if duration > 0: | |
bitrate = (file_size * 8) / (duration * 1000) # Convert to kbps | |
else: | |
bitrate = 0 | |
# If bitrate is still 0, do a more thorough probe to estimate it | |
if bitrate == 0: | |
cmd_detailed = [ | |
'ffprobe', | |
'-v', 'error', | |
'-select_streams', 'v:0', | |
'-show_entries', 'stream=width,height,r_frame_rate', | |
'-of', 'json', | |
str(video_path) | |
] | |
result = subprocess.run(cmd_detailed, capture_output=True, text=True) | |
stream_data = json.loads(result.stdout) | |
if 'streams' in stream_data and stream_data['streams']: | |
stream = stream_data['streams'][0] | |
# Extract video dimensions and frame rate | |
width = int(stream.get('width', 0)) | |
height = int(stream.get('height', 0)) | |
# Parse frame rate (could be in "30000/1001" format) | |
frame_rate_str = stream.get('r_frame_rate', '0/1') | |
try: | |
if '/' in frame_rate_str: | |
num, den = map(int, frame_rate_str.split('/')) | |
frame_rate = num / den if den != 0 else 0 | |
else: | |
frame_rate = float(frame_rate_str) | |
except (ValueError, ZeroDivisionError): | |
frame_rate = 0 | |
# Estimate bitrate based on resolution and frame rate | |
# This is a very rough estimation | |
if width > 0 and height > 0 and frame_rate > 0: | |
pixels = width * height | |
if pixels >= 1920 * 1080: # HD or higher | |
bitrate = 5000 # Estimate 5 Mbps for HD content | |
elif pixels >= 1280 * 720: # 720p | |
bitrate = 3000 # Estimate 3 Mbps | |
elif pixels >= 854 * 480: # 480p | |
bitrate = 1500 # Estimate 1.5 Mbps | |
else: | |
bitrate = 800 # Lower resolution | |
# Adjust for frame rate | |
if frame_rate > 30: | |
bitrate = bitrate * (frame_rate / 30) | |
return { | |
'duration': duration, | |
'bitrate': bitrate | |
} | |
except Exception as e: | |
print(f"Error analyzing {video_path}: {e}") | |
return None | |
def calculate_savings(self, original_size, duration): | |
"""Calculate potential savings""" | |
estimated_size = (self.threshold_bitrate * 1000 * duration) / 8 | |
savings_percent = ((original_size - estimated_size) / original_size) * 100 | |
return max(0, savings_percent) # Ensure we don't get negative savings | |
def process_video(self, input_path): | |
"""Process a single video file""" | |
# Create relative output path with .mp4 extension | |
rel_path = input_path.relative_to(self.input_folder) | |
output_path = self.output_folder / rel_path.with_suffix(f'.{self.output_format}') | |
# Create output directory | |
output_path.parent.mkdir(parents=True, exist_ok=True) | |
# For MP4 files, if output is also MP4 and they're in the same location, ensure we don't overwrite | |
if input_path.suffix.lower() == f'.{self.output_format}' and input_path.name == output_path.name: | |
output_stem = output_path.stem | |
output_path = output_path.with_stem(f"{output_stem}_processed") | |
# Skip if output exists | |
if output_path.exists(): | |
print(f"Skipping: {output_path} (already exists)") | |
return False | |
# Get original file size | |
original_size = input_path.stat().st_size | |
original_mb = original_size / (1024 * 1024) | |
# Get video information | |
info = self.get_video_info(input_path) | |
if not info: | |
print(f"Skipping: {input_path} (cannot analyze)") | |
return False | |
# Calculate potential savings | |
savings = self.calculate_savings(original_size, info['duration']) | |
# Debug output | |
print(f"\nAnalyzing: {input_path}") | |
print(f"File format: {input_path.suffix[1:].upper()}") # Show file format without the dot | |
print(f"Original size: {original_mb:.2f}MB") | |
print(f"Detected bitrate: {info['bitrate']:.1f}kbps") | |
print(f"Duration: {info['duration']:.2f} seconds") | |
print(f"Expected savings: {savings:.1f}%") | |
# Check if worth processing | |
if info['bitrate'] <= self.threshold_bitrate: | |
print(f"Skipping: {input_path} (bitrate too low: {info['bitrate']:.1f}kbps)") | |
return False | |
if savings < self.savings_threshold: | |
print(f"Skipping: {input_path} (insufficient savings: {savings:.1f}%)") | |
return False | |
print(f"Processing: {input_path}") | |
print(f"Converting to: {self.output_format.upper()}") | |
print(f"Output path: {output_path}") | |
# Process the video | |
cmd = [ | |
'ffmpeg', | |
'-i', str(input_path), | |
'-b:v', self.target_bitrate, | |
'-c:v', 'h264', # Using H.264 codec for MP4 | |
'-c:a', 'aac', # Using AAC for audio in MP4 | |
'-pix_fmt', 'yuv420p', # Standard pixel format for compatibility | |
'-movflags', '+faststart', # Optimize for web streaming | |
'-threads', '0', | |
str(output_path) | |
] | |
try: | |
subprocess.run(cmd, check=True, capture_output=True) | |
# Verify output | |
if output_path.exists(): | |
new_size = output_path.stat().st_size | |
new_mb = new_size / (1024 * 1024) | |
if new_size >= original_size: | |
print(f"Output larger than input, removing: {output_path}") | |
output_path.unlink() | |
return False | |
saved_mb = original_mb - new_mb | |
print(f"Success! Saved: {saved_mb:.2f}MB ({(saved_mb/original_mb*100):.1f}%)") | |
return True | |
except subprocess.CalledProcessError as e: | |
print(f"Error processing {input_path}: {e}") | |
if output_path.exists(): | |
output_path.unlink() | |
return False | |
def find_video_files(self): | |
"""Find all video files with proper handling of case sensitivity""" | |
all_files = [] | |
# Use different case patterns to ensure we catch all files regardless of case | |
for ext in ['.mp4', '.MP4', '.Mp4', '.mP4', '.mkv', '.MKV', '.Mkv', '.mKv', '.avi', '.AVI', '.Avi', '.aVi']: | |
# Use glob pattern with the specific extension | |
pattern = f"*{ext}" | |
files = list(self.input_folder.rglob(pattern)) | |
all_files.extend(files) | |
# Deduplicate (in case a file matches multiple patterns on case-insensitive filesystems) | |
unique_files = list(set(all_files)) | |
# Count by extension | |
mp4_count = sum(1 for f in unique_files if f.suffix.lower() == '.mp4') | |
mkv_count = sum(1 for f in unique_files if f.suffix.lower() == '.mkv') | |
avi_count = sum(1 for f in unique_files if f.suffix.lower() == '.avi') | |
return unique_files, mp4_count, mkv_count, avi_count | |
def process_all_videos(self): | |
"""Process all MP4, MKV, and AVI files in the input folder""" | |
# Find video files with robust case handling | |
video_files, mp4_count, mkv_count, avi_count = self.find_video_files() | |
if not video_files: | |
print("No video files found!") | |
return | |
print(f"Found {len(video_files)} video files ({mp4_count} MP4, {mkv_count} MKV, {avi_count} AVI)") | |
# Print first few files for debugging | |
if video_files: | |
print("\nSample of files found:") | |
for i, file in enumerate(sorted(video_files)[:5]): # Show first 5 files | |
print(f" {i+1}. {file} ({file.suffix})") | |
print(" ...") | |
processed = 0 | |
for video in tqdm(sorted(video_files), desc="Processing videos"): | |
if self.process_video(video): | |
processed += 1 | |
print(f"\nProcessing complete! {processed}/{len(video_files)} files processed") | |
def main(): | |
if len(sys.argv) != 3: | |
print("Usage: python video_processor.py input_folder output_folder") | |
sys.exit(1) | |
input_folder = sys.argv[1] | |
output_folder = sys.argv[2] | |
if not os.path.exists(input_folder): | |
print(f"Input folder does not exist: {input_folder}") | |
sys.exit(1) | |
processor = VideoProcessor(input_folder, output_folder) | |
processor.process_all_videos() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment