Last active
September 26, 2022 16:54
-
-
Save pthavarasa/d891086301c465af1959e1fbe5161fb6 to your computer and use it in GitHub Desktop.
splitting wav audio stereo to mono and perform voice activity detector with webrtcvad. export spaker label as rttm format file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip install webrtcvad | |
# pip install noisereduce | |
from scipy.io import wavfile | |
import librosa | |
import numpy as np | |
def splitStereo(stereoWav): | |
"0 : left channel, 1 : right channel" | |
sr, samples = wavfile.read(stereoWav) | |
return samples[:, 0], samples[:, 1], sr | |
def splitStereoSave(stereoWav): | |
leftChannel, rightChannel, sr = splitStereo(stereoWav) | |
leftChannelPath = "".join(stereoWav.split(".")[:-1]+["_left_channel.wav"]) | |
rightChannelPath = "".join(stereoWav.split(".")[:-1]+["_right_channel.wav"]) | |
wavfile.write(leftChannelPath, sr, leftChannel) | |
wavfile.write(rightChannelPath, sr, rightChannel) | |
return leftChannelPath, rightChannelPath | |
def noiseReduction(wav): | |
import noisereduce as nr | |
rate, data = wavfile.read(wav) | |
reducedNoise = nr.reduce_noise(y=data, sr=rate) | |
reducedNoisePath = "".join(wav.split(".")[:-1]+["_reduced_noise.wav"]) | |
wavfile.write(reducedNoisePath, rate, reducedNoise) | |
return reducedNoisePath | |
class SpeakerSeparation(): | |
"input : Mono audio file in wav format and samples rate [8000, 16000, 32000, 48000]" | |
def __init__(self, audio=None, sr=None): | |
if audio: | |
self.mono_wav = audio | |
self.samples, self.sample_rate = librosa.load(audio, sr=sr) | |
self.samples = librosa.util.normalize(self.samples) | |
def samplesFloatToInt(self, samples): | |
"Converts librosa's float-based representation to int, given a numpy array SD" | |
return [int(s*32767) for s in samples] | |
def voiceActivityDetection(self, aggressiveness = 3): | |
"voice activity detection with webrtcvad" | |
import webrtcvad | |
import struct | |
vad = webrtcvad.Vad() | |
vad.set_mode(aggressiveness) | |
self.samplesInt = self.samplesFloatToInt(self.samples) | |
raw_samples = struct.pack("%dh" % (len(self.samplesInt)), *self.samplesInt) | |
window_duration = 0.03 # duration in seconds | |
samples_per_window = int(window_duration * self.sample_rate + 0.5) | |
bytes_per_sample = 2 | |
try: | |
segments = [] | |
for start in np.arange(0, len(self.samplesInt), samples_per_window): | |
stop = min(start + samples_per_window, len(self.samplesInt)) | |
is_speech = vad.is_speech(raw_samples[start * bytes_per_sample: stop * bytes_per_sample], | |
sample_rate = self.sample_rate) | |
segments.append(dict( | |
start = start, | |
stop = stop, | |
is_speech = is_speech)) | |
except: | |
print("An exception occurred") | |
return segments | |
def concatenateVAD(self, segments): | |
return np.concatenate([self.samplesInt[segment['start']:segment['stop']] for segment in segments if segment['is_speech']]) | |
def plotSamples(self): | |
import matplotlib.pyplot as plt | |
plt.figure(figsize = (10,7)) | |
plt.plot(self.samples) | |
def plotVAD(self, segments): | |
"plot segment identifed as speech" | |
import matplotlib.pyplot as plt | |
plt.figure(figsize = (10,7)) | |
plt.plot(self.samplesInt) | |
ymax = max(self.samplesInt) | |
for segment in segments: | |
if segment['is_speech']: | |
plt.plot([ segment['start'], segment['stop'] - 1], [ymax * 1.1, ymax * 1.1], color = 'orange') | |
plt.xlabel('sample') | |
plt.grid() | |
def filterOnlySpeaker(segments, speakerName): | |
segmentLength = len(segments) | |
cursor = 0 | |
onlySpeaker = [] | |
begining = None | |
ending = None | |
while cursor < segmentLength: | |
if begining is None and segments[cursor]['is_speech']: | |
begining = segments[cursor]['start'] | |
if begining is not None and not segments[cursor]['is_speech']: | |
onlySpeaker.append({'start': begining, 'stop': segments[cursor-1]['stop'], 'name': speakerName}) | |
begining = None | |
if begining is not None and cursor == segmentLength-1: | |
onlySpeaker.append({'start': begining, 'stop': segments[cursor]['stop'], 'name': speakerName}) | |
cursor+=1 | |
return onlySpeaker | |
def concatenatespeakers(leftSpeaker, rightSpeaker): | |
return sorted(leftSpeaker + rightSpeaker, key=lambda x: x['start']) | |
def rttmSave(speakers, fileName, sr): | |
# get process_rttm file from https://gist.github.com/pthavarasa/f873ad3cdd3a6c9fef7122eb1ae12dd4 | |
from process_rttm import LabelRTTM | |
f = open(fileName, 'w') | |
f.write(''.join([LabelRTTM(fileName, i['start']/sr, (i['stop'] - i['start'])/sr, i['name']).format_rttm() for i in speakers])) | |
f.close() | |
def rttmPipeline(wav): | |
leftChannelPath, rightChannelPath = splitStereoSave(wav) | |
ss = SpeakerSeparation(noiseReduction(leftChannelPath), sr=8000) | |
leftSegments = ss.voiceActivityDetection() | |
ss = SpeakerSeparation(noiseReduction(rightChannelPath), sr=8000) | |
rightSegments = ss.voiceActivityDetection() | |
leftSpeaker = filterOnlySpeaker(leftSegments, "leftSpeaker") | |
rightSpeaker = filterOnlySpeaker(rightSegments, "rightSpeaker") | |
rttmSave(concatenatespeakers(leftSpeaker, rightSpeaker), wav.split(".")[0] + ".rttm", 8000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
requirements
code