Skip to content

get_slurm_config

get_slurm_config(wftask, workflow_dir_local, workflow_dir_remote, config_path=None)

Prepare a SlurmConfig configuration object

The sources for SlurmConfig attributes, in increasing priority order, are

  1. The general content of the Fractal SLURM configuration file.
  2. The GPU-specific content of the Fractal SLURM configuration file, if appropriate.
  3. Properties in wftask.meta (which, for WorkflowTasks added through Workflow.insert_task, also includes wftask.task.meta);

Note: wftask.meta may be None.

Parameters:

Name Type Description Default
wftask WorkflowTask

WorkflowTask for which the SLURM configuration is is to be prepared.

required
workflow_dir_local Path

Server-owned directory to store all task-execution-related relevant files (inputs, outputs, errors, and all meta files related to the job execution). Note: users cannot write directly to this folder.

required
workflow_dir_remote Path

User-side directory with the same scope as workflow_dir_local, and where a user can write.

required
config_path Optional[Path]

Path of aFractal SLURM configuration file; if None, use FRACTAL_SLURM_CONFIG_FILE variable from settings.

None

Returns:

Name Type Description
slurm_config SlurmConfig

The SlurmConfig object

Source code in fractal_server/app/runner/v1/_slurm/get_slurm_config.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def get_slurm_config(
    wftask: WorkflowTask,
    workflow_dir_local: Path,
    workflow_dir_remote: Path,
    config_path: Optional[Path] = None,
) -> SlurmConfig:
    """
    Prepare a `SlurmConfig` configuration object

    The sources for `SlurmConfig` attributes, in increasing priority order, are

    1. The general content of the Fractal SLURM configuration file.
    2. The GPU-specific content of the Fractal SLURM configuration file, if
        appropriate.
    3. Properties in `wftask.meta` (which, for `WorkflowTask`s added through
       `Workflow.insert_task`, also includes `wftask.task.meta`);

    Note: `wftask.meta` may be `None`.

    Arguments:
        wftask:
            WorkflowTask for which the SLURM configuration is is to be
            prepared.
        workflow_dir_local:
            Server-owned directory to store all task-execution-related relevant
            files (inputs, outputs, errors, and all meta files related to the
            job execution). Note: users cannot write directly to this folder.
        workflow_dir_remote:
            User-side directory with the same scope as `workflow_dir_local`,
            and where a user can write.
        config_path:
            Path of aFractal  SLURM configuration file; if `None`, use
            `FRACTAL_SLURM_CONFIG_FILE` variable from settings.

    Returns:
        slurm_config:
            The SlurmConfig object
    """

    logger.debug(
        "[get_slurm_config] WorkflowTask meta attribute: {wftask.meta=}"
    )

    # Incorporate slurm_env.default_slurm_config
    slurm_env = load_slurm_config_file(config_path=config_path)
    slurm_dict = slurm_env.default_slurm_config.dict(
        exclude_unset=True, exclude={"mem"}
    )
    if slurm_env.default_slurm_config.mem:
        slurm_dict["mem_per_task_MB"] = slurm_env.default_slurm_config.mem

    # Incorporate slurm_env.batching_config
    for key, value in slurm_env.batching_config.dict().items():
        slurm_dict[key] = value

    # Incorporate slurm_env.user_local_exports
    slurm_dict["user_local_exports"] = slurm_env.user_local_exports

    logger.debug(
        "[get_slurm_config] Fractal SLURM configuration file: "
        f"{slurm_env.dict()=}"
    )

    # GPU-related options
    # Notes about priority:
    # 1. This block of definitions takes priority over other definitions from
    #    slurm_env which are not under the `needs_gpu` subgroup
    # 2. This block of definitions has lower priority than whatever comes next
    #    (i.e. from WorkflowTask.meta).
    if wftask.meta is not None:
        needs_gpu = wftask.meta.get("needs_gpu", False)
    else:
        needs_gpu = False
    logger.debug(f"[get_slurm_config] {needs_gpu=}")
    if needs_gpu:
        for key, value in slurm_env.gpu_slurm_config.dict(
            exclude_unset=True, exclude={"mem"}
        ).items():
            slurm_dict[key] = value
        if slurm_env.gpu_slurm_config.mem:
            slurm_dict["mem_per_task_MB"] = slurm_env.gpu_slurm_config.mem

    # Number of CPUs per task, for multithreading
    if wftask.meta is not None and "cpus_per_task" in wftask.meta:
        cpus_per_task = int(wftask.meta["cpus_per_task"])
        slurm_dict["cpus_per_task"] = cpus_per_task

    # Required memory per task, in MB
    if wftask.meta is not None and "mem" in wftask.meta:
        raw_mem = wftask.meta["mem"]
        mem_per_task_MB = _parse_mem_value(raw_mem)
        slurm_dict["mem_per_task_MB"] = mem_per_task_MB

    # Job name
    job_name = wftask.task.name.replace(" ", "_")
    slurm_dict["job_name"] = job_name

    # Optional SLURM arguments and extra lines
    if wftask.meta is not None:
        account = wftask.meta.get("account", None)
        if account is not None:
            error_msg = (
                f"Invalid {account=} property in WorkflowTask `meta` "
                "attribute.\n"
                "SLURM account must be set in the request body of the "
                "apply-workflow endpoint, or by modifying the user properties."
            )
            logger.error(error_msg)
            raise SlurmConfigError(error_msg)
        for key in ["time", "gres", "constraint"]:
            value = wftask.meta.get(key, None)
            if value:
                slurm_dict[key] = value
    if wftask.meta is not None:
        extra_lines = wftask.meta.get("extra_lines", [])
    else:
        extra_lines = []
    extra_lines = slurm_dict.get("extra_lines", []) + extra_lines
    if len(set(extra_lines)) != len(extra_lines):
        logger.debug(
            "[get_slurm_config] Removing repeated elements "
            f"from {extra_lines=}."
        )
        extra_lines = list(set(extra_lines))
    slurm_dict["extra_lines"] = extra_lines

    # Job-batching parameters (if None, they will be determined heuristically)
    if wftask.meta is not None:
        tasks_per_job = wftask.meta.get("tasks_per_job", None)
        parallel_tasks_per_job = wftask.meta.get(
            "parallel_tasks_per_job", None
        )
    else:
        tasks_per_job = None
        parallel_tasks_per_job = None
    slurm_dict["tasks_per_job"] = tasks_per_job
    slurm_dict["parallel_tasks_per_job"] = parallel_tasks_per_job

    # Put everything together
    logger.debug(
        "[get_slurm_config] Now create a SlurmConfig object based "
        f"on {slurm_dict=}"
    )
    slurm_config = SlurmConfig(**slurm_dict)

    return slurm_config