Dask: Scale your scikit.learn pipelines¶
Dask is a flexible library for parallel computing in Python. The purpose of this guide is not to describe how dask works. For that, go to its documentation. Moreover, there are plenty of tutorials online. For instance, this official one; a nice overview was presented in AnacondaCon 2018 and there’s even one crafted for Idiap.
The purpose of this guide is to describe:
The integration of dask with scikit learn pipelines and samples
The specificities of Dask under the Idiap SGE
From Scikit Learn pipelines to Dask Task Graphs¶
The purpose of scikit learn pipelines is to assemble several scikit estimators in one final one. Then, it is possible to use the methods fit and transform to create models and transform your data respectivelly.
Any pipeline can be transformed in a Dask Graph to be further executed by any Dask Client.
This is carried out via the wrap function when used like wrap(["dask"], estimator) (see Convenience wrapper function).
Such function does two things:
Edit the current
sklearn.pipeline.Pipelineby adding a new first step, where input samples are transformed in Dask Bag. This allows the usage ofdask.bag.mapfor further transformations.Wrap all estimators in the pipeline with
DaskWrapper. This wrapper is responsible for the creation of the task graph for the methods .fit and .transform.
The code snippet below enables such feature for an arbitrary pipeline.
>>> import bob.pipelines as mario
>>> from sklearn.pipeline import make_pipeline
>>> pipeline = make_pipeline(...)
>>> dask_pipeline = mario.wrap(["dask"], pipeline) # Create a dask graph
>>> dask_pipeline.fit_transform(....).compute() # Run the task graph using the default client
The code below is an example. Especially lines 59-63 where we convert such pipeline in a Dask Graph and runs it in a local computer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | import os
import shutil
import numpy
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
import bob.pipelines
from bob.pipelines.sample import Sample
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
return X
def fit(self, X, y=None):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self):
self._fit_model = None
def transform(self, X, metadata=None):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
self._fit_model = numpy.array([[1, 2], [3, 4]])
return self
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
model_path = "./dask_tmp"
os.makedirs(model_path, exist_ok=True)
pipeline = make_pipeline(MyTransformer(), MyFitTranformer())
# Wrapping with sample, checkpoint and dask
pipeline = bob.pipelines.wrap(
["sample", "checkpoint", "dask"],
pipeline,
model_path=os.path.join(model_path, "model.pickle"),
features_dir=model_path,
transform_extra_arguments=(("metadata", "metadata"),),
)
# Create a dask graph from a pipeline
# Run the task graph in the local computer in a single tread
X_transformed = pipeline.fit_transform(X_as_sample).compute(scheduler="single-threaded")
shutil.rmtree(model_path)
|
Such code generates the following graph.
This graph can be seem by running http://localhost:8787 during its execution.¶
Dask + Idiap SGE¶
Dask, allows the deployment and parallelization of graphs either locally or in complex job queuing systems, such as PBS, SGE…. This is achieved via Dask-Jobqueue. Below follow a nice video explaining what is the Dask-Jobqueue, some of its features and how to use it to run dask graphs.
Warning
To submit jobs at Idiap’s SGE it’s mandatory to set a project name. Run the code below to set it:
$ bob config set sge.project <YOUR_PROJECT_NAME>
The snippet below shows how to deploy the exact same pipeline from the previous section in the Idiap SGE cluster
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
>>> from dask.distributed import Client
>>> cluster = SGEMultipleQueuesCluster() # Creates the SGE launcher that launches jobs in the q_1day
>>> client = Client(cluster) # Creates the scheduler and attaching it to the SGE job queue system
>>> dask_pipeline.fit_transform(....).compute(scheduler=client) # Runs my graph in the Idiap SGE
That’s it, you just run a scikit pipeline in the Idiap SGE grid :-)
Dask provides generic deployment mechanism for SGE systems, but it contains the following limitations:
It assumes that a dask graph runs in an homogeneous grid setup. For instance, if parts your graph needs a specific resource that it’s avaible in other SGE queues (e.g q_gpu, q_long_gpu, IO_BIG), the scheduler is not able to request those resources on the fly.
As a result of 1., the mechanism of adaptive deployment is not able to handle job submissions of two or more queues.
For this reason the generic SGE laucher was extended to this one bob.pipelines.distributed.sge.SGEMultipleQueuesCluster. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job.
Launching jobs in different SGE queues¶
SGE queue specs are defined in python dictionary as in the example below, where, the root keys are the labels of the SGE queue and the other inner keys represents:
queue: The real name of the SGE queue
memory: The amount of memory required for the job
io_big: Submit jobs with IO_BIG=TRUE
resource_spec: Whatever other key using in qsub -l
resources: Reference label used to tag dask delayed so it will run in a specific queue. This is a very important feature the will be discussed in the next section.
>>> Q_1DAY_GPU_SPEC = {
... "default": {
... "queue": "q_1day",
... "memory": "8GB",
... "io_big": True,
... "resource_spec": "",
... "max_jobs": 48,
... "resources": "",
... },
... "q_short_gpu": {
... "queue": "q_short_gpu",
... "memory": "12GB",
... "io_big": False,
... "resource_spec": "",
... "max_jobs": 48,
... "resources": {"q_short_gpu":1},
... },
... }
Now that the queue specifications are set, let’s trigger some jobs.
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
>>> from dask.distributed import Client
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> client = Client(cluster) # Creating the scheduler
Note
To check if the jobs were actually submitted always do qstat:
$ qstat
Running estimator operations in specific SGE queues¶
Sometimes it’s necessary to run parts of a pipeline in specific SGE queues (e.g. q_1day IO_BIG or q_gpu). The example below shows how this is approached (lines 52 to 57). In this example, the fit method of MyBoostedFitTransformer runs on q_short_gpu
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | import os
import shutil
import numpy
from dask.distributed import Client
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
import bob.pipelines
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
from bob.pipelines.distributed.sge import get_resource_requirements
from bob.pipelines.sample import Sample
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
return X
def fit(self, X, y=None):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self):
self._fit_model = None
def transform(self, X, metadata=None):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
self._fit_model = numpy.array([[1, 2], [3, 4]])
return self
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
model_path = "./dask_tmp"
os.makedirs(model_path, exist_ok=True)
pipeline = make_pipeline(MyTransformer(), MyFitTranformer())
# Wrapping with sample, checkpoint and dask
# NOTE that pipeline.fit will run in `q_short_gpu`
pipeline = bob.pipelines.wrap(
["sample", "checkpoint", "dask"],
pipeline,
model_path=model_path,
transform_extra_arguments=(("metadata", "metadata"),),
fit_tag="q_short_gpu",
)
# Creating my cluster obj.
cluster = SGEMultipleQueuesCluster()
client = Client(cluster) # Creating the scheduler
resources = get_resource_requirements(pipeline)
# Run the task graph in the local computer in a single tread
# NOTE THAT resources is set in .compute
X_transformed = pipeline.fit_transform(X_as_sample).compute(
scheduler=client, resources=resources
)
shutil.rmtree(model_path)
|