Prepare a SlurmConfig
configuration object
The argument which_type
determines whether we use wftask.meta_parallel
or wftask.meta_non_parallel
. In the following descritpion, let us assume
that which_type="parallel"
.
The sources for SlurmConfig
attributes, in increasing priority order, are
- The general content of the Fractal SLURM configuration file.
- The GPU-specific content of the Fractal SLURM configuration file, if
appropriate.
- Properties in
wftask.meta_parallel
(which typically include those in
wftask.task.meta_parallel
). Note that wftask.meta_parallel
may be
None
.
Parameters:
Name |
Type |
Description |
Default |
wftask |
WorkflowTaskV2
|
WorkflowTask for which the SLURM configuration is is to be
prepared.
|
required
|
config_path |
Optional[Path]
|
Path of a Fractal SLURM configuration file; if None , use
FRACTAL_SLURM_CONFIG_FILE variable from settings.
|
None
|
which_type |
Literal['non_parallel', 'parallel']
|
Determines whether to use meta_parallel or meta_non_parallel .
|
required
|
Returns:
Source code in fractal_server/app/runner/v2/_slurm_common/get_slurm_config.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 | def get_slurm_config(
wftask: WorkflowTaskV2,
which_type: Literal["non_parallel", "parallel"],
config_path: Optional[Path] = None,
) -> SlurmConfig:
"""
Prepare a `SlurmConfig` configuration object
The argument `which_type` determines whether we use `wftask.meta_parallel`
or `wftask.meta_non_parallel`. In the following descritpion, let us assume
that `which_type="parallel"`.
The sources for `SlurmConfig` attributes, in increasing priority order, are
1. The general content of the Fractal SLURM configuration file.
2. The GPU-specific content of the Fractal SLURM configuration file, if
appropriate.
3. Properties in `wftask.meta_parallel` (which typically include those in
`wftask.task.meta_parallel`). Note that `wftask.meta_parallel` may be
`None`.
Arguments:
wftask:
WorkflowTask for which the SLURM configuration is is to be
prepared.
config_path:
Path of a Fractal SLURM configuration file; if `None`, use
`FRACTAL_SLURM_CONFIG_FILE` variable from settings.
which_type:
Determines whether to use `meta_parallel` or `meta_non_parallel`.
Returns:
slurm_config:
The SlurmConfig object
"""
if which_type == "non_parallel":
wftask_meta = wftask.meta_non_parallel
elif which_type == "parallel":
wftask_meta = wftask.meta_parallel
else:
raise ValueError(
f"get_slurm_config received invalid argument {which_type=}."
)
logger.debug(
f"[get_slurm_config] WorkflowTask meta attribute: {wftask_meta=}"
)
# Incorporate slurm_env.default_slurm_config
slurm_env = load_slurm_config_file(config_path=config_path)
slurm_dict = slurm_env.default_slurm_config.dict(
exclude_unset=True, exclude={"mem"}
)
if slurm_env.default_slurm_config.mem:
slurm_dict["mem_per_task_MB"] = slurm_env.default_slurm_config.mem
# Incorporate slurm_env.batching_config
for key, value in slurm_env.batching_config.dict().items():
slurm_dict[key] = value
# Incorporate slurm_env.user_local_exports
slurm_dict["user_local_exports"] = slurm_env.user_local_exports
logger.debug(
"[get_slurm_config] Fractal SLURM configuration file: "
f"{slurm_env.dict()=}"
)
# GPU-related options
# Notes about priority:
# 1. This block of definitions takes priority over other definitions from
# slurm_env which are not under the `needs_gpu` subgroup
# 2. This block of definitions has lower priority than whatever comes next
# (i.e. from WorkflowTask.meta_parallel).
if wftask_meta is not None:
needs_gpu = wftask_meta.get("needs_gpu", False)
else:
needs_gpu = False
logger.debug(f"[get_slurm_config] {needs_gpu=}")
if needs_gpu:
for key, value in slurm_env.gpu_slurm_config.dict(
exclude_unset=True, exclude={"mem"}
).items():
slurm_dict[key] = value
if slurm_env.gpu_slurm_config.mem:
slurm_dict["mem_per_task_MB"] = slurm_env.gpu_slurm_config.mem
# Number of CPUs per task, for multithreading
if wftask_meta is not None and "cpus_per_task" in wftask_meta:
cpus_per_task = int(wftask_meta["cpus_per_task"])
slurm_dict["cpus_per_task"] = cpus_per_task
# Required memory per task, in MB
if wftask_meta is not None and "mem" in wftask_meta:
raw_mem = wftask_meta["mem"]
mem_per_task_MB = _parse_mem_value(raw_mem)
slurm_dict["mem_per_task_MB"] = mem_per_task_MB
# Job name
job_name = wftask.task.name.replace(" ", "_")
slurm_dict["job_name"] = job_name
# Optional SLURM arguments and extra lines
if wftask_meta is not None:
account = wftask_meta.get("account", None)
if account is not None:
error_msg = (
f"Invalid {account=} property in WorkflowTask `meta` "
"attribute.\n"
"SLURM account must be set in the request body of the "
"apply-workflow endpoint, or by modifying the user properties."
)
logger.error(error_msg)
raise SlurmConfigError(error_msg)
for key in ["time", "gres", "gpus", "constraint"]:
value = wftask_meta.get(key, None)
if value is not None:
slurm_dict[key] = value
if wftask_meta is not None:
extra_lines = wftask_meta.get("extra_lines", [])
else:
extra_lines = []
extra_lines = slurm_dict.get("extra_lines", []) + extra_lines
if len(set(extra_lines)) != len(extra_lines):
logger.debug(
"[get_slurm_config] Removing repeated elements "
f"from {extra_lines=}."
)
extra_lines = list(set(extra_lines))
slurm_dict["extra_lines"] = extra_lines
# Job-batching parameters (if None, they will be determined heuristically)
if wftask_meta is not None:
tasks_per_job = wftask_meta.get("tasks_per_job", None)
parallel_tasks_per_job = wftask_meta.get(
"parallel_tasks_per_job", None
)
else:
tasks_per_job = None
parallel_tasks_per_job = None
slurm_dict["tasks_per_job"] = tasks_per_job
slurm_dict["parallel_tasks_per_job"] = parallel_tasks_per_job
# Put everything together
logger.debug(
"[get_slurm_config] Now create a SlurmConfig object based "
f"on {slurm_dict=}"
)
slurm_config = SlurmConfig(**slurm_dict)
return slurm_config
|