Skip to content

common

Common utilities and routines for runner backends (public API)

This module includes utilities and routines that are of use to implement runner backends but that should also be exposed to the other components of Fractal Server.

TaskParameterEncoder

Bases: JSONEncoder

Convenience JSONEncoder that serialises Paths as strings

Source code in fractal_server/app/runner/v1/common.py
20
21
22
23
24
25
26
27
28
class TaskParameterEncoder(JSONEncoder):
    """
    Convenience JSONEncoder that serialises `Path`s as strings
    """

    def default(self, value):
        if isinstance(value, Path):
            return value.as_posix()
        return JSONEncoder.default(self, value)

TaskParameters

Bases: BaseModel

Wrapper for task input parameters

Instances of this class are used to pass parameters from the output of a task to the input of the next one.

Attributes:

Name Type Description
input_paths list[Path]

Input paths as derived by the input dataset.

output_paths list[Path]

Output path as derived from the output dataset.

metadata dict[str, Any]

Dataset metadata, as found in the input dataset or as updated by the previous task.

history list[dict[str, Any]]

Dataset history, as found in the input dataset or as updated by the previous task.

Source code in fractal_server/app/runner/v1/common.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class TaskParameters(BaseModel):
    """
    Wrapper for task input parameters

    Instances of this class are used to pass parameters from the output of a
    task to the input of the next one.

    Attributes:
        input_paths:
            Input paths as derived by the input dataset.
        output_paths:
            Output path as derived from the output dataset.
        metadata:
            Dataset metadata, as found in the input dataset or as updated by
            the previous task.
        history:
            Dataset history, as found in the input dataset or as updated by
            the previous task.
    """

    input_paths: list[Path]
    output_path: Path
    metadata: dict[str, Any]
    history: list[dict[str, Any]]

    class Config:
        arbitrary_types_allowed = True
        extra = "forbid"

validate_workflow_compatibility(*, input_dataset, workflow, output_dataset, first_task_index, last_task_index)

Check compatibility of workflow and input / ouptut dataset

Source code in fractal_server/app/runner/v1/common.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def validate_workflow_compatibility(
    *,
    input_dataset: Dataset,
    workflow: Workflow,
    output_dataset: Dataset,
    first_task_index: int,
    last_task_index: int,
) -> None:
    """
    Check compatibility of workflow and input / ouptut dataset
    """
    # Check input_dataset type
    workflow_input_type = workflow.task_list[first_task_index].task.input_type
    if (
        workflow_input_type != "Any"
        and workflow_input_type != input_dataset.type
    ):
        raise TypeError(
            f"Incompatible types `{workflow_input_type}` of workflow "
            f"`{workflow.name}` and `{input_dataset.type}` of dataset "
            f"`{input_dataset.name}`"
        )

    # Check output_dataset type
    workflow_output_type = workflow.task_list[last_task_index].task.output_type
    if (
        workflow_output_type != "Any"
        and workflow_output_type != output_dataset.type
    ):
        raise TypeError(
            f"Incompatible types `{workflow_output_type}` of workflow "
            f"`{workflow.name}` and `{output_dataset.type}` of dataset "
            f"`{output_dataset.name}`"
        )

write_args_file(*args, path)

Merge arbitrary dictionaries and write to file

Parameters:

Name Type Description Default
*args dict[str, Any]

One or more dictionaries that will be merged into one respecting the order with which they are passed in, i.e., last in overrides previous ones.

()
path Path

Destination for serialised file.

required
Source code in fractal_server/app/runner/v1/common.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def write_args_file(
    *args: dict[str, Any],
    path: Path,
):
    """
    Merge arbitrary dictionaries and write to file

    Args:
        *args:
            One or more dictionaries that will be merged into one respecting
            the order with which they are passed in, i.e., last in overrides
            previous ones.
        path:
            Destination for serialised file.
    """
    out = {}
    for d in args:
        out.update(d)

    with open(path, "w") as f:
        json.dump(out, f, cls=TaskParameterEncoder, indent=4)