Skip to content

models

Note that this module is imported from fractal_server/migrations/env.py, thus we should always export all relevant database models from here or they will not be picked up by alembic.

ApplyWorkflow

Bases: _ApplyWorkflowBaseV1, SQLModel

Represent a workflow run

This table is responsible for storing the state of a workflow execution in the database.

Attributes:

Name Type Description
id Optional[int]

Primary key.

project_id Optional[int]

ID of the project the workflow belongs to, or None if the project was deleted.

input_dataset_id Optional[int]

ID of the input dataset, or None if the dataset was deleted.

output_dataset_id Optional[int]

ID of the output dataset, or None if the dataset was deleted.

workflow_id Optional[int]

ID of the workflow being applied, or None if the workflow was deleted.

status str

Job status

workflow_dump dict[str, Any]

Copy of the submitted workflow at submission.

input_dataset_dump dict[str, Any]

Copy of the input_dataset at submission.

output_dataset_dump dict[str, Any]

Copy of the output_dataset at submission.

start_timestamp datetime

Timestamp of when the run began.

end_timestamp Optional[datetime]

Timestamp of when the run ended or failed.

status str

Status of the run.

log Optional[str]

Forward of the workflow logs.

user_email str

Email address of the user who submitted the job.

slurm_account Optional[str]

Account to be used when submitting the job to SLURM (see "account" option in sbatch documentation).

first_task_index int
last_task_index int
Source code in fractal_server/app/models/v1/job.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class ApplyWorkflow(_ApplyWorkflowBaseV1, SQLModel, table=True):
    """
    Represent a workflow run

    This table is responsible for storing the state of a workflow execution in
    the database.

    Attributes:
        id:
            Primary key.
        project_id:
            ID of the project the workflow belongs to, or `None` if the project
            was deleted.
        input_dataset_id:
            ID of the input dataset, or `None` if the dataset was deleted.
        output_dataset_id:
            ID of the output dataset, or `None` if the dataset was deleted.
        workflow_id:
            ID of the workflow being applied, or `None` if the workflow was
            deleted.
        status:
            Job status
        workflow_dump:
            Copy of the submitted workflow at submission.
        input_dataset_dump:
            Copy of the input_dataset at submission.
        output_dataset_dump:
            Copy of the output_dataset at submission.
        start_timestamp:
            Timestamp of when the run began.
        end_timestamp:
            Timestamp of when the run ended or failed.
        status:
            Status of the run.
        log:
            Forward of the workflow logs.
        user_email:
            Email address of the user who submitted the job.
        slurm_account:
            Account to be used when submitting the job to SLURM (see "account"
            option in [`sbatch`
            documentation](https://slurm.schedmd.com/sbatch.html#SECTION_OPTIONS)).
        first_task_index:
        last_task_index:
    """

    class Config:
        arbitrary_types_allowed = True

    id: Optional[int] = Field(default=None, primary_key=True)

    project_id: Optional[int] = Field(foreign_key="project.id")
    workflow_id: Optional[int] = Field(foreign_key="workflow.id")
    input_dataset_id: Optional[int] = Field(foreign_key="dataset.id")
    output_dataset_id: Optional[int] = Field(foreign_key="dataset.id")

    user_email: str = Field(nullable=False)
    slurm_account: Optional[str]

    input_dataset_dump: dict[str, Any] = Field(
        sa_column=Column(JSON, nullable=False)
    )
    output_dataset_dump: dict[str, Any] = Field(
        sa_column=Column(JSON, nullable=False)
    )
    workflow_dump: dict[str, Any] = Field(
        sa_column=Column(JSON, nullable=False)
    )
    project_dump: dict[str, Any] = Field(
        sa_column=Column(JSON, nullable=False)
    )

    working_dir: Optional[str]
    working_dir_user: Optional[str]
    first_task_index: int
    last_task_index: int

    start_timestamp: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )
    end_timestamp: Optional[datetime] = Field(
        default=None, sa_column=Column(DateTime(timezone=True))
    )
    status: str = JobStatusTypeV1.SUBMITTED
    log: Optional[str] = None

Dataset

Bases: _DatasetBaseV1, SQLModel

Represent a dataset

Attributes:

Name Type Description
id Optional[int]

Primary key

project_id int

ID of the project the workflow belongs to.

meta dict[str, Any]

Metadata of the Dataset

history list[dict[str, Any]]

History of the Dataset

resource_list list[Resource]

(Mapper attribute)

Source code in fractal_server/app/models/v1/dataset.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class Dataset(_DatasetBaseV1, SQLModel, table=True):
    """
    Represent a dataset

    Attributes:
        id:
            Primary key
        project_id:
            ID of the project the workflow belongs to.
        meta:
            Metadata of the Dataset
        history:
            History of the Dataset
        resource_list:
            (Mapper attribute)

    """

    id: Optional[int] = Field(default=None, primary_key=True)
    project_id: int = Field(foreign_key="project.id")
    project: "Project" = Relationship(  # noqa: F821
        sa_relationship_kwargs=dict(lazy="selectin"),
    )

    resource_list: list[Resource] = Relationship(
        sa_relationship_kwargs={
            "lazy": "selectin",
            "order_by": "Resource.id",
            "collection_class": ordering_list("id"),
            "cascade": "all, delete-orphan",
        }
    )

    meta: dict[str, Any] = Field(sa_column=Column(JSON), default={})
    history: list[dict[str, Any]] = Field(
        sa_column=Column(JSON, server_default="[]", nullable=False)
    )

    timestamp_created: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )

    class Config:
        arbitrary_types_allowed = True

    @property
    def paths(self) -> list[str]:
        return [r.path for r in self.resource_list]

JobStatusTypeV1

Bases: str, Enum

Define the available job statuses

Attributes:

Name Type Description
SUBMITTED

The job was created. This does not guarantee that it was also submitted to an executor (e.g. other errors could have prevented this), nor that it is actually running (e.g. SLURM jobs could be still in the queue).

DONE

The job successfully reached its end.

FAILED

The workflow terminated with an error.

Source code in fractal_server/app/schemas/v1/applyworkflow.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class JobStatusTypeV1(str, Enum):
    """
    Define the available job statuses

    Attributes:
        SUBMITTED:
            The job was created. This does not guarantee that it was also
            submitted to an executor (e.g. other errors could have prevented
            this), nor that it is actually running (e.g. SLURM jobs could be
            still in the queue).
        DONE:
            The job successfully reached its end.
        FAILED:
            The workflow terminated with an error.
    """

    SUBMITTED = "submitted"
    DONE = "done"
    FAILED = "failed"

OAuthAccount

Bases: SQLModel

ORM model for OAuth accounts (oauthaccount database table).

This class is based on fastapi_users_db_sqlmodel::SQLModelBaseOAuthAccount. Original Copyright: 2021 François Voron, released under MIT licence.

Attributes:

Name Type Description
id Optional[int]
user_id int
user Optional[UserOAuth]
oauth_name str
access_token str
expires_at Optional[int]
refresh_token Optional[str]
account_id str
account_email str
Source code in fractal_server/app/models/security.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class OAuthAccount(SQLModel, table=True):
    """
    ORM model for OAuth accounts (`oauthaccount` database table).

    This class is based on fastapi_users_db_sqlmodel::SQLModelBaseOAuthAccount.
    Original Copyright: 2021 François Voron, released under MIT licence.

    Attributes:
        id:
        user_id:
        user:
        oauth_name:
        access_token:
        expires_at:
        refresh_token:
        account_id:
        account_email:
    """

    __tablename__ = "oauthaccount"

    id: Optional[int] = Field(default=None, primary_key=True)
    user_id: int = Field(foreign_key="user_oauth.id", nullable=False)
    user: Optional["UserOAuth"] = Relationship(back_populates="oauth_accounts")
    oauth_name: str = Field(index=True, nullable=False)
    access_token: str = Field(nullable=False)
    expires_at: Optional[int] = Field(nullable=True)
    refresh_token: Optional[str] = Field(nullable=True)
    account_id: str = Field(index=True, nullable=False)
    account_email: str = Field(nullable=False)

    class Config:
        orm_mode = True

State

Bases: SQLModel

Store arbitrary data in the database

This table is just a state interchange that allows the system to store arbitrary data for later retrieval. This is particuarly important for long background tasks, in which it is not possible to return a meaningful response to the client within a single request lifespan.

Attributes:

Name Type Description
id Optional[int]

Primary key

data dict[str, Any]

Content of the State

timestamp datetime

Timestap of the State

Source code in fractal_server/app/models/v1/state.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class State(SQLModel, table=True):
    """
    Store arbitrary data in the database

    This table is just a state interchange that allows the system to store
    arbitrary data for later retrieval. This is particuarly important for long
    background tasks, in which it is not possible to return a meaningful
    response to the client within a single request lifespan.

    Attributes:
        id: Primary key
        data: Content of the `State`
        timestamp: Timestap of the `State`
    """

    id: Optional[int] = Field(default=None, primary_key=True)
    data: dict[str, Any] = Field(sa_column=Column(JSON), default={})
    timestamp: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True)),
    )

Task

Bases: _TaskBaseV1, SQLModel

Task model

Attributes:

Name Type Description
id Optional[int]

Primary key

command str

Executable command

input_type str

Expected type of input Dataset

output_type str

Expected type of output Dataset

meta Optional[dict[str, Any]]

Additional metadata related to execution (e.g. computational resources)

source str

inherited from _TaskBase

name str

inherited from _TaskBase

args_schema Optional[dict[str, Any]]

JSON schema of task arguments

args_schema_version Optional[str]

label pointing at how the JSON schema of task arguments was generated

Source code in fractal_server/app/models/v1/task.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class Task(_TaskBaseV1, SQLModel, table=True):
    """
    Task model

    Attributes:
        id: Primary key
        command: Executable command
        input_type: Expected type of input `Dataset`
        output_type: Expected type of output `Dataset`
        meta:
            Additional metadata related to execution (e.g. computational
            resources)
        source: inherited from `_TaskBase`
        name: inherited from `_TaskBase`
        args_schema: JSON schema of task arguments
        args_schema_version:
            label pointing at how the JSON schema of task arguments was
            generated
    """

    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    command: str
    source: str = Field(unique=True)
    input_type: str
    output_type: str
    meta: Optional[dict[str, Any]] = Field(sa_column=Column(JSON), default={})
    owner: Optional[str] = None
    version: Optional[str] = None
    args_schema: Optional[dict[str, Any]] = Field(
        sa_column=Column(JSON), default=None
    )
    args_schema_version: Optional[str]
    docs_info: Optional[str] = None
    docs_link: Optional[HttpUrl] = None

    @property
    def parallelization_level(self) -> Optional[str]:
        try:
            return self.meta["parallelization_level"]
        except KeyError:
            return None

    @property
    def is_parallel(self) -> bool:
        return bool(self.parallelization_level)

    @property
    def default_args_from_args_schema(self) -> dict[str, Any]:
        """
        Extract default arguments from args_schema
        """
        # Return {} if there is no args_schema
        if self.args_schema is None:
            return {}
        # Try to construct default_args
        try:
            default_args = {}
            properties = self.args_schema["properties"]
            for prop_name, prop_schema in properties.items():
                default_value = prop_schema.get("default", None)
                if default_value is not None:
                    default_args[prop_name] = default_value
            return default_args
        except KeyError as e:
            logging.warning(
                "Cannot set default_args from args_schema="
                f"{json.dumps(self.args_schema)}\n"
                f"Original KeyError: {str(e)}"
            )
            return {}

default_args_from_args_schema: dict[str, Any] property

Extract default arguments from args_schema

TaskGroupV2

Bases: SQLModel

Source code in fractal_server/app/models/v2/task_group.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class TaskGroupV2(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    task_list: list[TaskV2] = Relationship(
        sa_relationship_kwargs=dict(
            lazy="selectin", cascade="all, delete-orphan"
        ),
    )

    user_id: int = Field(foreign_key="user_oauth.id")
    user_group_id: Optional[int] = Field(foreign_key="usergroup.id")

    origin: str
    pkg_name: str
    version: Optional[str] = None
    python_version: Optional[str] = None
    path: Optional[str] = None
    wheel_path: Optional[str] = None
    pip_extras: Optional[str] = None
    pinned_package_versions: dict[str, str] = Field(
        sa_column=Column(
            JSON,
            server_default="{}",
            default={},
            nullable=True,
        ),
    )
    pip_freeze: Optional[str] = None
    venv_path: Optional[str] = None
    venv_size_in_kB: Optional[int] = None
    venv_file_number: Optional[int] = None

    active: bool = True
    timestamp_created: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )

    @property
    def pip_install_string(self) -> str:
        """
        Prepare string to be used in `python -m pip install`.
        """
        extras = f"[{self.pip_extras}]" if self.pip_extras is not None else ""

        if self.wheel_path is not None:
            return f"{self.wheel_path}{extras}"
        else:
            if self.version is None:
                raise ValueError(
                    "Cannot run `pip_install_string` with "
                    f"{self.pkg_name=}, {self.wheel_path=}, {self.version=}."
                )
            return f"{self.pkg_name}{extras}=={self.version}"

    @property
    def pinned_package_versions_string(self) -> str:
        """
        Prepare string to be used in `python -m pip install`.
        """
        if self.pinned_package_versions is None:
            return ""
        output = " ".join(
            [
                f"{key}=={value}"
                for key, value in self.pinned_package_versions.items()
            ]
        )
        return output

pinned_package_versions_string: str property

Prepare string to be used in python -m pip install.

pip_install_string: str property

Prepare string to be used in python -m pip install.

UserGroup

Bases: SQLModel

ORM model for the usergroup database table.

Attributes:

Name Type Description
id Optional[int]

ID of the group

name str

Name of the group

timestamp_created datetime

Time of creation

Source code in fractal_server/app/models/security.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class UserGroup(SQLModel, table=True):
    """
    ORM model for the `usergroup` database table.

    Attributes:
        id: ID of the group
        name: Name of the group
        timestamp_created: Time of creation
    """

    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(unique=True)
    timestamp_created: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )
    viewer_paths: list[str] = Field(
        sa_column=Column(JSON, server_default="[]", nullable=False)
    )

UserOAuth

Bases: SQLModel

ORM model for the user_oauth database table.

This class is a modification of SQLModelBaseUserDB from from fastapi_users_db_sqlmodel. Original Copyright: 2022 François Voron, released under MIT licence.

Attributes:

Name Type Description
id Optional[int]
email EmailStr
hashed_password str
is_active bool
is_superuser bool
is_verified bool
slurm_user bool
slurm_accounts bool
cache_dir bool
username Optional[str]
oauth_accounts list[OAuthAccount]
Source code in fractal_server/app/models/security.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class UserOAuth(SQLModel, table=True):
    """
    ORM model for the `user_oauth` database table.

    This class is a modification of SQLModelBaseUserDB from from
    fastapi_users_db_sqlmodel. Original Copyright: 2022 François Voron,
    released under MIT licence.

    Attributes:
        id:
        email:
        hashed_password:
        is_active:
        is_superuser:
        is_verified:
        slurm_user:
        slurm_accounts:
        cache_dir:
        username:
        oauth_accounts:
    """

    __tablename__ = "user_oauth"

    id: Optional[int] = Field(default=None, primary_key=True)

    email: EmailStr = Field(
        sa_column_kwargs={"unique": True, "index": True}, nullable=False
    )
    hashed_password: str
    is_active: bool = Field(True, nullable=False)
    is_superuser: bool = Field(False, nullable=False)
    is_verified: bool = Field(False, nullable=False)

    username: Optional[str]

    oauth_accounts: list["OAuthAccount"] = Relationship(
        back_populates="user",
        sa_relationship_kwargs={"lazy": "joined", "cascade": "all, delete"},
    )

    user_settings_id: Optional[int] = Field(
        foreign_key="user_settings.id", default=None
    )
    settings: Optional[UserSettings] = Relationship(
        sa_relationship_kwargs=dict(lazy="selectin", cascade="all, delete")
    )

    class Config:
        orm_mode = True

Workflow

Bases: _WorkflowBaseV1, SQLModel

Workflow

Attributes:

Name Type Description
id Optional[int]

Primary key

project_id int

ID of the project the workflow belongs to.

task_list list[WorkflowTask]

List of associations to tasks.

Source code in fractal_server/app/models/v1/workflow.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class Workflow(_WorkflowBaseV1, SQLModel, table=True):
    """
    Workflow

    Attributes:
        id:
            Primary key
        project_id:
            ID of the project the workflow belongs to.
        task_list:
            List of associations to tasks.
    """

    id: Optional[int] = Field(default=None, primary_key=True)
    project_id: int = Field(foreign_key="project.id")
    project: "Project" = Relationship(  # noqa: F821
        sa_relationship_kwargs=dict(lazy="selectin"),
    )

    task_list: list[WorkflowTask] = Relationship(
        sa_relationship_kwargs=dict(
            lazy="selectin",
            order_by="WorkflowTask.order",
            collection_class=ordering_list("order"),
            cascade="all, delete-orphan",
        ),
    )
    timestamp_created: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )

    @property
    def input_type(self):
        return self.task_list[0].task.input_type

    @property
    def output_type(self):
        return self.task_list[-1].task.output_type

WorkflowTask

Bases: _WorkflowTaskBaseV1, SQLModel

A Task as part of a Workflow

This is a crossing table between Task and Workflow. In addition to the foreign keys, it allows for parameter overriding and keeps the order within the list of tasks of the workflow.

Attributes:

Name Type Description
id Optional[int]

Primary key

workflow_id int

ID of the Workflow the WorkflowTask belongs to

task_id int

ID of the task corresponding to the WorkflowTask

order Optional[int]

Positional order of the WorkflowTask in Workflow.task_list

meta Optional[dict[str, Any]]

Additional parameters useful for execution

args Optional[dict[str, Any]]

Task arguments

task Task

Task object associated with the current WorkflowTask

Source code in fractal_server/app/models/v1/workflow.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class WorkflowTask(_WorkflowTaskBaseV1, SQLModel, table=True):
    """
    A Task as part of a Workflow

    This is a crossing table between Task and Workflow. In addition to the
    foreign keys, it allows for parameter overriding and keeps the order
    within the list of tasks of the workflow.


    Attributes:
        id:
            Primary key
        workflow_id:
            ID of the `Workflow` the `WorkflowTask` belongs to
        task_id:
            ID of the task corresponding to the `WorkflowTask`
        order:
            Positional order of the `WorkflowTask` in `Workflow.task_list`
        meta:
            Additional parameters useful for execution
        args:
            Task arguments
        task:
            `Task` object associated with the current `WorkflowTask`

    """

    class Config:
        arbitrary_types_allowed = True
        fields = {"parent": {"exclude": True}}

    id: Optional[int] = Field(default=None, primary_key=True)

    workflow_id: int = Field(foreign_key="workflow.id")
    task_id: int = Field(foreign_key="task.id")
    order: Optional[int]
    meta: Optional[dict[str, Any]] = Field(sa_column=Column(JSON))
    args: Optional[dict[str, Any]] = Field(sa_column=Column(JSON))
    task: Task = Relationship(sa_relationship_kwargs=dict(lazy="selectin"))

    @validator("args")
    def validate_args(cls, value: dict = None):
        """
        Prevent fractal task reserved parameter names from entering args

        Forbidden argument names are `input_paths`, `output_path`, `metadata`,
        `component`.
        """
        if value is None:
            return
        forbidden_args_keys = {
            "input_paths",
            "output_path",
            "metadata",
            "component",
        }
        args_keys = set(value.keys())
        intersect_keys = forbidden_args_keys.intersection(args_keys)
        if intersect_keys:
            raise ValueError(
                "`args` contains the following forbidden keys: "
                f"{intersect_keys}"
            )
        return value

    @property
    def is_parallel(self) -> bool:
        return self.task.is_parallel

    @property
    def parallelization_level(self) -> Union[str, None]:
        return self.task.parallelization_level

validate_args(value=None)

Prevent fractal task reserved parameter names from entering args

Forbidden argument names are input_paths, output_path, metadata, component.

Source code in fractal_server/app/models/v1/workflow.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@validator("args")
def validate_args(cls, value: dict = None):
    """
    Prevent fractal task reserved parameter names from entering args

    Forbidden argument names are `input_paths`, `output_path`, `metadata`,
    `component`.
    """
    if value is None:
        return
    forbidden_args_keys = {
        "input_paths",
        "output_path",
        "metadata",
        "component",
    }
    args_keys = set(value.keys())
    intersect_keys = forbidden_args_keys.intersection(args_keys)
    if intersect_keys:
        raise ValueError(
            "`args` contains the following forbidden keys: "
            f"{intersect_keys}"
        )
    return value

get_timestamp()

Get timezone aware timestamp.

Source code in fractal_server/utils.py
28
29
30
31
32
def get_timestamp() -> datetime:
    """
    Get timezone aware timestamp.
    """
    return datetime.now(tz=timezone.utc)