Skip to content

_slurm_ssh

Slurm Backend

This backend runs fractal workflows in a SLURM cluster.

process_workflow(*, job_id, workflow, dataset, workflow_dir_local, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, job_attribute_filters, job_type_filters, user_id, resource, profile, fractal_ssh=None, slurm_account=None, worker_init=None, user_cache_dir)

Run a workflow through a slurm_ssh backend.

Args:
job_id: Job ID.
workflow: Workflow to be run
dataset: Dataset to be used.
workflow_dir_local: Local working directory for this job.
workflow_dir_remote:
    Remote working directory for this job - only relevant for
    `slurm_sudo` and `slurm_ssh` backends.
first_task_index:
    Positional index of the first task to execute; if `None`, start
    from `0`.
last_task_index:
    Positional index of the last task to execute; if `None`, proceed
    until the last task.
logger_name: Logger name
user_id: User ID.
resource: Computational resource for running this job.
profile: Computational profile for running this job.
user_cache_dir:
    User-writeable folder (typically a subfolder of `project_dir`).
    Only relevant for `slurm_sudo` and `slurm_ssh` backends.
fractal_ssh:
    `FractalSSH` object, only relevant for the `slurm_ssh` backend.
slurm_account:
    SLURM account to set.
    Only relevant for `slurm_sudo` and `slurm_ssh` backends.
worker_init:
    Additional preamble lines for SLURM submission script.
    Only relevant for `slurm_sudo` and `slurm_ssh` backends.
Source code in fractal_server/runner/v2/_slurm_ssh.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def process_workflow(
    *,
    job_id: int,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    workflow_dir_remote: Path | None = None,
    first_task_index: int | None = None,
    last_task_index: int | None = None,
    logger_name: str,
    job_attribute_filters: AttributeFilters,
    job_type_filters: dict[str, bool],
    user_id: int,
    resource: Resource,
    profile: Profile,
    fractal_ssh: FractalSSH | None = None,
    slurm_account: str | None = None,
    worker_init: str | None = None,
    user_cache_dir: str,
) -> None:
    """
    Run a workflow through a `slurm_ssh` backend.

        Args:
        job_id: Job ID.
        workflow: Workflow to be run
        dataset: Dataset to be used.
        workflow_dir_local: Local working directory for this job.
        workflow_dir_remote:
            Remote working directory for this job - only relevant for
            `slurm_sudo` and `slurm_ssh` backends.
        first_task_index:
            Positional index of the first task to execute; if `None`, start
            from `0`.
        last_task_index:
            Positional index of the last task to execute; if `None`, proceed
            until the last task.
        logger_name: Logger name
        user_id: User ID.
        resource: Computational resource for running this job.
        profile: Computational profile for running this job.
        user_cache_dir:
            User-writeable folder (typically a subfolder of `project_dir`).
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
        fractal_ssh:
            `FractalSSH` object, only relevant for the `slurm_ssh` backend.
        slurm_account:
            SLURM account to set.
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
        worker_init:
            Additional preamble lines for SLURM submission script.
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
    """

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    if isinstance(worker_init, str):
        worker_init = worker_init.split("\n")

    with SlurmSSHRunner(
        fractal_ssh=fractal_ssh,
        root_dir_local=workflow_dir_local,
        root_dir_remote=workflow_dir_remote,
        slurm_account=slurm_account,
        resource=resource,
        profile=profile,
        common_script_lines=worker_init,
        user_cache_dir=user_cache_dir,
    ) as runner:
        execute_tasks_v2(
            wf_task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)
            ],
            dataset=dataset,
            job_id=job_id,
            runner=runner,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_remote,
            logger_name=logger_name,
            get_runner_config=get_slurm_config,
            job_attribute_filters=job_attribute_filters,
            job_type_filters=job_type_filters,
            user_id=user_id,
        )