Skip to content

slurm_config

Submodule to handle the SLURM configuration for a WorkflowTask

SlurmConfig

Bases: BaseModel

Abstraction for SLURM parameters

NOTE: SlurmConfig objects are created internally in fractal-server, and they are not meant to be initialized by the user; the same holds for SlurmConfig attributes (e.g. mem_per_task_MB), which are not meant to be part of the superuser-defined resource.jobs_runner_config JSON field.

Part of the attributes map directly to some of the SLURM attributes (see https://slurm.schedmd.com/sbatch.html), e.g. partition. Other attributes are metaparameters which are needed in fractal-server to combine multiple tasks in the same SLURM job (e.g. parallel_tasks_per_job or max_num_jobs).

Attributes:

Name Type Description
partition str

Corresponds to SLURM option.

cpus_per_task int

Corresponds to SLURM option.

mem_per_task_MB int

Corresponds to mem SLURM option.

job_name str | None

Corresponds to name SLURM option.

constraint str | None

Corresponds to SLURM option.

gres str | None

Corresponds to SLURM option.

account str | None

Corresponds to SLURM option.

gpus str | None

Corresponds to SLURM option.

time str | None

Corresponds to SLURM option (WARNING: not fully supported).

nodelist str | None

Corresponds to SLURM option.

exclude str | None

Corresponds to SLURM option.

prefix str

Prefix of configuration lines in SLURM submission scripts.

shebang_line str

Shebang line for SLURM submission scripts.

extra_lines list[str] | None

Additional lines to include in SLURM submission scripts.

tasks_per_job int | None

Number of tasks for each SLURM job.

parallel_tasks_per_job int | None

Number of tasks to run in parallel for each SLURM job.

target_cpus_per_job int

Optimal number of CPUs to be requested in each SLURM job.

max_cpus_per_job int

Maximum number of CPUs that can be requested in each SLURM job.

target_mem_per_job int

Optimal amount of memory (in MB) to be requested in each SLURM job.

max_mem_per_job int

Maximum amount of memory (in MB) that can be requested in each SLURM job.

target_num_jobs int

Optimal number of SLURM jobs for a given WorkflowTask.

max_num_jobs int

Maximum number of SLURM jobs for a given WorkflowTask.

user_local_exports dict[str, str] | None

Key-value pairs to be included as export-ed variables in SLURM submission script, after prepending values with the user's cache directory.

Source code in fractal_server/runner/executors/slurm_common/slurm_config.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
class SlurmConfig(BaseModel):
    """
    Abstraction for SLURM parameters

    **NOTE**: `SlurmConfig` objects are created internally in `fractal-server`,
    and they are not meant to be initialized by the user; the same holds for
    `SlurmConfig` attributes (e.g. `mem_per_task_MB`), which are not meant to
    be part of the superuser-defined `resource.jobs_runner_config` JSON field.

    Part of the attributes map directly to some of the SLURM attributes (see
    https://slurm.schedmd.com/sbatch.html), e.g. `partition`. Other attributes
    are metaparameters which are needed in fractal-server to combine multiple
    tasks in the same SLURM job (e.g. `parallel_tasks_per_job` or
    `max_num_jobs`).

    Attributes:
        partition: Corresponds to SLURM option.
        cpus_per_task: Corresponds to SLURM option.
        mem_per_task_MB: Corresponds to `mem` SLURM option.
        job_name: Corresponds to `name` SLURM option.
        constraint: Corresponds to SLURM option.
        gres: Corresponds to SLURM option.
        account: Corresponds to SLURM option.
        gpus: Corresponds to SLURM option.
        time: Corresponds to SLURM option (WARNING: not fully supported).
        nodelist: Corresponds to SLURM option.
        exclude: Corresponds to SLURM option.
        prefix: Prefix of configuration lines in SLURM submission scripts.
        shebang_line: Shebang line for SLURM submission scripts.
        extra_lines: Additional lines to include in SLURM submission scripts.
        tasks_per_job: Number of tasks for each SLURM job.
        parallel_tasks_per_job: Number of tasks to run in parallel for
                                each SLURM job.
        target_cpus_per_job: Optimal number of CPUs to be requested in each
                             SLURM job.
        max_cpus_per_job: Maximum number of CPUs that can be requested in each
                          SLURM job.
        target_mem_per_job: Optimal amount of memory (in MB) to be requested in
                            each SLURM job.
        max_mem_per_job: Maximum amount of memory (in MB) that can be requested
                         in each SLURM job.
        target_num_jobs: Optimal number of SLURM jobs for a given WorkflowTask.
        max_num_jobs: Maximum number of SLURM jobs for a given WorkflowTask.
        user_local_exports:
            Key-value pairs to be included as `export`-ed variables in SLURM
            submission script, after prepending values with the user's cache
            directory.
    """

    model_config = ConfigDict(extra="forbid")

    # Required SLURM parameters (note that the integer attributes are those
    # that will need to scale up with the number of parallel tasks per job)
    partition: str
    cpus_per_task: int
    mem_per_task_MB: int
    prefix: str = "#SBATCH"
    shebang_line: str = "#!/bin/sh"

    # Optional SLURM parameters
    job_name: str | None = None
    constraint: str | None = None
    gres: str | None = None
    gpus: str | None = None
    time: str | None = None
    account: str | None = None
    nodelist: str | None = None
    exclude: str | None = None

    # Free-field attribute for extra lines to be added to the SLURM job
    # preamble
    extra_lines: list[str] | None = Field(default_factory=list)

    # Variables that will be `export`ed in the SLURM submission script
    user_local_exports: dict[str, str] | None = None

    # Metaparameters needed to combine multiple tasks in each SLURM job
    tasks_per_job: int | None = None
    parallel_tasks_per_job: int | None = None
    target_cpus_per_job: int
    max_cpus_per_job: int
    target_mem_per_job: int
    max_mem_per_job: int
    target_num_jobs: int
    max_num_jobs: int

    def _sorted_extra_lines(self) -> list[str]:
        """
        Return a copy of `self.extra_lines`, where lines starting with
        `self.prefix` are listed first.
        """

        def _no_prefix(_line):
            if _line.startswith(self.prefix):
                return 0
            else:
                return 1

        return sorted(self.extra_lines, key=_no_prefix)

    def sort_script_lines(self, script_lines: list[str]) -> list[str]:
        """
        Return a copy of `script_lines`, where lines are sorted as in:

        1. `self.shebang_line` (if present);
        2. Lines starting with `self.prefix`;
        3. Other lines.

        Args:
            script_lines:
        """

        def _sorting_function(_line):
            if _line == self.shebang_line:
                return 0
            elif _line.startswith(self.prefix):
                return 1
            else:
                return 2

        return sorted(script_lines, key=_sorting_function)

    def to_sbatch_preamble(
        self,
        remote_export_dir: str | None = None,
    ) -> list[str]:
        """
        Compile `SlurmConfig` object into the preamble of a SLURM submission
        script.

        Args:
            remote_export_dir:
                Base directory for exports defined in
                `self.user_local_exports`.
        """
        if self.parallel_tasks_per_job is None:
            raise ValueError(
                "SlurmConfig.sbatch_preamble requires that "
                f"{self.parallel_tasks_per_job=} is not None."
            )
        if self.extra_lines:
            if len(self.extra_lines) != len(set(self.extra_lines)):
                raise ValueError(f"{self.extra_lines=} contains repetitions")

        mem_per_job_MB = self.parallel_tasks_per_job * self.mem_per_task_MB
        lines = [
            self.shebang_line,
            f"{self.prefix} --partition={self.partition}",
            f"{self.prefix} --ntasks={self.parallel_tasks_per_job}",
            f"{self.prefix} --cpus-per-task={self.cpus_per_task}",
            f"{self.prefix} --mem={mem_per_job_MB}M",
        ]
        for key in [
            "job_name",
            "constraint",
            "gres",
            "gpus",
            "time",
            "account",
            "exclude",
            "nodelist",
        ]:
            value = getattr(self, key)
            if value is not None:
                # Handle the `time` parameter
                if key == "time" and self.parallel_tasks_per_job > 1:
                    # NOTE: see issue #1632
                    logger.warning(
                        f"`time` SLURM parameter is set to {self.time}, "
                        "but this does not take into account the number of "
                        f"SLURM tasks ({self.parallel_tasks_per_job})."
                    )
                option = key.replace("_", "-")
                lines.append(f"{self.prefix} --{option}={value}")

        if self.extra_lines:
            for line in self._sorted_extra_lines():
                lines.append(line)

        if self.user_local_exports:
            if remote_export_dir is None:
                raise ValueError(
                    f"remote_export_dir=None but {self.user_local_exports=}"
                )
            for key, value in self.user_local_exports.items():
                tmp_value = str(Path(remote_export_dir) / value)
                lines.append(f"export {key}={tmp_value}")

        """
        FIXME export SRUN_CPUS_PER_TASK
        # From https://slurm.schedmd.com/sbatch.html: Beginning with 22.05,
        # srun will not inherit the --cpus-per-task value requested by salloc
        # or sbatch.  It must be requested again with the call to srun or set
        # with the SRUN_CPUS_PER_TASK environment variable if desired for the
        # task(s).
        if config.cpus_per_task:
            #additional_setup_lines.append(
                f"export SRUN_CPUS_PER_TASK={config.cpus_per_task}"
            )
        """

        return lines

    @property
    def batch_size(self) -> int:
        return self.tasks_per_job

_sorted_extra_lines()

Return a copy of self.extra_lines, where lines starting with self.prefix are listed first.

Source code in fractal_server/runner/executors/slurm_common/slurm_config.py
101
102
103
104
105
106
107
108
109
110
111
112
113
def _sorted_extra_lines(self) -> list[str]:
    """
    Return a copy of `self.extra_lines`, where lines starting with
    `self.prefix` are listed first.
    """

    def _no_prefix(_line):
        if _line.startswith(self.prefix):
            return 0
        else:
            return 1

    return sorted(self.extra_lines, key=_no_prefix)

sort_script_lines(script_lines)

Return a copy of script_lines, where lines are sorted as in:

  1. self.shebang_line (if present);
  2. Lines starting with self.prefix;
  3. Other lines.

Parameters:

Name Type Description Default
script_lines list[str]
required
Source code in fractal_server/runner/executors/slurm_common/slurm_config.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def sort_script_lines(self, script_lines: list[str]) -> list[str]:
    """
    Return a copy of `script_lines`, where lines are sorted as in:

    1. `self.shebang_line` (if present);
    2. Lines starting with `self.prefix`;
    3. Other lines.

    Args:
        script_lines:
    """

    def _sorting_function(_line):
        if _line == self.shebang_line:
            return 0
        elif _line.startswith(self.prefix):
            return 1
        else:
            return 2

    return sorted(script_lines, key=_sorting_function)

to_sbatch_preamble(remote_export_dir=None)

Compile SlurmConfig object into the preamble of a SLURM submission script.

Parameters:

Name Type Description Default
remote_export_dir str | None

Base directory for exports defined in self.user_local_exports.

None
Source code in fractal_server/runner/executors/slurm_common/slurm_config.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def to_sbatch_preamble(
    self,
    remote_export_dir: str | None = None,
) -> list[str]:
    """
    Compile `SlurmConfig` object into the preamble of a SLURM submission
    script.

    Args:
        remote_export_dir:
            Base directory for exports defined in
            `self.user_local_exports`.
    """
    if self.parallel_tasks_per_job is None:
        raise ValueError(
            "SlurmConfig.sbatch_preamble requires that "
            f"{self.parallel_tasks_per_job=} is not None."
        )
    if self.extra_lines:
        if len(self.extra_lines) != len(set(self.extra_lines)):
            raise ValueError(f"{self.extra_lines=} contains repetitions")

    mem_per_job_MB = self.parallel_tasks_per_job * self.mem_per_task_MB
    lines = [
        self.shebang_line,
        f"{self.prefix} --partition={self.partition}",
        f"{self.prefix} --ntasks={self.parallel_tasks_per_job}",
        f"{self.prefix} --cpus-per-task={self.cpus_per_task}",
        f"{self.prefix} --mem={mem_per_job_MB}M",
    ]
    for key in [
        "job_name",
        "constraint",
        "gres",
        "gpus",
        "time",
        "account",
        "exclude",
        "nodelist",
    ]:
        value = getattr(self, key)
        if value is not None:
            # Handle the `time` parameter
            if key == "time" and self.parallel_tasks_per_job > 1:
                # NOTE: see issue #1632
                logger.warning(
                    f"`time` SLURM parameter is set to {self.time}, "
                    "but this does not take into account the number of "
                    f"SLURM tasks ({self.parallel_tasks_per_job})."
                )
            option = key.replace("_", "-")
            lines.append(f"{self.prefix} --{option}={value}")

    if self.extra_lines:
        for line in self._sorted_extra_lines():
            lines.append(line)

    if self.user_local_exports:
        if remote_export_dir is None:
            raise ValueError(
                f"remote_export_dir=None but {self.user_local_exports=}"
            )
        for key, value in self.user_local_exports.items():
            tmp_value = str(Path(remote_export_dir) / value)
            lines.append(f"export {key}={tmp_value}")

    """
    FIXME export SRUN_CPUS_PER_TASK
    # From https://slurm.schedmd.com/sbatch.html: Beginning with 22.05,
    # srun will not inherit the --cpus-per-task value requested by salloc
    # or sbatch.  It must be requested again with the call to srun or set
    # with the SRUN_CPUS_PER_TASK environment variable if desired for the
    # task(s).
    if config.cpus_per_task:
        #additional_setup_lines.append(
            f"export SRUN_CPUS_PER_TASK={config.cpus_per_task}"
        )
    """

    return lines