Coverage for src/bob/pipelines/distributed/sge.py: 22%

143 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-06-16 14:21 +0200

1#!/usr/bin/env python 

2# vim: set fileencoding=utf-8 : 

3# Tiago de Freitas Pereira <tiago.pereira@idiap.ch> 

4 

5import logging 

6import sys 

7 

8import dask 

9 

10from clapper.rc import UserDefaults 

11from dask_jobqueue.core import Job, JobQueueCluster 

12from distributed.deploy import Adaptive 

13from distributed.scheduler import Scheduler 

14 

15from .sge_queues import QUEUE_DEFAULT 

16 

17logger = logging.getLogger(__name__) 

18rc = UserDefaults("bobrc.toml") 

19 

20 

21class SGEIdiapJob(Job): 

22 """Launches a SGE Job in the IDIAP cluster. This class basically encodes 

23 the CLI command that bootstrap the worker in a SGE job. Check here 

24 `https://distributed.dask.org/en/latest/resources.html#worker-resources` 

25 for more information. 

26 

27 ..note: This is class is temporary. It's basically a copy from SGEJob from dask_jobqueue. 

28 The difference is that here I'm also handling the dask job resources tag (which is not handled anywhere). This has to be patched in the Job class. Please follow here `https://github.com/dask/dask-jobqueue/issues/378` to get news about this patch 

29 """ 

30 

31 submit_command = "qsub" 

32 cancel_command = "qdel" 

33 config_name = "SGEIdiapJob" 

34 

35 def __init__( 

36 self, 

37 *args, 

38 queue=None, 

39 project=rc.get("sge.project"), 

40 resource_spec=None, 

41 job_extra_directives=None, 

42 config_name="sge", 

43 **kwargs, 

44 ): 

45 if queue is None: 

46 queue = dask.config.get("jobqueue.%s.queue" % config_name) 

47 if project is None: 

48 project = dask.config.get("jobqueue.%s.project" % config_name) 

49 if resource_spec is None: 

50 resource_spec = dask.config.get( 

51 "jobqueue.%s.resource-spec" % config_name 

52 ) 

53 if job_extra_directives is None: 

54 job_extra_directives = dask.config.get( 

55 "jobqueue.%s.job-extra-directives" % config_name 

56 ) 

57 

58 # Resources 

59 resources = kwargs.pop("resources", None) 

60 

61 super().__init__( 

62 *args, config_name=config_name, death_timeout=10000, **kwargs 

63 ) 

64 

65 # Amending the --resources in the `distributed.cli.dask_worker` CLI command 

66 if resources: 

67 # Preparing the string to be sent to `dask-worker` command 

68 resources_str = "" 

69 for k, v in resources.items(): 

70 resources_str += f"{k}={v}" 

71 

72 self._command_template += f" --resources {resources_str}" 

73 

74 header_lines = [] 

75 if self.job_name is not None: 

76 header_lines.append("#$ -N %(job-name)s") 

77 if queue is not None: 

78 header_lines.append("#$ -q %(queue)s") 

79 if project is not None: 

80 header_lines.append("#$ -P %(project)s") 

81 if resource_spec is not None: 

82 header_lines.append("#$ -l %(resource_spec)s") 

83 

84 if self.log_directory is not None: 

85 header_lines.append("#$ -e %(log_directory)s/") 

86 header_lines.append("#$ -o %(log_directory)s/") 

87 header_lines.extend(["#$ -cwd", "#$ -j y"]) 

88 header_lines.extend(["#$ %s" % arg for arg in job_extra_directives]) 

89 header_template = "\n".join(header_lines) 

90 

91 config = { 

92 "job-name": self.job_name, 

93 "queue": queue, 

94 "project": project, 

95 "processes": self.worker_processes, 

96 "resource_spec": resource_spec, 

97 "log_directory": self.log_directory, 

98 } 

99 self.job_header = header_template % config 

100 logger.debug("Job script: \n %s" % self.job_script()) 

101 

102 

103def get_max_jobs(queue_dict): 

104 """Given a queue list, get the max number of possible jobs.""" 

105 

106 return max( 

107 [ 

108 queue_dict[r]["max_jobs"] 

109 for r in queue_dict 

110 if "max_jobs" in queue_dict[r] 

111 ] 

112 ) 

113 

114 

115def get_resource_requirements(pipeline): 

116 """ 

117 Get the resource requirements to execute a graph. 

118 This is useful when it's necessary get the dictionary mapping the dask delayed keys with 

119 specific resource restrictions. 

120 Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information 

121 

122 Parameters 

123 ---------- 

124 pipeline: :any:`sklearn.pipeline.Pipeline` 

125 A :any:`sklearn.pipeline.Pipeline` wrapper with :any:`bob.pipelines.DaskWrapper` 

126 

127 Example 

128 ------- 

129 >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP 

130 >>> client = Client(cluster) # doctest: +SKIP 

131 >>> from bob.pipelines.sge import get_resource_requirements # doctest: +SKIP 

132 >>> resources = get_resource_requirements(pipeline) # doctest: +SKIP 

133 >>> my_delayed_task.compute(scheduler=client, resources=resources) # doctest: +SKIP 

134 """ 

135 

136 resources = dict() 

137 for s in pipeline: 

138 if hasattr(s, "resource_tags"): 

139 resources.update(s.resource_tags) 

140 return resources 

141 

142 

143class SGEMultipleQueuesCluster(JobQueueCluster): 

144 """Launch Dask jobs in the SGE cluster allowing the request of multiple 

145 queues. 

146 

147 Parameters 

148 ---------- 

149 log_directory: str 

150 Default directory for the SGE logs 

151 

152 protocol: str 

153 Scheduler communication protocol 

154 

155 dashboard_address: str 

156 Default port for the dask dashboard, 

157 

158 job_script_prologue: str, 

159 Extra environment variables to send to the workers 

160 

161 sge_job_spec: dict 

162 Dictionary containing a minimum specification for the qsub command. 

163 It consists of: 

164 

165 queue: SGE queue 

166 memory: Memory requirement in GB (e.g. 4GB) 

167 io_bio: set the io_big flag 

168 resource_spec: Whatever extra argument to be sent to qsub (qsub -l) 

169 tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html) 

170 max_jobs: Maximum number of jobs in the queue 

171 

172 min_jobs: int 

173 Lower bound for the number of jobs for `self.adapt` 

174 

175 

176 Example 

177 ------- 

178 

179 Below follow a vanilla-example that will create a set of jobs on all.q: 

180 

181 >>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster # doctest: +SKIP 

182 >>> from dask.distributed import Client # doctest: +SKIP 

183 >>> cluster = SGEMultipleQueuesCluster() # doctest: +SKIP 

184 >>> cluster.scale_up(10) # doctest: +SKIP 

185 >>> client = Client(cluster) # doctest: +SKIP 

186 

187 It's possible to demand a resource specification yourself: 

188 

189 >>> Q_1DAY_IO_BIG_SPEC = { 

190 ... "default": { 

191 ... "queue": "q_1day", 

192 ... "memory": "8GB", 

193 ... "io_big": True, 

194 ... "resource_spec": "", 

195 ... "resources": "", 

196 ... } 

197 ... } 

198 >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) # doctest: +SKIP 

199 >>> cluster.scale_up(10) # doctest: +SKIP 

200 >>> client = Client(cluster) # doctest: +SKIP 

201 

202 

203 

204 More than one jon spec can be set: 

205 

206 >>> Q_1DAY_GPU_SPEC = { 

207 ... "default": { 

208 ... "queue": "q_1day", 

209 ... "memory": "8GB", 

210 ... "io_big": True, 

211 ... "resource_spec": "", 

212 ... "resources": "", 

213 ... }, 

214 ... "gpu": { 

215 ... "queue": "q_gpu", 

216 ... "memory": "12GB", 

217 ... "io_big": False, 

218 ... "resource_spec": "", 

219 ... "resources": {"GPU":1}, 

220 ... }, 

221 ... } 

222 >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP 

223 >>> cluster.scale_up(10) # doctest: +SKIP 

224 >>> cluster.scale_up(1, sge_job_spec_key="gpu") # doctest: +SKIP 

225 >>> client = Client(cluster) # doctest: +SKIP 

226 

227 

228 Adaptive job allocation can also be used via `AdaptiveIdiap` extension: 

229 

230 >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP 

231 >>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) # doctest: +SKIP 

232 >>> client = Client(cluster) # doctest: +SKIP 

233 """ 

234 

235 def __init__( 

236 self, 

237 log_directory="./logs", 

238 protocol="tcp://", 

239 dashboard_address=":8787", 

240 job_script_prologue=None, 

241 sge_job_spec=QUEUE_DEFAULT, 

242 min_jobs=1, 

243 project=rc.get("sge.project"), 

244 **kwargs, 

245 ): 

246 # Defining the job launcher 

247 self.job_cls = SGEIdiapJob 

248 self.sge_job_spec = sge_job_spec 

249 

250 self.protocol = protocol 

251 self.log_directory = log_directory 

252 self.project = project 

253 

254 silence_logs = "error" 

255 interface = None 

256 host = None 

257 security = None 

258 

259 if job_script_prologue is None: 

260 job_script_prologue = [] 

261 elif not isinstance(job_script_prologue, list): 

262 job_script_prologue = [job_script_prologue] 

263 self.job_script_prologue = job_script_prologue + [ 

264 "export PYTHONPATH=" + ":".join(sys.path) 

265 ] 

266 

267 scheduler = { 

268 "cls": SchedulerResourceRestriction, # Use local scheduler for now 

269 "options": { 

270 "protocol": self.protocol, 

271 "interface": interface, 

272 "host": host, 

273 "dashboard_address": dashboard_address, 

274 "security": security, 

275 }, 

276 } 

277 

278 # Spec cluster parameters 

279 loop = None 

280 asynchronous = False 

281 name = None 

282 

283 # Starting the SpecCluster constructor 

284 super(JobQueueCluster, self).__init__( 

285 scheduler=scheduler, 

286 worker={}, 

287 loop=loop, 

288 silence_logs=silence_logs, 

289 asynchronous=asynchronous, 

290 name=name, 

291 ) 

292 

293 def _get_worker_spec_options(self, job_spec): 

294 """Craft a dask worker_spec to be used in the qsub command.""" 

295 

296 new_resource_spec = job_spec.get("resource_spec", "") 

297 

298 # IO_BIG 

299 new_resource_spec += ( 

300 "io_big=TRUE," 

301 if "io_big" in job_spec and job_spec["io_big"] 

302 else "" 

303 ) 

304 

305 memory = job_spec.get("memory", "")[:-1] 

306 new_resource_spec += f"mem_free={memory}," 

307 

308 queue = job_spec.get("queue", "") 

309 if queue != "all.q": 

310 new_resource_spec += f"{queue}=TRUE" 

311 

312 new_resource_spec = ( 

313 None if new_resource_spec == "" else new_resource_spec 

314 ) 

315 

316 return { 

317 "queue": queue, 

318 "project": self.project, 

319 "memory": job_spec.get("memory", ""), 

320 "job_extra_directives": job_spec.get("job_extra_directives", None), 

321 "cores": 1, 

322 "processes": 1, 

323 "log_directory": self.log_directory, 

324 "local_directory": self.log_directory, 

325 "resource_spec": new_resource_spec, 

326 "interface": None, 

327 "protocol": self.protocol, 

328 "security": None, 

329 "resources": job_spec.get("resources", ""), 

330 "job_script_prologue": self.job_script_prologue, 

331 } 

332 

333 def scale(self, n_jobs, sge_job_spec_key="default"): 

334 """Launch an SGE job in the Idiap SGE cluster. 

335 

336 Parameters 

337 ---------- 

338 

339 n_jobs: int 

340 Quantity of jobs to scale 

341 

342 sge_job_spec_key: str 

343 One of the specs `SGEMultipleQueuesCluster.sge_job_spec` 

344 """ 

345 

346 if n_jobs == 0: 

347 # Shutting down all workers 

348 return super(JobQueueCluster, self).scale(0, memory=None, cores=0) 

349 

350 job_spec = self.sge_job_spec[sge_job_spec_key] 

351 worker_spec_options = self._get_worker_spec_options(job_spec) 

352 n_cores = 1 

353 worker_spec = {"cls": self.job_cls, "options": worker_spec_options} 

354 

355 # Defining a new worker_spec with some SGE characteristics 

356 self.new_spec = worker_spec 

357 

358 return super(JobQueueCluster, self).scale( 

359 n_jobs, memory=None, cores=n_cores 

360 ) 

361 

362 def scale_up(self, n_jobs, sge_job_spec_key=None): 

363 """Scale cluster up. 

364 

365 This is supposed to be used by the scheduler while dynamically 

366 allocating resources 

367 """ 

368 return self.scale(n_jobs, sge_job_spec_key) 

369 

370 async def scale_down(self, workers, sge_job_spec_key=None): 

371 """Scale cluster down. 

372 

373 This is supposed to be used by the scheduler while dynamically 

374 allocating resources 

375 """ 

376 await super().scale_down(workers) 

377 

378 def adapt(self, *args, **kwargs): 

379 super().adapt(*args, Adaptive=AdaptiveMultipleQueue, **kwargs) 

380 

381 

382class AdaptiveMultipleQueue(Adaptive): 

383 """Custom mechanism to adaptively allocate workers based on scheduler load. 

384 

385 This custom implementation extends the `Adaptive.recommendations` by looking 

386 at the `distributed.scheduler.TaskState.resource_restrictions`. 

387 

388 The heuristics is: 

389 

390 .. note :: 

391 If a certain task has the status `no-worker` and it has resource_restrictions, the scheduler should 

392 request a job matching those resource restrictions 

393 """ 

394 

395 async def recommendations(self, target: int) -> dict: 

396 """Make scale up/down recommendations based on current state and 

397 target.""" 

398 

399 plan = self.plan 

400 

401 # Get tasks with no worker associated due to 

402 # resource restrictions 

403 resource_restrictions = ( 

404 await self.scheduler.get_no_worker_tasks_resource_restrictions() 

405 ) 

406 

407 # If the amount of resources requested is bigger 

408 # than what available and those jobs has restrictions 

409 if target > len(plan): 

410 self.close_counts.clear() 

411 if len(resource_restrictions) > 0: 

412 return { 

413 "status": "up", 

414 "n": target, 

415 "sge_job_spec_key": list(resource_restrictions[0].keys())[ 

416 0 

417 ], 

418 } 

419 else: 

420 return {"status": "up", "n": target} 

421 

422 # If the amount of resources requested is lower 

423 # than what is available, is time to downscale 

424 elif target < len(plan): 

425 to_close = set() 

426 

427 # Get the worksers that can be closed. 

428 if target < len(plan) - len(to_close): 

429 L = await self.workers_to_close(target=target) 

430 to_close.update(L) 

431 

432 firmly_close = set() 

433 # COUNTING THE AMOUNT OF SCHEDULER CYCLES THAT WE SHOULD KEEP 

434 # THIS WORKER BEFORE DESTROYING IT 

435 for w in to_close: 

436 self.close_counts[w] += 1 

437 if self.close_counts[w] >= self.wait_count: 

438 firmly_close.add(w) 

439 

440 for k in list(self.close_counts): # clear out unseen keys 

441 if k in firmly_close or k not in to_close: 

442 del self.close_counts[k] 

443 

444 # Send message to destroy workers 

445 if firmly_close: 

446 return {"status": "down", "workers": list(firmly_close)} 

447 

448 # If the amount of available workers is ok 

449 # for the current demand, BUT 

450 # there are tasks that need some special worker: 

451 # SCALE EVERYTHING UP 

452 if target == len(plan) and len(resource_restrictions) > 0: 

453 return { 

454 "status": "up", 

455 "n": target + 1, 

456 "sge_job_spec_key": list(resource_restrictions[0].keys())[0], 

457 } 

458 else: 

459 return {"status": "same"} 

460 

461 async def scale_up(self, n, sge_job_spec_key="default"): 

462 await self.cluster.scale(n, sge_job_spec_key=sge_job_spec_key) 

463 

464 async def scale_down(self, workers, sge_job_spec_key="default"): 

465 await super().scale_down(workers) 

466 

467 

468class SchedulerResourceRestriction(Scheduler): 

469 """Idiap extended distributed scheduler. 

470 

471 This scheduler extends `Scheduler` by just adding a handler that 

472 fetches, at every scheduler cycle, the resource restrictions of a 

473 task that has status `no-worker` 

474 """ 

475 

476 def __init__(self, *args, **kwargs): 

477 super(SchedulerResourceRestriction, self).__init__( 

478 idle_timeout=rc.get("bob.pipelines.sge.idle_timeout", 3600), 

479 allowed_failures=rc.get("bob.pipelines.sge.allowed_failures", 100), 

480 worker_ttl=rc.get("bob.pipelines.sge.worker_ttl", 120), 

481 synchronize_worker_interval="10s", 

482 *args, 

483 **kwargs, 

484 ) 

485 self.handlers[ 

486 "get_no_worker_tasks_resource_restrictions" 

487 ] = self.get_no_worker_tasks_resource_restrictions 

488 

489 def get_no_worker_tasks_resource_restrictions(self, comm=None): 

490 """Get the a task resource restrictions for jobs that has the status 

491 'no-worker'.""" 

492 

493 resource_restrictions = [] 

494 for k in self.tasks: 

495 if ( 

496 self.tasks[k].state == "no-worker" 

497 and self.tasks[k].resource_restrictions is not None 

498 ): 

499 resource_restrictions.append( 

500 self.tasks[k].resource_restrictions 

501 ) 

502 

503 return resource_restrictions