Efficient pipelines with dask and xarray¶
Note
This section of the API is not used by bob.bio
and bob.pad
packages.
If you are only interested in learning those packages, you can skip this page.
In this guide, we will see an alternative method to what was discussed before about sample-based processing, checkpointing, dask integration. We will be doing the same concepts as discussed before but try to do it in a more efficient way.
In this guide we are interested in several things:
Sample-based processing and carrying the sample metadata over the pipeline.
Checkpointing: we may want to save intermediate steps.
Lazy operations and graph optimizations. We’ll define all operations using dask and we will benefit from lazy operations and graph optimizations.
Failed sample handling: we may want to drop some samples in the pipeline if we fail to process them.
Scalability: we want to use dask-ml estimators to handle larger than memory datasets for training.
This guide builds upon scikit-learn, dask, and xarray. If you are not familiar with those libraries, you may want to get familiar with those libraries first.
First, let’s run our example classification problem without using the tools here to get familiar with our examples. We are going to do an Scaler+PCA+LDA example on the iris dataset:
>>> from sklearn import datasets
>>> from sklearn.preprocessing import StandardScaler
>>> from sklearn.decomposition import PCA
>>> from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
>>> from sklearn.pipeline import make_pipeline
>>> iris = datasets.load_iris()
>>> scaler, pca, lda = StandardScaler(), PCA(n_components=3, random_state=0), LinearDiscriminantAnalysis()
>>> pipeline = make_pipeline(scaler, pca, lda)
>>> _ = pipeline.fit(iris.data, iris.target)
>>> # scaler.fit was called
>>> # scaler.transform was called
>>> # pca.fit was called
>>> # pca.transform was called
>>> # lda.fit was called
>>> data = pipeline.decision_function(iris.data)
>>> # scaler.transform was called
>>> # pca.transform was called
>>> # lda.decision_function was called
As you can see here, the example ran fine. The iris.data
was transformed
twice using scaler.transform
and pca.transform
but that’s ok and we
could have avoided that at the cost of complexity and more memory usage.
Let’s go through through this example again and increase its complexity
as we progress.
Sample-based processing¶
First, let’s look at how we can turn this into a sample-based pipeline. We need to convert our dataset to a list of samples first:
>>> import bob.pipelines
>>> from functools import partial
>>> def load(i):
... return iris.data[i]
>>> samples = [
... bob.pipelines.DelayedSample(partial(load, i), target=y)
... for i, y in enumerate(iris.target)
... ]
>>> samples[0]
DelayedSample(target=0)
You may be already familiar with our sample concept. If not, please read more on
Samples, a way to enhance scikit pipelines with metadata. Now, to optimize our process, we will represent our
samples in an xarray.Dataset
using dask.array.Array
’s:
>>> dataset = bob.pipelines.xr.samples_to_dataset(samples, npartitions=3)
>>> dataset
<xarray.Dataset> Size: 6kB
Dimensions: (sample: 150, dim_0: 4)
Dimensions without coordinates: sample, dim_0
Data variables:
target (sample) int64 1kB dask.array<chunksize=(50,), meta=np.ndarray>
data (sample, dim_0) float64 5kB dask.array<chunksize=(50, 4), meta=np.ndarray>
You can see here that our samples
were converted to a dataset of dask
arrays. The dataset is made of two dimensions: sample
and dim_0
. We
have 150 samples in our dataset and the data is made of 4 dimensional features.
We also partitioned our dataset to 3 partitions (chunks of 50 samples) to ensure efficient and parallel processing of our data. Read up on dask arrays to become more familiar with the idea of chunks.
If you want to give a name to dim_0
, you can provide a meta
parameter which should a xarray.DataArray
providing information
about data
in our samples:
>>> import xarray as xr
>>> # construct the meta from one sample
>>> meta = xr.DataArray(samples[0].data, dims=("feature"))
>>> dataset = bob.pipelines.xr.samples_to_dataset(samples, npartitions=3, meta=meta)
>>> dataset
<xarray.Dataset> Size: 6kB
Dimensions: (sample: 150, feature: 4)
Dimensions without coordinates: sample, feature
Data variables:
target (sample) int64 1kB dask.array<chunksize=(50,), meta=np.ndarray>
data (sample, feature) float64 5kB dask.array<chunksize=(50, 4), meta=np.ndarray>
Now, we want to build a pipeline that instead of numpy arrays, processes this
dataset instead. We can do that with our DatasetPipeline
. A dataset
pipeline is made of scikit-learn estimators but instead of working on numpy
arrays, it works on xarray datasets with dask arrays inside them. We will build
our pipeline using again PCA and LDA. To build a dataset pipeline, we need to
tell DatasetPipeline
which variables to pass to our estimators. By default,
DatasetPipeline
will pass the data
variable to our transformer. This
will work for PCA since it only needs data
in its .fit
and
.transform
methods. However, for LDA, we also need to provide target
when fitting the estimator. DatasetPipeline
as input takes a list of
estimators. If you have to give more information about an estimator, you pass a
dictionary instead.
>>> pipeline = bob.pipelines.xr.DatasetPipeline(
... [
... scaler,
... pca,
... dict(estimator=lda, fit_input=["data", "target"]),
... ]
... )
>>> pipeline
DatasetPipeline(...)
The dictionaries are used to construct Block
’s. You can checkout
that class to see what options are possible.
Now let’s fit our pipeline with our xarray dataset. Ideally, we want
this fit step be postponed until the we call dask.compute
on our
results. But this does not happen here which we will explain later.
>>> _ = pipeline.fit(dataset)
Now let’s call decision_function
on our pipeline. What will be
returned is a new dataset with the data
variable changed to the
output of lda.decision_function
.
>>> ds = pipeline.decision_function(dataset)
>>> ds
<xarray.Dataset> Size: 5kB
Dimensions: (sample: 150, c: 3)
Dimensions without coordinates: sample, c
Data variables:
target (sample) int64 1kB dask.array<chunksize=(50,), meta=np.ndarray>
data (sample, c) float64 4kB dask.array<chunksize=(50, 3), meta=np.ndarray>
To get the results as numpy arrays you can call .compute()
on xarray
or dask objects:
>>> ds.compute()
<xarray.Dataset> Size: 5kB
Dimensions: (sample: 150, c: 3)
Dimensions without coordinates: sample, c
Data variables:
target (sample) int64 1kB 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2
data (sample, c) float64 4kB 28.42 -15.84 -59.68 ... -57.81 3.79 6.92
Our operations were not lazy here (you can’t see in the docs that it was not
lazy but if you increase logging’s verbosity in your code, you will see) because
we had unknown dimensions. Whenever DatasetPipeline
is faced with an
unknown dimension, it will compute all the computations till then (using
dask.persist
) and then continues with valid size dimensions. A workaround to
this is to provide the feature size of each estimator to DatasetPipeline
.
You have to provide a list of size 2 tuples of (dim_name, dim_size)
. The
dim_name
must not overlap with what is already in the dataset unless it has
the same size. If your estimator does not change the dimension sizes, you can
provide None
as dimension size and this will be inferred from the dataset.
For new and unknown dimension sizes use np.nan.
>>> pipeline = bob.pipelines.xr.DatasetPipeline(
... [
... # scaler output is the same size as input `feature`
... dict(estimator=scaler, output_dims=[("feature", None)]),
... # pca feature size is 3. You can't call this dimension `feature`
... # because it's already in the input dataset with a different size
... dict(estimator=pca, output_dims=[("pca_feature", 3)]),
... # size of the output of lda.decision_function is 3
... # because we have 3 classes in our problem
... dict(estimator=lda, fit_input=["data", "target"], output_dims=[("class", 3)]),
... ]
... )
>>> pipeline
DatasetPipeline(...)
>>> ds = pipeline.fit(dataset).decision_function(dataset)
>>> ds
<xarray.Dataset> Size: 5kB
Dimensions: (sample: 150, class: 3)
Dimensions without coordinates: sample, class
Data variables:
target (sample) int64 1kB dask.array<chunksize=(50,), meta=np.ndarray>
data (sample, class) float64 4kB dask.array<chunksize=(50, 3), meta=np.ndarray>
This time nothing was computed. We can get the results by calling
ds.compute()
:
>>> ds.compute()
<xarray.Dataset> Size: 5kB
Dimensions: (sample: 150, class: 3)
Dimensions without coordinates: sample, class
Data variables:
target (sample) int64 1kB 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2
data (sample, class) float64 4kB 28.42 -15.84 -59.68 ... 3.79 6.92
>>> ds.data.data.visualize(format="svg")
In the visualization of the dask graph below, you can see that dask is only
executing scaler.transform
and pca.transform
once.
Checkpointing¶
We may want to checkpoint features or save our fitted estimators for
later use. Also, we might want to do this checkpointing to inspect our
features. Let’s add the key
metadata to our dataset first:
>>> def load(i):
... return iris.data[i]
>>> samples = [
... bob.pipelines.DelayedSample(partial(load, i), target=y, key=i)
... for i, y in enumerate(iris.target)
... ]
>>> samples[0]
DelayedSample(target=0, key=0)
>>> # construct the meta from one sample
>>> meta = xr.DataArray(samples[0].data, dims=("feature"))
>>> dataset = bob.pipelines.xr.samples_to_dataset(samples, npartitions=3, meta=meta)
>>> dataset
<xarray.Dataset> Size: 7kB
Dimensions: (sample: 150, feature: 4)
Dimensions without coordinates: sample, feature
Data variables:
target (sample) int64 1kB dask.array<chunksize=(50,), meta=np.ndarray>
key (sample) int64 1kB dask.array<chunksize=(50,), meta=np.ndarray>
data (sample, feature) float64 5kB dask.array<chunksize=(50, 4), meta=np.ndarray>
To checkpoint estimators, all we need is to provide a model_path
or
features_dir
in our pipeline and it will checkpont the model or the
features:
>>> import os
>>> scaler_model = os.path.join(tempdir, "scaler.pkl")
>>> pca_model = os.path.join(tempdir, "pca.pkl")
>>> pca_features = os.path.join(tempdir, "pca_features")
>>> lda_model = os.path.join(tempdir, "lda.pkl")
>>> pipeline = bob.pipelines.xr.DatasetPipeline(
... [
... dict(estimator=scaler, output_dims=[("feature", None)],
... model_path=scaler_model),
... dict(estimator=pca, output_dims=[("pca_feature", 3)],
... model_path=pca_model, features_dir=pca_features),
... dict(estimator=lda, fit_input=["data", "target"], output_dims=[("class", 3)],
... model_path=lda_model),
... ]
... )
>>> ds = pipeline.fit(dataset).decision_function(dataset)
>>> ds.compute()
<xarray.Dataset> Size: 6kB
Dimensions: (sample: 150, class: 3)
Dimensions without coordinates: sample, class
Data variables:
target (sample) int64 1kB 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2
key (sample) int64 1kB 0 1 2 3 4 5 6 7 ... 143 144 145 146 147 148 149
data (sample, class) float64 4kB 28.42 -15.84 -59.68 ... 3.79 6.92
Now if you repeat the operations, the checkpoints will be used:
>>> ds = pipeline.fit(dataset).decision_function(dataset)
>>> ds.compute()
<xarray.Dataset> Size: 6kB
Dimensions: (sample: 150, class: 3)
Dimensions without coordinates: sample, class
Data variables:
target (sample) int64 1kB 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2
key (sample) int64 1kB 0 1 2 3 4 5 6 7 ... 143 144 145 146 147 148 149
data (sample, class) float64 4kB 28.42 -15.84 -59.68 ... 3.79 6.92
>>> ds.data.data.visualize(format="svg")
In the visualization of the dask graph below, you can see that dask is only
executing lda.decision_function
and loading pca features and lda model from
disk.
Handling failed processing of Samples¶
Imagine that in your pipeline, you may want to drop some samples because you
were unable to process them or they were low quality samples.
DatasetPipeline
provides a generic option for applying a function on the
whole dataset. Instead of providing an estimator, you just provide a function
that takes as input a dataset and returns a modified dataset. This mechanism can
be used for example to drop samples. To signal that we have failed processing a
sample, we will put np.nan
in the data of our failed samples.
Imagine, the following PCA class that fails to process every other sample:
>>> import numpy as np
>>> class FailingPCA(PCA):
... def transform(self, X):
... Xt = super().transform(X)
... Xt[::2] = np.nan
... return Xt
Now in our pipeline, we want to drop nan
samples after PCA transformations:
>>> failing_pca = FailingPCA(n_components=3, random_state=0)
>>> pipeline = bob.pipelines.xr.DatasetPipeline(
... [
... dict(estimator=scaler, output_dims=[("feature", None)]),
... dict(estimator=failing_pca, output_dims=[("pca_feature", 3)]),
... dict(dataset_map=lambda x: x.persist().dropna("sample")),
... dict(estimator=lda, fit_input=["data", "target"], output_dims=[("class", 3)]),
... ]
... )
>>> ds = pipeline.fit(dataset).decision_function(dataset)
>>> ds.compute()
<xarray.Dataset> Size: 3kB
Dimensions: (sample: 75, class: 3)
Dimensions without coordinates: sample, class
Data variables:
target (sample) int64 600B 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2
key (sample) int64 600B 1 3 5 7 9 11 13 ... 137 139 141 143 145 147 149
data (sample, class) float64 2kB 21.74 -13.45 -54.81 ... 4.178 8.07
You can see that we have 75 samples now instead of 150 samples. The
dataset_map
option is generic. You can apply any operation in this function.
Training on datasets larger than memory¶
Sometimes your dataset is larger than your machine’s memory and this prevents
you to train (.fit
) your estimator(s). Luckily since our data are already
dask arrays, we can use dask-ml estimators to overcome this. When you
provide dask-ml estimators, set input_dask_array
as True
.
>>> # It's always a good idea to shuffle the samples if you are doing
>>> # partial_fit.
>>> dataset = bob.pipelines.xr.samples_to_dataset(
... samples, npartitions=3, meta=meta, shuffle=True)
>>> from sklearn.linear_model import SGDClassifier
>>> import dask_ml.preprocessing, dask_ml.decomposition, dask_ml.wrappers
>>> # construct the estimators
>>> scaler = dask_ml.preprocessing.StandardScaler()
>>> pca = dask_ml.decomposition.PCA(n_components=3, random_state=0)
>>> clf = SGDClassifier(random_state=0, loss='log_loss', penalty='l2', tol=1e-3)
>>> clf = dask_ml.wrappers.Incremental(clf, scoring="accuracy")
>>> pipeline = bob.pipelines.xr.DatasetPipeline(
... [
... dict(
... estimator=scaler,
... output_dims=[("feature", None)],
... input_dask_array=True,
... ),
... dict(
... estimator=pca,
... output_dims=[("pca_features", 3)],
... input_dask_array=True,
... ),
... dict(
... estimator=clf,
... fit_input=["data", "target"],
... output_dims=[], # since we are going to call `.predict`
... input_dask_array=True,
... fit_kwargs=dict(classes=range(3)),
... ),
... ]
... )
>>> ds = pipeline.fit(dataset).predict(dataset)
>>> ds = ds.compute()
>>> correct_classification = np.array(ds.data == ds.target).sum()
>>> correct_classification > 85
True
>>> ds.dims == {"sample": 150}
True