Skip to content

_slurm_ssh

Slurm Backend

This backend runs fractal workflows in a SLURM cluster.

process_workflow(*, job_id, workflow, dataset, workflow_dir_local, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, job_attribute_filters, job_type_filters, user_id, resource, profile, fractal_ssh=None, slurm_account=None, worker_init=None, user_cache_dir)

Run a workflow through a slurm_ssh backend.

Args:
job_id: Job ID.
workflow: Workflow to be run
dataset: Dataset to be used.
workflow_dir_local: Local working directory for this job.
workflow_dir_remote:
    Remote working directory for this job - only relevant for
    `slurm_sudo` and `slurm_ssh` backends.
first_task_index:
    Positional index of the first task to execute; if `None`, start
    from `0`.
last_task_index:
    Positional index of the last task to execute; if `None`, proceed
    until the last task.
logger_name: Logger name
user_id: User ID.
resource: Computational resource for running this job.
profile: Computational profile for running this job.
user_cache_dir:
    User-writeable folder (typically a subfolder of `project_dirs`).
    Only relevant for `slurm_sudo` and `slurm_ssh` backends.
fractal_ssh:
    `FractalSSH` object, only relevant for the `slurm_ssh` backend.
slurm_account:
    SLURM account to set.
    Only relevant for `slurm_sudo` and `slurm_ssh` backends.
worker_init:
    Additional preamble lines for SLURM submission script.
    Only relevant for `slurm_sudo` and `slurm_ssh` backends.
Source code in fractal_server/runner/v2/_slurm_ssh.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def process_workflow(
    *,
    job_id: int,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    workflow_dir_remote: Path | None = None,
    first_task_index: int | None = None,
    last_task_index: int | None = None,
    logger_name: str,
    job_attribute_filters: AttributeFilters,
    job_type_filters: dict[str, bool],
    user_id: int,
    resource: Resource,
    profile: Profile,
    fractal_ssh: FractalSSH | None = None,
    slurm_account: str | None = None,
    worker_init: str | None = None,
    user_cache_dir: str,
) -> None:
    """
    Run a workflow through a `slurm_ssh` backend.

        Args:
        job_id: Job ID.
        workflow: Workflow to be run
        dataset: Dataset to be used.
        workflow_dir_local: Local working directory for this job.
        workflow_dir_remote:
            Remote working directory for this job - only relevant for
            `slurm_sudo` and `slurm_ssh` backends.
        first_task_index:
            Positional index of the first task to execute; if `None`, start
            from `0`.
        last_task_index:
            Positional index of the last task to execute; if `None`, proceed
            until the last task.
        logger_name: Logger name
        user_id: User ID.
        resource: Computational resource for running this job.
        profile: Computational profile for running this job.
        user_cache_dir:
            User-writeable folder (typically a subfolder of `project_dirs`).
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
        fractal_ssh:
            `FractalSSH` object, only relevant for the `slurm_ssh` backend.
        slurm_account:
            SLURM account to set.
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
        worker_init:
            Additional preamble lines for SLURM submission script.
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
    """

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    if isinstance(worker_init, str):
        worker_init = worker_init.split("\n")

    with SlurmSSHRunner(
        fractal_ssh=fractal_ssh,
        root_dir_local=workflow_dir_local,
        root_dir_remote=workflow_dir_remote,
        slurm_account=slurm_account,
        resource=resource,
        profile=profile,
        common_script_lines=worker_init,
        user_cache_dir=user_cache_dir,
    ) as runner:
        execute_tasks(
            wf_task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)
            ],
            dataset=dataset,
            job_id=job_id,
            runner=runner,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_remote,
            logger_name=logger_name,
            get_runner_config=get_slurm_config,
            job_attribute_filters=job_attribute_filters,
            job_type_filters=job_type_filters,
            user_id=user_id,
            resource_id=resource.id,
        )