Skip to content

_slurm

Slurm Bakend

This backend runs fractal workflows in a SLURM cluster using Clusterfutures Executor objects.

_process_workflow(*, workflow, input_paths, output_path, input_metadata, input_history, logger_name, workflow_dir_local, workflow_dir_remote, first_task_index, last_task_index, slurm_user=None, slurm_account=None, user_cache_dir, worker_init=None)

Internal processing routine for the SLURM backend

This function initialises the a FractalSlurmExecutor, setting logging, workflow working dir and user to impersonate. It then schedules the workflow tasks and returns the output dataset metadata.

Cf. process_workflow

Returns:

Name Type Description
output_dataset_metadata dict[str, Any]

Metadata of the output dataset

Source code in fractal_server/app/runner/v1/_slurm/__init__.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def _process_workflow(
    *,
    workflow: Workflow,
    input_paths: list[Path],
    output_path: Path,
    input_metadata: dict[str, Any],
    input_history: list[dict[str, Any]],
    logger_name: str,
    workflow_dir_local: Path,
    workflow_dir_remote: Path,
    first_task_index: int,
    last_task_index: int,
    slurm_user: Optional[str] = None,
    slurm_account: Optional[str] = None,
    user_cache_dir: str,
    worker_init: Optional[Union[str, list[str]]] = None,
) -> dict[str, Any]:
    """
    Internal processing routine for the SLURM backend

    This function initialises the a FractalSlurmExecutor, setting logging,
    workflow working dir and user to impersonate. It then schedules the
    workflow tasks and returns the output dataset metadata.

    Cf.
    [process_workflow][fractal_server.app.runner.v1._local.process_workflow]

    Returns:
        output_dataset_metadata: Metadata of the output dataset
    """

    if not slurm_user:
        raise RuntimeError(
            "slurm_user argument is required, for slurm backend"
        )

    if isinstance(worker_init, str):
        worker_init = worker_init.split("\n")

    with FractalSlurmExecutor(
        debug=True,
        keep_logs=True,
        slurm_user=slurm_user,
        user_cache_dir=user_cache_dir,
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
        common_script_lines=worker_init,
        slurm_account=slurm_account,
    ) as executor:
        output_task_pars = execute_tasks(
            executor=executor,
            task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)  # noqa
            ],  # noqa
            task_pars=TaskParameters(
                input_paths=input_paths,
                output_path=output_path,
                metadata=input_metadata,
                history=input_history,
            ),
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_remote,
            submit_setup_call=_slurm_submit_setup,
            logger_name=logger_name,
        )
    output_dataset_metadata_history = dict(
        metadata=output_task_pars.metadata, history=output_task_pars.history
    )
    return output_dataset_metadata_history

process_workflow(*, workflow, input_paths, output_path, input_metadata, input_history, logger_name, workflow_dir_local, workflow_dir_remote=None, user_cache_dir=None, slurm_user=None, slurm_account=None, worker_init=None, first_task_index=None, last_task_index=None) async

Process workflow (SLURM backend public interface)

Cf. process_workflow

Source code in fractal_server/app/runner/v1/_slurm/__init__.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def process_workflow(
    *,
    workflow: Workflow,
    input_paths: list[Path],
    output_path: Path,
    input_metadata: dict[str, Any],
    input_history: list[dict[str, Any]],
    logger_name: str,
    workflow_dir_local: Path,
    workflow_dir_remote: Optional[Path] = None,
    user_cache_dir: Optional[str] = None,
    slurm_user: Optional[str] = None,
    slurm_account: Optional[str] = None,
    worker_init: Optional[str] = None,
    first_task_index: Optional[int] = None,
    last_task_index: Optional[int] = None,
) -> dict[str, Any]:
    """
    Process workflow (SLURM backend public interface)

    Cf.
    [process_workflow][fractal_server.app.runner.v1._local.process_workflow]
    """

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    output_dataset_metadata_history = await async_wrap(_process_workflow)(
        workflow=workflow,
        input_paths=input_paths,
        output_path=output_path,
        input_metadata=input_metadata,
        input_history=input_history,
        logger_name=logger_name,
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
        slurm_user=slurm_user,
        slurm_account=slurm_account,
        user_cache_dir=user_cache_dir,
        worker_init=worker_init,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )
    return output_dataset_metadata_history