_local
Local Bakend
This backend runs Fractal workflows using FractalThreadPoolExecutor
(a custom
version of Python
ThreadPoolExecutor)
to run tasks in several threads.
Incidentally, it also represents the reference implementation for a backend.
_process_workflow(*, workflow, input_paths, output_path, input_metadata, input_history, logger_name, workflow_dir_local, first_task_index, last_task_index)
¶
Internal processing routine
Schedules the workflow using a FractalThreadPoolExecutor
.
Cf. process_workflow for the call signature.
Source code in fractal_server/app/runner/v1/_local/__init__.py
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
|
process_workflow(*, workflow, input_paths, output_path, input_metadata, input_history, logger_name, workflow_dir_local, workflow_dir_remote=None, slurm_user=None, slurm_account=None, user_cache_dir=None, worker_init=None, first_task_index=None, last_task_index=None)
async
¶
Run a workflow
This function is responsible for running a workflow on some input data, saving the output and taking care of any exception raised during the run.
NOTE: This is the local
backend's public interface, which also works as
a reference implementation for other backends.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow
|
The workflow to be run |
required |
input_paths |
list[Path]
|
The paths to the input files to pass to the first task of the workflow |
required |
output_path |
Path
|
The destination path for the last task of the workflow |
required |
input_metadata |
dict[str, Any]
|
Initial metadata, passed to the first task |
required |
logger_name |
str
|
Name of the logger to log information on the run to |
required |
workflow_dir_local |
Path
|
Working directory for this run. |
required |
workflow_dir_remote |
Optional[Path]
|
Working directory for this run, on the user side. This argument is
present for compatibility with the standard backend interface, but
for the |
None
|
slurm_user |
Optional[str]
|
Username to impersonate to run the workflow. This argument is
present for compatibility with the standard backend interface, but
is ignored in the |
None
|
slurm_account |
Optional[str]
|
SLURM account to use when running the workflow. This argument is
present for compatibility with the standard backend interface, but
is ignored in the |
None
|
user_cache_dir |
Optional[str]
|
Cache directory of the user who will run the workflow. This
argument is present for compatibility with the standard backend
interface, but is ignored in the |
None
|
worker_init |
Optional[str]
|
Any additional, usually backend specific, information to be passed
to the backend executor. This argument is present for compatibility
with the standard backend interface, but is ignored in the |
None
|
first_task_index |
Optional[int]
|
Positional index of the first task to execute; if |
None
|
last_task_index |
Optional[int]
|
Positional index of the last task to execute; if |
None
|
Raises:
Type | Description |
---|---|
TaskExecutionError
|
wrapper for errors raised during tasks' execution (positive exit codes). |
JobExecutionError
|
wrapper for errors raised by the tasks' executors (negative exit codes). |
Returns:
Name | Type | Description |
---|---|---|
output_dataset_metadata |
dict[str, Any]
|
The updated metadata for the dataset, as returned by the last task of the workflow |
Source code in fractal_server/app/runner/v1/_local/__init__.py
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
|