Coverage for src/bob/pipelines/distributed/__init__.py: 50%
24 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-12 21:32 +0200
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-12 21:32 +0200
1from pkgutil import extend_path
3from .sge import SchedulerResourceRestriction
5# see https://docs.python.org/3/library/pkgutil.html
6__path__ = extend_path(__path__, __name__)
9# DASK-click VALID_DASK_CLIENT_STRINGS
10# TO BE USED IN:
11# @click.option(
12# "--dask-client",
13# "-l",
14# entry_point_group="dask.client",
15# string_exceptions=VALID_DASK_CLIENT_STRINGS,
16# default="single-threaded",
17# help="Dask client for the execution of the pipeline.",
18# cls=ResourceOption,
19# )
21try:
22 import dask
24 VALID_DASK_CLIENT_STRINGS = dask.base.named_schedulers
25except (ModuleNotFoundError, ImportError):
26 VALID_DASK_CLIENT_STRINGS = (
27 "sync",
28 "synchronous",
29 "single-threaded",
30 "threads",
31 "threading",
32 "processes",
33 "multiprocessing",
34 )
37def dask_get_partition_size(cluster, n_objects, lower_bound=200):
38 """
39 Heuristics that gives you a number for dask.partition_size.
40 The heuristics is pretty simple, given the max number of possible workers to be run
41 in a queue (not the number of current workers running) and a total number objects to be processed do n_objects/n_max_workers:
43 Check https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions
44 for best practices
46 Parameters
47 ----------
49 cluster: :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`
50 Cluster of the type :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`
52 n_objects: int
53 Number of objects to be processed
55 lower_bound: int
56 Minimum partition size.
58 """
59 from .sge import SGEMultipleQueuesCluster
61 if not isinstance(cluster, SGEMultipleQueuesCluster):
62 return None
64 max_jobs = cluster.sge_job_spec["default"]["max_jobs"]
66 # Trying to set a lower bound for the
67 return (
68 max(n_objects // max_jobs, lower_bound)
69 if n_objects > max_jobs
70 else n_objects
71 )
74def get_local_parallel_client(parallel=None, processes=True):
75 """Returns a local Dask client with the given parameters, see the dask documentation for details: https://docs.dask.org/en/latest/how-to/deploy-dask/single-distributed.html?highlight=localcluster#localcluster
77 Parameters
78 ----------
80 parallel: int or None
81 The number of workers (processes or threads) to use; if `None`, take as many processors as we have on the system
83 processes: boolean
84 Shall the dask client start processes (True, recommended) or threads (False). Note that threads in pure pyton do not run in parallel, see: https://www.quantstart.com/articles/Parallelising-Python-with-Threading-and-Multiprocessing/
85 """
87 from multiprocessing import cpu_count
89 from dask.distributed import Client, LocalCluster
91 parallel = parallel or cpu_count()
93 cluster = LocalCluster(
94 processes=processes,
95 n_workers=parallel if processes else 1,
96 threads_per_worker=1 if processes else parallel,
97 )
98 return Client(cluster)
101def __appropriate__(*args):
102 """Says object was actually declared here, and not in the import module.
103 Fixing sphinx warnings of not being able to find classes, when path is
104 shortened.
106 Parameters
107 ----------
108 *args
109 The objects that you want sphinx to believe that are defined here.
111 Resolves `Sphinx referencing issues <https//github.com/sphinx-
112 doc/sphinx/issues/3048>`
113 """
115 for obj in args:
116 obj.__module__ = __name__
119__appropriate__(
120 SchedulerResourceRestriction,
121)