Python API for bob.pipelines¶
Summary¶
Sample’s API¶
|
Representation of sample. |
|
Representation of sample that can be loaded via a callable. |
|
A set of samples with extra attributes |
|
A set of samples with extra attributes |
|
A cached version of DelayedSampleSet |
|
A batch of samples that looks like [s.data for s in samples] |
Wrapper’s API¶
|
Wraps several estimators inside each other. |
The base class for all wrappers. |
|
|
Wraps scikit-learn estimators to work with |
|
Wraps |
|
Wraps Scikit estimators to handle Dask Bags as input. |
|
Transform an arbitrary iterator into a |
|
Database’s API¶
|
A generic database interface. Use this class to convert csv files to a database that outputs samples. The format is simple, the files must be inside a folder (or a compressed tarball) with the following format::. |
|
Converts a list of files to a list of samples. |
|
Converts a csv file to a list of samples |
Transformers’ API¶
Converts str fields in samples to a different type |
|
Xarray’s API¶
Converts a list of samples to a dataset. |
|
|
A dataset-based scikit-learn pipeline. |
|
A block representation in a graph. |
Utilities¶
Test if an object is picklable or not. |
|
Checks the given parameter for validity |
|
Checks the given parameters for validity. |
|
|
Recursively collects resource_tags in dasked estimators. |
|
|
|
Takes a list of SampleSets (with one or multiple samples in each SampleSet) and returns a list of SampleSets (with one sample in each SampleSet) |
|
Returns the default tags of a Transformer unless forced or specified. |
|
Generates a hash code given a string. |
|
Check if an object and its nested objects is an instance of a class. |
Test if an object is picklable or not. |
|
|
Iterates over the transformers of |
Main module¶
- class bob.pipelines.BaseWrapper¶
Bases:
MetaEstimatorMixin
,BaseEstimator
The base class for all wrappers.
- class bob.pipelines.CSVToSamples(list_file, transformer=None, dict_reader_kwargs=None, **kwargs)¶
Bases:
FileListToSamples
Converts a csv file to a list of samples
- property rows¶
- class bob.pipelines.CheckpointWrapper(estimator, model_path=None, features_dir=None, extension=None, save_func=None, load_func=None, sample_attribute=None, hash_fn=None, attempts=10, force=False, **kwargs)¶
Bases:
BaseWrapper
,TransformerMixin
Wraps
Sample
-based estimators so the results are saved in disk.- Parameters
estimator – The scikit-learn estimator to be wrapped.
model_path (str) – Saves the estimator state in this directory if the estimator is stateful
features_dir (str) – Saves the transformed data in this directory
extension (str) – Default extension of the transformed features. If None, will use the
bob_checkpoint_extension
tag in the estimator, or default to.h5
.save_func – Pointer to a customized function that saves transformed features to disk. If None, will use the
bob_feature_save_fn
tag in the estimator, or default tobob.io.base.save
.load_func – Pointer to a customized function that loads transformed features from disk. If None, will use the
bob_feature_load_fn
tag in the estimator, or default tobob.io.base.load
.sample_attribute (str) – Defines the payload attribute of the sample. If None, will use the
bob_output
tag in the estimator, or default todata
.hash_fn – Pointer to a hash function. This hash function maps sample.key to a hash code and this hash code corresponds a relative directory where a single sample will be checkpointed. This is useful when is desirable file directories with less than a certain number of files.
attempts – Number of checkpoint attempts. Sometimes, because of network/disk issues files can’t be saved. This argument sets the maximum number of attempts to checkpoint a sample.
force (bool) – If True, will recompute the checkpoints even if they exists
- class bob.pipelines.DaskWrapper(estimator, fit_tag=None, transform_tag=None, fit_supports_dask_array=None, fit_supports_dask_bag=None, **kwargs)¶
Bases:
BaseWrapper
,TransformerMixin
Wraps Scikit estimators to handle Dask Bags as input.
- Parameters
fit_resource (str) – Mark the delayed(self.fit) with this value. This can be used in a future delayed(self.fit).compute(resources=resource_tape) so dask scheduler can place this task in a particular resource (e.g GPU)
transform_resource (str) – Mark the delayed(self.transform) with this value. This can be used in a future delayed(self.transform).compute(resources=resource_tape) so dask scheduler can place this task in a particular resource (e.g GPU)
- class bob.pipelines.DelayedSample(load, parent=None, delayed_attributes=None, **kwargs)¶
Bases:
Sample
Representation of sample that can be loaded via a callable.
The optional
**kwargs
argument allows you to attach more attributes to this sample instance.- Parameters
load – A python function that can be called parameterlessly, to load the sample in question from whatever medium
parent (
DelayedSample
,Sample
, None) – If passed, consider this as a parent of this sample, to copy informationdelayed_attributes (dict or None) – A dictionary of name : load_fn pairs that will be used to create attributes of name : load_fn() in this class. Use this to option to create more delayed attributes than just
sample.data
.kwargs (dict) – Further attributes of this sample, to be stored and eventually transmitted to transformed versions of the sample
- property data¶
Loads the data from the disk file.
- class bob.pipelines.DelayedSampleSet(load, parent=None, **kwargs)¶
Bases:
SampleSet
A set of samples with extra attributes
- property samples¶
- class bob.pipelines.DelayedSampleSetCached(load, parent=None, **kwargs)¶
Bases:
DelayedSampleSet
A cached version of DelayedSampleSet
- property samples¶
- class bob.pipelines.DelayedSamplesCall(func, func_name, samples, sample_attribute='data', **kwargs)¶
Bases:
object
- class bob.pipelines.FileListDatabase(dataset_protocols_path, protocol, reader_cls=<class 'bob.pipelines.CSVToSamples'>, transformer=None, **kwargs)¶
Bases:
object
A generic database interface. Use this class to convert csv files to a database that outputs samples. The format is simple, the files must be inside a folder (or a compressed tarball) with the following format:
dataset_protocols_path/<protocol>/<group>.csv
The top folders are the name of the protocols (if you only have one, you may name it
default
). Inside each protocol folder, there are <group>.csv files where the name of the file specifies the name of the group. We recommend using the namestrain
,dev
,eval
for your typical training, development, and test sets.- property protocol¶
- property transformer¶
- class bob.pipelines.FileListToSamples(list_file, transformer=None, **kwargs)¶
Bases:
Iterable
Converts a list of files to a list of samples.
- class bob.pipelines.Sample(data, parent=None, **kwargs)¶
Bases:
_ReprMixin
Representation of sample. A Sample is a simple container that wraps a data-point (see Samples, a way to enhance scikit pipelines with metadata)
Each sample must have the following attributes:
attribute
data
: Contains the data for this sample
- class bob.pipelines.SampleBatch(samples, sample_attribute='data')¶
Bases:
Sequence
,_ReprMixin
A batch of samples that looks like [s.data for s in samples]
However, when you call np.array(SampleBatch), it will construct a numpy array from sample.data attributes in a memory efficient way.
- class bob.pipelines.SampleSet(samples, parent=None, **kwargs)¶
Bases:
MutableSequence
,_ReprMixin
A set of samples with extra attributes
- class bob.pipelines.SampleWrapper(estimator, transform_extra_arguments=None, fit_extra_arguments=None, output_attribute=None, input_attribute=None, delayed_output=True, **kwargs)¶
Bases:
BaseWrapper
,TransformerMixin
Wraps scikit-learn estimators to work with
Sample
-based pipelines.Do not use this class except for scikit-learn estimators.
- estimator¶
The scikit-learn estimator that is wrapped.
- fit_extra_arguments¶
Use this option if you want to pass extra arguments to the fit method of the mixed instance. The format is a list of two value tuples. The first value in tuples is the name of the argument that fit accepts, like
y
, and the second value is the name of the attribute that samples carry. For example, if you are passing samples to the fit method and want to passsubject
attributes of samples as they
argument to the fit method, you can provide[("y", "subject")]
as the value for this attribute.- Type
[tuple]
- output_attribute¶
The name of a Sample attribute where the output of the estimator will be saved to [Default is
data
]. For example, ifoutput_attribute
is"annotations"
, thensample.annotations
will contain the output of the estimator.- Type
- transform_extra_arguments¶
Similar to
fit_extra_arguments
but for the transform and other similar methods.- Type
[tuple]
- delayed_output¶
If
True
, the output will be an instance ofDelayedSample
otherwise it will be an instance ofSample
.- Type
- class bob.pipelines.ToDaskBag(npartitions=None, partition_size=None, **kwargs)¶
Bases:
TransformerMixin
,BaseEstimator
Transform an arbitrary iterator into a
dask.bag.Bag
Example
>>> import bob.pipelines as mario >>> transformer = mario.ToDaskBag() >>> dask_bag = transformer.transform([1,2,3]) >>> # dask_bag.map_partitions(...)
- npartitions¶
Number of partitions used in
dask.bag.from_sequence
- Type
- bob.pipelines.check_parameter_for_validity(parameter, parameter_description, valid_parameters, default_parameter=None)[source]¶
Checks the given parameter for validity
Ensures a given parameter is in the set of valid parameters. If the parameter is
None
or empty, the value indefault_parameter
will be returned, in case it is specified, otherwise aValueError
will be raised.This function will return the parameter after the check tuple or list of parameters, or raise a
ValueError
.- Parameters
parameter (
str
orNone
) – The single parameter to be checked. Might be a string or None.parameter_description (str) – A short description of the parameter. This will be used to raise an exception in case the parameter is not valid.
valid_parameters (list of
str
) – A list/tuple of valid values for the parameters.default_parameter (list of
str
, optional) – The default parameter that will be returned in case parameter is None or empty. If omitted and parameter is empty, a ValueError is raised.
- Returns
The validated parameter.
- Return type
- Raises
ValueError – If the specified parameter is invalid.
- bob.pipelines.check_parameters_for_validity(parameters, parameter_description, valid_parameters, default_parameters=None)[source]¶
Checks the given parameters for validity.
Checks a given parameter is in the set of valid parameters. It also assures that the parameters form a tuple or a list. If parameters is ‘None’ or empty, the default_parameters will be returned (if default_parameters is omitted, all valid_parameters are returned).
This function will return a tuple or list of parameters, or raise a ValueError.
- Parameters
parameters (str or list of
str
or None) – The parameters to be checked. Might be a string, a list/tuple of strings, or None.parameter_description (str) – A short description of the parameter. This will be used to raise an exception in case the parameter is not valid.
valid_parameters (list of
str
) – A list/tuple of valid values for the parameters.default_parameters (list of
str
or None) – The list/tuple of default parameters that will be returned in case parameters is None or empty. If omitted, all valid_parameters are used.
- Returns
A list or tuple containing the valid parameters.
- Return type
- Raises
ValueError – If some of the parameters are not valid.
- bob.pipelines.dask_tags(estimator)[source]¶
Recursively collects resource_tags in dasked estimators.
- bob.pipelines.flatten_samplesets(samplesets)[source]¶
Takes a list of SampleSets (with one or multiple samples in each SampleSet) and returns a list of SampleSets (with one sample in each SampleSet)
- Parameters
samplesets (list of
bob.pipelines.SampleSet
) – Input list of SampleSets (with one or multiple samples in each SampleSet
- bob.pipelines.get_bob_tags(estimator=None, force_tags=None)[source]¶
Returns the default tags of a Transformer unless forced or specified.
Relies on the tags API of sklearn to set and retrieve the tags.
Specify an estimator tag values with
estimator._more_tags
:class My_annotator_transformer(sklearn.base.BaseEstimator): def _more_tags(self): return {"bob_output": "annotations"}
The returned tags will take their value with the following priority:
key:value in force_tags, if it is present;
key:value in estimator tags (set with estimator._more_tags()) if it exists;
the default value for that tag if none of the previous exist.
Examples
- bob_input: str
The Sample attribute passed to the first argument of the fit or transform method. Default value is
data
. Example:{"bob_input": ("annotations")}
will result in:
estimator.transform(sample.annotations)
- bob_transform_extra_input: tuple of str
Each element of the tuple is a str representing an attribute of a Sample object. Each attribute of the sample will be passed as argument to the transform method in that order. Default value is an empty tuple
(,)
. Example:{"bob_transform_extra_input": (("kwarg_1","annotations"), ("kwarg_2","gender"))}
will result in:
estimator.transform(sample.data, kwarg_1=sample.annotations, kwarg_2=sample.gender)
- bob_fit_extra_input: tuple of str
Each element of the tuple is a str representing an attribute of a Sample object. Each attribute of the sample will be passed as argument to the fit method in that order. Default value is an empty tuple
(,)
. Example:{"bob_fit_extra_input": (("y", "annotations"), ("extra "metadata"))}
will result in:
estimator.fit(sample.data, y=sample.annotations, extra=sample.metadata)
- bob_output: str
The Sample attribute in which the output of the transform is stored. Default value is
data
.- bob_checkpoint_extension: str
The extension of each checkpoint file. Default value is
.h5
.- bob_features_save_fn: func
The function used to save each checkpoint file. Default value is
bob.io.base.save
.- bob_features_load_fn: func
The function used to load each checkpoint file. Default value is
bob.io.base.load
.- bob_fit_supports_dask_array: bool
Indicates that the fit method of that estimator accepts dask arrays as input. You may only use this tag if you accept X (N, M) and optionally y (N) as input. The fit function may not accept any other input. Default value is
False
.- bob_fit_supports_dask_bag: bool
Indicates that the fit method of that estimator accepts dask bags as input. If true, each input parameter of the fit will be a dask bag. You still can (and normally you should) wrap your estimator with the SampleWrapper so the same code runs with and without dask. Default value is
False
.- bob_checkpoint_features: bool
If False, the features of the estimator will never be saved. Default value is
True
.
- Parameters
- Returns
The resulting tags with a value (either specified in the estimator, forced by the arguments, or default)
- Return type
- bob.pipelines.hash_string(key, bucket_size=1000)[source]¶
Generates a hash code given a string. The have is given by the sum(ord([string])) mod bucket_size
- bob.pipelines.is_instance_nested(instance, attribute, isinstance_of)[source]¶
Check if an object and its nested objects is an instance of a class.
This is useful while using aggregation and it’s necessary to check if some functionally was aggregated
- Parameters
instance – Object to be searched
attribute – Attribute name to be recursively searched
isinstance_of – Instance class to be searched
- bob.pipelines.is_pipeline_wrapped(estimator, wrapper)[source]¶
Iterates over the transformers of
sklearn.pipeline.Pipeline
checking and checks if they were wrapped with wrapper class- Parameters
estimator (sklearn.pipeline.Pipeline) – Pipeline to be checked
wrapper (type) – The Wrapper class or a tuple of classes to be checked
- Returns
Returns a list of boolean values, where each value indicates if the corresponding estimator is wrapped or not
- Return type
- bob.pipelines.wrap(bases, estimator=None, **kwargs)[source]¶
Wraps several estimators inside each other.
If
estimator
is a pipeline, the estimators in that pipeline are wrapped.The default behavior of wrappers can be customized through the tags; see
bob.pipelines.get_bob_tags
for more information.- Parameters
- Returns
The wrapped estimator
- Return type
- Raises
ValueError – If not all kwargs are consumed.
Heterogeneous SGE¶
- class bob.pipelines.distributed.sge.SGEIdiapJob(*args, queue=None, project=None, resource_spec=None, job_extra=None, config_name='sge', **kwargs)[source]¶
Bases:
Job
Launches a SGE Job in the IDIAP cluster. This class basically encodes the CLI command that bootstrap the worker in a SGE job. Check here https://distributed.dask.org/en/latest/resources.html#worker-resources for more information.
- ..note: This is class is temporary. It’s basically a copy from SGEJob from dask_jobqueue.
The difference is that here I’m also handling the dask job resources tag (which is not handled anywhere). This has to be patched in the Job class. Please follow here https://github.com/dask/dask-jobqueue/issues/378 to get news about this patch
- submit_command = 'qsub'¶
- cancel_command = 'qdel'¶
- config_name = 'SGEIdiapJob'¶
- bob.pipelines.distributed.sge.get_max_jobs(queue_dict)[source]¶
Given a queue list, get the max number of possible jobs.
- bob.pipelines.distributed.sge.get_resource_requirements(pipeline)[source]¶
Get the resource requirements to execute a graph. This is useful when it’s necessary get the dictionary mapping the dask delayed keys with specific resource restrictions. Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information
- Parameters
pipeline (
sklearn.pipeline.Pipeline
) – Asklearn.pipeline.Pipeline
wrapper withbob.pipelines.DaskWrapper
Example
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) >>> client = Client(cluster) >>> from bob.pipelines.sge import get_resource_requirements >>> resources = get_resource_requirements(pipeline) >>> my_delayed_task.compute(scheduler=client, resources=resources)
- class bob.pipelines.distributed.sge.SGEMultipleQueuesCluster(log_directory='./logs', protocol='tcp://', dashboard_address=':8787', env_extra=None, sge_job_spec={'default': {'io_big': False, 'max_jobs': 128, 'memory': '8GB', 'queue': 'q_1day', 'resource_spec': '', 'resources': {'default': 1}}, 'q_1week': {'io_big': True, 'max_jobs': 24, 'memory': '4GB', 'queue': 'q_1week', 'resource_spec': '', 'resources': {'q_1week': 1}}, 'q_gpu': {'io_big': False, 'memory': '30GB', 'queue': 'q_gpu', 'resource_spec': '', 'resources': {'q_gpu': 1}}, 'q_long_gpu': {'io_big': False, 'memory': '30GB', 'queue': 'q_long_gpu', 'resource_spec': '', 'resources': {'q_long_gpu': 1}}, 'q_short_gpu': {'io_big': False, 'max_jobs': 45, 'memory': '30GB', 'queue': 'q_short_gpu', 'resource_spec': '', 'resources': {'q_short_gpu': 1}}}, min_jobs=1, project=None, **kwargs)[source]¶
Bases:
JobQueueCluster
Launch Dask jobs in the SGE cluster allowing the request of multiple queues.
- Parameters
log_directory (str) –
Default directory for the SGE logs
- protocol: str
Scheduler communication protocol
- dashboard_address: str
Default port for the dask dashboard,
- env_extra: str,
Extra environment variables to send to the workers
- sge_job_spec: dict
Dictionary containing a minimum specification for the qsub command. It consists of:
queue: SGE queue memory: Memory requirement in GB (e.g. 4GB) io_bio: set the io_big flag resource_spec: Whatever extra argument to be sent to qsub (qsub -l) tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html) max_jobs: Maximum number of jobs in the queue
- min_jobs: int
Lower bound for the number of jobs for self.adapt
Example
Below follow a vanilla-example that will create a set of jobs on all.q:
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster >>> from dask.distributed import Client >>> cluster = SGEMultipleQueuesCluster() >>> cluster.scale_up(10) >>> client = Client(cluster)
It’s possible to demand a resource specification yourself:
>>> Q_1DAY_IO_BIG_SPEC = { ... "default": { ... "queue": "q_1day", ... "memory": "8GB", ... "io_big": True, ... "resource_spec": "", ... "resources": "", ... } ... } >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) >>> cluster.scale_up(10) >>> client = Client(cluster)
More than one jon spec can be set:
>>> Q_1DAY_GPU_SPEC = { ... "default": { ... "queue": "q_1day", ... "memory": "8GB", ... "io_big": True, ... "resource_spec": "", ... "resources": "", ... }, ... "gpu": { ... "queue": "q_gpu", ... "memory": "12GB", ... "io_big": False, ... "resource_spec": "", ... "resources": {"GPU":1}, ... }, ... } >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) >>> cluster.scale_up(10) >>> cluster.scale_up(1, sge_job_spec_key="gpu") >>> client = Client(cluster)
Adaptive job allocation can also be used via AdaptiveIdiap extension:
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) >>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) >>> client = Client(cluster)
- scale_up(n_jobs, sge_job_spec_key=None)[source]¶
Scale cluster up.
This is supposed to be used by the scheduler while dynamically allocating resources
- async scale_down(workers, sge_job_spec_key=None)[source]¶
Scale cluster down.
This is supposed to be used by the scheduler while dynamically allocating resources
- adapt(*args, **kwargs)[source]¶
Scale Dask cluster automatically based on scheduler activity.
- Parameters
minimum (int) – Minimum number of workers to keep around for the cluster
maximum (int) – Maximum number of workers to keep around for the cluster
minimum_memory (str) – Minimum amount of memory for the cluster
maximum_memory (str) – Maximum amount of memory for the cluster
minimum_jobs (int) – Minimum number of jobs
maximum_jobs (int) – Maximum number of jobs
**kwargs – Extra parameters to pass to dask.distributed.Adaptive
See also
dask.distributed.Adaptive
for more keyword arguments
- class bob.pipelines.distributed.sge.AdaptiveMultipleQueue(cluster=None, interval=None, minimum=None, maximum=None, wait_count=None, target_duration=None, worker_key=None, **kwargs)[source]¶
Bases:
Adaptive
Custom mechanism to adaptively allocate workers based on scheduler load.
This custom implementation extends the Adaptive.recommendations by looking at the distributed.scheduler.TaskState.resource_restrictions.
The heuristics is:
Note
If a certain task has the status no-worker and it has resource_restrictions, the scheduler should request a job matching those resource restrictions
Transformers¶
- bob.pipelines.transformers.Str_To_Types(fieldtypes)[source]¶
Converts str fields in samples to a different type
- Parameters
fieldtypes (dict) – A dict that specifies the functions to be used to convert strings to other types.
- Returns
A scikit-learn transformer that does the conversion.
- Return type
Example
>>> from bob.pipelines import Sample >>> from bob.pipelines.transformers import Str_To_Types, str_to_bool >>> samples = [Sample(None, id="1", flag="True"), Sample(None, id="2", flag="False")] >>> transformer = Str_To_Types(fieldtypes=dict(id=int, flag=str_to_bool)) >>> transformer.transform(samples) [Sample(data=None, id=1, flag=True), Sample(data=None, id=2, flag=False)]
xarray Wrapper¶
- bob.pipelines.xarray.samples_to_dataset(samples, meta=None, npartitions=48, shuffle=False)[source]¶
Converts a list of samples to a dataset.
See Efficient pipelines with dask and xarray.
- Parameters
samples (list) – A list of
Sample
orDelayedSample
objects.meta (
xarray.DataArray
, optional) – An xarray.DataArray to be used as a template for data inside samples.npartitions (
int
, optional) – The number of partitions to partition the samples.shuffle (
bool
, optional) – If True, shuffles the samples (in-place) before constructing the dataset.
- Returns
The constructed dataset with at least a
data
variable.- Return type
xarray.Dataset
- class bob.pipelines.xarray.Block(estimator=None, output_dtype=<class 'float'>, output_dims=((None, nan), ), fit_input='data', transform_input='data', estimator_name=None, model_path=None, features_dir=None, extension='.hdf5', save_func=None, load_func=None, dataset_map=None, input_dask_array=False, fit_kwargs=None, **kwargs)[source]¶
Bases:
_ReprMixin
A block representation in a graph. This class is meant to be used with
DatasetPipeline
.- dataset_map¶
A callable that transforms the input dataset into another dataset.
- Type
callable
- fit_input¶
A str or list of str of column names of the dataset to be given to the
.fit
method.
- fit_kwargs¶
A dict of
fit_kwargs
to be passed to the.fit
method of the estimator.- Type
None or dict
- load_func¶
A function to save the features. Defaults to
np.load
.- Type
callable
- output_dims¶
A list of
(dim_name, dim_size)
tuples. Ifdim_name
isNone
, a new name is automatically generated, otherwise it should be a string.dim_size
should be a positive integer or nan for new dimensions orNone
for existing dimensions.- Type
- save_func¶
A function to save the features. Defaults to
np.save
withallow_pickle
set toFalse
.- Type
callable
- transform_input¶
A str or list of str of column names of the dataset to be given to the
.transform
method.
- property output_ndim¶
- class bob.pipelines.xarray.DatasetPipeline(graph, **kwargs)[source]¶
Bases:
_BaseComposition
A dataset-based scikit-learn pipeline. See Efficient pipelines with dask and xarray.