Algorithms have at least one input and one output. All algorithm endpoints are organized in groups. Groups are used by the platform to indicate which inputs and outputs are synchronized together. The first group is automatically synchronized with the channel defined by the block in which the algorithm is deployed.
Endpoint Name | Data Format | Nature |
---|---|---|
category | system/integer/1 | Input |
scores | system/integer/1 | Input |
Analyzers may produce any number of results. Once experiments using this analyzer are done, you may display the results or filter experiments using criteria based on them.
Name | Type |
---|---|
numberNotProcessedT1Samples | int32 |
numberNotProcessedT5Samples | int32 |
totalNumberT1Samples | int32 |
ACER | float32 |
rocT1T5 | plot/isoroc/1 |
rocT1T3 | plot/isoroc/1 |
numberCorrectT5Samples | int32 |
rocT1T6 | plot/isoroc/1 |
rocT1T2 | plot/isoroc/1 |
numberCorrectT3Samples | int32 |
totalNumberT5Samples | int32 |
numberNotProcessedT6Samples | int32 |
numberNotProcessedT4Samples | int32 |
numberCorrectT4Samples | int32 |
numberCorrectT1Samples | int32 |
totalNumberT2Samples | int32 |
numberNotProcessedT2Samples | int32 |
numberCorrectT2Samples | int32 |
rocAll | plot/isoroc/1 |
totalNumberT4Samples | int32 |
totalNumberT6Samples | int32 |
numberNotProcessedT3Samples | int32 |
numberCorrectT6Samples | int32 |
totalNumberT3Samples | int32 |
rocT1T4 | plot/isoroc/1 |
xxxxxxxxxx
import numpy as np
import bob.measure
class Algorithm:
def __init__(self):
self.T1 = None
self.T2 = None
self.T3 = None
self.T4 = None
self.T5 = None
self.T6 = None
self.allspoofs = None
self.correct_T1 = []
self.correct_T2 = []
self.correct_T3 = []
self.correct_T4 = []
self.correct_T5 = []
self.correct_T6 = []
self.incorrect_T1 = []
self.incorrect_T2 = []
self.incorrect_T3 = []
self.incorrect_T4 = []
self.incorrect_T5 = []
self.incorrect_T6 = []
self.notprocessed_T1 = []
self.notprocessed_T2 = []
self.notprocessed_T3 = []
self.notprocessed_T4 = []
self.notprocessed_T5 = []
self.notprocessed_T6 = []
self.acer = None
self.n_bins = 50
self.threshold = 50
def process(self, inputs, outputs):
# accumulate the test scores
data_test = inputs['scores'].data.value
label_test = inputs['category'].data.value
if label_test == 1:
if data_test < 0 or data_test > 100:
self.notprocessed_T1.append(data_test)
elif data_test > self.threshold:
self.correct_T1.append(data_test)
else:
self.incorrect_T1.append(data_test)
elif label_test == 2:
if data_test < 0 or data_test > 100:
self.notprocessed_T2.append(data_test)
elif data_test <= self.threshold:
self.correct_T2.append(data_test)
else:
self.incorrect_T2.append(data_test)
elif label_test == 3:
if data_test < 0 or data_test > 100:
self.notprocessed_T3.append(data_test)
elif data_test <= self.threshold:
self.correct_T3.append(data_test)
else:
self.incorrect_T3.append(data_test)
elif label_test == 4:
if data_test < 0 or data_test > 100:
self.notprocessed_T4.append(data_test)
elif data_test <= self.threshold:
self.correct_T4.append(data_test)
else:
self.incorrect_T4.append(data_test)
elif label_test == 5:
if data_test < 0 or data_test > 100:
self.notprocessed_T5.append(data_test)
elif data_test <= self.threshold:
self.correct_T5.append(data_test)
else:
self.incorrect_T5.append(data_test)
elif label_test == 6:
if data_test < 0 or data_test > 100:
self.notprocessed_T6.append(data_test)
elif data_test <= self.threshold:
self.correct_T6.append(data_test)
else:
self.incorrect_T6.append(data_test)
# once all values are received, evaluate the scores
if not(inputs.hasMoreData()):
self.T1 = np.array(self.correct_T1+self.incorrect_T1)
self.T2 = np.array(self.correct_T2+self.incorrect_T2)
self.T3 = np.array(self.correct_T3+self.incorrect_T3)
self.T4 = np.array(self.correct_T4+self.incorrect_T4)
self.T5 = np.array(self.correct_T5+self.incorrect_T5)
self.T6 = np.array(self.correct_T6+self.incorrect_T6)
self.allspoofs = np.array(self.correct_T2+self.incorrect_T2+self.correct_T3+self.incorrect_T3+self.correct_T4+self.incorrect_T4+self.correct_T5+self.incorrect_T5+self.correct_T6+self.incorrect_T6)
apcer = np.float32(len(self.incorrect_T2+self.incorrect_T3+self.incorrect_T4+self.incorrect_T5+self.incorrect_T6)/len(self.allspoofs))
bpcer = np.float32(len(self.incorrect_T1)/len(self.T1))
self.acer = np.float32((apcer+bpcer)/2)
# test_hist_pos, test_bin_pos = np.histogram(self.lives, self.n_bins)
# test_hist_neg, test_bin_neg = np.histogram(self.spoofs, self.n_bins)
# eer_threshold = bob.measure.eer_threshold(self.spoofs.astype(np.double), self.lives.astype(np.double))
# far_test, frr_test = bob.measure.farfrr(self.negatives_test, self.lives, eer_threshold)
rocAll = bob.measure.roc(self.T1.astype(np.double), self.allspoofs.astype(np.double), 100)
rocT1T2 = bob.measure.roc(self.T1.astype(np.double), self.T2.astype(np.double), 100)
rocT1T3 = bob.measure.roc(self.T1.astype(np.double), self.T3.astype(np.double), 100)
rocT1T4 = bob.measure.roc(self.T1.astype(np.double), self.T4.astype(np.double), 100)
rocT1T5 = bob.measure.roc(self.T1.astype(np.double), self.T5.astype(np.double), 100)
rocT1T6 = bob.measure.roc(self.T1.astype(np.double), self.T6.astype(np.double), 100)
# writes the output back to the platform
outputs.write({
'totalNumberT1Samples': np.int32(len(self.T1)),
'totalNumberT2Samples': np.int32(len(self.T2)),
'totalNumberT3Samples': np.int32(len(self.T3)),
'totalNumberT4Samples': np.int32(len(self.T4)),
'totalNumberT5Samples': np.int32(len(self.T5)),
'totalNumberT6Samples': np.int32(len(self.T6)),
'numberCorrectT1Samples': np.int32(len(self.correct_T1)),
'numberCorrectT2Samples': np.int32(len(self.correct_T2)),
'numberCorrectT3Samples': np.int32(len(self.correct_T3)),
'numberCorrectT4Samples': np.int32(len(self.correct_T4)),
'numberCorrectT5Samples': np.int32(len(self.correct_T5)),
'numberCorrectT6Samples': np.int32(len(self.correct_T6)),
'numberNotProcessedT1Samples': np.int32(len(self.notprocessed_T1)),
'numberNotProcessedT2Samples': np.int32(len(self.notprocessed_T2)),
'numberNotProcessedT3Samples': np.int32(len(self.notprocessed_T3)),
'numberNotProcessedT4Samples': np.int32(len(self.notprocessed_T4)),
'numberNotProcessedT5Samples': np.int32(len(self.notprocessed_T5)),
'numberNotProcessedT6Samples': np.int32(len(self.notprocessed_T6)),
'ACER': np.float32(self.acer),
'rocAll': {
"data":
[
{
"label": 'rocAll',
"false_positives": rocAll[1],
"false_negatives": rocAll[0],
"number_of_positives": np.uint64(len(self.T1)),
"number_of_negatives": np.uint64(len(self.allspoofs))
}
]
},
'rocT1T2': {
"data":
[
{
"label": 'rocT1T2',
"false_positives": rocT1T2[1],
"false_negatives": rocT1T2[0],
"number_of_positives": np.uint64(len(self.T1)),
"number_of_negatives": np.uint64(len(self.T2))
}
]
},
'rocT1T3': {
"data":
[
{
"label": 'rocT1T3',
"false_positives": rocT1T3[1],
"false_negatives": rocT1T3[0],
"number_of_positives": np.uint64(len(self.T1)),
"number_of_negatives": np.uint64(len(self.T3))
}
]
},
'rocT1T4': {
"data":
[
{
"label": 'rocT1T4',
"false_positives": rocT1T4[1],
"false_negatives": rocT1T4[0],
"number_of_positives": np.uint64(len(self.T1)),
"number_of_negatives": np.uint64(len(self.T4))
}
]
},
'rocT1T5': {
"data":
[
{
"label": 'rocT1T5',
"false_positives": rocT1T5[1],
"false_negatives": rocT1T5[0],
"number_of_positives": np.uint64(len(self.T1)),
"number_of_negatives": np.uint64(len(self.T5))
}
]
},
'rocT1T6': {
"data":
[
{
"label": 'rocT1T6',
"false_positives": rocT1T6[1],
"false_negatives": rocT1T6[0],
"number_of_positives": np.uint64(len(self.T1)),
"number_of_negatives": np.uint64(len(self.T6))
}
]
}
})
return True
The code for this algorithm in Python
The ruler at 80 columns indicate suggested POSIX line breaks (for readability).
The editor will automatically enlarge to accomodate the entirety of your input
Use keyboard shortcuts for search/replace and faster editing. For example, use Ctrl-F (PC) or Cmd-F (Mac) to search through this box