_local_experimental
_process_workflow(*, workflow, dataset, logger_name, workflow_dir_local, first_task_index, last_task_index)
¶
Internal processing routine
Schedules the workflow using a FractalProcessPoolExecutor
.
Cf. process_workflow for the call signature.
Source code in fractal_server/app/runner/v2/_local_experimental/__init__.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
|
process_workflow(*, workflow, dataset, workflow_dir_local, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, user_cache_dir=None, slurm_user=None, slurm_account=None, worker_init=None)
async
¶
Run a workflow
This function is responsible for running a workflow on some input data, saving the output and taking care of any exception raised during the run.
NOTE: This is the local_experimental
backend's public interface,
which also works as a reference implementation for other backends.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
WorkflowV2
|
The workflow to be run |
required |
dataset |
DatasetV2
|
Initial dataset. |
required |
workflow_dir_local |
Path
|
Working directory for this run. |
required |
workflow_dir_remote |
Optional[Path]
|
Working directory for this run, on the user side. This argument is
present for compatibility with the standard backend interface, but
for the |
None
|
first_task_index |
Optional[int]
|
Positional index of the first task to execute; if |
None
|
last_task_index |
Optional[int]
|
Positional index of the last task to execute; if |
None
|
logger_name |
str
|
Logger name |
required |
slurm_user |
Optional[str]
|
Username to impersonate to run the workflow. This argument is
present for compatibility with the standard backend interface, but
is ignored in the |
None
|
slurm_account |
Optional[str]
|
SLURM account to use when running the workflow. This argument is
present for compatibility with the standard backend interface, but
is ignored in the |
None
|
user_cache_dir |
Optional[str]
|
Cache directory of the user who will run the workflow. This
argument is present for compatibility with the standard backend
interface, but is ignored in the |
None
|
worker_init |
Optional[str]
|
Any additional, usually backend specific, information to be passed
to the backend executor. This argument is present for compatibility
with the standard backend interface, but is ignored in the |
None
|
Raises:
Type | Description |
---|---|
TaskExecutionError
|
wrapper for errors raised during tasks' execution (positive exit codes). |
JobExecutionError
|
wrapper for errors raised by the tasks' executors (negative exit codes). |
Returns:
Name | Type | Description |
---|---|---|
output_dataset_metadata |
dict
|
The updated metadata for the dataset, as returned by the last task of the workflow |
Source code in fractal_server/app/runner/v2/_local_experimental/__init__.py
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
|