Bob 2.0 implementation of VAD based on signal's energy
Algorithms have at least one input and one output. All algorithm endpoints are organized in groups. Groups are used by the platform to indicate which inputs and outputs are synchronized together. The first group is automatically synchronized with the channel defined by the block in which the algorithm is deployed.
Endpoint Name | Data Format | Nature |
---|---|---|
speech | system/array_1d_floats/1 | Input |
vad | system/array_1d_integers/1 | Output |
Parameters allow users to change the configuration of an algorithm when scheduling an experiment
Name | Description | Type | Default | Range/Choices |
---|---|---|---|---|
rate | sampling rate | float64 | 16000.0 |
xxxxxxxxxx
import numpy
import bob.core
import bob.ap
import bob.learn.em
import logging
logger = logging.getLogger("bob.c++")
def normalize_std_array(vector):
"""Applies a unit mean and variance normalization to an arrayset"""
# Initializes variables
length = 1
n_samples = len(vector)
mean = numpy.ndarray((length,), 'float64')
std = numpy.ndarray((length,), 'float64')
mean.fill(0)
std.fill(0)
# Computes mean and variance
for array in vector:
x = array.astype('float64')
mean += x
std += (x ** 2)
mean /= n_samples
std /= n_samples
std -= (mean ** 2)
std = std ** 0.5
arrayset = numpy.ndarray(shape=(n_samples,mean.shape[0]), dtype=numpy.float64)
for i in range (0, n_samples):
arrayset[i,:] = (vector[i]-mean) / std
return arrayset
def smoothing(labels, smoothing_window):
""" Applies a smoothing on VAD"""
if numpy.sum(labels)< smoothing_window:
return labels
segments = []
for k in range(1,len(labels)-1):
if labels[k]==0 and labels[k-1]==1 and labels[k+1]==1 :
labels[k]=1
for k in range(1,len(labels)-1):
if labels[k]==1 and labels[k-1]==0 and labels[k+1]==0 :
labels[k]=0
seg = numpy.array([0,0,labels[0]])
for k in range(1,len(labels)):
if labels[k] != labels[k-1]:
seg[1]=k-1
segments.append(seg)
seg = numpy.array([k,k,labels[k]])
seg[1]=len(labels)-1
segments.append(seg)
if len(segments) < 2:
return labels
curr = segments[0]
next = segments[1]
# Look at the first segment. If it's short enough, just change its labels
if (curr[1]-curr[0]+1) < smoothing_window and (next[1]-next[0]+1) > smoothing_window:
if curr[2]==1:
labels[curr[0] : (curr[1]+1)] = numpy.zeros(curr[1] - curr[0] + 1)
curr[2]=0
else: #curr[2]==0
labels[curr[0] : (curr[1]+1)] = numpy.ones(curr[1] - curr[0] + 1)
curr[2]=1
for k in range(1,len(segments)-1):
prev = segments[k-1]
curr = segments[k]
next = segments[k+1]
if (curr[1]-curr[0]+1) < smoothing_window and (prev[1]-prev[0]+1) > smoothing_window and (next[1]-next[0]+1) > smoothing_window:
if curr[2]==1:
labels[curr[0] : (curr[1]+1)] = numpy.zeros(curr[1] - curr[0] + 1)
curr[2]=0
else: #curr[2]==0
labels[curr[0] : (curr[1]+1)] = numpy.ones(curr[1] - curr[0] + 1)
curr[2]=1
prev = segments[-2]
curr = segments[-1]
if (curr[1]-curr[0]+1) < smoothing_window and (prev[1]-prev[0]+1) > smoothing_window:
if curr[2]==1:
labels[curr[0] : (curr[1]+1)] = numpy.zeros(curr[1] - curr[0] + 1)
curr[2]=0
else: #if curr[2]==0
labels[curr[0] : (curr[1]+1)] = numpy.ones(curr[1] - curr[0] + 1)
curr[2]=1
return labels
class Algorithm:
def __init__(self):
# Cepstral parameters
self.win_length_ms = 20
self.win_shift_ms = 10
# VAD parameters
self.max_iterations = 10
self.smoothing_window = 10 # This corresponds to 100ms
self.rate = 16000
def _voice_activity_detection(self, energy_array):
#########################
## Initialisation part ##
#########################
max_iterations = self.max_iterations
n_samples = len(energy_array)
normalized_energy = normalize_std_array(energy_array)
kmeans = bob.learn.em.KMeansMachine(2, 1)
logger_propagate = logger.propagate
# Mute logger propagation
if logger_propagate:
logger.propagate = False
m_ubm = bob.learn.em.GMMMachine(2, 1)
kmeans_trainer = bob.learn.em.KMeansTrainer('RANDOM_NO_DUPLICATE')
convergence_threshold = 0.0005
max_iterations = max_iterations
# Trains using the KMeansTrainer
bob.learn.em.train(kmeans_trainer, kmeans, normalized_energy, max_iterations, convergence_threshold)
[variances, weights] = kmeans.get_variances_and_weights_for_each_cluster(normalized_energy)
means = kmeans.means
if numpy.isnan(means[0]) or numpy.isnan(means[1]):
#print("Warning: skip this file")
return numpy.array(numpy.zeros(n_samples), dtype=numpy.int16)
# Initializes the GMM
m_ubm.means = means
m_ubm.variances = variances
m_ubm.weights = weights
m_ubm.set_variance_thresholds(0.0005)
trainer = bob.learn.em.ML_GMMTrainer(True, True, True)
gmm_max_iterations = 25
bob.learn.em.train(trainer, m_ubm, normalized_energy, gmm_max_iterations, convergence_threshold)
means = m_ubm.means
weights = m_ubm.weights
# Enable logger propagation again
if logger_propagate:
logger.propagate = True
if means[0] < means[1]:
higher = 1
lower = 0
else:
higher = 0
lower = 1
label = numpy.array(numpy.ones(n_samples), dtype=numpy.int16)
higher_mean_gauss = m_ubm.get_gaussian(higher)
lower_mean_gauss = m_ubm.get_gaussian(lower)
k=0
for i in range(n_samples):
if higher_mean_gauss.log_likelihood(normalized_energy[i]) < lower_mean_gauss.log_likelihood( normalized_energy[i]):
label[i]=0
else:
label[i]=label[i] * 1
#print("After Energy-based VAD there are %d frames remaining over %d" %(numpy.sum(label), len(label)))
return label
def setup(self, parameters):
self.rate = parameters.get('rate', self.rate)
wl = self.win_length_ms
ws = self.win_shift_ms
max_iterations = self.max_iterations
smoothing_window = self.smoothing_window
rate = self.rate
self.preprocessor = bob.ap.Energy(rate, wl, ws)
return True
def process(self, inputs, outputs):
float_wav = inputs["speech"].data.value
energy_array = self.preprocessor(float_wav)
labels = self._voice_activity_detection(energy_array)
vad_labels = smoothing(labels,10) # discard isolated speech less than 100ms
outputs["vad"].write({
'value':vad_labels
})
return True
The code for this algorithm in Python
The ruler at 80 columns indicate suggested POSIX line breaks (for readability).
The editor will automatically enlarge to accomodate the entirety of your input
Use keyboard shortcuts for search/replace and faster editing. For example, use Ctrl-F (PC) or Cmd-F (Mac) to search through this box
Raw content
Updated | Name | Databases/Protocols | Analyzers | |||
---|---|---|---|---|---|---|
pkorshunov/pkorshunov/isv-asv-pad-fusion-complete/1/asv_isv-pad_lbp_hist_ratios_lr-fusion_lr-pa_aligned | avspoof/2@physicalaccess_verification,avspoof/2@physicalaccess_verify_train,avspoof/2@physicalaccess_verify_train_spoof,avspoof/2@physicalaccess_antispoofing,avspoof/2@physicalaccess_verification_spoof | pkorshunov/spoof-score-fusion-roc_hist/1 | ||||
pkorshunov/pkorshunov/isv-asv-pad-fusion-complete/1/asv_isv-pad_gmm-fusion_lr-pa | avspoof/2@physicalaccess_verification,avspoof/2@physicalaccess_verify_train,avspoof/2@physicalaccess_verify_train_spoof,avspoof/2@physicalaccess_antispoofing,avspoof/2@physicalaccess_verification_spoof | pkorshunov/spoof-score-fusion-roc_hist/1 | ||||
pkorshunov/pkorshunov/isv-speaker-verification-spoof/1/isv-speaker-verification-spoof-pa | avspoof/2@physicalaccess_verification,avspoof/2@physicalaccess_verification_spoof | pkorshunov/eerhter_postperf_iso_spoof/1 | ||||
pkorshunov/pkorshunov/isv-speaker-verification/1/isv-speaker-verification-licit | avspoof/2@physicalaccess_verification | pkorshunov/eerhter_postperf_iso/1 |
This table shows the number of times this algorithm has been successfully run using the given environment. Note this does not provide sufficient information to evaluate if the algorithm will run when submitted to different conditions.