Skip to content

_slurm_ssh

Slurm Bakend

This backend runs fractal workflows in a SLURM cluster using Clusterfutures Executor objects.

process_workflow(*, workflow, dataset, workflow_dir_local, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, job_attribute_filters, fractal_ssh, worker_init=None, user_id, user_cache_dir=None, slurm_user=None, slurm_account=None)

Process workflow (SLURM backend public interface)

Source code in fractal_server/app/runner/v2/_slurm_ssh/__init__.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def process_workflow(
    *,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    workflow_dir_remote: Optional[Path] = None,
    first_task_index: Optional[int] = None,
    last_task_index: Optional[int] = None,
    logger_name: str,
    job_attribute_filters: AttributeFiltersType,
    fractal_ssh: FractalSSH,
    worker_init: Optional[str] = None,
    user_id: int,
    # Not used
    user_cache_dir: Optional[str] = None,
    slurm_user: Optional[str] = None,
    slurm_account: Optional[str] = None,
) -> None:
    """
    Process workflow (SLURM backend public interface)
    """

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    if isinstance(worker_init, str):
        worker_init = worker_init.split("\n")

    # Create main remote folder
    try:
        fractal_ssh.mkdir(folder=str(workflow_dir_remote))
        logger.info(f"Created {str(workflow_dir_remote)} via SSH.")
    except Exception as e:
        error_msg = (
            f"Could not create {str(workflow_dir_remote)} via SSH.\n"
            f"Original error: {str(e)}."
        )
        logger.error(error_msg)
        raise JobExecutionError(info=error_msg)

    with FractalSlurmSSHExecutor(
        fractal_ssh=fractal_ssh,
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
        common_script_lines=worker_init,
    ) as executor:
        execute_tasks_v2(
            wf_task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)
            ],
            dataset=dataset,
            executor=executor,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_remote,
            logger_name=logger_name,
            submit_setup_call=_slurm_submit_setup,
            job_attribute_filters=job_attribute_filters,
            user_id=user_id,
        )