Skip to content

workflow

Workflow

Bases: _WorkflowBaseV1, SQLModel

Workflow

Attributes:

Name Type Description
id Optional[int]

Primary key

project_id int

ID of the project the workflow belongs to.

task_list list[WorkflowTask]

List of associations to tasks.

Source code in fractal_server/app/models/v1/workflow.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class Workflow(_WorkflowBaseV1, SQLModel, table=True):
    """
    Workflow

    Attributes:
        id:
            Primary key
        project_id:
            ID of the project the workflow belongs to.
        task_list:
            List of associations to tasks.
    """

    id: Optional[int] = Field(default=None, primary_key=True)
    project_id: int = Field(foreign_key="project.id")
    project: "Project" = Relationship(  # noqa: F821
        sa_relationship_kwargs=dict(lazy="selectin"),
    )

    task_list: list[WorkflowTask] = Relationship(
        sa_relationship_kwargs=dict(
            lazy="selectin",
            order_by="WorkflowTask.order",
            collection_class=ordering_list("order"),
            cascade="all, delete-orphan",
        ),
    )
    timestamp_created: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )

    @property
    def input_type(self):
        return self.task_list[0].task.input_type

    @property
    def output_type(self):
        return self.task_list[-1].task.output_type

WorkflowTask

Bases: _WorkflowTaskBaseV1, SQLModel

A Task as part of a Workflow

This is a crossing table between Task and Workflow. In addition to the foreign keys, it allows for parameter overriding and keeps the order within the list of tasks of the workflow.

Attributes:

Name Type Description
id Optional[int]

Primary key

workflow_id int

ID of the Workflow the WorkflowTask belongs to

task_id int

ID of the task corresponding to the WorkflowTask

order Optional[int]

Positional order of the WorkflowTask in Workflow.task_list

meta Optional[dict[str, Any]]

Additional parameters useful for execution

args Optional[dict[str, Any]]

Task arguments

task Task

Task object associated with the current WorkflowTask

Source code in fractal_server/app/models/v1/workflow.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class WorkflowTask(_WorkflowTaskBaseV1, SQLModel, table=True):
    """
    A Task as part of a Workflow

    This is a crossing table between Task and Workflow. In addition to the
    foreign keys, it allows for parameter overriding and keeps the order
    within the list of tasks of the workflow.


    Attributes:
        id:
            Primary key
        workflow_id:
            ID of the `Workflow` the `WorkflowTask` belongs to
        task_id:
            ID of the task corresponding to the `WorkflowTask`
        order:
            Positional order of the `WorkflowTask` in `Workflow.task_list`
        meta:
            Additional parameters useful for execution
        args:
            Task arguments
        task:
            `Task` object associated with the current `WorkflowTask`

    """

    class Config:
        arbitrary_types_allowed = True
        fields = {"parent": {"exclude": True}}

    id: Optional[int] = Field(default=None, primary_key=True)

    workflow_id: int = Field(foreign_key="workflow.id")
    task_id: int = Field(foreign_key="task.id")
    order: Optional[int]
    meta: Optional[dict[str, Any]] = Field(sa_column=Column(JSON))
    args: Optional[dict[str, Any]] = Field(sa_column=Column(JSON))
    task: Task = Relationship(sa_relationship_kwargs=dict(lazy="selectin"))

    @validator("args")
    def validate_args(cls, value: dict = None):
        """
        Prevent fractal task reserved parameter names from entering args

        Forbidden argument names are `input_paths`, `output_path`, `metadata`,
        `component`.
        """
        if value is None:
            return
        forbidden_args_keys = {
            "input_paths",
            "output_path",
            "metadata",
            "component",
        }
        args_keys = set(value.keys())
        intersect_keys = forbidden_args_keys.intersection(args_keys)
        if intersect_keys:
            raise ValueError(
                "`args` contains the following forbidden keys: "
                f"{intersect_keys}"
            )
        return value

    @property
    def is_parallel(self) -> bool:
        return self.task.is_parallel

    @property
    def parallelization_level(self) -> Union[str, None]:
        return self.task.parallelization_level

validate_args(value=None)

Prevent fractal task reserved parameter names from entering args

Forbidden argument names are input_paths, output_path, metadata, component.

Source code in fractal_server/app/models/v1/workflow.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@validator("args")
def validate_args(cls, value: dict = None):
    """
    Prevent fractal task reserved parameter names from entering args

    Forbidden argument names are `input_paths`, `output_path`, `metadata`,
    `component`.
    """
    if value is None:
        return
    forbidden_args_keys = {
        "input_paths",
        "output_path",
        "metadata",
        "component",
    }
    args_keys = set(value.keys())
    intersect_keys = forbidden_args_keys.intersection(args_keys)
    if intersect_keys:
        raise ValueError(
            "`args` contains the following forbidden keys: "
            f"{intersect_keys}"
        )
    return value