Skip to content

_slurm_sudo

Slurm Bakend

This backend runs fractal workflows in a SLURM cluster using Clusterfutures Executor objects.

_process_workflow(*, workflow, dataset, logger_name, workflow_dir_local, workflow_dir_remote, first_task_index, last_task_index, slurm_user=None, slurm_account=None, user_cache_dir, worker_init=None)

Internal processing routine for the SLURM backend

This function initialises the a FractalSlurmExecutor, setting logging, workflow working dir and user to impersonate. It then schedules the workflow tasks and returns the new dataset attributes

Cf. process_workflow

Returns:

Name Type Description
new_dataset_attributes dict[str, Any]
Source code in fractal_server/app/runner/v2/_slurm_sudo/__init__.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def _process_workflow(
    *,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    logger_name: str,
    workflow_dir_local: Path,
    workflow_dir_remote: Path,
    first_task_index: int,
    last_task_index: int,
    slurm_user: Optional[str] = None,
    slurm_account: Optional[str] = None,
    user_cache_dir: str,
    worker_init: Optional[Union[str, list[str]]] = None,
) -> dict[str, Any]:
    """
    Internal processing routine for the SLURM backend

    This function initialises the a FractalSlurmExecutor, setting logging,
    workflow working dir and user to impersonate. It then schedules the
    workflow tasks and returns the new dataset attributes

    Cf.
    [process_workflow][fractal_server.app.runner.v2._local.process_workflow]

    Returns:
        new_dataset_attributes:
    """

    if not slurm_user:
        raise RuntimeError(
            "slurm_user argument is required, for slurm backend"
        )

    if isinstance(worker_init, str):
        worker_init = worker_init.split("\n")

    with FractalSlurmExecutor(
        debug=True,
        keep_logs=True,
        slurm_user=slurm_user,
        user_cache_dir=user_cache_dir,
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
        common_script_lines=worker_init,
        slurm_account=slurm_account,
    ) as executor:
        new_dataset_attributes = execute_tasks_v2(
            wf_task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)  # noqa
            ],  # noqa
            dataset=dataset,
            executor=executor,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_remote,
            logger_name=logger_name,
            submit_setup_call=_slurm_submit_setup,
        )
    return new_dataset_attributes

process_workflow(*, workflow, dataset, workflow_dir_local, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, user_cache_dir=None, slurm_user=None, slurm_account=None, worker_init=None) async

Process workflow (SLURM backend public interface)

Cf. process_workflow

Source code in fractal_server/app/runner/v2/_slurm_sudo/__init__.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def process_workflow(
    *,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    workflow_dir_remote: Optional[Path] = None,
    first_task_index: Optional[int] = None,
    last_task_index: Optional[int] = None,
    logger_name: str,
    # Slurm-specific
    user_cache_dir: Optional[str] = None,
    slurm_user: Optional[str] = None,
    slurm_account: Optional[str] = None,
    worker_init: Optional[str] = None,
) -> dict:
    """
    Process workflow (SLURM backend public interface)

    Cf.
    [process_workflow][fractal_server.app.runner.v2._local.process_workflow]
    """

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    new_dataset_attributes = await async_wrap(_process_workflow)(
        workflow=workflow,
        dataset=dataset,
        logger_name=logger_name,
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
        user_cache_dir=user_cache_dir,
        slurm_user=slurm_user,
        slurm_account=slurm_account,
        worker_init=worker_init,
    )
    return new_dataset_attributes