Skip to content

Runner Backends

Runner backends are responsible for scheduling and applying (running) tasks on your data. Fractal currently supports two backends:

  • local: This is the reference backend implementation, which runs tasks locally on the same host where the server is installed.
  • SLURM: Run tasks by scheduling them on a SLURM cluster.

Both local and SLURM backends leverage on Python's concurrent.futures interface. As such, writing a new backend based on concurrent executors should require not much more effort than copying the reference local interface and swapping the Executor in the public interface coroutine.

Public interface

The backends need to implement the following common public interface.

process_workflow(*, workflow, dataset, workflow_dir_local, job_id, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, job_attribute_filters, job_type_filters, user_id, **kwargs)

Run a workflow through

Parameters:

Name Type Description Default
workflow WorkflowV2

The workflow to be run

required
dataset DatasetV2

Initial dataset.

required
workflow_dir_local Path

Working directory for this run.

required
workflow_dir_remote Optional[Path]

Working directory for this run, on the user side. This argument is present for compatibility with the standard backend interface, but for the local backend it cannot be different from workflow_dir_local.

None
first_task_index Optional[int]

Positional index of the first task to execute; if None, start from 0.

None
last_task_index Optional[int]

Positional index of the last task to execute; if None, proceed until the last task.

None
logger_name str

Logger name

required
user_id int
required

Raises:

Type Description
TaskExecutionError

wrapper for errors raised during tasks' execution (positive exit codes).

JobExecutionError

wrapper for errors raised by the tasks' executors (negative exit codes).

Source code in fractal_server/app/runner/v2/_local.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def process_workflow(
    *,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    job_id: int,
    workflow_dir_remote: Optional[Path] = None,
    first_task_index: Optional[int] = None,
    last_task_index: Optional[int] = None,
    logger_name: str,
    job_attribute_filters: AttributeFiltersType,
    job_type_filters: dict[str, bool],
    user_id: int,
    **kwargs,
) -> None:
    """
    Run a workflow through

    Args:
        workflow:
            The workflow to be run
        dataset:
            Initial dataset.
        workflow_dir_local:
            Working directory for this run.
        workflow_dir_remote:
            Working directory for this run, on the user side. This argument is
            present for compatibility with the standard backend interface, but
            for the `local` backend it cannot be different from
            `workflow_dir_local`.
        first_task_index:
            Positional index of the first task to execute; if `None`, start
            from `0`.
        last_task_index:
            Positional index of the last task to execute; if `None`, proceed
            until the last task.
        logger_name: Logger name
        user_id:

    Raises:
        TaskExecutionError: wrapper for errors raised during tasks' execution
                            (positive exit codes).
        JobExecutionError: wrapper for errors raised by the tasks' executors
                           (negative exit codes).
    """

    if workflow_dir_remote and (workflow_dir_remote != workflow_dir_local):
        raise NotImplementedError(
            "Local backend does not support different directories "
            f"{workflow_dir_local=} and {workflow_dir_remote=}"
        )

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    with LocalRunner(root_dir_local=workflow_dir_local) as runner:
        execute_tasks_v2(
            wf_task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)
            ],
            dataset=dataset,
            job_id=job_id,
            runner=runner,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_local,
            logger_name=logger_name,
            get_runner_config=get_local_backend_config,
            job_attribute_filters=job_attribute_filters,
            job_type_filters=job_type_filters,
            user_id=user_id,
        )