Skip to content

Instantly share code, notes, and snippets.

@alunkingusw
Last active June 16, 2025 08:42
Show Gist options
  • Save alunkingusw/4275a26ba79cefd49f5a2e91d91c4da2 to your computer and use it in GitHub Desktop.
Save alunkingusw/4275a26ba79cefd49f5a2e91d91c4da2 to your computer and use it in GitHub Desktop.
Performing diarization on an audio file and comparing embeddings of known speakers.
!pip install -qq pyannote.audio
from datetime import datetime
from pyannote.core import Segment
from subprocess import CalledProcessError, run
from pyannote.audio import Pipeline
from pyannote.audio.core.model import Model # Import Model
import os
import torch # Import torch
from pyannote.audio import Inference
from sklearn.preprocessing import normalize
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
# Create a timestamp string to keep track of when the process was executed.
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# mount drive to load the files we need.
from google.colab import drive
print("Mounting Google Drive. Please follow the instructions to authenticate.")
drive.mount('/content/drive')
from google.colab import userdata
HUGGING_FACE = userdata.get('HUGGING_FACE')
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=HUGGING_FACE
)
# Load the embedding model
embedding_model = Model.from_pretrained("pyannote/embedding", use_auth_token=HUGGING_FACE)
# Move the pipeline to the GPU if available
if torch.cuda.is_available():
pipeline.to(torch.device("cuda"))
print("Pipeline moved to GPU")
else:
print("GPU not available, pipeline running on CPU")
# Initialize inference with whole window
embedding_model = Inference("pyannote/embedding", window="whole")
if torch.cuda.is_available():
embedding_model.to(torch.device("cuda"))
# Use the path to your file in Google Drive
input_file = '/content/drive/path_to_audio_file.wav'
# inference - num_speakers is optional
diarization = pipeline(input_file, num_speakers=4)
# let's get our reference audio clips.
# Define the dictionary of speaker names and audio clips
speaker_audio_clips = {
"speaker_one": "/content/drive/path_to_speaker_one_sample.wav",
"speaker_two": "/content/drive/path_to_speaker_two_sample.wav",
}
reference_embeddings = {}
reference_embeddings[0] = embedding_model(speaker_audio_clips['speaker_one'])
reference_embeddings[1] = embedding_model(speaker_audio_clips['speaker_two'])
#could have an if statement to check the shape here first?
reference_embeddings[0] = reference_embeddings[0].reshape(1,-1)
reference_embeddings[1] = reference_embeddings[1].reshape(1,-1)
# Dictionary to collect embeddings per speaker
speaker_embeddings = {}
# once the diarisation is complete, we loop through the speakers.
for turn, _, speaker in diarization.itertracks(yield_label=True):
#print(f"start={turn.start:.2f}s stop={turn.end:.2f}s speaker_{speaker}")
#ignore segments shorter than 2 seconds
if turn.duration < 2:
continue
# Crop embedding for the current segment
try:
segment = Segment(turn.start, turn.end)
emb = embedding_model.crop(input_file, segment).reshape(1,-1)
emb = normalize(emb) # L2 normalization
# Collect embeddings per speaker
if speaker not in speaker_embeddings:
speaker_embeddings[speaker] = []
speaker_embeddings[speaker].append(emb)
except Exception as e:
print(f"Error processing segment {turn}: {e}")
# Average and compare speaker embeddings to reference
for speaker, embeddings in speaker_embeddings.items():
stacked = np.vstack(embeddings)
avg_embedding = np.mean(stacked, axis=0, keepdims=True)
print(f"\nResults for diarized speaker_{speaker}:")
for ref_label, ref_embedding in reference_embeddings.items():
score = cosine_similarity(avg_embedding, normalize(ref_embedding))[0][0]
print(f" Similarity to reference {ref_label} ({'speaker_one' if ref_label==0 else 'speaker_two'}): {score:.3f}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment