Skip to content

Instantly share code, notes, and snippets.

@pthavarasa
Last active September 26, 2022 16:54
Show Gist options
  • Save pthavarasa/d891086301c465af1959e1fbe5161fb6 to your computer and use it in GitHub Desktop.
Save pthavarasa/d891086301c465af1959e1fbe5161fb6 to your computer and use it in GitHub Desktop.
splitting wav audio stereo to mono and perform voice activity detector with webrtcvad. export spaker label as rttm format file
# pip install webrtcvad
# pip install noisereduce
from scipy.io import wavfile
import librosa
import numpy as np
def splitStereo(stereoWav):
"0 : left channel, 1 : right channel"
sr, samples = wavfile.read(stereoWav)
return samples[:, 0], samples[:, 1], sr
def splitStereoSave(stereoWav):
leftChannel, rightChannel, sr = splitStereo(stereoWav)
leftChannelPath = "".join(stereoWav.split(".")[:-1]+["_left_channel.wav"])
rightChannelPath = "".join(stereoWav.split(".")[:-1]+["_right_channel.wav"])
wavfile.write(leftChannelPath, sr, leftChannel)
wavfile.write(rightChannelPath, sr, rightChannel)
return leftChannelPath, rightChannelPath
def noiseReduction(wav):
import noisereduce as nr
rate, data = wavfile.read(wav)
reducedNoise = nr.reduce_noise(y=data, sr=rate)
reducedNoisePath = "".join(wav.split(".")[:-1]+["_reduced_noise.wav"])
wavfile.write(reducedNoisePath, rate, reducedNoise)
return reducedNoisePath
class SpeakerSeparation():
"input : Mono audio file in wav format and samples rate [8000, 16000, 32000, 48000]"
def __init__(self, audio=None, sr=None):
if audio:
self.mono_wav = audio
self.samples, self.sample_rate = librosa.load(audio, sr=sr)
self.samples = librosa.util.normalize(self.samples)
def samplesFloatToInt(self, samples):
"Converts librosa's float-based representation to int, given a numpy array SD"
return [int(s*32767) for s in samples]
def voiceActivityDetection(self, aggressiveness = 3):
"voice activity detection with webrtcvad"
import webrtcvad
import struct
vad = webrtcvad.Vad()
vad.set_mode(aggressiveness)
self.samplesInt = self.samplesFloatToInt(self.samples)
raw_samples = struct.pack("%dh" % (len(self.samplesInt)), *self.samplesInt)
window_duration = 0.03 # duration in seconds
samples_per_window = int(window_duration * self.sample_rate + 0.5)
bytes_per_sample = 2
try:
segments = []
for start in np.arange(0, len(self.samplesInt), samples_per_window):
stop = min(start + samples_per_window, len(self.samplesInt))
is_speech = vad.is_speech(raw_samples[start * bytes_per_sample: stop * bytes_per_sample],
sample_rate = self.sample_rate)
segments.append(dict(
start = start,
stop = stop,
is_speech = is_speech))
except:
print("An exception occurred")
return segments
def concatenateVAD(self, segments):
return np.concatenate([self.samplesInt[segment['start']:segment['stop']] for segment in segments if segment['is_speech']])
def plotSamples(self):
import matplotlib.pyplot as plt
plt.figure(figsize = (10,7))
plt.plot(self.samples)
def plotVAD(self, segments):
"plot segment identifed as speech"
import matplotlib.pyplot as plt
plt.figure(figsize = (10,7))
plt.plot(self.samplesInt)
ymax = max(self.samplesInt)
for segment in segments:
if segment['is_speech']:
plt.plot([ segment['start'], segment['stop'] - 1], [ymax * 1.1, ymax * 1.1], color = 'orange')
plt.xlabel('sample')
plt.grid()
def filterOnlySpeaker(segments, speakerName):
segmentLength = len(segments)
cursor = 0
onlySpeaker = []
begining = None
ending = None
while cursor < segmentLength:
if begining is None and segments[cursor]['is_speech']:
begining = segments[cursor]['start']
if begining is not None and not segments[cursor]['is_speech']:
onlySpeaker.append({'start': begining, 'stop': segments[cursor-1]['stop'], 'name': speakerName})
begining = None
if begining is not None and cursor == segmentLength-1:
onlySpeaker.append({'start': begining, 'stop': segments[cursor]['stop'], 'name': speakerName})
cursor+=1
return onlySpeaker
def concatenatespeakers(leftSpeaker, rightSpeaker):
return sorted(leftSpeaker + rightSpeaker, key=lambda x: x['start'])
def rttmSave(speakers, fileName, sr):
# get process_rttm file from https://gist.github.com/pthavarasa/f873ad3cdd3a6c9fef7122eb1ae12dd4
from process_rttm import LabelRTTM
f = open(fileName, 'w')
f.write(''.join([LabelRTTM(fileName, i['start']/sr, (i['stop'] - i['start'])/sr, i['name']).format_rttm() for i in speakers]))
f.close()
def rttmPipeline(wav):
leftChannelPath, rightChannelPath = splitStereoSave(wav)
ss = SpeakerSeparation(noiseReduction(leftChannelPath), sr=8000)
leftSegments = ss.voiceActivityDetection()
ss = SpeakerSeparation(noiseReduction(rightChannelPath), sr=8000)
rightSegments = ss.voiceActivityDetection()
leftSpeaker = filterOnlySpeaker(leftSegments, "leftSpeaker")
rightSpeaker = filterOnlySpeaker(rightSegments, "rightSpeaker")
rttmSave(concatenatespeakers(leftSpeaker, rightSpeaker), wav.split(".")[0] + ".rttm", 8000)
@pthavarasa
Copy link
Author

pthavarasa commented Sep 1, 2022

Example

Split stereo audio into 2 separate mono channel and save

leftChannelPath, rightChannelPath = splitStereoSave("main5.wav")

Visualize mono channel audio voice activity detection

ss = SpeakerSeparation(leftChannelPath, sr=8000)
# ss = SpeakerSeparation(noiseReduction(leftChannelPath), sr=8000) # apply noise reduction
segments = ss.voiceActivityDetection()
ss.plotVAD(segments)

filter voice avtive only and play

import IPython.display as ipd
ipd.Audio(ss.concatenateVAD(segments), rate=ss.sample_rate)

Generate label (RTTM format)

ss = SpeakerSeparation(noiseReduction(leftChannelPath), sr=8000)
leftSegments = ss.voiceActivityDetection()
ss = SpeakerSeparation(noiseReduction(rightChannelPath), sr=8000)
rightSegments = ss.voiceActivityDetection()
leftSpeaker = filterOnlySpeaker(leftSegments, "leftSpeaker")
rightSpeaker = filterOnlySpeaker(rightSegments, "rightSpeaker")
rttmSave(concatenatespeakers(leftSpeaker, rightSpeaker), "Speaker.rttm", 8000)

Speaker separation pipeline for stereo audio

  • work only if stereo audio contain only 2 speaker speak in each channel
rttmPipeline("main5.wav")

@pthavarasa
Copy link
Author

pthavarasa commented Sep 2, 2022

  • yellow line represent VAD

before noise reduction

image

after noise reduction

image

@pthavarasa
Copy link
Author

pthavarasa commented Sep 2, 2022

RTTM visualize

from pyannote.database.util import load_rttm
REFERENCE = f"Speaker.rttm"
reference = load_rttm(REFERENCE)["Speaker.cha"]
reference

image

@pthavarasa
Copy link
Author

requirements

# for speechbrain
!pip install -qq torch==1.11.0 torchvision==0.12.0 torchaudio==0.11.0 torchtext==0.12.0
!pip install -qq speechbrain==0.5.12

# pyannote.audio
!pip install -qq pyannote.audio

# for visualization purposes
!pip install -qq moviepy ipython==7.34.0

%pip install pyannote.audio
%pip install pyannote.database

code

from pyannote.database.util import load_rttm
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained("pyannote/[email protected]")

name = "main4_0"
# apply the pipeline to an audio file
diarization = pipeline(name + ".wav", min_speakers=2, max_speakers=3)

# dump the diarization output to disk using RTTM format
with open(name + ".rttm", "w") as rttm:
    diarization.write_rttm(rttm)

reference = load_rttm(name + ".rttm")[name]
reference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment