Coverage for src/bob/pipelines/distributed/__init__.py: 50%

24 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-06-16 14:21 +0200

1from pkgutil import extend_path 

2 

3from .sge import SchedulerResourceRestriction 

4 

5# see https://docs.python.org/3/library/pkgutil.html 

6__path__ = extend_path(__path__, __name__) 

7 

8 

9# DASK-click VALID_DASK_CLIENT_STRINGS 

10# TO BE USED IN: 

11# @click.option( 

12# "--dask-client", 

13# "-l", 

14# entry_point_group="dask.client", 

15# string_exceptions=VALID_DASK_CLIENT_STRINGS, 

16# default="single-threaded", 

17# help="Dask client for the execution of the pipeline.", 

18# cls=ResourceOption, 

19# ) 

20 

21try: 

22 import dask 

23 

24 VALID_DASK_CLIENT_STRINGS = dask.base.named_schedulers 

25except (ModuleNotFoundError, ImportError): 

26 VALID_DASK_CLIENT_STRINGS = ( 

27 "sync", 

28 "synchronous", 

29 "single-threaded", 

30 "threads", 

31 "threading", 

32 "processes", 

33 "multiprocessing", 

34 ) 

35 

36 

37def dask_get_partition_size(cluster, n_objects, lower_bound=200): 

38 """ 

39 Heuristics that gives you a number for dask.partition_size. 

40 The heuristics is pretty simple, given the max number of possible workers to be run 

41 in a queue (not the number of current workers running) and a total number objects to be processed do n_objects/n_max_workers: 

42 

43 Check https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions 

44 for best practices 

45 

46 Parameters 

47 ---------- 

48 

49 cluster: :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster` 

50 Cluster of the type :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster` 

51 

52 n_objects: int 

53 Number of objects to be processed 

54 

55 lower_bound: int 

56 Minimum partition size. 

57 

58 """ 

59 from .sge import SGEMultipleQueuesCluster 

60 

61 if not isinstance(cluster, SGEMultipleQueuesCluster): 

62 return None 

63 

64 max_jobs = cluster.sge_job_spec["default"]["max_jobs"] 

65 

66 # Trying to set a lower bound for the 

67 return ( 

68 max(n_objects // max_jobs, lower_bound) 

69 if n_objects > max_jobs 

70 else n_objects 

71 ) 

72 

73 

74def get_local_parallel_client(parallel=None, processes=True): 

75 """Returns a local Dask client with the given parameters, see the dask documentation for details: https://docs.dask.org/en/latest/how-to/deploy-dask/single-distributed.html?highlight=localcluster#localcluster 

76 

77 Parameters 

78 ---------- 

79 

80 parallel: int or None 

81 The number of workers (processes or threads) to use; if `None`, take as many processors as we have on the system 

82 

83 processes: boolean 

84 Shall the dask client start processes (True, recommended) or threads (False). Note that threads in pure pyton do not run in parallel, see: https://www.quantstart.com/articles/Parallelising-Python-with-Threading-and-Multiprocessing/ 

85 """ 

86 

87 from multiprocessing import cpu_count 

88 

89 from dask.distributed import Client, LocalCluster 

90 

91 parallel = parallel or cpu_count() 

92 

93 cluster = LocalCluster( 

94 processes=processes, 

95 n_workers=parallel if processes else 1, 

96 threads_per_worker=1 if processes else parallel, 

97 ) 

98 return Client(cluster) 

99 

100 

101def __appropriate__(*args): 

102 """Says object was actually declared here, and not in the import module. 

103 Fixing sphinx warnings of not being able to find classes, when path is 

104 shortened. 

105 

106 Parameters 

107 ---------- 

108 *args 

109 The objects that you want sphinx to believe that are defined here. 

110 

111 Resolves `Sphinx referencing issues <https//github.com/sphinx- 

112 doc/sphinx/issues/3048>` 

113 """ 

114 

115 for obj in args: 

116 obj.__module__ = __name__ 

117 

118 

119__appropriate__( 

120 SchedulerResourceRestriction, 

121)