Skip to content

_common

Common utilities and routines for runner backends (private API)

This module includes utilities and routines that are of use to implement runner backends and that should not be exposed outside of the runner subsystem.

_call_command_wrapper(cmd, stdout, stderr)

Call a command and write its stdout and stderr to files

Raises:

Type Description
TaskExecutionError

If the subprocess.run call returns a positive exit code

JobExecutionError

If the subprocess.run call returns a negative exit code (e.g. due to the subprocess receiving a TERM or KILL signal)

Source code in fractal_server/app/runner/v1/_common.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def _call_command_wrapper(cmd: str, stdout: Path, stderr: Path) -> None:
    """
    Call a command and write its stdout and stderr to files

    Raises:
        TaskExecutionError: If the `subprocess.run` call returns a positive
                            exit code
        JobExecutionError:  If the `subprocess.run` call returns a negative
                            exit code (e.g. due to the subprocess receiving a
                            TERM or KILL signal)
    """

    validate_cmd(cmd)
    # Verify that task command is executable
    if shutil.which(shlex_split(cmd)[0]) is None:
        msg = (
            f'Command "{shlex_split(cmd)[0]}" is not valid. '
            "Hint: make sure that it is executable."
        )
        raise TaskExecutionError(msg)

    fp_stdout = open(stdout, "w")
    fp_stderr = open(stderr, "w")
    try:
        result = subprocess.run(  # nosec
            shlex_split(cmd),
            stderr=fp_stderr,
            stdout=fp_stdout,
        )
    except Exception as e:
        raise e
    finally:
        fp_stdout.close()
        fp_stderr.close()

    if result.returncode > 0:
        with stderr.open("r") as fp_stderr:
            err = fp_stderr.read()
        raise TaskExecutionError(err)
    elif result.returncode < 0:
        raise JobExecutionError(
            info=f"Task failed with returncode={result.returncode}"
        )

_task_needs_image_list(_task)

Whether a task requires metadata["image"] in its args.json file.

For details see https://github.com/fractal-analytics-platform/fractal-server/issues/1237

Parameters:

Name Type Description Default
_task Task

The task to be checked.

required
Source code in fractal_server/app/runner/v1/_common.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def _task_needs_image_list(_task: Task) -> bool:
    """
    Whether a task requires `metadata["image"]` in its `args.json` file.

    For details see
    https://github.com/fractal-analytics-platform/fractal-server/issues/1237

    Args:
        _task: The task to be checked.
    """
    settings = Inject(get_settings)
    exception_task_names = settings.FRACTAL_RUNNER_TASKS_INCLUDE_IMAGE.split(
        ";"
    )
    if _task.name in exception_task_names:
        return True
    else:
        return False

call_parallel_task(*, executor, wftask, task_pars_depend, workflow_dir_local, workflow_dir_remote=None, submit_setup_call=no_op_submit_setup_call, logger_name=None)

Collect results from the parallel instances of a parallel task

Prepare and submit for execution all the single calls of a parallel task, and return a single TaskParameters instance to be passed on to the next task.

NOTE: this function is executed by the same user that runs fractal-server, and therefore may not have access to some of user's files.

Parameters:

Name Type Description Default
executor Executor

The concurrent.futures.Executor-compatible executor that will run the task.

required
wftask WorkflowTask

The parallel task to run.

required
task_pars_depend TaskParameters

The task parameters to be passed on to the parallel task.

required
workflow_dir_local Path

The server-side working directory for workflow execution.

required
workflow_dir_remote Optional[Path]

The user-side working directory for workflow execution (only relevant for multi-user executors).

None
submit_setup_call Callable

An optional function that computes configuration parameters for the executor.

no_op_submit_setup_call
logger_name Optional[str]

Name of the logger

None

Returns:

Name Type Description
out_task_parameters TaskParameters

The output task parameters of the parallel task execution, ready to be passed on to the next task.

Source code in fractal_server/app/runner/v1/_common.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def call_parallel_task(
    *,
    executor: Executor,
    wftask: WorkflowTask,
    task_pars_depend: TaskParameters,
    workflow_dir_local: Path,
    workflow_dir_remote: Optional[Path] = None,
    submit_setup_call: Callable = no_op_submit_setup_call,
    logger_name: Optional[str] = None,
) -> TaskParameters:
    """
    Collect results from the parallel instances of a parallel task

    Prepare and submit for execution all the single calls of a parallel task,
    and return a single TaskParameters instance to be passed on to the
    next task.

    **NOTE**: this function is executed by the same user that runs
    `fractal-server`, and therefore may not have access to some of user's
    files.

    Args:
        executor:
            The `concurrent.futures.Executor`-compatible executor that will
            run the task.
        wftask:
            The parallel task to run.
        task_pars_depend:
            The task parameters to be passed on to the parallel task.
        workflow_dir_local:
            The server-side working directory for workflow execution.
        workflow_dir_remote:
            The user-side working directory for workflow execution (only
            relevant for multi-user executors).
        submit_setup_call:
            An optional function that computes configuration parameters for
            the executor.
        logger_name:
            Name of the logger

    Returns:
        out_task_parameters:
            The output task parameters of the parallel task execution, ready to
            be passed on to the next task.
    """
    logger = get_logger(logger_name)

    if not workflow_dir_remote:
        workflow_dir_remote = workflow_dir_local

    try:
        component_list = task_pars_depend.metadata[
            wftask.parallelization_level
        ]
    except KeyError:
        keys = list(task_pars_depend.metadata.keys())
        raise RuntimeError(
            "WorkflowTask parallelization_level "
            f"('{wftask.parallelization_level}') is missing "
            f"in metadata keys ({keys})."
        )

    # Backend-specific configuration
    try:
        extra_setup = submit_setup_call(
            wftask=wftask,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_remote,
        )
    except Exception as e:
        tb = "".join(traceback.format_tb(e.__traceback__))
        raise RuntimeError(
            f"{type(e)} error in {submit_setup_call=}\n"
            f"Original traceback:\n{tb}"
        )

    # Preliminary steps
    actual_task_pars_depend = trim_TaskParameters(
        task_pars_depend, wftask.task
    )

    partial_call_task = partial(
        call_single_parallel_task,
        wftask=wftask,
        task_pars=actual_task_pars_depend,
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
    )

    # Submit tasks for execution. Note that `for _ in map_iter:
    # pass` explicitly calls the .result() method for each future, and
    # therefore is blocking until the task are complete.
    map_iter = executor.map(partial_call_task, component_list, **extra_setup)

    # Wait for execution of parallel tasks, and aggregate updated metadata (ref
    # https://github.com/fractal-analytics-platform/fractal-server/issues/802).
    # NOTE: Even if we remove the need of aggregating metadata, we must keep
    # the iteration over `map_iter` (e.g. as in `for _ in map_iter: pass`), to
    # make this call blocking. This is required *also* because otherwise the
    # shutdown of a FractalSlurmExecutor while running map() may not work
    aggregated_metadata_update: dict[str, Any] = {}
    for this_meta_update in map_iter:
        # Cover the case where the task wrote `null`, rather than a
        # valid dictionary (ref fractal-server issue #878), or where the
        # metadiff file was missing.
        if this_meta_update is None:
            this_meta_update = {}
        # Include this_meta_update into aggregated_metadata_update
        for key, val in this_meta_update.items():
            aggregated_metadata_update.setdefault(key, []).append(val)
    if aggregated_metadata_update:
        logger.warning(
            "Aggregating parallel-taks updated metadata (with keys "
            f"{list(aggregated_metadata_update.keys())}).\n"
            "This feature is experimental and it may change in "
            "future releases."
        )

    # Prepare updated_metadata
    updated_metadata = task_pars_depend.metadata.copy()
    updated_metadata.update(aggregated_metadata_update)

    # Prepare updated_history (note: the expected type for history items is
    # defined in `_DatasetHistoryItem`)
    wftask_dump = wftask.model_dump(exclude={"task"})
    wftask_dump["task"] = wftask.task.model_dump()
    new_history_item = dict(
        workflowtask=wftask_dump,
        status=WorkflowTaskStatusTypeV1.DONE,
        parallelization=dict(
            parallelization_level=wftask.parallelization_level,
            component_list=component_list,
        ),
    )
    updated_history = task_pars_depend.history.copy()
    updated_history.append(new_history_item)

    # Assemble a TaskParameter object
    out_task_parameters = TaskParameters(
        input_paths=[task_pars_depend.output_path],
        output_path=task_pars_depend.output_path,
        metadata=updated_metadata,
        history=updated_history,
    )

    return out_task_parameters

call_single_parallel_task(component, *, wftask, task_pars, workflow_dir_local, workflow_dir_remote=None)

Call a single instance of a parallel task

Parallel tasks need to run in several instances across the parallelization parameters. This function is responsible of running each single one of those instances.

Note

This function is directly submitted to a concurrent.futures-compatible executor, roughly as in

some_future = executor.map(call_single_parallel_task, ...)

If the executor then impersonates another user (as in the FractalSlurmExecutor), this function is run by that user.

Parameters:

Name Type Description Default
component str

The parallelization parameter.

required
wftask WorkflowTask

The task to execute.

required
task_pars TaskParameters

The parameters to pass on to the task.

required
workflow_dir_local Path

The server-side working directory for workflow execution.

required
workflow_dir_remote Optional[Path]

The user-side working directory for workflow execution (only relevant for multi-user executors).

None

Returns:

Type Description
Any

The json.load-ed contents of the metadiff output file, or None if the file is missing.

Raises:

Type Description
TaskExecutionError

If the wrapped task raises a task-related error. This function is responsible of adding debugging information to the TaskExecutionError, such as task order and name.

JobExecutionError

If the wrapped task raises a job-related error.

RuntimeError

If the workflow_dir_local is falsy.

Source code in fractal_server/app/runner/v1/_common.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def call_single_parallel_task(
    component: str,
    *,
    wftask: WorkflowTask,
    task_pars: TaskParameters,
    workflow_dir_local: Path,
    workflow_dir_remote: Optional[Path] = None,
) -> Any:
    """
    Call a single instance of a parallel task

    Parallel tasks need to run in several instances across the parallelization
    parameters. This function is responsible of running each single one of
    those instances.

    Note:
        This function is directly submitted to a
        `concurrent.futures`-compatible executor, roughly as in

            some_future = executor.map(call_single_parallel_task, ...)

        If the executor then impersonates another user (as in the
        `FractalSlurmExecutor`), this function is run by that user.

    Args:
        component:
            The parallelization parameter.
        wftask:
            The task to execute.
        task_pars:
            The parameters to pass on to the task.
        workflow_dir_local:
            The server-side working directory for workflow execution.
        workflow_dir_remote:
            The user-side working directory for workflow execution (only
            relevant for multi-user executors).

    Returns:
        The `json.load`-ed contents of the metadiff output file, or `None` if
            the file is missing.

    Raises:
        TaskExecutionError: If the wrapped task raises a task-related error.
                            This function is responsible of adding debugging
                            information to the TaskExecutionError, such as task
                            order and name.
        JobExecutionError: If the wrapped task raises a job-related error.
        RuntimeError: If the `workflow_dir_local` is falsy.
    """
    if not workflow_dir_local:
        raise RuntimeError
    if not workflow_dir_remote:
        workflow_dir_remote = workflow_dir_local

    task_files = get_task_file_paths(
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
        task_order=wftask.order,
        task_name=wftask.task.name,
        component=component,
    )

    # write args file (by assembling task_pars, wftask.args and component)
    write_args_file(
        task_pars.dict(exclude={"history"}),
        wftask.args or {},
        dict(component=component),
        path=task_files.args,
    )

    # assemble full command
    cmd = (
        f"{wftask.task.command} -j {task_files.args} "
        f"--metadata-out {task_files.metadiff}"
    )

    try:
        _call_command_wrapper(
            cmd, stdout=task_files.out, stderr=task_files.err
        )
    except TaskExecutionError as e:
        e.workflow_task_order = wftask.order
        e.workflow_task_id = wftask.id
        e.task_name = wftask.task.name
        raise e

    # JSON-load metadiff file and return its contents (or None)
    try:
        with task_files.metadiff.open("r") as f:
            this_meta_update = json.load(f)
    except FileNotFoundError:
        this_meta_update = None

    return this_meta_update

call_single_task(*, wftask, task_pars, workflow_dir_local, workflow_dir_remote=None, logger_name=None)

Call a single task

This assembles the runner arguments (input_paths, output_path, ...) and wftask arguments (i.e., arguments that are specific to the WorkflowTask, such as message or index in the dummy task), writes them to file, call the task executable command passing the arguments file as an input and assembles the output.

Note: This function is directly submitted to a concurrent.futures-compatible executor, as in

some_future = executor.submit(call_single_task, ...)

If the executor then impersonates another user (as in the FractalSlurmExecutor), this function is run by that user. For this reason, it should not write any file to workflow_dir_local, or it may yield permission errors.

Parameters:

Name Type Description Default
wftask WorkflowTask

The workflow task to be called. This includes task specific arguments via the wftask.args attribute.

required
task_pars TaskParameters

The parameters required to run the task which are not specific to the task, e.g., I/O paths.

required
workflow_dir_local Path

The server-side working directory for workflow execution.

required
workflow_dir_remote Optional[Path]

The user-side working directory for workflow execution (only relevant for multi-user executors). If None, it is set to be equal to workflow_dir_remote.

None
logger_name Optional[str]

Name of the logger

None

Returns:

Name Type Description
out_task_parameters TaskParameters

A TaskParameters in which the previous output becomes the input and where metadata is the metadata dictionary returned by the task being called.

Raises:

Type Description
TaskExecutionError

If the wrapped task raises a task-related error. This function is responsible of adding debugging information to the TaskExecutionError, such as task order and name.

JobExecutionError

If the wrapped task raises a job-related error.

Source code in fractal_server/app/runner/v1/_common.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def call_single_task(
    *,
    wftask: WorkflowTask,
    task_pars: TaskParameters,
    workflow_dir_local: Path,
    workflow_dir_remote: Optional[Path] = None,
    logger_name: Optional[str] = None,
) -> TaskParameters:
    """
    Call a single task

    This assembles the runner arguments (input_paths, output_path, ...) and
    wftask arguments (i.e., arguments that are specific to the WorkflowTask,
    such as message or index in the dummy task), writes them to file, call the
    task executable command passing the arguments file as an input and
    assembles the output.

    **Note**: This function is directly submitted to a
    `concurrent.futures`-compatible executor, as in

        some_future = executor.submit(call_single_task, ...)

    If the executor then impersonates another user (as in the
    `FractalSlurmExecutor`), this function is run by that user.  For this
    reason, it should not write any file to `workflow_dir_local`, or it may
    yield permission errors.

    Args:
        wftask:
            The workflow task to be called. This includes task specific
            arguments via the wftask.args attribute.
        task_pars:
            The parameters required to run the task which are not specific to
            the task, e.g., I/O paths.
        workflow_dir_local:
            The server-side working directory for workflow execution.
        workflow_dir_remote:
            The user-side working directory for workflow execution (only
            relevant for multi-user executors). If `None`, it is set to be
            equal to `workflow_dir_remote`.
        logger_name:
            Name of the logger

    Returns:
        out_task_parameters:
            A TaskParameters in which the previous output becomes the input
            and where metadata is the metadata dictionary returned by the task
            being called.

    Raises:
        TaskExecutionError: If the wrapped task raises a task-related error.
                            This function is responsible of adding debugging
                            information to the TaskExecutionError, such as task
                            order and name.
        JobExecutionError: If the wrapped task raises a job-related error.
    """

    logger = get_logger(logger_name)

    if not workflow_dir_remote:
        workflow_dir_remote = workflow_dir_local

    task_files = get_task_file_paths(
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
        task_order=wftask.order,
        task_name=wftask.task.name,
    )

    # write args file (by assembling task_pars and wftask.args)
    write_args_file(
        task_pars.dict(exclude={"history"}),
        wftask.args or {},
        path=task_files.args,
    )

    # assemble full command
    cmd = (
        f"{wftask.task.command} -j {task_files.args} "
        f"--metadata-out {task_files.metadiff}"
    )

    try:
        _call_command_wrapper(
            cmd, stdout=task_files.out, stderr=task_files.err
        )
    except TaskExecutionError as e:
        e.workflow_task_order = wftask.order
        e.workflow_task_id = wftask.id
        e.task_name = wftask.task.name
        raise e

    # This try/except block covers the case of a task that ran successfully but
    # did not write the expected metadiff file (ref fractal-server issue #854).
    try:
        with task_files.metadiff.open("r") as f_metadiff:
            diff_metadata = json.load(f_metadiff)
    except FileNotFoundError as e:
        logger.warning(
            f"Skip collection of updated metadata. Original error: {str(e)}"
        )
        diff_metadata = {}

    # Cover the case where the task wrote `null`, rather than a valid
    # dictionary (ref fractal-server issue #878).
    if diff_metadata is None:
        diff_metadata = {}

    # Prepare updated_metadata
    updated_metadata = task_pars.metadata.copy()
    updated_metadata.update(diff_metadata)
    # Prepare updated_history (note: the expected type for history items is
    # defined in `_DatasetHistoryItem`)
    wftask_dump = wftask.model_dump(exclude={"task"})
    wftask_dump["task"] = wftask.task.model_dump()
    new_history_item = dict(
        workflowtask=wftask_dump,
        status=WorkflowTaskStatusTypeV1.DONE,
        parallelization=None,
    )
    updated_history = task_pars.history.copy()
    updated_history.append(new_history_item)

    # Assemble a TaskParameter object
    out_task_parameters = TaskParameters(
        input_paths=[task_pars.output_path],
        output_path=task_pars.output_path,
        metadata=updated_metadata,
        history=updated_history,
    )

    return out_task_parameters

execute_tasks(*, executor, task_list, task_pars, workflow_dir_local, workflow_dir_remote=None, submit_setup_call=no_op_submit_setup_call, logger_name)

Submit a list of WorkflowTasks for execution

Note: At the end of each task, write current metadata to workflow_dir_local / METADATA_FILENAME, so that they can be read as part of the get_job endpoint.

Parameters:

Name Type Description Default
executor Executor

The concurrent.futures.Executor-compatible executor that will run the task.

required
task_list list[WorkflowTask]

The list of wftasks to be run

required
task_pars TaskParameters

The task parameters to be passed on to the first task of the list.

required
workflow_dir_local Path

The server-side working directory for workflow execution.

required
workflow_dir_remote Optional[Path]

The user-side working directory for workflow execution (only relevant for multi-user executors). If None, it is set to be equal to workflow_dir_local.

None
submit_setup_call Callable

An optional function that computes configuration parameters for the executor.

no_op_submit_setup_call
logger_name str

Name of the logger

required

Returns:

Name Type Description
current_task_pars TaskParameters

A TaskParameters object which constitutes the output of the last task in the list.

Source code in fractal_server/app/runner/v1/_common.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def execute_tasks(
    *,
    executor: Executor,
    task_list: list[WorkflowTask],
    task_pars: TaskParameters,
    workflow_dir_local: Path,
    workflow_dir_remote: Optional[Path] = None,
    submit_setup_call: Callable = no_op_submit_setup_call,
    logger_name: str,
) -> TaskParameters:
    """
    Submit a list of WorkflowTasks for execution

    **Note:** At the end of each task, write current metadata to
    `workflow_dir_local / METADATA_FILENAME`, so that they can be read as part
    of the [`get_job`
    endpoint](../../api/v1/job/#fractal_server.app.routes.api.v1.job.get_job).

    Arguments:
        executor:
            The `concurrent.futures.Executor`-compatible executor that will
            run the task.
        task_list:
            The list of wftasks to be run
        task_pars:
            The task parameters to be passed on to the first task of the list.
        workflow_dir_local:
            The server-side working directory for workflow execution.
        workflow_dir_remote:
            The user-side working directory for workflow execution (only
            relevant for multi-user executors). If `None`, it is set to be
            equal to `workflow_dir_local`.
        submit_setup_call:
            An optional function that computes configuration parameters for
            the executor.
        logger_name:
            Name of the logger

    Returns:
        current_task_pars:
            A TaskParameters object which constitutes the output of the last
            task in the list.
    """
    if not workflow_dir_remote:
        workflow_dir_remote = workflow_dir_local

    logger = get_logger(logger_name)

    current_task_pars = task_pars.copy()

    for this_wftask in task_list:
        logger.debug(
            f"SUBMIT {this_wftask.order}-th task "
            f'(name="{this_wftask.task.name}")'
        )
        if this_wftask.is_parallel:
            current_task_pars = call_parallel_task(
                executor=executor,
                wftask=this_wftask,
                task_pars_depend=current_task_pars,
                workflow_dir_local=workflow_dir_local,
                workflow_dir_remote=workflow_dir_remote,
                submit_setup_call=submit_setup_call,
                logger_name=logger_name,
            )
        else:
            # Call backend-specific submit_setup_call
            try:
                extra_setup = submit_setup_call(
                    wftask=this_wftask,
                    workflow_dir_local=workflow_dir_local,
                    workflow_dir_remote=workflow_dir_remote,
                )
            except Exception as e:
                tb = "".join(traceback.format_tb(e.__traceback__))
                raise RuntimeError(
                    f"{type(e)} error in {submit_setup_call=}\n"
                    f"Original traceback:\n{tb}"
                )
            # NOTE: executor.submit(call_single_task, ...) is non-blocking,
            # i.e. the returned future may have `this_wftask_future.done() =
            # False`. We make it blocking right away, by calling `.result()`
            # NOTE: do not use trim_TaskParameters for non-parallel tasks,
            # since the `task_pars` argument in `call_single_task` is also used
            # as a basis for new `metadata`.
            this_wftask_future = executor.submit(
                call_single_task,
                wftask=this_wftask,
                task_pars=current_task_pars,
                workflow_dir_local=workflow_dir_local,
                workflow_dir_remote=workflow_dir_remote,
                logger_name=logger_name,
                **extra_setup,
            )
            # Wait for the future result (blocking)
            current_task_pars = this_wftask_future.result()
        logger.debug(
            f"END    {this_wftask.order}-th task "
            f'(name="{this_wftask.task.name}")'
        )

        # Write most recent metadata to METADATA_FILENAME
        with open(workflow_dir_local / METADATA_FILENAME, "w") as f:
            json.dump(current_task_pars.metadata, f, indent=2)

        # Write most recent metadata to HISTORY_FILENAME
        with open(workflow_dir_local / HISTORY_FILENAME, "w") as f:
            json.dump(current_task_pars.history, f, indent=2)

    return current_task_pars

no_op_submit_setup_call(*, wftask, workflow_dir_local, workflow_dir_remote)

Default (no-operation) interface of submit_setup_call.

Source code in fractal_server/app/runner/v1/_common.py
37
38
39
40
41
42
43
44
45
46
def no_op_submit_setup_call(
    *,
    wftask: WorkflowTask,
    workflow_dir_local: Path,
    workflow_dir_remote: Path,
) -> dict:
    """
    Default (no-operation) interface of submit_setup_call.
    """
    return {}

trim_TaskParameters(task_params, _task)

Return a smaller copy of a TaskParameter object.

Remove metadata["image"] key/value pair - see issues 1237 and 1242. (https://github.com/fractal-analytics-platform/fractal-server/issues/1237) This applies only to parallel tasks with names different from the ones defined in _task_needs_image_list.

Source code in fractal_server/app/runner/v1/_common.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def trim_TaskParameters(
    task_params: TaskParameters,
    _task: Task,
) -> TaskParameters:
    """
    Return a smaller copy of a TaskParameter object.

    Remove metadata["image"] key/value pair - see issues 1237 and 1242.
    (https://github.com/fractal-analytics-platform/fractal-server/issues/1237)
    This applies only to parallel tasks with names different from the ones
    defined in `_task_needs_image_list`.
    """
    task_params_slim = deepcopy(task_params)
    if not _task_needs_image_list(_task) and _task.is_parallel:
        if "image" in task_params_slim.metadata.keys():
            task_params_slim.metadata.pop("image")
        task_params_slim.history = []
    return task_params_slim