Coverage for src/bob/pipelines/distributed/sge.py: 22%
143 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-06-16 14:21 +0200
« prev ^ index » next coverage.py v7.0.5, created at 2023-06-16 14:21 +0200
1#!/usr/bin/env python
2# vim: set fileencoding=utf-8 :
3# Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
5import logging
6import sys
8import dask
10from clapper.rc import UserDefaults
11from dask_jobqueue.core import Job, JobQueueCluster
12from distributed.deploy import Adaptive
13from distributed.scheduler import Scheduler
15from .sge_queues import QUEUE_DEFAULT
17logger = logging.getLogger(__name__)
18rc = UserDefaults("bobrc.toml")
21class SGEIdiapJob(Job):
22 """Launches a SGE Job in the IDIAP cluster. This class basically encodes
23 the CLI command that bootstrap the worker in a SGE job. Check here
24 `https://distributed.dask.org/en/latest/resources.html#worker-resources`
25 for more information.
27 ..note: This is class is temporary. It's basically a copy from SGEJob from dask_jobqueue.
28 The difference is that here I'm also handling the dask job resources tag (which is not handled anywhere). This has to be patched in the Job class. Please follow here `https://github.com/dask/dask-jobqueue/issues/378` to get news about this patch
29 """
31 submit_command = "qsub"
32 cancel_command = "qdel"
33 config_name = "SGEIdiapJob"
35 def __init__(
36 self,
37 *args,
38 queue=None,
39 project=rc.get("sge.project"),
40 resource_spec=None,
41 job_extra_directives=None,
42 config_name="sge",
43 **kwargs,
44 ):
45 if queue is None:
46 queue = dask.config.get("jobqueue.%s.queue" % config_name)
47 if project is None:
48 project = dask.config.get("jobqueue.%s.project" % config_name)
49 if resource_spec is None:
50 resource_spec = dask.config.get(
51 "jobqueue.%s.resource-spec" % config_name
52 )
53 if job_extra_directives is None:
54 job_extra_directives = dask.config.get(
55 "jobqueue.%s.job-extra-directives" % config_name
56 )
58 # Resources
59 resources = kwargs.pop("resources", None)
61 super().__init__(
62 *args, config_name=config_name, death_timeout=10000, **kwargs
63 )
65 # Amending the --resources in the `distributed.cli.dask_worker` CLI command
66 if resources:
67 # Preparing the string to be sent to `dask-worker` command
68 resources_str = ""
69 for k, v in resources.items():
70 resources_str += f"{k}={v}"
72 self._command_template += f" --resources {resources_str}"
74 header_lines = []
75 if self.job_name is not None:
76 header_lines.append("#$ -N %(job-name)s")
77 if queue is not None:
78 header_lines.append("#$ -q %(queue)s")
79 if project is not None:
80 header_lines.append("#$ -P %(project)s")
81 if resource_spec is not None:
82 header_lines.append("#$ -l %(resource_spec)s")
84 if self.log_directory is not None:
85 header_lines.append("#$ -e %(log_directory)s/")
86 header_lines.append("#$ -o %(log_directory)s/")
87 header_lines.extend(["#$ -cwd", "#$ -j y"])
88 header_lines.extend(["#$ %s" % arg for arg in job_extra_directives])
89 header_template = "\n".join(header_lines)
91 config = {
92 "job-name": self.job_name,
93 "queue": queue,
94 "project": project,
95 "processes": self.worker_processes,
96 "resource_spec": resource_spec,
97 "log_directory": self.log_directory,
98 }
99 self.job_header = header_template % config
100 logger.debug("Job script: \n %s" % self.job_script())
103def get_max_jobs(queue_dict):
104 """Given a queue list, get the max number of possible jobs."""
106 return max(
107 [
108 queue_dict[r]["max_jobs"]
109 for r in queue_dict
110 if "max_jobs" in queue_dict[r]
111 ]
112 )
115def get_resource_requirements(pipeline):
116 """
117 Get the resource requirements to execute a graph.
118 This is useful when it's necessary get the dictionary mapping the dask delayed keys with
119 specific resource restrictions.
120 Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information
122 Parameters
123 ----------
124 pipeline: :any:`sklearn.pipeline.Pipeline`
125 A :any:`sklearn.pipeline.Pipeline` wrapper with :any:`bob.pipelines.DaskWrapper`
127 Example
128 -------
129 >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
130 >>> client = Client(cluster) # doctest: +SKIP
131 >>> from bob.pipelines.sge import get_resource_requirements # doctest: +SKIP
132 >>> resources = get_resource_requirements(pipeline) # doctest: +SKIP
133 >>> my_delayed_task.compute(scheduler=client, resources=resources) # doctest: +SKIP
134 """
136 resources = dict()
137 for s in pipeline:
138 if hasattr(s, "resource_tags"):
139 resources.update(s.resource_tags)
140 return resources
143class SGEMultipleQueuesCluster(JobQueueCluster):
144 """Launch Dask jobs in the SGE cluster allowing the request of multiple
145 queues.
147 Parameters
148 ----------
149 log_directory: str
150 Default directory for the SGE logs
152 protocol: str
153 Scheduler communication protocol
155 dashboard_address: str
156 Default port for the dask dashboard,
158 job_script_prologue: str,
159 Extra environment variables to send to the workers
161 sge_job_spec: dict
162 Dictionary containing a minimum specification for the qsub command.
163 It consists of:
165 queue: SGE queue
166 memory: Memory requirement in GB (e.g. 4GB)
167 io_bio: set the io_big flag
168 resource_spec: Whatever extra argument to be sent to qsub (qsub -l)
169 tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html)
170 max_jobs: Maximum number of jobs in the queue
172 min_jobs: int
173 Lower bound for the number of jobs for `self.adapt`
176 Example
177 -------
179 Below follow a vanilla-example that will create a set of jobs on all.q:
181 >>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster # doctest: +SKIP
182 >>> from dask.distributed import Client # doctest: +SKIP
183 >>> cluster = SGEMultipleQueuesCluster() # doctest: +SKIP
184 >>> cluster.scale_up(10) # doctest: +SKIP
185 >>> client = Client(cluster) # doctest: +SKIP
187 It's possible to demand a resource specification yourself:
189 >>> Q_1DAY_IO_BIG_SPEC = {
190 ... "default": {
191 ... "queue": "q_1day",
192 ... "memory": "8GB",
193 ... "io_big": True,
194 ... "resource_spec": "",
195 ... "resources": "",
196 ... }
197 ... }
198 >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) # doctest: +SKIP
199 >>> cluster.scale_up(10) # doctest: +SKIP
200 >>> client = Client(cluster) # doctest: +SKIP
204 More than one jon spec can be set:
206 >>> Q_1DAY_GPU_SPEC = {
207 ... "default": {
208 ... "queue": "q_1day",
209 ... "memory": "8GB",
210 ... "io_big": True,
211 ... "resource_spec": "",
212 ... "resources": "",
213 ... },
214 ... "gpu": {
215 ... "queue": "q_gpu",
216 ... "memory": "12GB",
217 ... "io_big": False,
218 ... "resource_spec": "",
219 ... "resources": {"GPU":1},
220 ... },
221 ... }
222 >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
223 >>> cluster.scale_up(10) # doctest: +SKIP
224 >>> cluster.scale_up(1, sge_job_spec_key="gpu") # doctest: +SKIP
225 >>> client = Client(cluster) # doctest: +SKIP
228 Adaptive job allocation can also be used via `AdaptiveIdiap` extension:
230 >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
231 >>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) # doctest: +SKIP
232 >>> client = Client(cluster) # doctest: +SKIP
233 """
235 def __init__(
236 self,
237 log_directory="./logs",
238 protocol="tcp://",
239 dashboard_address=":8787",
240 job_script_prologue=None,
241 sge_job_spec=QUEUE_DEFAULT,
242 min_jobs=1,
243 project=rc.get("sge.project"),
244 **kwargs,
245 ):
246 # Defining the job launcher
247 self.job_cls = SGEIdiapJob
248 self.sge_job_spec = sge_job_spec
250 self.protocol = protocol
251 self.log_directory = log_directory
252 self.project = project
254 silence_logs = "error"
255 interface = None
256 host = None
257 security = None
259 if job_script_prologue is None:
260 job_script_prologue = []
261 elif not isinstance(job_script_prologue, list):
262 job_script_prologue = [job_script_prologue]
263 self.job_script_prologue = job_script_prologue + [
264 "export PYTHONPATH=" + ":".join(sys.path)
265 ]
267 scheduler = {
268 "cls": SchedulerResourceRestriction, # Use local scheduler for now
269 "options": {
270 "protocol": self.protocol,
271 "interface": interface,
272 "host": host,
273 "dashboard_address": dashboard_address,
274 "security": security,
275 },
276 }
278 # Spec cluster parameters
279 loop = None
280 asynchronous = False
281 name = None
283 # Starting the SpecCluster constructor
284 super(JobQueueCluster, self).__init__(
285 scheduler=scheduler,
286 worker={},
287 loop=loop,
288 silence_logs=silence_logs,
289 asynchronous=asynchronous,
290 name=name,
291 )
293 def _get_worker_spec_options(self, job_spec):
294 """Craft a dask worker_spec to be used in the qsub command."""
296 new_resource_spec = job_spec.get("resource_spec", "")
298 # IO_BIG
299 new_resource_spec += (
300 "io_big=TRUE,"
301 if "io_big" in job_spec and job_spec["io_big"]
302 else ""
303 )
305 memory = job_spec.get("memory", "")[:-1]
306 new_resource_spec += f"mem_free={memory},"
308 queue = job_spec.get("queue", "")
309 if queue != "all.q":
310 new_resource_spec += f"{queue}=TRUE"
312 new_resource_spec = (
313 None if new_resource_spec == "" else new_resource_spec
314 )
316 return {
317 "queue": queue,
318 "project": self.project,
319 "memory": job_spec.get("memory", ""),
320 "job_extra_directives": job_spec.get("job_extra_directives", None),
321 "cores": 1,
322 "processes": 1,
323 "log_directory": self.log_directory,
324 "local_directory": self.log_directory,
325 "resource_spec": new_resource_spec,
326 "interface": None,
327 "protocol": self.protocol,
328 "security": None,
329 "resources": job_spec.get("resources", ""),
330 "job_script_prologue": self.job_script_prologue,
331 }
333 def scale(self, n_jobs, sge_job_spec_key="default"):
334 """Launch an SGE job in the Idiap SGE cluster.
336 Parameters
337 ----------
339 n_jobs: int
340 Quantity of jobs to scale
342 sge_job_spec_key: str
343 One of the specs `SGEMultipleQueuesCluster.sge_job_spec`
344 """
346 if n_jobs == 0:
347 # Shutting down all workers
348 return super(JobQueueCluster, self).scale(0, memory=None, cores=0)
350 job_spec = self.sge_job_spec[sge_job_spec_key]
351 worker_spec_options = self._get_worker_spec_options(job_spec)
352 n_cores = 1
353 worker_spec = {"cls": self.job_cls, "options": worker_spec_options}
355 # Defining a new worker_spec with some SGE characteristics
356 self.new_spec = worker_spec
358 return super(JobQueueCluster, self).scale(
359 n_jobs, memory=None, cores=n_cores
360 )
362 def scale_up(self, n_jobs, sge_job_spec_key=None):
363 """Scale cluster up.
365 This is supposed to be used by the scheduler while dynamically
366 allocating resources
367 """
368 return self.scale(n_jobs, sge_job_spec_key)
370 async def scale_down(self, workers, sge_job_spec_key=None):
371 """Scale cluster down.
373 This is supposed to be used by the scheduler while dynamically
374 allocating resources
375 """
376 await super().scale_down(workers)
378 def adapt(self, *args, **kwargs):
379 super().adapt(*args, Adaptive=AdaptiveMultipleQueue, **kwargs)
382class AdaptiveMultipleQueue(Adaptive):
383 """Custom mechanism to adaptively allocate workers based on scheduler load.
385 This custom implementation extends the `Adaptive.recommendations` by looking
386 at the `distributed.scheduler.TaskState.resource_restrictions`.
388 The heuristics is:
390 .. note ::
391 If a certain task has the status `no-worker` and it has resource_restrictions, the scheduler should
392 request a job matching those resource restrictions
393 """
395 async def recommendations(self, target: int) -> dict:
396 """Make scale up/down recommendations based on current state and
397 target."""
399 plan = self.plan
401 # Get tasks with no worker associated due to
402 # resource restrictions
403 resource_restrictions = (
404 await self.scheduler.get_no_worker_tasks_resource_restrictions()
405 )
407 # If the amount of resources requested is bigger
408 # than what available and those jobs has restrictions
409 if target > len(plan):
410 self.close_counts.clear()
411 if len(resource_restrictions) > 0:
412 return {
413 "status": "up",
414 "n": target,
415 "sge_job_spec_key": list(resource_restrictions[0].keys())[
416 0
417 ],
418 }
419 else:
420 return {"status": "up", "n": target}
422 # If the amount of resources requested is lower
423 # than what is available, is time to downscale
424 elif target < len(plan):
425 to_close = set()
427 # Get the worksers that can be closed.
428 if target < len(plan) - len(to_close):
429 L = await self.workers_to_close(target=target)
430 to_close.update(L)
432 firmly_close = set()
433 # COUNTING THE AMOUNT OF SCHEDULER CYCLES THAT WE SHOULD KEEP
434 # THIS WORKER BEFORE DESTROYING IT
435 for w in to_close:
436 self.close_counts[w] += 1
437 if self.close_counts[w] >= self.wait_count:
438 firmly_close.add(w)
440 for k in list(self.close_counts): # clear out unseen keys
441 if k in firmly_close or k not in to_close:
442 del self.close_counts[k]
444 # Send message to destroy workers
445 if firmly_close:
446 return {"status": "down", "workers": list(firmly_close)}
448 # If the amount of available workers is ok
449 # for the current demand, BUT
450 # there are tasks that need some special worker:
451 # SCALE EVERYTHING UP
452 if target == len(plan) and len(resource_restrictions) > 0:
453 return {
454 "status": "up",
455 "n": target + 1,
456 "sge_job_spec_key": list(resource_restrictions[0].keys())[0],
457 }
458 else:
459 return {"status": "same"}
461 async def scale_up(self, n, sge_job_spec_key="default"):
462 await self.cluster.scale(n, sge_job_spec_key=sge_job_spec_key)
464 async def scale_down(self, workers, sge_job_spec_key="default"):
465 await super().scale_down(workers)
468class SchedulerResourceRestriction(Scheduler):
469 """Idiap extended distributed scheduler.
471 This scheduler extends `Scheduler` by just adding a handler that
472 fetches, at every scheduler cycle, the resource restrictions of a
473 task that has status `no-worker`
474 """
476 def __init__(self, *args, **kwargs):
477 super(SchedulerResourceRestriction, self).__init__(
478 idle_timeout=rc.get("bob.pipelines.sge.idle_timeout", 3600),
479 allowed_failures=rc.get("bob.pipelines.sge.allowed_failures", 100),
480 worker_ttl=rc.get("bob.pipelines.sge.worker_ttl", 120),
481 synchronize_worker_interval="10s",
482 *args,
483 **kwargs,
484 )
485 self.handlers[
486 "get_no_worker_tasks_resource_restrictions"
487 ] = self.get_no_worker_tasks_resource_restrictions
489 def get_no_worker_tasks_resource_restrictions(self, comm=None):
490 """Get the a task resource restrictions for jobs that has the status
491 'no-worker'."""
493 resource_restrictions = []
494 for k in self.tasks:
495 if (
496 self.tasks[k].state == "no-worker"
497 and self.tasks[k].resource_restrictions is not None
498 ):
499 resource_restrictions.append(
500 self.tasks[k].resource_restrictions
501 )
503 return resource_restrictions