Checkpointing¶
Very often during the processing of sklearn.pipeline.Pipeline
with big chunks of
data, it is useful to have checkpoints of some steps of the pipeline into the disk. This
is useful for several purposes, such as:
Reusing samples that are expensive to be re-computed.
Inspection of algorithms.
Scikit-learn has a caching mechanism that allows the caching of
sklearn.pipeline.Pipeline
that can be used for such purpose. Although useful,
such structure is not user friendly.
As we detailed in Samples, a way to enhance scikit pipelines with metadata, sklearn estimators can be extended to
handle samples with metadata. Now, one metadata can be a unique identifier of each
sample. We will refer to this unique identifier as sample.key
. If we have that in
our samples, we can use that identifier to save and load samples from disk. This is what
we call checkpointing and to do this, all you need to do is to wrap your estimator with
CheckpointWrapper
and make sure your samples have the .key
metadata.
Checkpointing samples¶
Below, you will see an example on how checkpointing works. First, let’s make a transformer.
>>> # by convention, we import bob.pipelines as mario, because mario works with pipes ;)
>>> import bob.pipelines as mario
>>> import numpy as np
>>> from sklearn.base import TransformerMixin, BaseEstimator
>>>
>>> class MyTransformer(TransformerMixin, BaseEstimator):
... def transform(self, X, sample_specific_offsets):
... print(f"Transforming {len(X)} samples ...")
... return np.array(X) + np.array(sample_specific_offsets)
...
... def fit(self, X):
... print("Fit was called!")
... return self
All checkpointing transformers must be able to handle Sample
’s.
For that, we can use SampleWrapper
:
>>> transform_extra_arguments=[("sample_specific_offsets", "offset")]
>>> sample_transformer = mario.SampleWrapper(MyTransformer(), transform_extra_arguments)
Then, we wrap it with CheckpointWrapper
:
>>> # create some samples with ``key`` metadata
>>> # Creating X: 3 samples, 2 features
>>> X = np.zeros((3, 2))
>>> # 3 offsets: one for each sample
>>> offsets = np.arange(3).reshape((3, 1))
>>> # key values must be string because they will be used to create file names.
>>> samples = [mario.Sample(x, offset=o, key=str(i)) for i, (x, o) in enumerate(zip(X, offsets))]
>>> samples[0]
Sample(data=array([0., 0.]), offset=array([0]), key='0')
>>> import tempfile
>>> import os
>>> # create a temporary directory to save checkpoints
>>> with tempfile.TemporaryDirectory() as dir_name:
... checkpointing_transformer = mario.CheckpointWrapper(
... sample_transformer, features_dir=dir_name)
...
... # transform samples
... transformed_samples = checkpointing_transformer.transform(samples)
...
... # Let's check the features directory
... list(sorted(os.listdir(dir_name)))
Transforming 3 samples ...
['0.h5', '1.h5', '2.h5']
Note
By default, CheckpointWrapper
saves samples inside HDF5 files
but you can change that. Refer to its documentation to see how.
If checkpoints for a sample already exists, it will not be recomputed but loaded from disk:
>>> # create a temporary directory to save checkpoints
>>> with tempfile.TemporaryDirectory() as dir_name:
... checkpointing_transformer = mario.CheckpointWrapper(
... sample_transformer, features_dir=dir_name)
...
... # transform samples for the first time, it should print transforming 3 samples
... transformed_samples1 = checkpointing_transformer.transform(samples)
...
... # transform samples again. This time it should not print transforming 3
... # samples
... transformed_samples2 = checkpointing_transformer.transform(samples)
...
... # It should print True
... print(np.allclose(transformed_samples1[1].data, transformed_samples2[1].data))
Transforming 3 samples ...
True
Note
SampleSet
’s can be checkpointed as well. The samples inside them
should have the .key
metadata.
Checkpointing estimators¶
We can also checkpoint estimators after their training (estimator.fit
). This allows
us to load the estimator from disk instead of training it if .fit
is called and a
checkpoint exists.
>>> # create a temporary directory to save checkpoints
>>> with tempfile.NamedTemporaryFile(prefix="model", suffix=".pkl") as f:
... f.close()
... checkpointing_transformer = mario.CheckpointWrapper(
... sample_transformer, model_path=f.name)
...
... # call .fit for the first time, it should print Fit was called!
... __ = checkpointing_transformer.fit(samples)
...
... # call .fit again. This time it should not print anything
... __ = checkpointing_transformer.fit(samples)
Fit was called!
Convenience wrapper function¶
We provide a wrap
function to wrap estimators in several layers easily. So far we
learned that we need to wrap our estimators with SampleWrapper
and
CheckpointWrapper
. There is also a Dask wrapper: DaskWrapper
which you’ll
learn about in Dask: Scale your scikit.learn pipelines. Below, is an example on how to use it.
Instead of:
>>> transformer = MyTransformer()
>>> transform_extra_arguments=[("sample_specific_offsets", "offset")]
>>> transformer = mario.SampleWrapper(transformer, transform_extra_arguments)
>>> transformer = mario.CheckpointWrapper(
... transformer, features_dir="features", model_path="model.pkl")
>>> transformer = mario.DaskWrapper(transformer)
You can write:
>>> transformer = mario.wrap(
... [MyTransformer, "sample", "checkpoint", "dask"],
... transform_extra_arguments=transform_extra_arguments,
... features_dir="features",
... model_path="model.pkl",
... )
>>> # or if your estimator is already created.
>>> transformer = mario.wrap(
... ["sample", "checkpoint", "dask"],
... MyTransformer(),
... transform_extra_arguments=transform_extra_arguments,
... features_dir="features",
... model_path="model.pkl",
... )
Much simpler, no? Internally "sample"
string will be replaced by
SampleWrapper
. You provide a list of classes to wrap as the first argument,
optionally provide an estimator to be wrapped as the second argument. If the second
argument is missing, the first class will be used to create the estimator. Then, you
provide the __init__
parameters of all classes as kwargs.
Internally, wrap
will pass kwargs to classes that accept it.
Note
wrap
is a convenience function but it might be limited in what it can do. You
can always use the wrapper classes directly.
wrap
recognizes sklearn.pipeline.Pipeline
objects and when pipelines are
passed, it wraps the steps inside them instead. For example, instead of:
>>> transformer1 = mario.wrap(
... [MyTransformer, "sample"],
... transform_extra_arguments=transform_extra_arguments,
... )
>>> transformer2 = mario.wrap(
... [MyTransformer, "sample"],
... transform_extra_arguments=transform_extra_arguments,
... )
>>> from sklearn.pipeline import make_pipeline
>>> pipeline = make_pipeline(transformer1, transformer2)
you can write:
>>> pipeline = make_pipeline(MyTransformer(), MyTransformer())
>>> pipeline = mario.wrap(["sample"], pipeline, transform_extra_arguments=transform_extra_arguments)
It will pass transform_extra_arguments
to all steps when wrapping them with the
SampleWrapper
. You cannot pass specific arguments to one of the steps. Wrapping
pipelines with wrap
, while limited, becomes useful when we are wrapping them
with Dask as we will see in Dask: Scale your scikit.learn pipelines.