Skip to content

_slurm_ssh

Slurm Backend

This backend runs fractal workflows in a SLURM cluster.

process_workflow(*, job_id, workflow, dataset, workflow_dir_local, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, job_attribute_filters, job_type_filters, user_id, resource, profile, fractal_ssh=None, slurm_account=None, worker_init=None, user_cache_dir)

Run a workflow through a slurm_ssh backend.

PARAMETER DESCRIPTION
job_id

Job ID.

TYPE: int

workflow

Workflow to be run

TYPE: WorkflowV2

dataset

Dataset to be used.

TYPE: DatasetV2

workflow_dir_local

Local working directory for this job.

TYPE: Path

workflow_dir_remote

Remote working directory for this job - only relevant for slurm_sudo and slurm_ssh backends.

TYPE: Path | None DEFAULT: None

first_task_index

Positional index of the first task to execute; if None, start from 0.

TYPE: int | None DEFAULT: None

last_task_index

Positional index of the last task to execute; if None, proceed until the last task.

TYPE: int | None DEFAULT: None

logger_name

Logger name

TYPE: str

user_id

User ID.

TYPE: int

resource

Computational resource for running this job.

TYPE: Resource

profile

Computational profile for running this job.

TYPE: Profile

user_cache_dir

User-writeable folder to be exposed as FRACTAL_CACHE_DIR.

TYPE: str

fractal_ssh

FractalSSH object, only relevant for the slurm_ssh backend.

TYPE: FractalSSH | None DEFAULT: None

slurm_account

SLURM account to set. Only relevant for slurm_sudo and slurm_ssh backends.

TYPE: str | None DEFAULT: None

worker_init

Additional preamble lines for SLURM submission script. Only relevant for slurm_sudo and slurm_ssh backends.

TYPE: str | None DEFAULT: None

Source code in fractal_server/runner/v2/_slurm_ssh.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def process_workflow(
    *,
    job_id: int,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    workflow_dir_remote: Path | None = None,
    first_task_index: int | None = None,
    last_task_index: int | None = None,
    logger_name: str,
    job_attribute_filters: AttributeFilters,
    job_type_filters: dict[str, bool],
    user_id: int,
    resource: Resource,
    profile: Profile,
    fractal_ssh: FractalSSH | None = None,
    slurm_account: str | None = None,
    worker_init: str | None = None,
    user_cache_dir: str,
) -> None:
    """
    Run a workflow through a `slurm_ssh` backend.

    Args:
        job_id: Job ID.
        workflow: Workflow to be run
        dataset: Dataset to be used.
        workflow_dir_local: Local working directory for this job.
        workflow_dir_remote:
            Remote working directory for this job - only relevant for
            `slurm_sudo` and `slurm_ssh` backends.
        first_task_index:
            Positional index of the first task to execute; if `None`, start
            from `0`.
        last_task_index:
            Positional index of the last task to execute; if `None`, proceed
            until the last task.
        logger_name: Logger name
        user_id: User ID.
        resource: Computational resource for running this job.
        profile: Computational profile for running this job.
        user_cache_dir:
            User-writeable folder to be exposed as `FRACTAL_CACHE_DIR`.
        fractal_ssh:
            `FractalSSH` object, only relevant for the `slurm_ssh` backend.
        slurm_account:
            SLURM account to set.
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
        worker_init:
            Additional preamble lines for SLURM submission script.
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
    """

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    with SlurmSSHRunner(
        fractal_ssh=fractal_ssh,
        root_dir_local=workflow_dir_local,
        root_dir_remote=workflow_dir_remote,
        slurm_account=slurm_account,
        resource=resource,
        profile=profile,
        common_script_lines=_get_worker_init_lines(worker_init),
        user_cache_dir=user_cache_dir,
        fractal_job_id=job_id,
    ) as runner:
        execute_tasks(
            wf_task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)
            ],
            dataset=dataset,
            job_id=job_id,
            runner=runner,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_remote,
            logger_name=logger_name,
            get_runner_config=get_slurm_config,
            job_attribute_filters=job_attribute_filters,
            job_type_filters=job_type_filters,
            user_id=user_id,
            resource_id=resource.id,
        )