Coverage for src/bob/pipelines/config/distributed/sge_io_big.py: 0%
8 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-12 21:32 +0200
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-12 21:32 +0200
1from dask.distributed import Client
3from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster, get_max_jobs
4from bob.pipelines.distributed.sge_queues import QUEUE_IOBIG
6min_jobs = max_jobs = get_max_jobs(QUEUE_IOBIG)
7cluster = SGEMultipleQueuesCluster(min_jobs=min_jobs, sge_job_spec=QUEUE_IOBIG)
8cluster.scale(max_jobs)
9# Adapting to minimim 1 job to maximum 48 jobs
10# interval: Milliseconds between checks from the scheduler
11# wait_count: Number of consecutive times that a worker should be suggested for
12# removal before we remove it.
13cluster.adapt(
14 minimum=min_jobs,
15 maximum=max_jobs,
16 wait_count=5,
17 interval=10,
18 target_duration="10s",
19)
20dask_client = Client(cluster)