Skip to content

v2

v2 models module

AccountingRecord

Bases: SQLModel

AccountingRecord table.

Source code in fractal_server/app/models/v2/accounting.py
13
14
15
16
17
18
19
20
21
22
23
24
25
class AccountingRecord(SQLModel, table=True):
    """
    AccountingRecord table.
    """

    id: int | None = Field(default=None, primary_key=True)
    user_id: int = Field(foreign_key="user_oauth.id", nullable=False)
    timestamp: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )
    num_tasks: int
    num_new_images: int

AccountingRecordSlurm

Bases: SQLModel

AccountingRecordSlurm table.

Source code in fractal_server/app/models/v2/accounting.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class AccountingRecordSlurm(SQLModel, table=True):
    """
    AccountingRecordSlurm table.
    """

    id: int | None = Field(default=None, primary_key=True)
    user_id: int = Field(foreign_key="user_oauth.id", nullable=False)
    timestamp: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )
    slurm_job_ids: list[int] = Field(
        default_factory=list,
        sa_column=Column(ARRAY(Integer)),
    )

DatasetV2

Bases: SQLModel

Dataset table.

Source code in fractal_server/app/models/v2/dataset.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class DatasetV2(SQLModel, table=True):
    """
    Dataset table.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: int | None = Field(default=None, primary_key=True)
    name: str

    project_id: int = Field(foreign_key="projectv2.id", ondelete="CASCADE")
    project: "ProjectV2" = Relationship(  # noqa: F821
        sa_relationship_kwargs=dict(lazy="selectin"),
    )

    history: list[dict[str, Any]] = Field(
        sa_column=Column(JSONB, server_default="[]", nullable=False)
    )

    timestamp_created: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )

    zarr_dir: str
    images: list[dict[str, Any]] = Field(
        sa_column=Column(JSONB, server_default="[]", nullable=False)
    )

    @property
    def image_zarr_urls(self) -> list[str]:
        return [image["zarr_url"] for image in self.images]

HistoryImageCache

Bases: SQLModel

HistoryImageCache table.

Source code in fractal_server/app/models/v2/history.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class HistoryImageCache(SQLModel, table=True):
    """
    HistoryImageCache table.
    """

    zarr_url: str = Field(primary_key=True)
    dataset_id: int = Field(
        primary_key=True,
        foreign_key="datasetv2.id",
        ondelete="CASCADE",
        index=True,
    )
    workflowtask_id: int = Field(
        primary_key=True,
        foreign_key="workflowtaskv2.id",
        ondelete="CASCADE",
        index=True,
    )

    latest_history_unit_id: int = Field(
        foreign_key="historyunit.id",
        ondelete="CASCADE",
    )

HistoryRun

Bases: SQLModel

HistoryRun table.

Source code in fractal_server/app/models/v2/history.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class HistoryRun(SQLModel, table=True):
    """
    HistoryRun table.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: int | None = Field(default=None, primary_key=True)
    dataset_id: int = Field(
        foreign_key="datasetv2.id",
        ondelete="CASCADE",
    )
    workflowtask_id: int | None = Field(
        foreign_key="workflowtaskv2.id",
        default=None,
        ondelete="SET NULL",
    )
    job_id: int = Field(foreign_key="jobv2.id")
    task_id: int | None = Field(foreign_key="taskv2.id", ondelete="SET NULL")

    workflowtask_dump: dict[str, Any] = Field(
        sa_column=Column(JSONB, nullable=False),
    )
    task_group_dump: dict[str, Any] = Field(
        sa_column=Column(JSONB, nullable=False),
    )

    timestamp_started: datetime = Field(
        sa_column=Column(DateTime(timezone=True), nullable=False),
        default_factory=get_timestamp,
    )
    status: str
    num_available_images: int

HistoryUnit

Bases: SQLModel

HistoryUnit table.

Source code in fractal_server/app/models/v2/history.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class HistoryUnit(SQLModel, table=True):
    """
    HistoryUnit table.
    """

    id: int | None = Field(default=None, primary_key=True)
    history_run_id: int = Field(
        foreign_key="historyrun.id",
        ondelete="CASCADE",
    )

    logfile: str
    status: str
    zarr_urls: list[str] = Field(
        sa_column=Column(ARRAY(String)),
        default_factory=list,
    )

JobV2

Bases: SQLModel

Job table.

Source code in fractal_server/app/models/v2/job.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class JobV2(SQLModel, table=True):
    """
    Job table.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: int | None = Field(default=None, primary_key=True)
    project_id: int | None = Field(
        foreign_key="projectv2.id", default=None, ondelete="SET NULL"
    )
    workflow_id: int | None = Field(
        foreign_key="workflowv2.id", default=None, ondelete="SET NULL"
    )
    dataset_id: int | None = Field(
        foreign_key="datasetv2.id", default=None, ondelete="SET NULL"
    )

    user_email: str = Field(nullable=False)
    slurm_account: str | None = None

    dataset_dump: dict[str, Any] = Field(
        sa_column=Column(JSONB, nullable=False)
    )
    workflow_dump: dict[str, Any] = Field(
        sa_column=Column(JSONB, nullable=False)
    )
    project_dump: dict[str, Any] = Field(
        sa_column=Column(JSONB, nullable=False)
    )

    worker_init: str | None = None
    working_dir: str | None = None
    working_dir_user: str | None = None
    first_task_index: int
    last_task_index: int

    start_timestamp: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )
    end_timestamp: datetime | None = Field(
        default=None, sa_column=Column(DateTime(timezone=True))
    )
    status: str = JobStatusTypeV2.SUBMITTED
    log: str | None = None
    executor_error_log: str | None = None

    attribute_filters: dict[str, list[int | float | str | bool]] = Field(
        sa_column=Column(JSONB, nullable=False, server_default="{}")
    )
    type_filters: dict[str, bool] = Field(
        sa_column=Column(JSONB, nullable=False, server_default="{}")
    )

LinkUserProjectV2

Bases: SQLModel

Crossing table between User and ProjectV2

Source code in fractal_server/app/models/linkuserproject.py
 5
 6
 7
 8
 9
10
11
class LinkUserProjectV2(SQLModel, table=True):
    """
    Crossing table between User and ProjectV2
    """

    project_id: int = Field(foreign_key="projectv2.id", primary_key=True)
    user_id: int = Field(foreign_key="user_oauth.id", primary_key=True)

Profile

Bases: SQLModel

Profile table.

Source code in fractal_server/app/models/v2/profile.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class Profile(SQLModel, table=True):
    """
    Profile table.
    """

    id: int | None = Field(default=None, primary_key=True)

    resource_id: int = Field(foreign_key="resource.id", ondelete="RESTRICT")

    resource_type: str
    """
    Type of resource (either `local`, `slurm_sudo` or `slurm_ssh`).
    """

    name: str = Field(unique=True)
    """
    Profile name.
    """

    username: str | None = None
    """
    Username to be impersonated, either via `sudo -u` or via `ssh`.
    """

    ssh_key_path: str | None = None
    """
    Path to the private SSH key of user `username` (only relevant if
    `resource_type="slurm_ssh"`).
    """

    jobs_remote_dir: str | None = None
    """
    Remote path of the job folder (only relevant if
    `resource_type="slurm_ssh"`).
    """

    tasks_remote_dir: str | None = None
    """
    Remote path of the task folder (only relevant if
    `resource_type="slurm_ssh"`).
    """

jobs_remote_dir = None class-attribute instance-attribute

Remote path of the job folder (only relevant if resource_type="slurm_ssh").

name = Field(unique=True) class-attribute instance-attribute

Profile name.

resource_type instance-attribute

Type of resource (either local, slurm_sudo or slurm_ssh).

ssh_key_path = None class-attribute instance-attribute

Path to the private SSH key of user username (only relevant if resource_type="slurm_ssh").

tasks_remote_dir = None class-attribute instance-attribute

Remote path of the task folder (only relevant if resource_type="slurm_ssh").

username = None class-attribute instance-attribute

Username to be impersonated, either via sudo -u or via ssh.

ProjectV2

Bases: SQLModel

Project table.

Source code in fractal_server/app/models/v2/project.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ProjectV2(SQLModel, table=True):
    """
    Project table.
    """

    id: int | None = Field(default=None, primary_key=True)
    name: str

    resource_id: int = Field(foreign_key="resource.id", ondelete="RESTRICT")
    timestamp_created: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )

    user_list: list[UserOAuth] = Relationship(
        link_model=LinkUserProjectV2,
        sa_relationship_kwargs={
            "lazy": "selectin",
        },
    )

Resource

Bases: SQLModel

Resource table.

Source code in fractal_server/app/models/v2/resource.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class Resource(SQLModel, table=True):
    """
    Resource table.
    """

    id: int | None = Field(default=None, primary_key=True)

    type: str
    """
    One of `local`, `slurm_sudo` or `slurm_ssh` - matching with
    `settings.FRACTAL_RUNNER_BACKEND`.
    """

    name: str = Field(unique=True)
    """
    Resource name.
    """

    timestamp_created: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )
    """
    Creation timestamp (autogenerated).
    """

    host: str | None = None
    """
    Address for ssh connections, when `type="slurm_ssh"`.
    """

    jobs_local_dir: str
    """
    Base local folder for job subfolders (containing artifacts and logs).
    """

    jobs_runner_config: dict[str, Any] = Field(
        sa_column=Column(JSONB, nullable=False, server_default="{}")
    )
    """
    Runner configuration, matching one of `JobRunnerConfigLocal` or
    `JobRunnerConfigSLURM` schemas.
    """

    jobs_slurm_python_worker: str | None = None
    """
    On SLURM deloyments, this is the Python interpreter that runs the
    `fractal-server` worker from within the SLURM jobs.
    """

    jobs_poll_interval: int
    """
    On SLURM resources: the interval to wait before new `squeue` calls.
    On local resources: ignored.
    """

    # task_settings
    tasks_local_dir: str
    """
    Base local folder for task-package subfolders.
    """

    tasks_python_config: dict[str, Any] = Field(
        sa_column=Column(JSONB, nullable=False, server_default="{}")
    )
    """
    Python configuration for task collection. Example:
    ```json
    {
      "default_version": "3.10",
      "versions:{
        "3.10": "/xxx/venv-3.10/bin/python",
        "3.11": "/xxx/venv-3.11/bin/python",
        "3.12": "/xxx/venv-3.12/bin/python"
       }
    }
    ```
    """

    tasks_pixi_config: dict[str, Any] = Field(
        sa_column=Column(JSONB, nullable=False, server_default="{}")
    )
    """
    Pixi configuration for task collection. Basic example:
    ```json
    {
        "default_version": "0.41.0",
        "versions": {
            "0.40.0": "/xxx/pixi/0.40.0/",
            "0.41.0": "/xxx/pixi/0.41.0/"
        },
    }
    ```
    """

    @property
    def pip_cache_dir_arg(self: Self) -> str:
        """
        If `pip_cache_dir` is set (in `self.tasks_python_config`), then
        return `--cache_dir /something`; else return `--no-cache-dir`.
        """
        _pip_cache_dir = self.tasks_python_config.get("pip_cache_dir", None)
        if _pip_cache_dir is not None:
            return f"--cache-dir {_pip_cache_dir}"
        else:
            return "--no-cache-dir"

    # Check constraints
    __table_args__ = (
        # `type` column must be one of "local", "slurm_sudo" or "slurm_ssh"
        CheckConstraint(
            "type IN ('local', 'slurm_sudo', 'slurm_ssh')",
            name="correct_type",
        ),
        # If `type` is not "local", `jobs_slurm_python_worker` must be set
        CheckConstraint(
            "(type = 'local') OR (jobs_slurm_python_worker IS NOT NULL)",
            name="jobs_slurm_python_worker_set",
        ),
    )

host = None class-attribute instance-attribute

Address for ssh connections, when type="slurm_ssh".

jobs_local_dir instance-attribute

Base local folder for job subfolders (containing artifacts and logs).

jobs_poll_interval instance-attribute

On SLURM resources: the interval to wait before new squeue calls. On local resources: ignored.

jobs_runner_config = Field(sa_column=(Column(JSONB, nullable=False, server_default='{}'))) class-attribute instance-attribute

Runner configuration, matching one of JobRunnerConfigLocal or JobRunnerConfigSLURM schemas.

jobs_slurm_python_worker = None class-attribute instance-attribute

On SLURM deloyments, this is the Python interpreter that runs the fractal-server worker from within the SLURM jobs.

name = Field(unique=True) class-attribute instance-attribute

Resource name.

pip_cache_dir_arg property

If pip_cache_dir is set (in self.tasks_python_config), then return --cache_dir /something; else return --no-cache-dir.

tasks_local_dir instance-attribute

Base local folder for task-package subfolders.

tasks_pixi_config = Field(sa_column=(Column(JSONB, nullable=False, server_default='{}'))) class-attribute instance-attribute

Pixi configuration for task collection. Basic example:

{
    "default_version": "0.41.0",
    "versions": {
        "0.40.0": "/xxx/pixi/0.40.0/",
        "0.41.0": "/xxx/pixi/0.41.0/"
    },
}

tasks_python_config = Field(sa_column=(Column(JSONB, nullable=False, server_default='{}'))) class-attribute instance-attribute

Python configuration for task collection. Example:

{
  "default_version": "3.10",
  "versions:{
    "3.10": "/xxx/venv-3.10/bin/python",
    "3.11": "/xxx/venv-3.11/bin/python",
    "3.12": "/xxx/venv-3.12/bin/python"
   }
}

timestamp_created = Field(default_factory=get_timestamp, sa_column=(Column(DateTime(timezone=True), nullable=False))) class-attribute instance-attribute

Creation timestamp (autogenerated).

type instance-attribute

One of local, slurm_sudo or slurm_ssh - matching with settings.FRACTAL_RUNNER_BACKEND.

TaskGroupV2

Bases: SQLModel

Source code in fractal_server/app/models/v2/task_group.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class TaskGroupV2(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    task_list: list[TaskV2] = Relationship(
        sa_relationship_kwargs=dict(
            lazy="selectin", cascade="all, delete-orphan"
        ),
    )

    user_id: int = Field(foreign_key="user_oauth.id")
    user_group_id: int | None = Field(
        foreign_key="usergroup.id", default=None, ondelete="SET NULL"
    )
    resource_id: int = Field(foreign_key="resource.id", ondelete="RESTRICT")

    origin: str
    pkg_name: str
    version: str | None = None
    python_version: str | None = None
    pixi_version: str | None = None
    path: str | None = None
    archive_path: str | None = None
    pip_extras: str | None = None
    pinned_package_versions_pre: dict[str, str] = Field(
        sa_column=Column(
            JSONB,
            server_default="{}",
            default={},
            nullable=True,
        ),
    )
    pinned_package_versions_post: dict[str, str] = Field(
        sa_column=Column(
            JSONB,
            server_default="{}",
            default={},
            nullable=True,
        ),
    )
    env_info: str | None = None
    venv_path: str | None = None
    venv_size_in_kB: int | None = None
    venv_file_number: int | None = None

    active: bool = True
    timestamp_created: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )
    timestamp_last_used: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(
            DateTime(timezone=True),
            nullable=False,
            server_default=(
                datetime(2024, 11, 20, tzinfo=timezone.utc).isoformat()
            ),
        ),
    )

    @property
    def pip_install_string(self) -> str:
        """
        Prepare string to be used in `python -m pip install`.
        """
        _check_origin_not_pixi(self.origin)

        extras = f"[{self.pip_extras}]" if self.pip_extras is not None else ""

        if self.archive_path is not None:
            return f"{self.archive_path}{extras}"
        else:
            if self.version is None:
                raise ValueError(
                    "Cannot run `pip_install_string` with "
                    f"{self.pkg_name=}, {self.archive_path=}, {self.version=}."
                )
            return f"{self.pkg_name}{extras}=={self.version}"

    @property
    def pinned_package_versions_pre_string(self) -> str:
        """
        Prepare string to be used in `python -m pip install`.
        """
        _check_origin_not_pixi(self.origin)

        if self.pinned_package_versions_pre is None:
            return ""
        output = _create_dependency_string(self.pinned_package_versions_pre)
        return output

    @property
    def pinned_package_versions_post_string(self) -> str:
        """
        Prepare string to be used in `python -m pip install`.
        """
        _check_origin_not_pixi(self.origin)

        if self.pinned_package_versions_post is None:
            return ""
        output = _create_dependency_string(self.pinned_package_versions_post)
        return output

pinned_package_versions_post_string property

Prepare string to be used in python -m pip install.

pinned_package_versions_pre_string property

Prepare string to be used in python -m pip install.

pip_install_string property

Prepare string to be used in python -m pip install.