Skip to content

applyworkflow

ApplyWorkflowCreateV1

Bases: _ApplyWorkflowBaseV1

Class for ApplyWorkflow creation.

Attributes:

Name Type Description
first_task_index Optional[int]
last_task_index Optional[int]
slurm_account Optional[StrictStr]
Source code in fractal_server/app/schemas/v1/applyworkflow.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class ApplyWorkflowCreateV1(_ApplyWorkflowBaseV1):
    """
    Class for `ApplyWorkflow` creation.

    Attributes:
        first_task_index:
        last_task_index:
        slurm_account:
    """

    first_task_index: Optional[int] = None
    last_task_index: Optional[int] = None
    slurm_account: Optional[StrictStr] = None

    # Validators
    _worker_init = validator("worker_init", allow_reuse=True)(
        valstr("worker_init")
    )

    @validator("first_task_index", always=True)
    def first_task_index_non_negative(cls, v, values):
        """
        Check that `first_task_index` is non-negative.
        """
        if v is not None and v < 0:
            raise ValueError(
                f"first_task_index cannot be negative (given: {v})"
            )
        return v

    @validator("last_task_index", always=True)
    def first_last_task_indices(cls, v, values):
        """
        Check that `last_task_index` is non-negative, and that it is not
        smaller than `first_task_index`.
        """
        if v is not None and v < 0:
            raise ValueError(
                f"last_task_index cannot be negative (given: {v})"
            )

        first_task_index = values.get("first_task_index")
        last_task_index = v
        if first_task_index is not None and last_task_index is not None:
            if first_task_index > last_task_index:
                raise ValueError(
                    f"{first_task_index=} cannot be larger than "
                    f"{last_task_index=}"
                )
        return v

first_last_task_indices(v, values)

Check that last_task_index is non-negative, and that it is not smaller than first_task_index.

Source code in fractal_server/app/schemas/v1/applyworkflow.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@validator("last_task_index", always=True)
def first_last_task_indices(cls, v, values):
    """
    Check that `last_task_index` is non-negative, and that it is not
    smaller than `first_task_index`.
    """
    if v is not None and v < 0:
        raise ValueError(
            f"last_task_index cannot be negative (given: {v})"
        )

    first_task_index = values.get("first_task_index")
    last_task_index = v
    if first_task_index is not None and last_task_index is not None:
        if first_task_index > last_task_index:
            raise ValueError(
                f"{first_task_index=} cannot be larger than "
                f"{last_task_index=}"
            )
    return v

first_task_index_non_negative(v, values)

Check that first_task_index is non-negative.

Source code in fractal_server/app/schemas/v1/applyworkflow.py
74
75
76
77
78
79
80
81
82
83
@validator("first_task_index", always=True)
def first_task_index_non_negative(cls, v, values):
    """
    Check that `first_task_index` is non-negative.
    """
    if v is not None and v < 0:
        raise ValueError(
            f"first_task_index cannot be negative (given: {v})"
        )
    return v

ApplyWorkflowReadV1

Bases: _ApplyWorkflowBaseV1

Class for ApplyWorkflow read from database.

Attributes:

Name Type Description
id int
project_id Optional[int]
project_dump ProjectDumpV1
user_email str
slurm_account Optional[str]
workflow_id Optional[int]
workflow_dump WorkflowDumpV1
input_dataset_id Optional[int]
input_dataset_dump DatasetDumpV1
output_dataset_id Optional[int]
output_dataset_dump DatasetDumpV1
start_timestamp datetime
end_timestamp Optional[datetime]
status str
log Optional[str]
working_dir Optional[str]
working_dir_user Optional[str]
first_task_index Optional[int]
last_task_index Optional[int]
Source code in fractal_server/app/schemas/v1/applyworkflow.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class ApplyWorkflowReadV1(_ApplyWorkflowBaseV1):
    """
    Class for `ApplyWorkflow` read from database.

    Attributes:
        id:
        project_id:
        project_dump:
        user_email:
        slurm_account:
        workflow_id:
        workflow_dump:
        input_dataset_id:
        input_dataset_dump:
        output_dataset_id:
        output_dataset_dump:
        start_timestamp:
        end_timestamp:
        status:
        log:
        working_dir:
        working_dir_user:
        first_task_index:
        last_task_index:
    """

    id: int
    project_id: Optional[int]
    project_dump: ProjectDumpV1
    user_email: str
    slurm_account: Optional[str]
    workflow_id: Optional[int]
    workflow_dump: WorkflowDumpV1
    input_dataset_id: Optional[int]
    input_dataset_dump: DatasetDumpV1
    output_dataset_id: Optional[int]
    output_dataset_dump: DatasetDumpV1
    start_timestamp: datetime
    end_timestamp: Optional[datetime]
    status: str
    log: Optional[str]
    working_dir: Optional[str]
    working_dir_user: Optional[str]
    first_task_index: Optional[int]
    last_task_index: Optional[int]

    _start_timestamp = validator("start_timestamp", allow_reuse=True)(
        valutc("start_timestamp")
    )
    _end_timestamp = validator("end_timestamp", allow_reuse=True)(
        valutc("end_timestamp")
    )

ApplyWorkflowUpdateV1

Bases: BaseModel

Class for updating a job status.

Attributes:

Name Type Description
status JobStatusTypeV1

New job status.

Source code in fractal_server/app/schemas/v1/applyworkflow.py
161
162
163
164
165
166
167
168
169
class ApplyWorkflowUpdateV1(BaseModel):
    """
    Class for updating a job status.

    Attributes:
        status: New job status.
    """

    status: JobStatusTypeV1

JobStatusTypeV1

Bases: str, Enum

Define the available job statuses

Attributes:

Name Type Description
SUBMITTED

The job was created. This does not guarantee that it was also submitted to an executor (e.g. other errors could have prevented this), nor that it is actually running (e.g. SLURM jobs could be still in the queue).

DONE

The job successfully reached its end.

FAILED

The workflow terminated with an error.

Source code in fractal_server/app/schemas/v1/applyworkflow.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class JobStatusTypeV1(str, Enum):
    """
    Define the available job statuses

    Attributes:
        SUBMITTED:
            The job was created. This does not guarantee that it was also
            submitted to an executor (e.g. other errors could have prevented
            this), nor that it is actually running (e.g. SLURM jobs could be
            still in the queue).
        DONE:
            The job successfully reached its end.
        FAILED:
            The workflow terminated with an error.
    """

    SUBMITTED = "submitted"
    DONE = "done"
    FAILED = "failed"

_ApplyWorkflowBaseV1

Bases: BaseModel

Base class for ApplyWorkflow.

Attributes:

Name Type Description
worker_init Optional[str]
Source code in fractal_server/app/schemas/v1/applyworkflow.py
44
45
46
47
48
49
50
51
52
class _ApplyWorkflowBaseV1(BaseModel):
    """
    Base class for `ApplyWorkflow`.

    Attributes:
        worker_init:
    """

    worker_init: Optional[str]