Skip to content

get_slurm_config

get_slurm_config(wftask, which_type, config_path=None)

Prepare a SlurmConfig configuration object

The argument which_type determines whether we use wftask.meta_parallel or wftask.meta_non_parallel. In the following descritpion, let us assume that which_type="parallel".

The sources for SlurmConfig attributes, in increasing priority order, are

  1. The general content of the Fractal SLURM configuration file.
  2. The GPU-specific content of the Fractal SLURM configuration file, if appropriate.
  3. Properties in wftask.meta_parallel (which typically include those in wftask.task.meta_parallel). Note that wftask.meta_parallel may be None.

Parameters:

Name Type Description Default
wftask WorkflowTaskV2

WorkflowTask for which the SLURM configuration is is to be prepared.

required
config_path Optional[Path]

Path of a Fractal SLURM configuration file; if None, use FRACTAL_SLURM_CONFIG_FILE variable from settings.

None
which_type Literal['non_parallel', 'parallel']

Determines whether to use meta_parallel or meta_non_parallel.

required

Returns:

Name Type Description
slurm_config SlurmConfig

The SlurmConfig object

Source code in fractal_server/app/runner/v2/_slurm_common/get_slurm_config.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def get_slurm_config(
    wftask: WorkflowTaskV2,
    which_type: Literal["non_parallel", "parallel"],
    config_path: Optional[Path] = None,
) -> SlurmConfig:
    """
    Prepare a `SlurmConfig` configuration object

    The argument `which_type` determines whether we use `wftask.meta_parallel`
    or `wftask.meta_non_parallel`. In the following descritpion, let us assume
    that `which_type="parallel"`.

    The sources for `SlurmConfig` attributes, in increasing priority order, are

    1. The general content of the Fractal SLURM configuration file.
    2. The GPU-specific content of the Fractal SLURM configuration file, if
        appropriate.
    3. Properties in `wftask.meta_parallel` (which typically include those in
       `wftask.task.meta_parallel`). Note that `wftask.meta_parallel` may be
       `None`.

    Arguments:
        wftask:
            WorkflowTask for which the SLURM configuration is is to be
            prepared.
        config_path:
            Path of a Fractal SLURM configuration file; if `None`, use
            `FRACTAL_SLURM_CONFIG_FILE` variable from settings.
        which_type:
            Determines whether to use `meta_parallel` or `meta_non_parallel`.

    Returns:
        slurm_config:
            The SlurmConfig object
    """

    if which_type == "non_parallel":
        wftask_meta = wftask.meta_non_parallel
    elif which_type == "parallel":
        wftask_meta = wftask.meta_parallel
    else:
        raise ValueError(
            f"get_slurm_config received invalid argument {which_type=}."
        )

    logger.debug(
        f"[get_slurm_config] WorkflowTask meta attribute: {wftask_meta=}"
    )

    # Incorporate slurm_env.default_slurm_config
    slurm_env = load_slurm_config_file(config_path=config_path)
    slurm_dict = slurm_env.default_slurm_config.dict(
        exclude_unset=True, exclude={"mem"}
    )
    if slurm_env.default_slurm_config.mem:
        slurm_dict["mem_per_task_MB"] = slurm_env.default_slurm_config.mem

    # Incorporate slurm_env.batching_config
    for key, value in slurm_env.batching_config.dict().items():
        slurm_dict[key] = value

    # Incorporate slurm_env.user_local_exports
    slurm_dict["user_local_exports"] = slurm_env.user_local_exports

    logger.debug(
        "[get_slurm_config] Fractal SLURM configuration file: "
        f"{slurm_env.dict()=}"
    )

    # GPU-related options
    # Notes about priority:
    # 1. This block of definitions takes priority over other definitions from
    #    slurm_env which are not under the `needs_gpu` subgroup
    # 2. This block of definitions has lower priority than whatever comes next
    #    (i.e. from WorkflowTask.meta_parallel).
    if wftask_meta is not None:
        needs_gpu = wftask_meta.get("needs_gpu", False)
    else:
        needs_gpu = False
    logger.debug(f"[get_slurm_config] {needs_gpu=}")
    if needs_gpu:
        for key, value in slurm_env.gpu_slurm_config.dict(
            exclude_unset=True, exclude={"mem"}
        ).items():
            slurm_dict[key] = value
        if slurm_env.gpu_slurm_config.mem:
            slurm_dict["mem_per_task_MB"] = slurm_env.gpu_slurm_config.mem

    # Number of CPUs per task, for multithreading
    if wftask_meta is not None and "cpus_per_task" in wftask_meta:
        cpus_per_task = int(wftask_meta["cpus_per_task"])
        slurm_dict["cpus_per_task"] = cpus_per_task

    # Required memory per task, in MB
    if wftask_meta is not None and "mem" in wftask_meta:
        raw_mem = wftask_meta["mem"]
        mem_per_task_MB = _parse_mem_value(raw_mem)
        slurm_dict["mem_per_task_MB"] = mem_per_task_MB

    # Job name
    job_name = wftask.task.name.replace(" ", "_")
    slurm_dict["job_name"] = job_name

    # Optional SLURM arguments and extra lines
    if wftask_meta is not None:
        account = wftask_meta.get("account", None)
        if account is not None:
            error_msg = (
                f"Invalid {account=} property in WorkflowTask `meta` "
                "attribute.\n"
                "SLURM account must be set in the request body of the "
                "apply-workflow endpoint, or by modifying the user properties."
            )
            logger.error(error_msg)
            raise SlurmConfigError(error_msg)
        for key in ["time", "gres", "gpus", "constraint"]:
            value = wftask_meta.get(key, None)
            if value is not None:
                slurm_dict[key] = value
    if wftask_meta is not None:
        extra_lines = wftask_meta.get("extra_lines", [])
    else:
        extra_lines = []
    extra_lines = slurm_dict.get("extra_lines", []) + extra_lines
    if len(set(extra_lines)) != len(extra_lines):
        logger.debug(
            "[get_slurm_config] Removing repeated elements "
            f"from {extra_lines=}."
        )
        extra_lines = list(set(extra_lines))
    slurm_dict["extra_lines"] = extra_lines

    # Job-batching parameters (if None, they will be determined heuristically)
    if wftask_meta is not None:
        tasks_per_job = wftask_meta.get("tasks_per_job", None)
        parallel_tasks_per_job = wftask_meta.get(
            "parallel_tasks_per_job", None
        )
    else:
        tasks_per_job = None
        parallel_tasks_per_job = None
    slurm_dict["tasks_per_job"] = tasks_per_job
    slurm_dict["parallel_tasks_per_job"] = parallel_tasks_per_job

    # Put everything together
    logger.debug(
        "[get_slurm_config] Now create a SlurmConfig object based "
        f"on {slurm_dict=}"
    )
    slurm_config = SlurmConfig(**slurm_dict)

    return slurm_config