Skip to content

_aux_functions_history

_verify_workflow_and_dataset_access(*, project_id, workflow_id, dataset_id, user_id, db) async

Verify user access to a dataset/workflow pair.

PARAMETER DESCRIPTION
project_id

TYPE: int

workflow_id

TYPE: int

dataset_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions_history.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def _verify_workflow_and_dataset_access(
    *,
    project_id: int,
    workflow_id: int,
    dataset_id: int,
    user_id: int,
    db: AsyncSession,
) -> dict[Literal["dataset", "workflow"], DatasetV2 | WorkflowV2]:
    """
    Verify user access to a dataset/workflow pair.

    Args:
        project_id:
        workflow_id:
        dataset_id:
        user_id:
        db:
    """
    await _get_project_check_owner(
        project_id=project_id,
        user_id=user_id,
        db=db,
    )
    workflow = await _get_workflow_or_404(
        workflow_id=workflow_id,
        db=db,
    )
    if workflow.project_id != project_id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail="Workflow does not belong to expected project.",
        )
    dataset = await _get_dataset_or_404(
        dataset_id=dataset_id,
        db=db,
    )
    if dataset.project_id != project_id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail="Dataset does not belong to expected project.",
        )

    return dict(dataset=dataset, workflow=workflow)

get_history_run_or_404(*, history_run_id, db) async

Get an existing HistoryRun or raise a 404.

PARAMETER DESCRIPTION
history_run_id

TYPE: int

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions_history.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
async def get_history_run_or_404(
    *, history_run_id: int, db: AsyncSession
) -> HistoryRun:
    """
    Get an existing HistoryRun  or raise a 404.

    Args:
        history_run_id:
        db:
    """
    history_run = await db.get(HistoryRun, history_run_id)
    if history_run is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"HistoryRun {history_run_id} not found",
        )
    return history_run

get_history_unit_or_404(*, history_unit_id, db) async

Get an existing HistoryUnit or raise a 404.

PARAMETER DESCRIPTION
history_unit_id

The HistoryUnit id

TYPE: int

db

An asynchronous db session

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions_history.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
async def get_history_unit_or_404(
    *, history_unit_id: int, db: AsyncSession
) -> HistoryUnit:
    """
    Get an existing HistoryUnit  or raise a 404.

    Args:
        history_unit_id: The `HistoryUnit` id
        db: An asynchronous db session
    """
    history_unit = await db.get(HistoryUnit, history_unit_id)
    if history_unit is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"HistoryUnit {history_unit_id} not found",
        )
    return history_unit

get_wftask_check_owner(*, project_id, dataset_id, workflowtask_id, user_id, db) async

Verify user access for the history of this dataset and workflowtask.

PARAMETER DESCRIPTION
project_id

TYPE: int

dataset_id

TYPE: int

workflowtask_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions_history.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
async def get_wftask_check_owner(
    *,
    project_id: int,
    dataset_id: int,
    workflowtask_id: int,
    user_id: int,
    db: AsyncSession,
) -> WorkflowTaskV2:
    """
    Verify user access for the history of this dataset and workflowtask.

    Args:
        project_id:
        dataset_id:
        workflowtask_id:
        user_id:
        db:
    """
    wftask = await _get_workflowtask_or_404(
        workflowtask_id=workflowtask_id,
        db=db,
    )
    await _verify_workflow_and_dataset_access(
        project_id=project_id,
        dataset_id=dataset_id,
        workflow_id=wftask.workflow_id,
        user_id=user_id,
        db=db,
    )
    return wftask

read_log_file(*, task_name, dataset_id, logfile, job_working_dir)

Returns the contents of a Job's log file, either directly from the working directory or from the corresponding ZIP archive.

The function first checks if logfile exists on disk.

If not, it checks if the Job working directory has been zipped and tries to read logfile from within the archive. (Note: it is assumed that logfile is relative to job_working_dir)

Source code in fractal_server/app/routes/api/v2/_aux_functions_history.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def read_log_file(
    *,
    task_name: str,
    dataset_id: int,
    logfile: str,
    job_working_dir: str,
) -> str:
    """
    Returns the contents of a Job's log file, either directly from the working
    directory or from the corresponding ZIP archive.

    The function first checks if `logfile` exists on disk.

    If not, it checks if the Job working directory has been zipped and tries to
    read `logfile` from within the archive.
    (Note: it is assumed that `logfile` is relative to `job_working_dir`)
    """
    archive_path = os.path.normpath(job_working_dir) + ".zip"
    try:
        if Path(logfile).exists():
            with open(logfile) as f:
                return f.read()
        elif Path(archive_path).exists():
            relative_logfile = (
                Path(logfile).relative_to(job_working_dir).as_posix()
            )
            return _read_single_file_from_zip(
                file_path=relative_logfile, archive_path=archive_path
            )

        else:
            logger.error(
                f"Error while retrieving logs for {logfile=} and "
                f"{archive_path=}: both files do not exist."
            )
            return (
                f"Logs for task '{task_name}' in dataset "
                f"{dataset_id} are not available."
            )
    except Exception as e:
        logger.error(
            f"Error while retrieving logs for {logfile=} and {archive_path=}. "
            f"Original error: {str(e)}"
        )
        return (
            f"Error while retrieving logs for task '{task_name}' "
            f"in dataset {dataset_id}."
        )