Coverage for src/bob/pipelines/config/distributed/sge_default.py: 0%
9 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-12 21:32 +0200
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-12 21:32 +0200
1from dask.distributed import Client
3from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster, get_max_jobs
4from bob.pipelines.distributed.sge_queues import QUEUE_DEFAULT
6min_jobs = 1
7max_jobs = get_max_jobs(QUEUE_DEFAULT)
8cluster = SGEMultipleQueuesCluster(
9 min_jobs=min_jobs, sge_job_spec=QUEUE_DEFAULT
10)
11cluster.scale(max_jobs)
13# Adapting to minimim 1 job to maximum 48 jobs
14# interval: Milliseconds between checks from the scheduler
15# wait_count: Number of consecutive times that a worker should be suggested for
16# removal before we remove it.
17cluster.adapt(
18 minimum=min_jobs,
19 maximum=max_jobs,
20 wait_count=5,
21 interval=10,
22 target_duration="10s",
23)
25dask_client = Client(cluster)