Created
November 27, 2023 01:31
-
-
Save dreness/df6b6828f95575a3b983c69c2bf73964 to your computer and use it in GitHub Desktop.
Batch audio transcription with whisper.cpp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!python | |
""" | |
Configure the arguments to the process_directory call at the bottom. | |
Start additional instances of this script until your hw is full. If you | |
have multiple GPUs, you're responsible for setting CUDA_VISIBLE_DEVICES. | |
Using the 'medium' model, a V100 is mostly utilized with two instances. | |
This still has some bugs probably... | |
""" | |
import os | |
import subprocess | |
import sys | |
import glob | |
import json | |
def transcribe_with_whisper_cpp(wav_file, whisper=None, model=None, threads="1", p_output=None): | |
# if required options are missing, bail | |
if whisper is None or model is None or p_output is None: | |
print("Missing required options for transcribe_with_whisper_cpp") | |
sys.exit(1) | |
# look for a file with the same name as the WAV file but with a .json extension | |
# and skip if it exists | |
json_file = wav_file+".json" | |
if os.path.exists(json_file): | |
print(f"Skipping {wav_file} because {json_file} already exists", file=p_output) | |
return | |
# Command and arguments | |
command = [ | |
whisper, # Path to the whisper.cpp executable | |
"-m", model, # Model | |
"-t", threads, # Number of threads | |
"-otxt", # Output format: plain text | |
"-ovtt", # Output format: WebVTT | |
"-osrt", # Output format: SubRip | |
"-ojf", # Output format: JSON (extended) | |
"-ocsv", # Output format: CSV | |
"--split-on-word", # Option to split on word | |
"-f", wav_file # WAV file to transcribe | |
] | |
# Execute the command and stream the stdout / stderr to a log file | |
print(f"Transcribing {wav_file}", file=p_output) | |
subprocess.run(command, stdout=p_output, stderr=p_output) | |
# delete the wav file | |
os.remove(wav_file) | |
def get_duration(path): | |
# use ffprobe / subprocess to get the duration of the file | |
cmd = ['ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', path] | |
out = subprocess.run(cmd, capture_output=True, check=True) | |
j = json.loads(out.stdout) | |
return j.get('format').get('duration') | |
def convert_to_wav(input_path, output_path, p_output): | |
# Set frame rate to 16kHz and export as mono WAV | |
# skip if the file already exists | |
if os.path.exists(output_path): | |
wav_duration = get_duration(output_path) | |
mp3_duration = get_duration(input_path) | |
# compare the duration of the wav file to the duration of the mp3 file | |
if wav_duration == mp3_duration: | |
print( | |
f"Skipping conversion of {output_path} to wav because a wav file of the same duration already exists.", | |
file=p_output) | |
p_output.flush() | |
return | |
# shell out to ffmpeg to convert the mp3 to mono 16 Khz wav | |
print(f"Converting {input_path} to {output_path}", file=p_output) | |
p_output.flush() | |
cmd = [ | |
'ffmpeg', | |
'-i', input_path, # Input file | |
'-ar', '16000', # Sample rate | |
'-ac', '1', # Channels | |
'-y', # Overwrite output file if it exists | |
output_path | |
] | |
subprocess.run(cmd, stdout=p_output, stderr=p_output) | |
def process_directory(directory=None, whisper=None, model=None, p_output=None, threads="1"): | |
if directory is None or whisper is None or model is None or p_output is None: | |
print("Missing required options for process_directory") | |
sys.exit(1) | |
for filename in os.listdir(directory): | |
if not filename.endswith(".mp3"): | |
continue | |
# Look for a lock file that matches the glob filename-*.lock | |
# if it exists, skip this file | |
locks = glob.glob(f"{directory}/{filename}-*.lock") | |
if len(locks) > 0: | |
print(f"Skipping {filename} because a lock file exists", file=p_output) | |
p_output.flush() | |
continue | |
fname = f"{filename}-{os.getpid()}.lock" | |
lockfile = os.path.join(directory, fname) | |
print(f"Touching {lockfile}", file=p_output) | |
p_output.flush() | |
open(lockfile, 'a').close() | |
input_path = os.path.join(directory, filename) | |
wav_path = input_path.replace(".mp3", ".wav") | |
convert_to_wav(input_path, wav_path, p_output) | |
transcribe_with_whisper_cpp( | |
wav_path, | |
whisper=whisper, | |
model=model, | |
threads=threads, | |
p_output=p_output) | |
# delete the lock file | |
print(f"Deleting {lockfile}", file=p_output) | |
p_output.flush() | |
os.remove(lockfile) | |
def find_needs_processing(directory): | |
# not strictly part of this script; just a helper to find files that need processing | |
# so I know what to upload to batch processors | |
queue = [] | |
for filename in os.listdir(directory): | |
if filename.endswith(".mp3"): | |
input_path = os.path.join(directory, filename) | |
wav_path = input_path.replace(".mp3", ".wav") | |
json_path = wav_path+".json" | |
if not os.path.exists(json_path): | |
queue.append(input_path) | |
print("\n".join(queue)) | |
print(len(queue)) | |
# find_needs_processing('/Users/andre/Downloads/cbb') | |
if __name__ == "__main__": | |
# for intel / cuda - although really the only difference is thread count | |
# for Apple Silicon, use one thread. | |
# get pid of current python process | |
pid = os.getpid() | |
p_output = open(f"/root/stdout-{pid}.log", 'a') | |
process_directory( | |
directory='/root/cbb', | |
whisper='/root/whisper.cpp/main', | |
model='/root/whisper.cpp/models/ggml-medium.bin', | |
threads="4", | |
p_output=p_output) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment