Skip to content

workflowtask

WorkflowTaskImportV2

Bases: BaseModel

Source code in fractal_server/app/schemas/v2/workflowtask.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class WorkflowTaskImportV2(BaseModel):
    model_config = ConfigDict(extra="forbid")

    meta_non_parallel: Optional[dict[str, Any]] = None
    meta_parallel: Optional[dict[str, Any]] = None
    args_non_parallel: Optional[dict[str, Any]] = None
    args_parallel: Optional[dict[str, Any]] = None
    type_filters: Optional[dict[str, bool]] = None
    input_filters: Optional[dict[str, Any]] = None

    task: Union[TaskImportV2, TaskImportV2Legacy]

    # Validators
    @model_validator(mode="before")
    @classmethod
    def update_legacy_filters(cls, values: dict):
        """
        Transform legacy filters (created with fractal-server<2.11.0)
        into type filters
        """
        if values.get("input_filters") is not None:
            if "type_filters" in values.keys():
                raise ValueError(
                    "Cannot set filters both through the legacy field "
                    "('filters') and the new one ('type_filters')."
                )
            else:
                # As of 2.11.0, WorkflowTask do not have attribute filters
                # any more.
                if values["input_filters"]["attributes"] != {}:
                    raise ValueError(
                        "Cannot set attribute filters for WorkflowTasks."
                    )
                # Convert legacy filters.types into new type_filters
                values["type_filters"] = values["input_filters"].get(
                    "types", {}
                )
                values["input_filters"] = None

        return values

    _type_filters = field_validator("type_filters")(
        classmethod(validate_type_filters)
    )
    _meta_non_parallel = field_validator("meta_non_parallel")(
        classmethod(valdict_keys("meta_non_parallel"))
    )
    _meta_parallel = field_validator("meta_parallel")(
        classmethod(valdict_keys("meta_parallel"))
    )
    _args_non_parallel = field_validator("args_non_parallel")(
        classmethod(valdict_keys("args_non_parallel"))
    )
    _args_parallel = field_validator("args_parallel")(
        classmethod(valdict_keys("args_parallel"))
    )

update_legacy_filters(values) classmethod

Transform legacy filters (created with fractal-server<2.11.0) into type filters

Source code in fractal_server/app/schemas/v2/workflowtask.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@model_validator(mode="before")
@classmethod
def update_legacy_filters(cls, values: dict):
    """
    Transform legacy filters (created with fractal-server<2.11.0)
    into type filters
    """
    if values.get("input_filters") is not None:
        if "type_filters" in values.keys():
            raise ValueError(
                "Cannot set filters both through the legacy field "
                "('filters') and the new one ('type_filters')."
            )
        else:
            # As of 2.11.0, WorkflowTask do not have attribute filters
            # any more.
            if values["input_filters"]["attributes"] != {}:
                raise ValueError(
                    "Cannot set attribute filters for WorkflowTasks."
                )
            # Convert legacy filters.types into new type_filters
            values["type_filters"] = values["input_filters"].get(
                "types", {}
            )
            values["input_filters"] = None

    return values

WorkflowTaskReplaceV2

Bases: BaseModel

Used by 'replace-task' endpoint

Source code in fractal_server/app/schemas/v2/workflowtask.py
77
78
79
80
81
class WorkflowTaskReplaceV2(BaseModel):
    """Used by 'replace-task' endpoint"""

    args_non_parallel: Optional[dict[str, Any]] = None
    args_parallel: Optional[dict[str, Any]] = None