Skip to content

_slurm_ssh

Slurm Bakend

This backend runs fractal workflows in a SLURM cluster using Clusterfutures Executor objects.

_process_workflow(*, workflow, dataset, logger_name, workflow_dir_local, workflow_dir_remote, first_task_index, last_task_index, fractal_ssh, worker_init=None)

Internal processing routine for the SLURM backend

This function initialises the a FractalSlurmExecutor, setting logging, workflow working dir and user to impersonate. It then schedules the workflow tasks and returns the new dataset attributes

Returns:

Name Type Description
new_dataset_attributes dict[str, Any]
Source code in fractal_server/app/runner/v2/_slurm_ssh/__init__.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def _process_workflow(
    *,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    logger_name: str,
    workflow_dir_local: Path,
    workflow_dir_remote: Path,
    first_task_index: int,
    last_task_index: int,
    fractal_ssh: FractalSSH,
    worker_init: Optional[Union[str, list[str]]] = None,
) -> dict[str, Any]:
    """
    Internal processing routine for the SLURM backend

    This function initialises the a FractalSlurmExecutor, setting logging,
    workflow working dir and user to impersonate. It then schedules the
    workflow tasks and returns the new dataset attributes

    Returns:
        new_dataset_attributes:
    """

    if isinstance(worker_init, str):
        worker_init = worker_init.split("\n")

    # Create main remote folder
    try:
        fractal_ssh.mkdir(folder=str(workflow_dir_remote))
        logger.info(f"Created {str(workflow_dir_remote)} via SSH.")
    except Exception as e:
        error_msg = (
            f"Could not create {str(workflow_dir_remote)} via SSH.\n"
            f"Original error: {str(e)}."
        )
        logger.error(error_msg)
        raise JobExecutionError(info=error_msg)

    with FractalSlurmSSHExecutor(
        fractal_ssh=fractal_ssh,
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
        common_script_lines=worker_init,
    ) as executor:
        new_dataset_attributes = execute_tasks_v2(
            wf_task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)  # noqa
            ],  # noqa
            dataset=dataset,
            executor=executor,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_remote,
            logger_name=logger_name,
            submit_setup_call=_slurm_submit_setup,
        )
    return new_dataset_attributes

process_workflow(*, workflow, dataset, workflow_dir_local, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, fractal_ssh, user_cache_dir=None, slurm_user=None, slurm_account=None, worker_init=None) async

Process workflow (SLURM backend public interface)

Source code in fractal_server/app/runner/v2/_slurm_ssh/__init__.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def process_workflow(
    *,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    workflow_dir_remote: Optional[Path] = None,
    first_task_index: Optional[int] = None,
    last_task_index: Optional[int] = None,
    logger_name: str,
    # Not used
    fractal_ssh: FractalSSH,
    user_cache_dir: Optional[str] = None,
    slurm_user: Optional[str] = None,
    slurm_account: Optional[str] = None,
    worker_init: Optional[str] = None,
) -> dict:
    """
    Process workflow (SLURM backend public interface)
    """

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    new_dataset_attributes = await async_wrap(_process_workflow)(
        workflow=workflow,
        dataset=dataset,
        logger_name=logger_name,
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
        worker_init=worker_init,
        fractal_ssh=fractal_ssh,
    )
    return new_dataset_attributes