Coverage for src/bob/pad/base/script/run_pipeline.py: 95%
94 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-12 23:40 +0200
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-12 23:40 +0200
1"""Executes PAD pipeline"""
4import logging
6import click
8from clapper.click import (
9 ConfigCommand,
10 ResourceOption,
11 log_parameters,
12 verbosity_option,
13)
15from bob.pipelines.distributed import (
16 VALID_DASK_CLIENT_STRINGS,
17 dask_get_partition_size,
18)
20logger = logging.getLogger(__name__)
23@click.command(
24 entry_point_group="bob.pad.config",
25 cls=ConfigCommand,
26 epilog="""\b
27 Command line examples\n
28 -----------------------
31 $ bob pad run-pipeline my_experiment.py -vv
32""",
33)
34@click.option(
35 "--pipeline",
36 "-p",
37 required=True,
38 entry_point_group="bob.pad.pipeline",
39 help="Feature extraction algorithm",
40 cls=ResourceOption,
41)
42@click.option(
43 "--decision_function",
44 "-f",
45 show_default=True,
46 default="decision_function",
47 help="Name of the Pipeline step to call for results, eg. ``predict_proba``",
48 cls=ResourceOption,
49)
50@click.option(
51 "--database",
52 "-d",
53 required=True,
54 entry_point_group="bob.pad.database",
55 help="PAD Database connector (class that implements the methods: `fit_samples`, `predict_samples`)",
56 cls=ResourceOption,
57)
58@click.option(
59 "--dask-client",
60 "-l",
61 entry_point_group="dask.client",
62 string_exceptions=VALID_DASK_CLIENT_STRINGS,
63 default="single-threaded",
64 help="Dask client for the execution of the pipeline.",
65 cls=ResourceOption,
66)
67@click.option(
68 "--group",
69 "-g",
70 "groups",
71 type=click.Choice(["train", "dev", "eval"]),
72 multiple=True,
73 default=("dev", "eval"),
74 help="If given, this value will limit the experiments belonging to a particular group",
75 cls=ResourceOption,
76)
77@click.option(
78 "-o",
79 "--output",
80 show_default=True,
81 default="results",
82 help="Saves scores (and checkpoints) in this folder.",
83 cls=ResourceOption,
84)
85@click.option(
86 "--checkpoint/--memory",
87 "checkpoint",
88 default=True,
89 help="If --checkpoint (which is the default), all steps of the pipeline will be saved. Checkpoints will be saved in `--output`.",
90 cls=ResourceOption,
91)
92@click.option(
93 "--dask-partition-size",
94 "-s",
95 help="If using Dask, this option defines the size of each dask.bag.partition."
96 "Use this option if the current heuristic that sets this value doesn't suit your experiment."
97 "(https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).",
98 default=None,
99 type=click.INT,
100 cls=ResourceOption,
101)
102@click.option(
103 "--dask-n-workers",
104 "-n",
105 help="If using Dask, this option defines the number of workers to start your experiment."
106 "Dask automatically scales up/down the number of workers due to the current load of tasks to be solved."
107 "Use this option if the current amount of workers set to start an experiment doesn't suit you.",
108 default=None,
109 type=click.INT,
110 cls=ResourceOption,
111)
112@click.option(
113 "--no-dask",
114 is_flag=True,
115 help="If set, it will not use Dask for the execution of the pipeline.",
116 cls=ResourceOption,
117)
118@verbosity_option(cls=ResourceOption, logger=logger)
119def run_pipeline(
120 pipeline,
121 decision_function,
122 database,
123 dask_client,
124 groups,
125 output,
126 checkpoint,
127 dask_partition_size,
128 dask_n_workers,
129 no_dask,
130 **kwargs,
131):
132 """Runs the simplest PAD pipeline."""
134 log_parameters(logger)
136 execute_pipeline(
137 pipeline=pipeline,
138 database=database,
139 decision_function=decision_function,
140 output=output,
141 groups=groups,
142 checkpoint=checkpoint,
143 dask_client=dask_client,
144 dask_partition_size=dask_partition_size,
145 dask_n_workers=dask_n_workers,
146 no_dask=no_dask,
147 )
150def execute_pipeline(
151 pipeline,
152 database,
153 decision_function="decision_function",
154 output="results",
155 groups=("dev", "eval"),
156 checkpoint=False,
157 dask_client="single-threaded",
158 dask_partition_size=None,
159 dask_n_workers=None,
160 no_dask=False,
161):
162 import os
163 import sys
165 import dask.bag
167 import bob.pipelines as mario
169 from bob.pipelines import DaskWrapper, is_pipeline_wrapped
170 from bob.pipelines.distributed.sge import get_resource_requirements
172 if no_dask:
173 dask_client = None
175 os.makedirs(output, exist_ok=True)
177 if checkpoint:
178 pipeline = mario.wrap(
179 ["checkpoint"], pipeline, features_dir=output, model_path=output
180 )
182 # Fetching samples
183 fit_samples = database.fit_samples()
184 total_samples = len(fit_samples)
185 predict_samples = dict()
186 for group in groups:
187 predict_samples[group] = database.predict_samples(group=group)
188 total_samples += len(predict_samples[group])
190 # Checking if the pipeline is dask-wrapped
191 if (
192 not any(is_pipeline_wrapped(pipeline, DaskWrapper))
193 ) and dask_client is not None:
194 # Scaling up if necessary
195 if dask_n_workers is not None and not isinstance(dask_client, str):
196 dask_client.cluster.scale(dask_n_workers)
198 # Defining the partition size
199 partition_size = None
200 if not isinstance(dask_client, str):
201 lower_bound = 1 # lower bound of 1 video per chunk since usually video are already big
202 partition_size = dask_get_partition_size(
203 dask_client.cluster, total_samples, lower_bound=lower_bound
204 )
205 if dask_partition_size is not None:
206 partition_size = dask_partition_size
208 pipeline = mario.wrap(["dask"], pipeline, partition_size=partition_size)
210 # create an experiment info file
211 with open(os.path.join(output, "Experiment_info.txt"), "wt") as f:
212 f.write(f"{sys.argv!r}\n")
213 f.write(f"database={database!r}\n")
214 f.write("Pipeline steps:\n")
215 for i, name, estimator in pipeline._iter():
216 f.write(f"Step {i}: {name}\n{estimator!r}\n")
218 # train the pipeline
219 pipeline.fit(fit_samples)
221 for group in groups:
222 logger.info(f"Running PAD pipeline for group {group}")
223 result = getattr(pipeline, decision_function)(predict_samples[group])
225 resources = None
226 if isinstance(result, dask.bag.core.Bag):
227 resources = get_resource_requirements(pipeline)
229 save_sample_scores(
230 result=result,
231 output=output,
232 group=group,
233 dask_client=dask_client,
234 resources=resources,
235 )
237 logger.info("PAD experiment finished!")
240def _get_csv_columns(sample):
241 """Returns a dict of {csv_column_name: sample_attr_name} given a sample."""
242 # Mandatory columns and their corresponding fields
243 columns_attr = {
244 "claimed_id": "subject",
245 "test_label": "key",
246 "is_bonafide": "is_bonafide",
247 "attack_type": "attack_type",
248 "score": "data",
249 }
250 # Preventing duplicates and unwanted data
251 ignored_fields = list(columns_attr.values()) + ["annotations"]
252 # Retrieving custom metadata attribute names
253 metadata_fields = [
254 k
255 for k in sample.__dict__.keys()
256 if not k.startswith("_") and k not in ignored_fields
257 ]
258 for field in metadata_fields:
259 columns_attr[field] = field
260 return columns_attr
263def sample_to_dict_row(sample, columns_fields):
264 row_values = {
265 col: getattr(sample, attr, None) for col, attr in columns_fields.items()
266 }
267 return row_values
270def score_samples_to_dataframe(samples):
271 import pandas as pd
273 rows, column_fields = [], None
274 for sample in samples:
275 if column_fields is None:
276 column_fields = _get_csv_columns(sample)
277 row_values = sample_to_dict_row(sample, column_fields)
278 rows.append(row_values)
279 df = pd.DataFrame(rows)
280 return df
283def save_sample_scores(
284 result,
285 output,
286 group,
287 dask_client,
288 resources=None,
289):
290 import os
292 import dask.bag
293 import dask.dataframe as dd
295 scores_path = os.path.join(output, f"scores-{group}.csv")
297 if isinstance(result, dask.bag.core.Bag):
298 # convert score samples to dataframes
299 result = result.map_partitions(score_samples_to_dataframe)
300 result = dd.from_delayed(result.to_delayed())
301 result.to_csv(
302 scores_path,
303 single_file=True,
304 compute_kwargs=dict(scheduler=dask_client, resources=resources),
305 index=False,
306 )
308 else:
309 # convert score samples to dataframes
310 result = score_samples_to_dataframe(result)
311 result.to_csv(scores_path, index=False)