Skip to content

task_group

TaskGroupV2

Bases: SQLModel

Source code in fractal_server/app/models/v2/task_group.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class TaskGroupV2(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    task_list: list[TaskV2] = Relationship(
        sa_relationship_kwargs=dict(
            lazy="selectin", cascade="all, delete-orphan"
        ),
    )

    user_id: int = Field(foreign_key="user_oauth.id")
    user_group_id: int | None = Field(
        foreign_key="usergroup.id", default=None, ondelete="SET NULL"
    )

    origin: str
    pkg_name: str
    version: str | None = None
    python_version: str | None = None
    pixi_version: str | None = None
    path: str | None = None
    archive_path: str | None = None
    pip_extras: str | None = None
    pinned_package_versions: dict[str, str] = Field(
        sa_column=Column(
            JSONB,
            server_default="{}",
            default={},
            nullable=True,
        ),
    )
    env_info: str | None = None
    venv_path: str | None = None
    venv_size_in_kB: int | None = None
    venv_file_number: int | None = None

    active: bool = True
    timestamp_created: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(DateTime(timezone=True), nullable=False),
    )
    timestamp_last_used: datetime = Field(
        default_factory=get_timestamp,
        sa_column=Column(
            DateTime(timezone=True),
            nullable=False,
            server_default=(
                datetime(2024, 11, 20, tzinfo=timezone.utc).isoformat()
            ),
        ),
    )

    @property
    def pip_install_string(self) -> str:
        """
        Prepare string to be used in `python -m pip install`.
        """
        if self.origin == "pixi":
            raise ValueError(
                f"Cannot call 'pip_install_string' if {self.origin=}."
            )

        extras = f"[{self.pip_extras}]" if self.pip_extras is not None else ""

        if self.archive_path is not None:
            return f"{self.archive_path}{extras}"
        else:
            if self.version is None:
                raise ValueError(
                    "Cannot run `pip_install_string` with "
                    f"{self.pkg_name=}, {self.archive_path=}, {self.version=}."
                )
            return f"{self.pkg_name}{extras}=={self.version}"

    @property
    def pinned_package_versions_string(self) -> str:
        """
        Prepare string to be used in `python -m pip install`.
        """
        if self.origin == "pixi":
            raise ValueError(
                "Cannot call 'pinned_package_versions_string' if "
                f"{self.origin=}."
            )

        if self.pinned_package_versions is None:
            return ""
        output = " ".join(
            [
                f"{key}=={value}"
                for key, value in self.pinned_package_versions.items()
            ]
        )
        return output

pinned_package_versions_string: str property

Prepare string to be used in python -m pip install.

pip_install_string: str property

Prepare string to be used in python -m pip install.