Dask: Scale your scikit.learn pipelines

Dask is a flexible library for parallel computing in Python. The purpose of this guide is not to describe how dask works. For that, go to its documentation. Moreover, there are plenty of tutorials online. For instance, this official one; a nice overview was presented in AnacondaCon 2018 and there’s even one crafted for Idiap.

The purpose of this guide is to describe:

  1. The integration of dask with scikit learn pipelines and samples

  2. The specificities of Dask under the Idiap SGE

From Scikit Learn pipelines to Dask Task Graphs

The purpose of scikit learn pipelines is to assemble several scikit estimators in one final one. Then, it is possible to use the methods fit and transform to create models and transform your data respectivelly.

Any pipeline can be transformed in a Dask Graph to be further executed by any Dask Client. This is carried out via the wrap function when used like wrap(["dask"], estimator) (see Convenience wrapper function). Such function does two things:

  1. Edit the current sklearn.pipeline.Pipeline by adding a new first step, where input samples are transformed in Dask Bag. This allows the usage of dask.bag.map for further transformations.

  2. Wrap all estimators in the pipeline with DaskWrapper. This wrapper is responsible for the creation of the task graph for the methods .fit and .transform.

The code snippet below enables such feature for an arbitrary pipeline.

>>> import bob.pipelines as mario
>>> from sklearn.pipeline import make_pipeline
>>> pipeline = make_pipeline(...)
>>> dask_pipeline = mario.wrap(["dask"], pipeline) # Create a dask graph
>>> dask_pipeline.fit_transform(....).compute() # Run the task graph using the default client

The code below is an example. Especially lines 59-63 where we convert such pipeline in a Dask Graph and runs it in a local computer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
import shutil

import numpy

from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline

import bob.pipelines

from bob.pipelines.sample import Sample


class MyTransformer(TransformerMixin, BaseEstimator):
    def transform(self, X, metadata=None):
        # Transform `X` with metadata
        return X

    def fit(self, X, y=None):
        pass

    def _more_tags(self):
        return {"stateless": True, "requires_fit": False}


class MyFitTranformer(TransformerMixin, BaseEstimator):
    def __init__(self):
        self._fit_model = None

    def transform(self, X, metadata=None):
        # Transform `X`
        return [x @ self._fit_model for x in X]

    def fit(self, X):
        self._fit_model = numpy.array([[1, 2], [3, 4]])
        return self


# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]

# Building an arbitrary pipeline
model_path = "./dask_tmp"
os.makedirs(model_path, exist_ok=True)
pipeline = make_pipeline(MyTransformer(), MyFitTranformer())

# Wrapping with sample, checkpoint and dask
pipeline = bob.pipelines.wrap(
    ["sample", "checkpoint", "dask"],
    pipeline,
    model_path=os.path.join(model_path, "model.pickle"),
    features_dir=model_path,
    transform_extra_arguments=(("metadata", "metadata"),),
)

# Create a dask graph from a pipeline
# Run the task graph in the local computer in a single tread
X_transformed = pipeline.fit_transform(X_as_sample).compute(scheduler="single-threaded")


shutil.rmtree(model_path)

Such code generates the following graph.

_images/dask_graph.png

This graph can be seem by running http://localhost:8787 during its execution.

Dask + Idiap SGE

Dask, allows the deployment and parallelization of graphs either locally or in complex job queuing systems, such as PBS, SGE…. This is achieved via Dask-Jobqueue. Below follow a nice video explaining what is the Dask-Jobqueue, some of its features and how to use it to run dask graphs.

Warning

To submit jobs at Idiap’s SGE it’s mandatory to set a project name. Run the code below to set it:

$ bob config set sge.project <YOUR_PROJECT_NAME>

The snippet below shows how to deploy the exact same pipeline from the previous section in the Idiap SGE cluster

>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
>>> from dask.distributed import Client
>>> cluster = SGEMultipleQueuesCluster() # Creates the SGE launcher that launches jobs in the q_1day
>>> client = Client(cluster) # Creates the scheduler and attaching it to the SGE job queue system
>>> dask_pipeline.fit_transform(....).compute(scheduler=client) # Runs my graph in the Idiap SGE

That’s it, you just run a scikit pipeline in the Idiap SGE grid :-)

Dask provides generic deployment mechanism for SGE systems, but it contains the following limitations:

  1. It assumes that a dask graph runs in an homogeneous grid setup. For instance, if parts your graph needs a specific resource that it’s avaible in other SGE queues (e.g q_gpu, q_long_gpu, IO_BIG), the scheduler is not able to request those resources on the fly.

  2. As a result of 1., the mechanism of adaptive deployment is not able to handle job submissions of two or more queues.

For this reason the generic SGE laucher was extended to this one bob.pipelines.distributed.sge.SGEMultipleQueuesCluster. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job.

Launching jobs in different SGE queues

SGE queue specs are defined in python dictionary as in the example below, where, the root keys are the labels of the SGE queue and the other inner keys represents:

  1. queue: The real name of the SGE queue

  2. memory: The amount of memory required for the job

  3. io_big: Submit jobs with IO_BIG=TRUE

  4. resource_spec: Whatever other key using in qsub -l

  5. resources: Reference label used to tag dask delayed so it will run in a specific queue. This is a very important feature the will be discussed in the next section.

>>> Q_1DAY_GPU_SPEC = {
...         "default": {
...             "queue": "q_1day",
...             "memory": "8GB",
...             "io_big": True,
...             "resource_spec": "",
...             "max_jobs": 48,
...             "resources": "",
...         },
...         "q_short_gpu": {
...             "queue": "q_short_gpu",
...             "memory": "12GB",
...             "io_big": False,
...             "resource_spec": "",
...             "max_jobs": 48,
...             "resources": {"q_short_gpu":1},
...         },
...     }

Now that the queue specifications are set, let’s trigger some jobs.

>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
>>> from dask.distributed import Client
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> client = Client(cluster) # Creating the scheduler

Note

To check if the jobs were actually submitted always do qstat:

$ qstat

Running estimator operations in specific SGE queues

Sometimes it’s necessary to run parts of a pipeline in specific SGE queues (e.g. q_1day IO_BIG or q_gpu). The example below shows how this is approached (lines 52 to 57). In this example, the fit method of MyBoostedFitTransformer runs on q_short_gpu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import os
import shutil

import numpy

from dask.distributed import Client
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline

import bob.pipelines

from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
from bob.pipelines.distributed.sge import get_resource_requirements
from bob.pipelines.sample import Sample


class MyTransformer(TransformerMixin, BaseEstimator):
    def transform(self, X, metadata=None):
        # Transform `X` with metadata
        return X

    def fit(self, X, y=None):
        pass

    def _more_tags(self):
        return {"stateless": True, "requires_fit": False}


class MyFitTranformer(TransformerMixin, BaseEstimator):
    def __init__(self):
        self._fit_model = None

    def transform(self, X, metadata=None):
        # Transform `X`
        return [x @ self._fit_model for x in X]

    def fit(self, X):
        self._fit_model = numpy.array([[1, 2], [3, 4]])
        return self


# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]

# Building an arbitrary pipeline
model_path = "./dask_tmp"
os.makedirs(model_path, exist_ok=True)
pipeline = make_pipeline(MyTransformer(), MyFitTranformer())

# Wrapping with sample, checkpoint and dask
# NOTE that pipeline.fit will run in `q_short_gpu`
pipeline = bob.pipelines.wrap(
    ["sample", "checkpoint", "dask"],
    pipeline,
    model_path=model_path,
    transform_extra_arguments=(("metadata", "metadata"),),
    fit_tag="q_short_gpu",
)

# Creating my cluster obj.
cluster = SGEMultipleQueuesCluster()
client = Client(cluster)  # Creating the scheduler
resources = get_resource_requirements(pipeline)

# Run the task graph in the local computer in a single tread
# NOTE THAT resources is set in .compute
X_transformed = pipeline.fit_transform(X_as_sample).compute(
    scheduler=client, resources=resources
)

shutil.rmtree(model_path)