Train a diarization system and output a model (the model must include the speaker database)
Algorithms have at least one input and one output. All algorithm endpoints are organized in groups. Groups are used by the platform to indicate which inputs and outputs are synchronized together. The first group is automatically synchronized with the channel defined by the block in which the algorithm is deployed.
Endpoint Name | Data Format | Nature |
---|---|---|
features | system/array_2d_floats/1 | Input |
uem | anthony_larcher/uemranges/1 | Input |
file_info | anthony_larcher/file_info_sd/1 | Input |
speakers | anthony_larcher/speakers/1 | Input |
model | anthony_larcher/array_1d_uint8/1 | Output |
xxxxxxxxxx
import copy
import h5py
import numpy
import pickle
import random
import sidekit
import struct
import s4d
import os
import logging
class H5Dict(dict):
"""
A dictionary sub-class used to exchange features between
algorithms.
"""
def __setitem__(self, key, value):
super().__setitem__(key, value)
def __getitem__(self, key):
return super().__getitem__(key)
def __iter__(self):
return iter(["cep",])
class AlliesExtractor(sidekit.FeaturesExtractor):
"""
A FeaturesExtractor is used to access cepstral coefficients
"""
def __init__(self, dataloader, name_dict, key="features"):
super().__init__()
self.dataloader = dataloader
self.name_dict = name_dict
self.audio_filename_structure = "{}"
self.key = key
def extract(self, show, channel, input_audio_filename):
entry, _, _ = self.dataloader[self.name_dict[show]]
cep = entry[self.key].value
label = numpy.ones((cep.shape[0]), dtype=bool)
h5f = H5Dict()
h5f[show] = iter(["cep"])
h5f["compression"] = 0
h5f[show + "/energy"] = cep[:, 0]
h5f[show + "/energy_mean"] = cep[:, 0].mean()
h5f[show + "/energy_std"] = cep[:, 0].std()
h5f[show + "/cep"] = cep[:, 1:]
h5f[show + "/cep_mean"] = cep[:, 1:].mean(1)
h5f[show + "/cep_std"] = cep[:, 1:].std(1)
h5f[show + "/vad"] = label.astype("int8")
return h5f
def setup_for_s4d(data_loaders):
"""
"""
# The laoder is the interface used to acces inputs of this algorithmic block
loader = data_loaders.loaderOf("features")
# The s4d.Diar stores information from the MDTM
# about each speech segments :
# file_id
# cluster which is used to store speaker index
# start_time
# stop_time
global_diar = s4d.diar.Diar()
global_diar.add_attribut(new_attribut="gender", default="U")
# Fill the s4d.Diar object and a dictionnary "name_dict"
# that is used to access audio_file per file_id
name_dict = {}
for i in range(loader.count()):
(data, _, end_index) = loader[i]
speech_segments = data["speakers"]
file_id = data["file_info"].file_id
supervision = data["file_info"].supervision
time_stamp = data["file_info"].time_stamp
for idx in range(speech_segments.speaker.size):
global_diar.append(
show=file_id,
cluster=speech_segments.speaker[idx],
start=int(round(speech_segments.start_time[idx] * 100, 0)),
stop=int(round(speech_segments.end_time[idx] * 100, 0)),
)
name_dict[file_id] = int(i)
# Create two obejct to access acoustic features
fe_train = AlliesExtractor(loader, name_dict)
fs_acoustic = sidekit.FeaturesServer(
features_extractor=fe_train,
dataset_list=["energy", "cep"],
keep_all_features=True,
delta=True,
double_delta=True,
feat_norm="cmvn_sliding"
)
# Get the feature size from the first file
feature_size = fe_train.extract(
list(name_dict.keys())[0], 0, list(name_dict.keys())[0]
)[list(name_dict.keys())[0] + "/cep"].shape[1] * 3 + 3
return global_diar, fs_acoustic, feature_size, end_index
def aggregate_models(ubm, tv_fa, norm_mean, norm_cov, plda_fa, iv_stat, global_diar):
"""
Agregate components of the acoustic model to exchange between algorithms.
The model is Pickled and converted into a string
"""
model = dict()
model["ubm"] = ubm
model["tv_fa"] = tv_fa
model["norm_mean"] = norm_mean
model["norm_cov"] = norm_cov
model["plda_fa"] = plda_fa
model["iv_stat"] = iv_stat
model["global_diar"] = global_diar
#pickle.dump(model, open( "model_beat.p", "wb" ))
pkl = pickle.dumps(model)
u8 = numpy.array(struct.unpack("{}B".format(len(pkl)), pkl), dtype=numpy.uint8)
return u8
def keep_recurring_speakers(iv_ss, rank_F, occ_number=2, filter_by_name=False):
"""
"""
unique, counts = numpy.unique(iv_ss.modelset, return_counts=True)
tot_sessions = 0
kept_idx = []
for model in range(unique.shape[0]):
if counts[model] >= occ_number and (
(
not (unique[model]).startswith("speaker")
and not (unique[model]).startswith("presentat")
and not (unique[model]).startswith("sup+")
and not (unique[model]).startswith("voix")
and not (unique[model]).startswith("publicitaire")
and not (unique[model]).startswith("locuteur")
and not (unique[model]).startswith("inconnu")
and not (unique[model]).startswith("traduct")
and not ("#" in (unique[model]))
and "_" in (unique[model])
and filter_by_name
)
or (not filter_by_name)
):
kept_idx = numpy.append(
kept_idx, numpy.where(iv_ss.modelset == unique[model])
)
tot_sessions += counts[model]
#
flt_iv = sidekit.StatServer()
flt_iv.stat0 = numpy.ones((tot_sessions, 1))
flt_iv.stat1 = numpy.ones((tot_sessions, rank_F))
flt_iv.modelset = numpy.empty(tot_sessions, dtype="|O")
flt_iv.segset = numpy.empty(tot_sessions, dtype="|O")
flt_iv.start = numpy.empty(tot_sessions, dtype="|O")
flt_iv.stop = numpy.empty(tot_sessions, dtype="|O")
#
i = 0
for model in range(unique.shape[0]):
if counts[model] >= occ_number and (
(
not (unique[model]).startswith("speaker")
and not (unique[model]).startswith("presentat")
and not (unique[model]).startswith("sup+")
and not (unique[model]).startswith("voix")
and not (unique[model]).startswith("publicitaire")
and not (unique[model]).startswith("locuteur")
and not (unique[model]).startswith("inconnu")
and not (unique[model]).startswith("traduct")
and not ("#" in (unique[model]))
and "_" in (unique[model])
and filter_by_name
)
or (not filter_by_name)
):
for ivector in numpy.where(iv_ss.modelset == unique[model])[0]:
flt_iv.stat1[i, :] = iv_ss.stat1[ivector, :]
flt_iv.modelset[i] = iv_ss.modelset[ivector]
flt_iv.segset[i] = iv_ss.segset[ivector]
flt_iv.start[i] = iv_ss.start[ivector]
flt_iv.stop[i] = iv_ss.stop[ivector]
i += 1
unique = numpy.unique(flt_iv.modelset)
return flt_iv, unique.shape[0], kept_idx
class Algorithm:
# initialise fields to store cross-input data (e.g. machines, aggregations, etc.)
def __init__(self):
"""
Set the meta-parameters of the model
"""
self.distrib_nb = 16 #256
self.iv_size = 50 #150
self.plda_rank = 10 #50
self.feature_size = None
self.tv_iteration_nb = 5
# this will be called each time the sync'd input has more data available to be processed
def process(self, data_loaders, outputs):
"""
Use the training data provided through the dataloader in order to
train the acoustic model and return it
:param data_loader: input parameters that is used to access all incoming data
:param outputs: parameter to send the model to the next block
"""
logging.debug("modif emcfo;ecjkiescgfsrg")
# The laoder is the interface used to acces inputs of this algorithmic block
loader = data_loaders.loaderOf("features")
global_diar, fs_acoustic, feature_size, end_index = setup_for_s4d(data_loaders)
self.feature_size = feature_size
############################################################################
# Start training the acoustic system.
# A description of the system can be found at ****************
# AJOUTER UN LIEN VERS LE DESCRIPTIF DU SYSTEME
# Train UBM on the first 1000 speech segments
ubm_diar = copy.deepcopy(global_diar)
ubm_diar.segments = ubm_diar.segments[:100]
ubm_idmap = ubm_diar.id_map()
ubm = sidekit.Mixture()
ubm.EM_split(fs_acoustic, ubm_idmap, self.distrib_nb, num_thread=1, save_partial=False)
# Select a maximum of 1000 segments of speech of more than 1s duration
# to train the Total Variability matrix
tv_diar = global_diar.copy_structure()
tv_diar.segments = random.sample(global_diar.filter("duration", ">=", 100).segments,
k= min(len(global_diar.filter("duration", ">=", 100)), 1000))
tv_idmap = tv_diar.id_map()
# Get the list of unused segments that are more than 1s
long_seg_diar = global_diar.copy_structure()
for seg in global_diar.filter("duration", ">=", 100).segments:
if not seg in tv_diar:
long_seg_diar.append_seg(seg)
long_seg_idmap = long_seg_diar.id_map()
# Shorter segments are preserved aside for clustering purpose
short_seg_diar = global_diar.filter("duration", "<", 100)
short_seg_idmap = short_seg_diar.id_map()
# Accumulate sufficient statistics for the training data
tv_stat = sidekit.StatServer(
tv_idmap,
distrib_nb=ubm.get_distrib_nb(),
feature_size=self.feature_size
)
long_seg_stat = sidekit.StatServer(
long_seg_idmap,
distrib_nb=ubm.get_distrib_nb(),
feature_size=self.feature_size
)
short_seg_stat = sidekit.StatServer(
short_seg_idmap,
distrib_nb=ubm.get_distrib_nb(),
feature_size=self.feature_size
)
tv_stat.accumulate_stat(
ubm=ubm,
feature_server=fs_acoustic,
seg_indices=range(tv_stat.segset.shape[0]),
num_thread=1
)
long_seg_stat.accumulate_stat(
ubm=ubm,
feature_server=fs_acoustic,
seg_indices=range(long_seg_stat.segset.shape[0]),
num_thread=1
)
short_seg_stat.accumulate_stat(
ubm=ubm,
feature_server=fs_acoustic,
seg_indices=range(short_seg_stat.segset.shape[0]),
num_thread=1
)
# Sufficient statistics for selected for training are passed to the FactorAnalyser
# via an HDF5 file handler that stays in memory
tv_stat_h5f = h5py.File("tv.h5", "a", backing_store=False, driver="core")
tv_stat.to_hdf5(tv_stat_h5f)
tv_fa = sidekit.FactorAnalyser()
tv_fa.total_variability(
tv_stat_h5f,
ubm,
tv_rank=self.iv_size,
nb_iter=self.tv_iteration_nb,
min_div=True,
tv_init=None,
batch_size=300,
save_init=False,
output_file_name=None,
num_thread=1,
)
# Extract i-vectors for plda training
tv_iv = tv_fa.extract_ivectors_single(
ubm, tv_stat, uncertainty=False
)
tmp_long_seg_iv = tv_fa.extract_ivectors_single(
ubm, long_seg_stat, uncertainty=False
)
short_seg_iv = tv_fa.extract_ivectors_single(
ubm, short_seg_stat, uncertainty=False
)
long_seg_iv = tv_iv.merge(tmp_long_seg_iv)
global_iv = long_seg_iv.merge(short_seg_iv)
# Train the Probabilistic Linear Discriminant Analyser
# First long i-vectors are normalized
norm_mean = norm_cov = None
norm_mean, norm_cov = long_seg_iv.estimate_spectral_norm_stat1(1, "sphNorm")
# Filter the i-vector set in order to keep only recurrent speakers to train the PLDA model
norm_iv, n_recc, _ = keep_recurring_speakers(
long_seg_iv, rank_F=self.iv_size, occ_number=2, filter_by_name=True
)
norm_iv.spectral_norm_stat1(norm_mean, norm_cov)
# EM algorithm to train the PLDA model
plda_fa = sidekit.FactorAnalyser()
plda_fa.plda(
norm_iv,
rank_f=self.plda_rank,
nb_iter=10,
scaling_factor=1.0,
output_file_name=None,
save_partial=False,
save_final=False
)
# Return the model in a format that can be pickled
model = aggregate_models(
ubm, tv_fa, norm_mean, norm_cov, plda_fa, global_iv, global_diar
)
outputs["model"].write({"value": model}, end_index)
return True
The code for this algorithm in Python
The ruler at 80 columns indicate suggested POSIX line breaks (for readability).
The editor will automatically enlarge to accomodate the entirety of your input
Use keyboard shortcuts for search/replace and faster editing. For example, use Ctrl-F (PC) or Cmd-F (Mac) to search through this box
This table shows the number of times this algorithm has been successfully run using the given environment. Note this does not provide sufficient information to evaluate if the algorithm will run when submitted to different conditions.