Skip to content

_slurm_config

Submodule to handle the SLURM configuration for a WorkflowTask

SlurmConfig

Bases: BaseModel

Abstraction for SLURM parameters

NOTE: SlurmConfig objects are created internally in fractal-server, and they are not meant to be initialized by the user; the same holds for SlurmConfig attributes (e.g. mem_per_task_MB), which are not meant to be part of the FRACTAL_SLURM_CONFIG_FILE JSON file (details on the expected file content are defined in SlurmConfigFile).

Part of the attributes map directly to some of the SLURM attribues (see https://slurm.schedmd.com/sbatch.html), e.g. partition. Other attributes are metaparameters which are needed in fractal-server to combine multiple tasks in the same SLURM job (e.g. parallel_tasks_per_job or max_num_jobs).

Attributes:

Name Type Description
partition str

Corresponds to SLURM option.

cpus_per_task int

Corresponds to SLURM option.

mem_per_task_MB int

Corresponds to mem SLURM option.

job_name Optional[str]

Corresponds to name SLURM option.

constraint Optional[str]

Corresponds to SLURM option.

gres Optional[str]

Corresponds to SLURM option.

account Optional[str]

Corresponds to SLURM option.

gpus Optional[str]

Corresponds to SLURM option.

time Optional[str]

Corresponds to SLURM option (WARNING: not fully supported).

prefix str

Prefix of configuration lines in SLURM submission scripts.

shebang_line str

Shebang line for SLURM submission scripts.

extra_lines Optional[list[str]]

Additional lines to include in SLURM submission scripts.

tasks_per_job Optional[int]

Number of tasks for each SLURM job.

parallel_tasks_per_job Optional[int]

Number of tasks to run in parallel for each SLURM job.

target_cpus_per_job int

Optimal number of CPUs to be requested in each SLURM job.

max_cpus_per_job int

Maximum number of CPUs that can be requested in each SLURM job.

target_mem_per_job int

Optimal amount of memory (in MB) to be requested in each SLURM job.

max_mem_per_job int

Maximum amount of memory (in MB) that can be requested in each SLURM job.

target_num_jobs int

Optimal number of SLURM jobs for a given WorkflowTask.

max_num_jobs int

Maximum number of SLURM jobs for a given WorkflowTask.

user_local_exports Optional[dict[str, str]]

Key-value pairs to be included as export-ed variables in SLURM submission script, after prepending values with the user's cache directory.

pre_submission_commands list[str]

List of commands to be prepended to the sbatch command.

Source code in fractal_server/app/runner/executors/slurm/_slurm_config.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
class SlurmConfig(BaseModel, extra=Extra.forbid):
    """
    Abstraction for SLURM parameters

    **NOTE**: `SlurmConfig` objects are created internally in `fractal-server`,
    and they are not meant to be initialized by the user; the same holds for
    `SlurmConfig` attributes (e.g. `mem_per_task_MB`), which are not meant to
    be part of the `FRACTAL_SLURM_CONFIG_FILE` JSON file (details on the
    expected file content are defined in
    [`SlurmConfigFile`](./#fractal_server.app.runner._slurm._slurm_config.SlurmConfigFile)).

    Part of the attributes map directly to some of the SLURM attribues (see
    https://slurm.schedmd.com/sbatch.html), e.g. `partition`. Other attributes
    are metaparameters which are needed in fractal-server to combine multiple
    tasks in the same SLURM job (e.g. `parallel_tasks_per_job` or
    `max_num_jobs`).

    Attributes:
        partition: Corresponds to SLURM option.
        cpus_per_task: Corresponds to SLURM option.
        mem_per_task_MB: Corresponds to `mem` SLURM option.
        job_name: Corresponds to `name` SLURM option.
        constraint: Corresponds to SLURM option.
        gres: Corresponds to SLURM option.
        account: Corresponds to SLURM option.
        gpus: Corresponds to SLURM option.
        time: Corresponds to SLURM option (WARNING: not fully supported).
        prefix: Prefix of configuration lines in SLURM submission scripts.
        shebang_line: Shebang line for SLURM submission scripts.
        extra_lines: Additional lines to include in SLURM submission scripts.
        tasks_per_job: Number of tasks for each SLURM job.
        parallel_tasks_per_job: Number of tasks to run in parallel for
                                each SLURM job.
        target_cpus_per_job: Optimal number of CPUs to be requested in each
                             SLURM job.
        max_cpus_per_job: Maximum number of CPUs that can be requested in each
                          SLURM job.
        target_mem_per_job: Optimal amount of memory (in MB) to be requested in
                            each SLURM job.
        max_mem_per_job: Maximum amount of memory (in MB) that can be requested
                         in each SLURM job.
        target_num_jobs: Optimal number of SLURM jobs for a given WorkflowTask.
        max_num_jobs: Maximum number of SLURM jobs for a given WorkflowTask.
        user_local_exports:
            Key-value pairs to be included as `export`-ed variables in SLURM
            submission script, after prepending values with the user's cache
            directory.
        pre_submission_commands: List of commands to be prepended to the sbatch
            command.
    """

    # Required SLURM parameters (note that the integer attributes are those
    # that will need to scale up with the number of parallel tasks per job)
    partition: str
    cpus_per_task: int
    mem_per_task_MB: int
    prefix: str = "#SBATCH"
    shebang_line: str = "#!/bin/sh"

    # Optional SLURM parameters
    job_name: Optional[str] = None
    constraint: Optional[str] = None
    gres: Optional[str] = None
    gpus: Optional[str] = None
    time: Optional[str] = None
    account: Optional[str] = None

    # Free-field attribute for extra lines to be added to the SLURM job
    # preamble
    extra_lines: Optional[list[str]] = Field(default_factory=list)

    # Variables that will be `export`ed in the SLURM submission script
    user_local_exports: Optional[dict[str, str]] = None

    # Metaparameters needed to combine multiple tasks in each SLURM job
    tasks_per_job: Optional[int] = None
    parallel_tasks_per_job: Optional[int] = None
    target_cpus_per_job: int
    max_cpus_per_job: int
    target_mem_per_job: int
    max_mem_per_job: int
    target_num_jobs: int
    max_num_jobs: int

    pre_submission_commands: list[str] = Field(default_factory=list)

    def _sorted_extra_lines(self) -> list[str]:
        """
        Return a copy of `self.extra_lines`, where lines starting with
        `self.prefix` are listed first.
        """

        def _no_prefix(_line):
            if _line.startswith(self.prefix):
                return 0
            else:
                return 1

        return sorted(self.extra_lines, key=_no_prefix)

    def sort_script_lines(self, script_lines: list[str]) -> list[str]:
        """
        Return a copy of `script_lines`, where lines are sorted as in:

        1. `self.shebang_line` (if present);
        2. Lines starting with `self.prefix`;
        3. Other lines.

        Arguments:
            script_lines:
        """

        def _sorting_function(_line):
            if _line == self.shebang_line:
                return 0
            elif _line.startswith(self.prefix):
                return 1
            else:
                return 2

        return sorted(script_lines, key=_sorting_function)

    def to_sbatch_preamble(
        self,
        remote_export_dir: Optional[str] = None,
    ) -> list[str]:
        """
        Compile `SlurmConfig` object into the preamble of a SLURM submission
        script.

        Arguments:
            remote_export_dir:
                Base directory for exports defined in
                `self.user_local_exports`.
        """
        if self.parallel_tasks_per_job is None:
            raise ValueError(
                "SlurmConfig.sbatch_preamble requires that "
                f"{self.parallel_tasks_per_job=} is not None."
            )
        if self.extra_lines:
            if len(self.extra_lines) != len(set(self.extra_lines)):
                raise ValueError(f"{self.extra_lines=} contains repetitions")

        mem_per_job_MB = self.parallel_tasks_per_job * self.mem_per_task_MB
        lines = [
            self.shebang_line,
            f"{self.prefix} --partition={self.partition}",
            f"{self.prefix} --ntasks={self.parallel_tasks_per_job}",
            f"{self.prefix} --cpus-per-task={self.cpus_per_task}",
            f"{self.prefix} --mem={mem_per_job_MB}M",
        ]
        for key in [
            "job_name",
            "constraint",
            "gres",
            "gpus",
            "time",
            "account",
        ]:
            value = getattr(self, key)
            if value is not None:
                # Handle the `time` parameter
                if key == "time" and self.parallel_tasks_per_job > 1:
                    # FIXME SSH: time setting must be handled better. Right now
                    # we simply propagate `time`, but this is not enough when
                    # several `srun` are combined in a single script.
                    logger.warning(
                        f"`time` SLURM parameter is set to {self.time}, "
                        "but this does not take into account the number of "
                        f"SLURM tasks ({self.parallel_tasks_per_job})."
                    )
                option = key.replace("_", "-")
                lines.append(f"{self.prefix} --{option}={value}")

        if self.extra_lines:
            for line in self._sorted_extra_lines():
                lines.append(line)

        if self.user_local_exports:
            if remote_export_dir is None:
                raise ValueError(
                    f"remote_export_dir=None but {self.user_local_exports=}"
                )
            for key, value in self.user_local_exports.items():
                tmp_value = str(Path(remote_export_dir) / value)
                lines.append(f"export {key}={tmp_value}")

        """
        FIXME export SRUN_CPUS_PER_TASK
        # From https://slurm.schedmd.com/sbatch.html: Beginning with 22.05,
        # srun will not inherit the --cpus-per-task value requested by salloc
        # or sbatch.  It must be requested again with the call to srun or set
        # with the SRUN_CPUS_PER_TASK environment variable if desired for the
        # task(s).
        if config.cpus_per_task:
            #additional_setup_lines.append(
                f"export SRUN_CPUS_PER_TASK={config.cpus_per_task}"
            )
        """

        return lines

_sorted_extra_lines()

Return a copy of self.extra_lines, where lines starting with self.prefix are listed first.

Source code in fractal_server/app/runner/executors/slurm/_slurm_config.py
285
286
287
288
289
290
291
292
293
294
295
296
297
def _sorted_extra_lines(self) -> list[str]:
    """
    Return a copy of `self.extra_lines`, where lines starting with
    `self.prefix` are listed first.
    """

    def _no_prefix(_line):
        if _line.startswith(self.prefix):
            return 0
        else:
            return 1

    return sorted(self.extra_lines, key=_no_prefix)

sort_script_lines(script_lines)

Return a copy of script_lines, where lines are sorted as in:

  1. self.shebang_line (if present);
  2. Lines starting with self.prefix;
  3. Other lines.

Parameters:

Name Type Description Default
script_lines list[str]
required
Source code in fractal_server/app/runner/executors/slurm/_slurm_config.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def sort_script_lines(self, script_lines: list[str]) -> list[str]:
    """
    Return a copy of `script_lines`, where lines are sorted as in:

    1. `self.shebang_line` (if present);
    2. Lines starting with `self.prefix`;
    3. Other lines.

    Arguments:
        script_lines:
    """

    def _sorting_function(_line):
        if _line == self.shebang_line:
            return 0
        elif _line.startswith(self.prefix):
            return 1
        else:
            return 2

    return sorted(script_lines, key=_sorting_function)

to_sbatch_preamble(remote_export_dir=None)

Compile SlurmConfig object into the preamble of a SLURM submission script.

Parameters:

Name Type Description Default
remote_export_dir Optional[str]

Base directory for exports defined in self.user_local_exports.

None
Source code in fractal_server/app/runner/executors/slurm/_slurm_config.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def to_sbatch_preamble(
    self,
    remote_export_dir: Optional[str] = None,
) -> list[str]:
    """
    Compile `SlurmConfig` object into the preamble of a SLURM submission
    script.

    Arguments:
        remote_export_dir:
            Base directory for exports defined in
            `self.user_local_exports`.
    """
    if self.parallel_tasks_per_job is None:
        raise ValueError(
            "SlurmConfig.sbatch_preamble requires that "
            f"{self.parallel_tasks_per_job=} is not None."
        )
    if self.extra_lines:
        if len(self.extra_lines) != len(set(self.extra_lines)):
            raise ValueError(f"{self.extra_lines=} contains repetitions")

    mem_per_job_MB = self.parallel_tasks_per_job * self.mem_per_task_MB
    lines = [
        self.shebang_line,
        f"{self.prefix} --partition={self.partition}",
        f"{self.prefix} --ntasks={self.parallel_tasks_per_job}",
        f"{self.prefix} --cpus-per-task={self.cpus_per_task}",
        f"{self.prefix} --mem={mem_per_job_MB}M",
    ]
    for key in [
        "job_name",
        "constraint",
        "gres",
        "gpus",
        "time",
        "account",
    ]:
        value = getattr(self, key)
        if value is not None:
            # Handle the `time` parameter
            if key == "time" and self.parallel_tasks_per_job > 1:
                # FIXME SSH: time setting must be handled better. Right now
                # we simply propagate `time`, but this is not enough when
                # several `srun` are combined in a single script.
                logger.warning(
                    f"`time` SLURM parameter is set to {self.time}, "
                    "but this does not take into account the number of "
                    f"SLURM tasks ({self.parallel_tasks_per_job})."
                )
            option = key.replace("_", "-")
            lines.append(f"{self.prefix} --{option}={value}")

    if self.extra_lines:
        for line in self._sorted_extra_lines():
            lines.append(line)

    if self.user_local_exports:
        if remote_export_dir is None:
            raise ValueError(
                f"remote_export_dir=None but {self.user_local_exports=}"
            )
        for key, value in self.user_local_exports.items():
            tmp_value = str(Path(remote_export_dir) / value)
            lines.append(f"export {key}={tmp_value}")

    """
    FIXME export SRUN_CPUS_PER_TASK
    # From https://slurm.schedmd.com/sbatch.html: Beginning with 22.05,
    # srun will not inherit the --cpus-per-task value requested by salloc
    # or sbatch.  It must be requested again with the call to srun or set
    # with the SRUN_CPUS_PER_TASK environment variable if desired for the
    # task(s).
    if config.cpus_per_task:
        #additional_setup_lines.append(
            f"export SRUN_CPUS_PER_TASK={config.cpus_per_task}"
        )
    """

    return lines

SlurmConfigError

Bases: ValueError

Slurm configuration error

Source code in fractal_server/app/runner/executors/slurm/_slurm_config.py
32
33
34
35
36
37
class SlurmConfigError(ValueError):
    """
    Slurm configuration error
    """

    pass

SlurmConfigFile

Bases: BaseModel

Specifications for the content of FRACTAL_SLURM_CONFIG_FILE

This must be a JSON file, and a valid example is

{
  "default_slurm_config": {
      "partition": "main",
      "cpus_per_task": 1
  },
  "gpu_slurm_config": {
      "partition": "gpu",
      "extra_lines": ["#SBATCH --gres=gpu:v100:1"]
  },
  "batching_config": {
      "target_cpus_per_job": 1,
      "max_cpus_per_job": 1,
      "target_mem_per_job": 200,
      "max_mem_per_job": 500,
      "target_num_jobs": 2,
      "max_num_jobs": 4
  },
  "user_local_exports": {
      "CELLPOSE_LOCAL_MODELS_PATH": "CELLPOSE_LOCAL_MODELS_PATH",
      "NAPARI_CONFIG": "napari_config.json"
  }
}

See _SlurmConfigSet and _BatchingConfigSet for more details.

Attributes:

Name Type Description
default_slurm_config _SlurmConfigSet

Common default options for all tasks.

gpu_slurm_config Optional[_SlurmConfigSet]

Default configuration for all GPU tasks.

batching_config _BatchingConfigSet

Configuration of the batching strategy.

user_local_exports Optional[dict[str, str]]

Key-value pairs to be included as export-ed variables in SLURM submission script, after prepending values with the user's cache directory.

Source code in fractal_server/app/runner/executors/slurm/_slurm_config.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class SlurmConfigFile(BaseModel, extra=Extra.forbid):
    """
    Specifications for the content of `FRACTAL_SLURM_CONFIG_FILE`

    This must be a JSON file, and a valid example is
    ```JSON
    {
      "default_slurm_config": {
          "partition": "main",
          "cpus_per_task": 1
      },
      "gpu_slurm_config": {
          "partition": "gpu",
          "extra_lines": ["#SBATCH --gres=gpu:v100:1"]
      },
      "batching_config": {
          "target_cpus_per_job": 1,
          "max_cpus_per_job": 1,
          "target_mem_per_job": 200,
          "max_mem_per_job": 500,
          "target_num_jobs": 2,
          "max_num_jobs": 4
      },
      "user_local_exports": {
          "CELLPOSE_LOCAL_MODELS_PATH": "CELLPOSE_LOCAL_MODELS_PATH",
          "NAPARI_CONFIG": "napari_config.json"
      }
    }
    ```

    See `_SlurmConfigSet` and `_BatchingConfigSet` for more details.

    Attributes:
        default_slurm_config:
            Common default options for all tasks.
        gpu_slurm_config:
            Default configuration for all GPU tasks.
        batching_config:
            Configuration of the batching strategy.
        user_local_exports:
            Key-value pairs to be included as `export`-ed variables in SLURM
            submission script, after prepending values with the user's cache
            directory.
    """

    default_slurm_config: _SlurmConfigSet
    gpu_slurm_config: Optional[_SlurmConfigSet]
    batching_config: _BatchingConfigSet
    user_local_exports: Optional[dict[str, str]]

_BatchingConfigSet

Bases: BaseModel

Options that can be set in FRACTAL_SLURM_CONFIG_FILE to configure the batching strategy (that is, how to combine several tasks in a single SLURM job). Only used as part of SlurmConfigFile.

Attributes:

Name Type Description
target_cpus_per_job int
max_cpus_per_job int
target_mem_per_job Union[int, str]

(see _parse_mem_value for details on allowed values)

max_mem_per_job Union[int, str]

(see _parse_mem_value for details on allowed values)

target_num_jobs int
max_num_jobs int
Source code in fractal_server/app/runner/executors/slurm/_slurm_config.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class _BatchingConfigSet(BaseModel, extra=Extra.forbid):
    """
    Options that can be set in `FRACTAL_SLURM_CONFIG_FILE` to configure the
    batching strategy (that is, how to combine several tasks in a single SLURM
    job). Only used as part of `SlurmConfigFile`.

    Attributes:
        target_cpus_per_job:
        max_cpus_per_job:
        target_mem_per_job:
            (see `_parse_mem_value` for details on allowed values)
        max_mem_per_job:
            (see `_parse_mem_value` for details on allowed values)
        target_num_jobs:
        max_num_jobs:
    """

    target_cpus_per_job: int
    max_cpus_per_job: int
    target_mem_per_job: Union[int, str]
    max_mem_per_job: Union[int, str]
    target_num_jobs: int
    max_num_jobs: int

_SlurmConfigSet

Bases: BaseModel

Options that can be set in FRACTAL_SLURM_CONFIG_FILE for the default/gpu SLURM config. Only used as part of SlurmConfigFile.

Attributes:

Name Type Description
partition Optional[str]
cpus_per_task Optional[int]
mem Optional[Union[int, str]]

See _parse_mem_value for details on allowed values.

constraint Optional[str]
gres Optional[str]
time Optional[str]
account Optional[str]
extra_lines Optional[list[str]]
Source code in fractal_server/app/runner/executors/slurm/_slurm_config.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class _SlurmConfigSet(BaseModel, extra=Extra.forbid):
    """
    Options that can be set in `FRACTAL_SLURM_CONFIG_FILE` for the default/gpu
    SLURM config. Only used as part of `SlurmConfigFile`.

    Attributes:
        partition:
        cpus_per_task:
        mem:
            See `_parse_mem_value` for details on allowed values.
        constraint:
        gres:
        time:
        account:
        extra_lines:
    """

    partition: Optional[str]
    cpus_per_task: Optional[int]
    mem: Optional[Union[int, str]]
    constraint: Optional[str]
    gres: Optional[str]
    time: Optional[str]
    account: Optional[str]
    extra_lines: Optional[list[str]]
    pre_submission_commands: Optional[list[str]]
    gpus: Optional[str]

_parse_mem_value(raw_mem)

Convert a memory-specification string into an integer (in MB units), or simply return the input if it is already an integer.

Supported units are "M", "G", "T", with "M" being the default; some parsing examples are: "10M" -> 10000, "3G" -> 3000000.

Parameters:

Name Type Description Default
raw_mem Union[str, int]

A string (e.g. "100M") or an integer (in MB).

required

Returns:

Type Description
int

Integer value of memory in MB units.

Source code in fractal_server/app/runner/executors/slurm/_slurm_config.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def _parse_mem_value(raw_mem: Union[str, int]) -> int:
    """
    Convert a memory-specification string into an integer (in MB units), or
    simply return the input if it is already an integer.

    Supported units are `"M", "G", "T"`, with `"M"` being the default; some
    parsing examples are: `"10M" -> 10000`, `"3G" -> 3000000`.

    Arguments:
        raw_mem:
            A string (e.g. `"100M"`) or an integer (in MB).

    Returns:
        Integer value of memory in MB units.
    """

    info = f"[_parse_mem_value] {raw_mem=}"
    error_msg = (
        f"{info}, invalid specification of memory requirements "
        "(valid examples: 93, 71M, 93G, 71T)."
    )

    # Handle integer argument
    if isinstance(raw_mem, int):
        return raw_mem

    # Handle string argument
    if not raw_mem[0].isdigit():  # fail e.g. for raw_mem="M100"
        logger.error(error_msg)
        raise SlurmConfigError(error_msg)
    if raw_mem.isdigit():
        mem_MB = int(raw_mem)
    elif raw_mem.endswith("M"):
        stripped_raw_mem = raw_mem.strip("M")
        if not stripped_raw_mem.isdigit():
            logger.error(error_msg)
            raise SlurmConfigError(error_msg)
        mem_MB = int(stripped_raw_mem)
    elif raw_mem.endswith("G"):
        stripped_raw_mem = raw_mem.strip("G")
        if not stripped_raw_mem.isdigit():
            logger.error(error_msg)
            raise SlurmConfigError(error_msg)
        mem_MB = int(stripped_raw_mem) * 10**3
    elif raw_mem.endswith("T"):
        stripped_raw_mem = raw_mem.strip("T")
        if not stripped_raw_mem.isdigit():
            logger.error(error_msg)
            raise SlurmConfigError(error_msg)
        mem_MB = int(stripped_raw_mem) * 10**6
    else:
        logger.error(error_msg)
        raise SlurmConfigError(error_msg)

    logger.debug(f"{info}, return {mem_MB}")
    return mem_MB

load_slurm_config_file(config_path=None)

Load a SLURM configuration file and validate its content with SlurmConfigFile.

Parameters:

Name Type Description Default
config_path Optional[Path]
None
Source code in fractal_server/app/runner/executors/slurm/_slurm_config.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def load_slurm_config_file(
    config_path: Optional[Path] = None,
) -> SlurmConfigFile:
    """
    Load a SLURM configuration file and validate its content with
    `SlurmConfigFile`.

    Arguments:
        config_path:
    """

    if not config_path:
        settings = Inject(get_settings)
        config_path = settings.FRACTAL_SLURM_CONFIG_FILE

    # Load file
    logger.debug(f"[get_slurm_config] Now loading {config_path=}")
    try:
        with config_path.open("r") as f:
            slurm_env = json.load(f)
    except Exception as e:
        raise SlurmConfigError(
            f"Error while loading {config_path=}. "
            f"Original error:\n{str(e)}"
        )

    # Validate file content
    logger.debug(f"[load_slurm_config_file] Now validating {config_path=}")
    logger.debug(f"[load_slurm_config_file] {slurm_env=}")
    try:
        obj = SlurmConfigFile(**slurm_env)
    except ValidationError as e:
        raise SlurmConfigError(
            f"Error while loading {config_path=}. "
            f"Original error:\n{str(e)}"
        )

    # Convert memory to MB units, in all relevant attributes
    if obj.default_slurm_config.mem:
        obj.default_slurm_config.mem = _parse_mem_value(
            obj.default_slurm_config.mem
        )
    if obj.gpu_slurm_config and obj.gpu_slurm_config.mem:
        obj.gpu_slurm_config.mem = _parse_mem_value(obj.gpu_slurm_config.mem)
    obj.batching_config.target_mem_per_job = _parse_mem_value(
        obj.batching_config.target_mem_per_job
    )
    obj.batching_config.max_mem_per_job = _parse_mem_value(
        obj.batching_config.max_mem_per_job
    )

    return obj