Skip to content

_local

process_workflow(*, workflow, dataset, workflow_dir_local, job_id, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, job_attribute_filters, job_type_filters, user_id, **kwargs)

Run a workflow through

Parameters:

Name Type Description Default
workflow WorkflowV2

The workflow to be run

required
dataset DatasetV2

Initial dataset.

required
workflow_dir_local Path

Working directory for this run.

required
workflow_dir_remote Optional[Path]

Working directory for this run, on the user side. This argument is present for compatibility with the standard backend interface, but for the local backend it cannot be different from workflow_dir_local.

None
first_task_index Optional[int]

Positional index of the first task to execute; if None, start from 0.

None
last_task_index Optional[int]

Positional index of the last task to execute; if None, proceed until the last task.

None
logger_name str

Logger name

required
user_id int
required

Raises:

Type Description
TaskExecutionError

wrapper for errors raised during tasks' execution (positive exit codes).

JobExecutionError

wrapper for errors raised by the tasks' executors (negative exit codes).

Source code in fractal_server/app/runner/v2/_local.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def process_workflow(
    *,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    job_id: int,
    workflow_dir_remote: Optional[Path] = None,
    first_task_index: Optional[int] = None,
    last_task_index: Optional[int] = None,
    logger_name: str,
    job_attribute_filters: AttributeFiltersType,
    job_type_filters: dict[str, bool],
    user_id: int,
    **kwargs,
) -> None:
    """
    Run a workflow through

    Args:
        workflow:
            The workflow to be run
        dataset:
            Initial dataset.
        workflow_dir_local:
            Working directory for this run.
        workflow_dir_remote:
            Working directory for this run, on the user side. This argument is
            present for compatibility with the standard backend interface, but
            for the `local` backend it cannot be different from
            `workflow_dir_local`.
        first_task_index:
            Positional index of the first task to execute; if `None`, start
            from `0`.
        last_task_index:
            Positional index of the last task to execute; if `None`, proceed
            until the last task.
        logger_name: Logger name
        user_id:

    Raises:
        TaskExecutionError: wrapper for errors raised during tasks' execution
                            (positive exit codes).
        JobExecutionError: wrapper for errors raised by the tasks' executors
                           (negative exit codes).
    """

    if workflow_dir_remote and (workflow_dir_remote != workflow_dir_local):
        raise NotImplementedError(
            "Local backend does not support different directories "
            f"{workflow_dir_local=} and {workflow_dir_remote=}"
        )

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    with LocalRunner(root_dir_local=workflow_dir_local) as runner:
        execute_tasks_v2(
            wf_task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)
            ],
            dataset=dataset,
            job_id=job_id,
            runner=runner,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_local,
            logger_name=logger_name,
            get_runner_config=get_local_backend_config,
            job_attribute_filters=job_attribute_filters,
            job_type_filters=job_type_filters,
            user_id=user_id,
        )