Algorithms have at least one input and one output. All algorithm endpoints are organized in groups. Groups are used by the platform to indicate which inputs and outputs are synchronized together. The first group is automatically synchronized with the channel defined by the block in which the algorithm is deployed.
Endpoint Name | Data Format | Nature |
---|---|---|
features | system/array_2d_floats/1 | Input |
processor_uem | anthony_larcher/uemranges/1 | Input |
processor_file_info | anthony_larcher/file_info_sd/1 | Input |
adapted_speakers | anthony_larcher/speakers/1 | Output |
Endpoint Name | Data Format | Nature |
---|---|---|
model | anthony_larcher/array_1d_uint8/1 | Input |
train_features | system/array_2d_floats/1 | Input |
train_uem | anthony_larcher/uemranges/1 | Input |
train_file_info | anthony_larcher/file_info_sd/1 | Input |
train_speakers | anthony_larcher/speakers/1 | Input |
xxxxxxxxxx
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import numpy as np
import pickle
import sidekit
import s4d
import struct
import copy
class H5Dict(dict):
"""
A dictionary sub-class used to exchange features between
algorithms.
"""
def __setitem__(self, key, value):
super().__setitem__(key, value)
def __getitem__(self, key):
return super().__getitem__(key)
def __iter__(self):
return iter(["cep",])
class AlliesExtractor(sidekit.FeaturesExtractor):
"""
A FeaturesExtractor is used to access cepstral coefficients in SIDEKIT style
"""
def __init__(self, dataloader, name_dict, key="features"):
super().__init__()
self.dataloader = dataloader
self.name_dict = name_dict
self.audio_filename_structure = "{}"
self.key = key
def extract(self, show, channel, input_audio_filename):
entry, _, _ = self.dataloader[self.name_dict[show]]
cep = entry[self.key].value
label = np.ones((cep.shape[0]), dtype=bool)
h5f = H5Dict()
h5f[show] = iter(["cep"])
h5f["compression"] = 0
h5f[show + "/energy"] = cep[:, 0]
h5f[show + "/energy_mean"] = cep[:, 0].mean()
h5f[show + "/energy_std"] = cep[:, 0].std()
h5f[show + "/cep"] = cep[:, 1:]
h5f[show + "/cep_mean"] = cep[:, 1:].mean(1)
h5f[show + "/cep_std"] = cep[:, 1:].std(1)
h5f[show + "/vad"] = label.astype("int8")
return h5f
def get_s4d_model_from_text(serialized_model):
"""
Fill a s4d.ModelIV object to use S4D
"""
model = s4d.ModelIV()
model.ubm = serialized_model['ubm']
model.tv = serialized_model['tv_fa'].F
model.tv_mean = serialized_model['tv_fa'].mean
model.tv_sigma = serialized_model['tv_fa'].Sigma
model.sn_mean = serialized_model['norm_mean']
model.sn_cov = serialized_model['norm_cov']
model.plda_mean = serialized_model['plda_fa'].mean
model.plda_f = serialized_model['plda_fa'].F
model.plda_g = None
model.plda_sigma = serialized_model['plda_fa'].Sigma
model.ivectors = serialized_model['iv_stat']
model.scores = None
model.nb_thread = 1
return model
def s4d_to_allies(s4d_segmentation):
"""
Convert a s4d.Diar object into the proper ALLIES format for segmentation
:param s4d_segmentation[0]: is a s4d.Diar object
"""
spk = []
st = []
en = []
for segment in s4d_segmentation[0]:
spk.append(segment["cluster"])
st.append(float(segment["start"])/100.)
en.append(float(segment["stop"])/100.)
hyp = {"speaker": spk, "start_time": st, "end_time": en}
return hyp
def extract_iv_on_segments(model, features, seg):
# Create a Statserver to accmulate the statistics
tmp_ss = sidekit.StatServer(statserver_file_name=None,
distrib_nb=len(model.ubm.w),
feature_size=features.shape[1],
index=None,
ubm=model.ubm)
tmp_ss.modelset = np.empty(len(seg), dtype="|O")
tmp_ss.segset = np.empty(len(seg), dtype="|O")
tmp_ss.start = np.zeros((len(seg),), dtype=np.int32)
tmp_ss.stop = np.zeros((len(seg),), dtype=np.int32)
tmp_ss.stat0 = np.zeros((len(seg), len(model.ubm.w)), dtype=np.float32)
tmp_ss.stat1 = np.zeros((len(seg), model.ubm.sv_size()), dtype=np.float32)
for idx, segment in enumerate(seg):
start = segment['start']
stop = segment['stop']
data = features[start:stop, :]
lp = model.ubm.compute_log_posterior_probabilities(data)
pp, foo = sidekit.mixture.sum_log_probabilities(lp)
tmp_ss.modelset[idx] = segment['cluster']
tmp_ss.modelset[idx] = segment['cluster']
tmp_ss.start[idx] = start
tmp_ss.stop[idx] = stop
# Compute 0th-order statistics
tmp_ss.stat0[idx, :] = pp.sum(0)
# Compute 1st-order statistics
tmp_ss.stat1[idx, :] = np.reshape(np.transpose(np.dot(data.transpose(), pp)), model.ubm.sv_size()).astype(np.float32)
fa = sidekit.FactorAnalyser(mean=model.tv_mean, Sigma=model.tv_sigma, F=model.tv)
ivectors = fa.extract_ivectors_single(model.ubm, tmp_ss)
return ivectors
def run_s4d_segmentation(model,
global_diar,
features,
file_id):
"""
"""
####################################################################################
# First pass of Within show diarization
####################################################################################
thr_l = 2
thr_h = 3
thr_vit = -250
# initialize diarization
init_diar = s4d.segmentation.init_seg(features, file_id)
# ## Step 1: gaussian divergence segmentation
seg_diar = s4d.segmentation.segmentation(features, init_diar)
# ## Step 2: linear BIC segmentation (fusion)
bicl_diar = s4d.segmentation.bic_linear(features, seg_diar, thr_l, sr=False)
# ## Step 3: BIC HAC (clustering intra-show)
hac = s4d.clustering.hac_bic.HAC_BIC(features, bicl_diar, thr_h, sr=False)
bich_diar = hac.perform()
# ## Step 4: Viterbi decoding
vit_diar = s4d.viterbi.viterbi_decoding(features, bich_diar, thr_vit)
####################################################################################
# Second pass of Within show diarization
####################################################################################
idmap_in = vit_diar.id_map()
# Add delta, double delta and normalize the features
label, threshold = sidekit.frontend.vad.vad_percentil(features[:, 0], 10)
delta_filter = np.array([.25, .5, .25, 0, -.25, -.5, -.25])
delta = sidekit.frontend.features.compute_delta(features, filt=delta_filter)
features = np.column_stack((features, delta))
double_delta = sidekit.frontend.features.compute_delta(delta, filt=delta_filter)
features = np.column_stack((features, double_delta))
label = sidekit.frontend.vad.label_fusion(label)
sidekit.frontend.normfeat.cep_sliding_norm(features, label=label, win=301, center=True, reduce=True)
# Extract i-vectors
within_iv = extract_iv_on_segments(model, features, vit_diar)
within_iv_backup = copy.deepcopy(within_iv)
# Normalize i-vectors from the new file and compute within show scores
within_iv_mean = within_iv_backup.mean_stat_per_model()
within_iv_mean.spectral_norm_stat1(model.sn_mean[:1], model.sn_cov[:1])
if within_iv_mean.modelset.shape[0] > 1:
ndx = sidekit.Ndx(models=within_iv_mean.modelset, testsegs=within_iv_mean.modelset)
scores = sidekit.iv_scoring.fast_PLDA_scoring(within_iv_mean,
within_iv_mean,
ndx,
model.plda_mean,
model.plda_f,
model.plda_sigma,
p_known=0.0,
scaling_factor=1.0,
check_missing=False)
scores.scoremat = 0.5 * (scores.scoremat + scores.scoremat.transpose())
print("models dans scores: {}".format(scores.modelset))
print("Clusters dans vit_diar: {}".format(vit_diar.unique('cluster')))
# Within show clustering
within_diar, cluster_dict, merge = s4d.clustering.hac_iv.hac_iv(
vit_diar, scores, threshold=5
)
print("Number of clusters in vit_diar {} and in within_diar {}".format(len(vit_diar.unique('cluster')),len(within_diar.unique('cluster'))))
print("scores min {} and max{}".format(scores.scoremat.min(), scores.scoremat.max()))
print("merge: {}".format(merge))
return (within_diar, cluster_dict, merge)
def unsupervised_model_adaptation(features,
file_id,
data_loaders,
model,
global_diar,
current_s4d_segmentation):
########################################################
# ACCESS TRAINING DATA TO ADAPT IF WANTED
########################################################
# You are allowed to re-process training data all along the
# lifelong adaptation.
# The code below shows how to acces data from the training set.
# Create the Data_Loaders to access all training data (not only the file_info)
# See below how to get all training data
train_loader = data_loaders.loaderOf("train_file_info")
# Get the list of training files
# We create a dictionnary with file_id as keys and the index of the file in the training set
# as value. This dictionnary will be used later to access the training files
# by file_id
name_dict = {}
for i in range(train_loader.count()):
(data, _, end) = train_loader[i]
train_file_id = data["train_file_info"].file_id
name_dict[train_file_id] = int(i)
# If you use an AlliesExtractor to access the features you need to instantiate it like this
fe = AlliesExtractor(train_loader, name_dict, "train_features")
fs_seg = sidekit.FeaturesServer(
features_extractor=fe,
dataset_list=["cep"],
keep_all_features=True,
delta=False,
double_delta=False,
)
fs_acoustic = sidekit.FeaturesServer(
features_extractor=fe,
dataset_list=["cep"],
keep_all_features=True,
delta=True,
double_delta=True,
feat_norm="cmvn_sliding",
)
########################################################
# Shows how to access training data per INDEX
#
# Shows how to access features, uem and file_info from
# a training file by using its index in the training set
########################################################
file_index = 2 # index of the file we want to access
file_id_per_index = train_loader[file_index][0]['train_file_info'].file_id
time_stamp_per_index = train_loader[file_index][0]['train_file_info'].time_stamp
supervision_per_index = train_loader[file_index][0]['train_file_info'].supervision
uem_per_index = train_loader[file_index][0]['train_uem']
features_per_index = train_loader[file_index][0]["train_features"].value
# Access features using an AlliesExtractor
# BEWARE: the access here is done using the file_ID that you must retrieve first
tmp = fe.extract(file_id_per_index, 0, "input_audio_filename")
features_per_index_extractor = tmp[file_id_per_index + "/cep"]
########################################################
# Shows how to access training data per file_id
#
# Shows how to access features, uem and file_info from
# a training file by using its file_id
########################################################
# Assume that we know the file_id, note that we stored them earlier in a dictionnary
train_file_id = list(name_dict.keys())[3]
time_stamp_per_ID = train_loader[name_dict[train_file_id]][0]["train_file_info"].time_stamp
supervision_per_ID = train_loader[name_dict[train_file_id]][0]["train_file_info"].supervision
uem_per_ID = train_loader[name_dict[train_file_id]][0]["train_uem"]
features_per_ID = train_loader[name_dict[train_file_id]][0]["train_features"].value
# Access features using an AlliesExtractor
# BEWARE: the access here is done using the file_ID that you must retrieve first
tmp = fe.extract(train_file_id, 0, "input_audio_filename")
features_per_ID_extractor = tmp[train_file_id + "/cep"]
####################################################################################
# Cross show diarization
####################################################################################
# group speakers across shows to improve the model if possible
return model, global_diar
def generate_system_request_to_user(model, global_diar, current_s4d_segmentation):
"""
Include here your code to ask "questions" to the human in the loop
Prototype of this function can be adapted to your need as it is internatl to this block
but it must return a request as a dictionary object as shown below
request_type can be either "same" or "boundary"
if "same", the question asked to the user is: Is the same speaker speaking at time time_1 and time_2
the cost of this question is 6s / total_file_duration
if "boundary" the question asked to the user is: What are the boundaries of the segment including time_1
note that the data time_2 is not used in this request
the cost of this question is (|time_2 - time_1| + 6s) / total_file_duration
time_1 must be a numpy.float32
time_2 must be a numpy.float32
"""
request = {
"request_type": "same",
"time_1": np.float32(0.5),
"time_2": np.float32(1.5),
}
return request
def online_adaptation(model, global_diar, features, current_s4d_segmentation, request, user_answer):
"""
Include here your code to adapt the model according
to the human domain expert answer to the request
Prototype of this function can be adapted to your need as it is internatl to this block
"""
# Case of interactive learning (request is a fake one)
# Case of active learning
new_s4d_segmentation = current_s4d_segmentation
return model, global_diar, new_s4d_segmentation
def rename_ivectors(iv_stat_server, cluster_dict):
"""
Function that is used by the baseline system to rename clusters of i-verctors
after segmentation
"""
# Rename i-vectors to match cluster IDs from the new segmentation
iv = copy.deepcopy(iv_stat_server)
reverse_dict = {}
for k in cluster_dict.keys():
for c in cluster_dict[k]:
reverse_dict[c] = k
for idx, c in enumerate(iv.modelset):
if c in reverse_dict.keys():
iv.modelset[idx] = reverse_dict[c]
return iv
class Algorithm:
"""
Main class of the lifelong_block
"""
def __init__(self):
self.model = None
self.global_diar = None
def process(self, inputs, data_loaders, outputs, loop_channel):
"""
"""
########################################################
# RECEIVE INCOMING INFORMATION FROM THE FILE TO PROCESS
########################################################
# Get the model after initial training
if self.model is None:
model_loader = data_loaders.loaderOf("model")
tmp_m = model_loader[0][0]['model'].value
pkl_after = struct.pack('{}B'.format(len(tmp_m)), *list(tmp_m))
serialized_model = pickle.loads(pkl_after)
#serialized_model = pickle.loads(model_loader[0][0]['model'])
# Return a model using s4d
self.model = get_s4d_model_from_text(serialized_model)
self.global_diar = serialized_model["global_diar"]
# Access features of the current file to process
# features is a numpy.ndarray with dimension nb_frames x 14
# See the documentation for detail of MFCC computation
features = inputs["features"].data.value
# Access incoming file information
# See documentation for a detailed description of the file_info
file_info = inputs["processor_file_info"].data
file_id = file_info.file_id
supervision = file_info.supervision
time_stamp = file_info.time_stamp
# Access UEM information for the incoming file
# See the documentation for a description of the UEM format
uem = inputs["processor_uem"].data
# Run diarization on the new input using s4d baseline system
current_s4d_segmentation = run_s4d_segmentation(self.model,
self.global_diar,
features,
file_id)
# Convert the output from s4d into the ALLIES format
current_hypothesis = s4d_to_allies(current_s4d_segmentation)
# If human assisted learning is ON
###################################################################################################
# Interact with the human if necessary
# This section exchange information with the user simulation and ends up with a new hypothesis
###################################################################################################
if supervision in ["active", "interactive"]:
human_assisted_learning = True
if not human_assisted_learning:
# In this method, see how to access initial training data to adapt the model
# for the new incoming data
self.model, self.global_diar = unsupervised_model_adaptation(features,
file_id,
data_loaders,
self.model,
self.global_diar,
current_s4d_segmentation
)
# If human assisted learning mode is on (active or interactive learning)
while human_assisted_learning:
# Create an empty request that is used to initiate interactive learning
# For the case of actove learning, this request is overwritten by your system itself
request = {"request_type": "toto", "time_1": np.float32(0.0), "time_2": np.float32(0.0)}
if supervision == "active":
# The system can send a question to the human in the loop
# by using an object of type request
# The request is the question asked to the system
request = generate_system_request_to_user(self.model, self.global_diar, current_s4d_segmentation)
# Send the request to the user and wait for the answer
message_to_user = {
"file_id": file_id, # ID of the file the question is related to
"hypothesis": current_hypothesis, # The current hypothesis
"system_request": request, # the question fot the human in the loop
}
human_assisted_learning, user_answer = loop_channel.validate(message_to_user)
# Take into account the user answer to generate a new hypothesis
# and possibly update the model
self.model, self.global_diar, current_s4d_segmentation = online_adaptation(self.model, self.global_diar, features, current_s4d_segmentation, request, user_answer)
# Convert the output from s4d into the ALLIES format
current_hypothesis = s4d_to_allies(current_s4d_segmentation)
# End of human assisted learning
# Send the current hypothesis
end_data_index = 1
outputs["adapted_speakers"].write(current_hypothesis)
if not inputs.hasMoreData():
pass
# always return True, it signals BEAT to continue processing
return True
The code for this algorithm in Python
The ruler at 80 columns indicate suggested POSIX line breaks (for readability).
The editor will automatically enlarge to accomodate the entirety of your input
Use keyboard shortcuts for search/replace and faster editing. For example, use Ctrl-F (PC) or Cmd-F (Mac) to search through this box
Test documentation
This table shows the number of times this algorithm has been successfully run using the given environment. Note this does not provide sufficient information to evaluate if the algorithm will run when submitted to different conditions.