Skip to content

workflowtask

WorkflowTaskImportV2

Bases: BaseModel

Source code in fractal_server/app/schemas/v2/workflowtask.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class WorkflowTaskImportV2(BaseModel):
    model_config = ConfigDict(extra="forbid")

    meta_non_parallel: Optional[dict[str, Any]] = None
    meta_parallel: Optional[dict[str, Any]] = None
    args_non_parallel: Optional[dict[str, Any]] = None
    args_parallel: Optional[dict[str, Any]] = None
    type_filters: Optional[dict[str, bool]] = None
    input_filters: Optional[dict[str, Any]] = None

    task: Union[TaskImportV2, TaskImportV2Legacy]

    # Validators
    @model_validator(mode="before")
    @classmethod
    def update_legacy_filters(cls, values: dict):
        """
        Transform legacy filters (created with fractal-server<2.11.0)
        into type filters
        """
        if values.get("input_filters") is not None:
            if "type_filters" in values.keys():
                raise ValueError(
                    "Cannot set filters both through the legacy field "
                    "('filters') and the new one ('type_filters')."
                )
            else:
                # As of 2.11.0, WorkflowTask do not have attribute filters
                # any more.
                if values["input_filters"]["attributes"] != {}:
                    raise ValueError(
                        "Cannot set attribute filters for WorkflowTasks."
                    )
                # Convert legacy filters.types into new type_filters
                values["type_filters"] = values["input_filters"].get(
                    "types", {}
                )
                values["input_filters"] = None

        return values

    _type_filters = field_validator("type_filters")(
        classmethod(validate_type_filters)
    )
    _meta_non_parallel = field_validator("meta_non_parallel")(
        classmethod(valdict_keys("meta_non_parallel"))
    )
    _meta_parallel = field_validator("meta_parallel")(
        classmethod(valdict_keys("meta_parallel"))
    )
    _args_non_parallel = field_validator("args_non_parallel")(
        classmethod(valdict_keys("args_non_parallel"))
    )
    _args_parallel = field_validator("args_parallel")(
        classmethod(valdict_keys("args_parallel"))
    )

update_legacy_filters(values) classmethod

Transform legacy filters (created with fractal-server<2.11.0) into type filters

Source code in fractal_server/app/schemas/v2/workflowtask.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@model_validator(mode="before")
@classmethod
def update_legacy_filters(cls, values: dict):
    """
    Transform legacy filters (created with fractal-server<2.11.0)
    into type filters
    """
    if values.get("input_filters") is not None:
        if "type_filters" in values.keys():
            raise ValueError(
                "Cannot set filters both through the legacy field "
                "('filters') and the new one ('type_filters')."
            )
        else:
            # As of 2.11.0, WorkflowTask do not have attribute filters
            # any more.
            if values["input_filters"]["attributes"] != {}:
                raise ValueError(
                    "Cannot set attribute filters for WorkflowTasks."
                )
            # Convert legacy filters.types into new type_filters
            values["type_filters"] = values["input_filters"].get(
                "types", {}
            )
            values["input_filters"] = None

    return values

WorkflowTaskReplaceV2

Bases: BaseModel

Used by 'replace-task' endpoint

Source code in fractal_server/app/schemas/v2/workflowtask.py
 96
 97
 98
 99
100
class WorkflowTaskReplaceV2(BaseModel):
    """Used by 'replace-task' endpoint"""

    args_non_parallel: Optional[dict[str, Any]] = None
    args_parallel: Optional[dict[str, Any]] = None

WorkflowTaskStatusTypeV2

Bases: str, Enum

Define the available values for the status of a WorkflowTask.

This model is used within the Dataset.history attribute, which is constructed in the runner and then used in the API (e.g. in the api/v2/project/{project_id}/dataset/{dataset_id}/status endpoint).

Attributes:

Name Type Description
SUBMITTED

The WorkflowTask is part of a running job.

DONE

The most-recent execution of this WorkflowTask was successful.

FAILED

The most-recent execution of this WorkflowTask failed.

Source code in fractal_server/app/schemas/v2/workflowtask.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class WorkflowTaskStatusTypeV2(str, Enum):
    """
    Define the available values for the status of a `WorkflowTask`.

    This model is used within the `Dataset.history` attribute, which is
    constructed in the runner and then used in the API (e.g. in the
    `api/v2/project/{project_id}/dataset/{dataset_id}/status` endpoint).

    Attributes:
        SUBMITTED: The `WorkflowTask` is part of a running job.
        DONE: The most-recent execution of this `WorkflowTask` was successful.
        FAILED: The most-recent execution of this `WorkflowTask` failed.
    """

    SUBMITTED = "submitted"
    DONE = "done"
    FAILED = "failed"