Skip to content

_local

process_workflow(*, job_id, workflow, dataset, workflow_dir_local, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, job_attribute_filters, job_type_filters, user_id, resource, profile, user_cache_dir=None, fractal_ssh=None, slurm_account=None, worker_init=None)

Run a workflow through a local backend.

PARAMETER DESCRIPTION
job_id

Job ID.

TYPE: int

workflow

Workflow to be run

TYPE: WorkflowV2

dataset

Dataset to be used.

TYPE: DatasetV2

workflow_dir_local

Local working directory for this job.

TYPE: Path

workflow_dir_remote

Remote working directory for this job - only relevant for slurm_sudo and slurm_ssh backends.

TYPE: Path | None DEFAULT: None

first_task_index

Positional index of the first task to execute; if None, start from 0.

TYPE: int | None DEFAULT: None

last_task_index

Positional index of the last task to execute; if None, proceed until the last task.

TYPE: int | None DEFAULT: None

logger_name

Logger name

TYPE: str

user_id

User ID.

TYPE: int

resource

Computational resource for running this job.

TYPE: Resource

profile

Computational profile for running this job.

TYPE: Profile

user_cache_dir

User-writeable folder (typically a subfolder of project_dir). Only relevant for slurm_sudo and slurm_ssh backends.

TYPE: str | None DEFAULT: None

fractal_ssh

FractalSSH object, only relevant for the slurm_ssh backend.

TYPE: FractalSSH | None DEFAULT: None

slurm_account

SLURM account to set. Only relevant for slurm_sudo and slurm_ssh backends.

TYPE: str | None DEFAULT: None

worker_init

Additional preamble lines for SLURM submission script. Only relevant for slurm_sudo and slurm_ssh backends.

TYPE: str | None DEFAULT: None

Source code in fractal_server/runner/v2/_local.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def process_workflow(
    *,
    job_id: int,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    workflow_dir_remote: Path | None = None,
    first_task_index: int | None = None,
    last_task_index: int | None = None,
    logger_name: str,
    job_attribute_filters: AttributeFilters,
    job_type_filters: dict[str, bool],
    user_id: int,
    resource: Resource,
    profile: Profile,
    user_cache_dir: str | None = None,
    fractal_ssh: FractalSSH | None = None,
    slurm_account: str | None = None,
    worker_init: str | None = None,
) -> None:
    """
    Run a workflow through a local backend.

    Args:
        job_id: Job ID.
        workflow: Workflow to be run
        dataset: Dataset to be used.
        workflow_dir_local: Local working directory for this job.
        workflow_dir_remote:
            Remote working directory for this job - only relevant for
            `slurm_sudo` and `slurm_ssh` backends.
        first_task_index:
            Positional index of the first task to execute; if `None`, start
            from `0`.
        last_task_index:
            Positional index of the last task to execute; if `None`, proceed
            until the last task.
        logger_name: Logger name
        user_id: User ID.
        resource: Computational resource for running this job.
        profile: Computational profile for running this job.
        user_cache_dir:
            User-writeable folder (typically a subfolder of `project_dir`).
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
        fractal_ssh:
            `FractalSSH` object, only relevant for the `slurm_ssh` backend.
        slurm_account:
            SLURM account to set.
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
        worker_init:
            Additional preamble lines for SLURM submission script.
            Only relevant for `slurm_sudo` and `slurm_ssh` backends.
    """

    if workflow_dir_remote and (workflow_dir_remote != workflow_dir_local):
        raise NotImplementedError(
            "Local backend does not support different directories "
            f"{workflow_dir_local=} and {workflow_dir_remote=}"
        )

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    with LocalRunner(
        root_dir_local=workflow_dir_local,
        resource=resource,
        profile=profile,
    ) as runner:
        execute_tasks_v2(
            wf_task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)
            ],
            dataset=dataset,
            job_id=job_id,
            runner=runner,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_local,
            logger_name=logger_name,
            get_runner_config=get_local_backend_config,
            job_attribute_filters=job_attribute_filters,
            job_type_filters=job_type_filters,
            user_id=user_id,
        )