Skip to content

_local

Local Bakend

This backend runs Fractal workflows using FractalThreadPoolExecutor (a custom version of Python ThreadPoolExecutor) to run tasks in several threads. Incidentally, it also represents the reference implementation for a backend.

_process_workflow(*, workflow, input_paths, output_path, input_metadata, input_history, logger_name, workflow_dir_local, first_task_index, last_task_index)

Internal processing routine

Schedules the workflow using a FractalThreadPoolExecutor.

Cf. process_workflow for the call signature.

Source code in fractal_server/app/runner/v1/_local/__init__.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def _process_workflow(
    *,
    workflow: Workflow,
    input_paths: list[Path],
    output_path: Path,
    input_metadata: dict[str, Any],
    input_history: list[dict[str, Any]],
    logger_name: str,
    workflow_dir_local: Path,
    first_task_index: int,
    last_task_index: int,
) -> dict[str, Any]:
    """
    Internal processing routine

    Schedules the workflow using a `FractalThreadPoolExecutor`.

    Cf.
    [process_workflow][fractal_server.app.runner.v1._local.process_workflow]
    for the call signature.
    """

    with FractalThreadPoolExecutor() as executor:
        output_task_pars = execute_tasks(
            executor=executor,
            task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)  # noqa
            ],  # noqa
            task_pars=TaskParameters(
                input_paths=input_paths,
                output_path=output_path,
                metadata=input_metadata,
                history=input_history,
            ),
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_local,
            logger_name=logger_name,
            submit_setup_call=_local_submit_setup,
        )
    output_dataset_metadata_history = dict(
        metadata=output_task_pars.metadata, history=output_task_pars.history
    )
    return output_dataset_metadata_history

process_workflow(*, workflow, input_paths, output_path, input_metadata, input_history, logger_name, workflow_dir_local, workflow_dir_remote=None, slurm_user=None, slurm_account=None, user_cache_dir=None, worker_init=None, first_task_index=None, last_task_index=None) async

Run a workflow

This function is responsible for running a workflow on some input data, saving the output and taking care of any exception raised during the run.

NOTE: This is the local backend's public interface, which also works as a reference implementation for other backends.

Parameters:

Name Type Description Default
workflow Workflow

The workflow to be run

required
input_paths list[Path]

The paths to the input files to pass to the first task of the workflow

required
output_path Path

The destination path for the last task of the workflow

required
input_metadata dict[str, Any]

Initial metadata, passed to the first task

required
logger_name str

Name of the logger to log information on the run to

required
workflow_dir_local Path

Working directory for this run.

required
workflow_dir_remote Optional[Path]

Working directory for this run, on the user side. This argument is present for compatibility with the standard backend interface, but for the local backend it cannot be different from workflow_dir_local.

None
slurm_user Optional[str]

Username to impersonate to run the workflow. This argument is present for compatibility with the standard backend interface, but is ignored in the local backend.

None
slurm_account Optional[str]

SLURM account to use when running the workflow. This argument is present for compatibility with the standard backend interface, but is ignored in the local backend.

None
user_cache_dir Optional[str]

Cache directory of the user who will run the workflow. This argument is present for compatibility with the standard backend interface, but is ignored in the local backend.

None
worker_init Optional[str]

Any additional, usually backend specific, information to be passed to the backend executor. This argument is present for compatibility with the standard backend interface, but is ignored in the local backend.

None
first_task_index Optional[int]

Positional index of the first task to execute; if None, start from 0.

None
last_task_index Optional[int]

Positional index of the last task to execute; if None, proceed until the last task.

None

Raises:

Type Description
TaskExecutionError

wrapper for errors raised during tasks' execution (positive exit codes).

JobExecutionError

wrapper for errors raised by the tasks' executors (negative exit codes).

Returns:

Name Type Description
output_dataset_metadata dict[str, Any]

The updated metadata for the dataset, as returned by the last task of the workflow

Source code in fractal_server/app/runner/v1/_local/__init__.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def process_workflow(
    *,
    workflow: Workflow,
    input_paths: list[Path],
    output_path: Path,
    input_metadata: dict[str, Any],
    input_history: list[dict[str, Any]],
    logger_name: str,
    workflow_dir_local: Path,
    workflow_dir_remote: Optional[Path] = None,
    slurm_user: Optional[str] = None,
    slurm_account: Optional[str] = None,
    user_cache_dir: Optional[str] = None,
    worker_init: Optional[str] = None,
    first_task_index: Optional[int] = None,
    last_task_index: Optional[int] = None,
) -> dict[str, Any]:
    """
    Run a workflow

    This function is responsible for running a workflow on some input data,
    saving the output and taking care of any exception raised during the run.

    NOTE: This is the `local` backend's public interface, which also works as
    a reference implementation for other backends.

    Args:
        workflow:
            The workflow to be run
        input_paths:
            The paths to the input files to pass to the first task of the
            workflow
        output_path:
            The destination path for the last task of the workflow
        input_metadata:
            Initial metadata, passed to the first task
        logger_name:
            Name of the logger to log information on the run to
        workflow_dir_local:
            Working directory for this run.
        workflow_dir_remote:
            Working directory for this run, on the user side. This argument is
            present for compatibility with the standard backend interface, but
            for the `local` backend it cannot be different from
            `workflow_dir_local`.
        slurm_user:
            Username to impersonate to run the workflow. This argument is
            present for compatibility with the standard backend interface, but
            is ignored in the `local` backend.
        slurm_account:
            SLURM account to use when running the workflow. This argument is
            present for compatibility with the standard backend interface, but
            is ignored in the `local` backend.
        user_cache_dir:
            Cache directory of the user who will run the workflow. This
            argument is present for compatibility with the standard backend
            interface, but is ignored in the `local` backend.
        worker_init:
            Any additional, usually backend specific, information to be passed
            to the backend executor. This argument is present for compatibility
            with the standard backend interface, but is ignored in the `local`
            backend.
        first_task_index:
            Positional index of the first task to execute; if `None`, start
            from `0`.
        last_task_index:
            Positional index of the last task to execute; if `None`, proceed
            until the last task.

    Raises:
        TaskExecutionError: wrapper for errors raised during tasks' execution
                            (positive exit codes).
        JobExecutionError: wrapper for errors raised by the tasks' executors
                           (negative exit codes).

    Returns:
        output_dataset_metadata:
            The updated metadata for the dataset, as returned by the last task
            of the workflow
    """

    if workflow_dir_remote and (workflow_dir_remote != workflow_dir_local):
        raise NotImplementedError(
            "Local backend does not support different directories "
            f"{workflow_dir_local=} and {workflow_dir_remote=}"
        )

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    output_dataset_metadata_history = await async_wrap(_process_workflow)(
        workflow=workflow,
        input_paths=input_paths,
        output_path=output_path,
        input_metadata=input_metadata,
        input_history=input_history,
        logger_name=logger_name,
        workflow_dir_local=workflow_dir_local,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )
    return output_dataset_metadata_history