Skip to content

_local_experimental

_process_workflow(*, workflow, dataset, logger_name, workflow_dir_local, first_task_index, last_task_index)

Internal processing routine

Schedules the workflow using a FractalProcessPoolExecutor.

Cf. process_workflow for the call signature.

Source code in fractal_server/app/runner/v2/_local_experimental/__init__.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def _process_workflow(
    *,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    logger_name: str,
    workflow_dir_local: Path,
    first_task_index: int,
    last_task_index: int,
) -> dict:
    """
    Internal processing routine

    Schedules the workflow using a `FractalProcessPoolExecutor`.

    Cf.
    [process_workflow][fractal_server.app.runner.v2._local_experimental.process_workflow]
    for the call signature.
    """
    with FractalProcessPoolExecutor(
        shutdown_file=workflow_dir_local / SHUTDOWN_FILENAME
    ) as executor:
        try:
            new_dataset_attributes = execute_tasks_v2(
                wf_task_list=workflow.task_list[
                    first_task_index : (last_task_index + 1)  # noqa
                ],
                dataset=dataset,
                executor=executor,
                workflow_dir_local=workflow_dir_local,
                workflow_dir_remote=workflow_dir_local,
                logger_name=logger_name,
                submit_setup_call=_local_submit_setup,
            )
        except BrokenProcessPool as e:
            raise JobExecutionError(
                info=(
                    "Job failed with BrokenProcessPool error, likely due to "
                    f"an executor shutdown.\nOriginal error:\n{e.args[0]}"
                )
            )

    return new_dataset_attributes

process_workflow(*, workflow, dataset, workflow_dir_local, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, user_cache_dir=None, slurm_user=None, slurm_account=None, worker_init=None) async

Run a workflow

This function is responsible for running a workflow on some input data, saving the output and taking care of any exception raised during the run.

NOTE: This is the local_experimental backend's public interface, which also works as a reference implementation for other backends.

Parameters:

Name Type Description Default
workflow WorkflowV2

The workflow to be run

required
dataset DatasetV2

Initial dataset.

required
workflow_dir_local Path

Working directory for this run.

required
workflow_dir_remote Optional[Path]

Working directory for this run, on the user side. This argument is present for compatibility with the standard backend interface, but for the local backend it cannot be different from workflow_dir_local.

None
first_task_index Optional[int]

Positional index of the first task to execute; if None, start from 0.

None
last_task_index Optional[int]

Positional index of the last task to execute; if None, proceed until the last task.

None
logger_name str

Logger name

required
slurm_user Optional[str]

Username to impersonate to run the workflow. This argument is present for compatibility with the standard backend interface, but is ignored in the local backend.

None
slurm_account Optional[str]

SLURM account to use when running the workflow. This argument is present for compatibility with the standard backend interface, but is ignored in the local backend.

None
user_cache_dir Optional[str]

Cache directory of the user who will run the workflow. This argument is present for compatibility with the standard backend interface, but is ignored in the local backend.

None
worker_init Optional[str]

Any additional, usually backend specific, information to be passed to the backend executor. This argument is present for compatibility with the standard backend interface, but is ignored in the local backend.

None

Raises:

Type Description
TaskExecutionError

wrapper for errors raised during tasks' execution (positive exit codes).

JobExecutionError

wrapper for errors raised by the tasks' executors (negative exit codes).

Returns:

Name Type Description
output_dataset_metadata dict

The updated metadata for the dataset, as returned by the last task of the workflow

Source code in fractal_server/app/runner/v2/_local_experimental/__init__.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
async def process_workflow(
    *,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    workflow_dir_remote: Optional[Path] = None,
    first_task_index: Optional[int] = None,
    last_task_index: Optional[int] = None,
    logger_name: str,
    # Slurm-specific
    user_cache_dir: Optional[str] = None,
    slurm_user: Optional[str] = None,
    slurm_account: Optional[str] = None,
    worker_init: Optional[str] = None,
) -> dict:
    """
    Run a workflow

    This function is responsible for running a workflow on some input data,
    saving the output and taking care of any exception raised during the run.

    NOTE: This is the `local_experimental` backend's public interface,
    which also works as a reference implementation for other backends.

    Args:
        workflow:
            The workflow to be run
        dataset:
            Initial dataset.
        workflow_dir_local:
            Working directory for this run.
        workflow_dir_remote:
            Working directory for this run, on the user side. This argument is
            present for compatibility with the standard backend interface, but
            for the `local` backend it cannot be different from
            `workflow_dir_local`.
        first_task_index:
            Positional index of the first task to execute; if `None`, start
            from `0`.
        last_task_index:
            Positional index of the last task to execute; if `None`, proceed
            until the last task.
        logger_name: Logger name
        slurm_user:
            Username to impersonate to run the workflow. This argument is
            present for compatibility with the standard backend interface, but
            is ignored in the `local` backend.
        slurm_account:
            SLURM account to use when running the workflow. This argument is
            present for compatibility with the standard backend interface, but
            is ignored in the `local` backend.
        user_cache_dir:
            Cache directory of the user who will run the workflow. This
            argument is present for compatibility with the standard backend
            interface, but is ignored in the `local` backend.
        worker_init:
            Any additional, usually backend specific, information to be passed
            to the backend executor. This argument is present for compatibility
            with the standard backend interface, but is ignored in the `local`
            backend.

    Raises:
        TaskExecutionError: wrapper for errors raised during tasks' execution
                            (positive exit codes).
        JobExecutionError: wrapper for errors raised by the tasks' executors
                           (negative exit codes).

    Returns:
        output_dataset_metadata:
            The updated metadata for the dataset, as returned by the last task
            of the workflow
    """

    if workflow_dir_remote and (workflow_dir_remote != workflow_dir_local):
        raise NotImplementedError(
            "LocalExperimental backend does not support different directories "
            f"{workflow_dir_local=} and {workflow_dir_remote=}"
        )

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    new_dataset_attributes = await async_wrap(_process_workflow)(
        workflow=workflow,
        dataset=dataset,
        logger_name=logger_name,
        workflow_dir_local=workflow_dir_local,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )
    return new_dataset_attributes