Skip to content

_aux_functions

Auxiliary functions to get object from the database or perform simple checks

_check_project_exists(*, project_name, user_id, db) async

Check that no other project with this name exists for this user.

Parameters:

Name Type Description Default
project_name str

Project name

required
user_id int

User ID

required
db AsyncSession
required

Raises:

Type Description
HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If such a project already exists

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
async def _check_project_exists(
    *,
    project_name: str,
    user_id: int,
    db: AsyncSession,
) -> None:
    """
    Check that no other project with this name exists for this user.

    Args:
        project_name: Project name
        user_id: User ID
        db:

    Raises:
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If such a project already exists
    """
    stm = (
        select(ProjectV2)
        .join(LinkUserProjectV2)
        .where(ProjectV2.name == project_name)
        .where(LinkUserProjectV2.user_id == user_id)
    )
    res = await db.execute(stm)
    if res.scalars().all():
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f"Project name ({project_name}) already in use",
        )

_check_workflow_exists(*, name, project_id, db) async

Check that no other workflow of this project has the same name.

Parameters:

Name Type Description Default
name str

Workflow name

required
project_id int

Project ID

required
db AsyncSession
required

Raises:

Type Description
HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If such a workflow already exists

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
async def _check_workflow_exists(
    *,
    name: str,
    project_id: int,
    db: AsyncSession,
) -> None:
    """
    Check that no other workflow of this project has the same name.

    Args:
        name: Workflow name
        project_id: Project ID
        db:

    Raises:
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If such a workflow already exists
    """
    stm = (
        select(WorkflowV2)
        .where(WorkflowV2.name == name)
        .where(WorkflowV2.project_id == project_id)
    )
    res = await db.execute(stm)
    if res.scalars().all():
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f"Workflow with {name=} and {project_id=} already exists.",
        )

_get_dataset_check_owner(*, project_id, dataset_id, user_id, db) async

Get a dataset and a project, after access control on the project

Parameters:

Name Type Description Default
project_id int
required
dataset_id int
required
user_id int
required
db AsyncSession
required

Returns:

Type Description
dict[Literal['dataset', 'project'], Union[DatasetV2, ProjectV2]]

Dictionary with the dataset and project objects (keys: dataset, project).

Raises:

Type Description
HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If the dataset is not associated to the project

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
async def _get_dataset_check_owner(
    *,
    project_id: int,
    dataset_id: int,
    user_id: int,
    db: AsyncSession,
) -> dict[Literal["dataset", "project"], Union[DatasetV2, ProjectV2]]:
    """
    Get a dataset and a project, after access control on the project

    Args:
        project_id:
        dataset_id:
        user_id:
        db:

    Returns:
        Dictionary with the dataset and project objects (keys: `dataset`,
            `project`).

    Raises:
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If the dataset is not associated to the project
    """
    # Access control for project
    project = await _get_project_check_owner(
        project_id=project_id, user_id=user_id, db=db
    )

    # Get dataset
    # (See issue 1087 for 'populate_existing=True')
    dataset = await db.get(DatasetV2, dataset_id, populate_existing=True)

    if not dataset:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Dataset not found"
        )
    if dataset.project_id != project_id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f"Invalid {project_id=} for {dataset_id=}",
        )

    return dict(dataset=dataset, project=project)

_get_job_check_owner(*, project_id, job_id, user_id, db) async

Get a job and a project, after access control on the project

Parameters:

Name Type Description Default
project_id int
required
job_id int
required
user_id int
required
db AsyncSession
required

Returns:

Type Description
dict[Literal['job', 'project'], Union[JobV2, ProjectV2]]

Dictionary with the job and project objects (keys: job, project).

Raises:

Type Description
HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If the job is not associated to the project

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
async def _get_job_check_owner(
    *,
    project_id: int,
    job_id: int,
    user_id: int,
    db: AsyncSession,
) -> dict[Literal["job", "project"], Union[JobV2, ProjectV2]]:
    """
    Get a job and a project, after access control on the project

    Args:
        project_id:
        job_id:
        user_id:
        db:

    Returns:
        Dictionary with the job and project objects (keys: `job`,
            `project`).

    Raises:
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If the job is not associated to the project
    """
    # Access control for project
    project = await _get_project_check_owner(
        project_id=project_id,
        user_id=user_id,
        db=db,
    )
    # Get dataset
    job = await db.get(JobV2, job_id)
    if not job:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Job not found"
        )
    if job.project_id != project_id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f"Invalid {project_id=} for {job_id=}",
        )
    return dict(job=job, project=project)

_get_project_check_owner(*, project_id, user_id, db) async

Check that user is a member of project and return the project.

Parameters:

Name Type Description Default
project_id int
required
user_id int
required
db AsyncSession
required

Returns:

Type Description
ProjectV2

The project object

Raises:

Type Description
HTTPException(status_code=403_FORBIDDEN)

If the user is not a member of the project

HTTPException(status_code=404_NOT_FOUND)

If the project does not exist

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
async def _get_project_check_owner(
    *,
    project_id: int,
    user_id: int,
    db: AsyncSession,
) -> ProjectV2:
    """
    Check that user is a member of project and return the project.

    Args:
        project_id:
        user_id:
        db:

    Returns:
        The project object

    Raises:
        HTTPException(status_code=403_FORBIDDEN):
            If the user is not a member of the project
        HTTPException(status_code=404_NOT_FOUND):
            If the project does not exist
    """
    project = await db.get(ProjectV2, project_id)

    link_user_project = await db.get(LinkUserProjectV2, (project_id, user_id))
    if not project:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Project not found"
        )
    if not link_user_project:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"Not allowed on project {project_id}",
        )

    return project

_get_submitted_jobs_statement()

Returns:

Type Description
SelectOfScalar

A sqlmodel statement that selects all Jobs with

SelectOfScalar

Job.status equal to submitted.

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
382
383
384
385
386
387
388
389
def _get_submitted_jobs_statement() -> SelectOfScalar:
    """
    Returns:
        A sqlmodel statement that selects all `Job`s with
        `Job.status` equal to `submitted`.
    """
    stm = select(JobV2).where(JobV2.status == JobStatusTypeV2.SUBMITTED)
    return stm

_get_task_check_owner(*, task_id, user, db) async

Get a task, after access control.

This check constitutes a preliminary version of access control: if the current user is not a superuser and differs from the task owner (including when owner is None), we raise an 403 HTTP Exception.

Parameters:

Name Type Description Default
task_id int
required
user UserOAuth
required
db AsyncSession
required

Returns:

Type Description
TaskV2

The task object.

Raises:

Type Description
HTTPException(status_code=404_NOT_FOUND)

If the task does not exist

HTTPException(status_code=403_FORBIDDEN)

If the user does not have rights to edit this task.

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
async def _get_task_check_owner(
    *,
    task_id: int,
    user: UserOAuth,
    db: AsyncSession,
) -> TaskV2:
    """
    Get a task, after access control.

    This check constitutes a preliminary version of access control:
    if the current user is not a superuser and differs from the task owner
    (including when `owner is None`), we raise an 403 HTTP Exception.

    Args:
        task_id:
        user:
        db:

    Returns:
        The task object.

    Raises:
        HTTPException(status_code=404_NOT_FOUND):
            If the task does not exist
        HTTPException(status_code=403_FORBIDDEN):
            If the user does not have rights to edit this task.
    """
    task = await db.get(TaskV2, task_id)
    if not task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"TaskV2 {task_id} not found.",
        )

    if not user.is_superuser:
        if task.owner is None:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=(
                    "Only a superuser can modify a TaskV2 with `owner=None`."
                ),
            )
        else:
            if user.username:
                owner = user.username
            else:
                verify_user_has_settings(user)
                owner = user.settings.slurm_user
            if owner != task.owner:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=(
                        f"Current user ({owner}) cannot modify TaskV2 "
                        f"{task.id} with different owner ({task.owner})."
                    ),
                )
    return task

_get_workflow_check_owner(*, workflow_id, project_id, user_id, db) async

Get a workflow and a project, after access control on the project.

Parameters:

Name Type Description Default
workflow_id int
required
project_id int
required
user_id int
required
db AsyncSession
required

Returns:

Type Description
WorkflowV2

The workflow object.

Raises:

Type Description
HTTPException(status_code=404_NOT_FOUND)

If the workflow does not exist

HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If the workflow is not associated to the project

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def _get_workflow_check_owner(
    *,
    workflow_id: int,
    project_id: int,
    user_id: int,
    db: AsyncSession,
) -> WorkflowV2:
    """
    Get a workflow and a project, after access control on the project.

    Args:
        workflow_id:
        project_id:
        user_id:
        db:

    Returns:
        The workflow object.

    Raises:
        HTTPException(status_code=404_NOT_FOUND):
            If the workflow does not exist
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If the workflow is not associated to the project
    """

    # Access control for project
    project = await _get_project_check_owner(
        project_id=project_id, user_id=user_id, db=db
    )

    # Get workflow
    # (See issue 1087 for 'populate_existing=True')
    workflow = await db.get(WorkflowV2, workflow_id, populate_existing=True)

    if not workflow:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found"
        )
    if workflow.project_id != project.id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=(f"Invalid {project_id=} for {workflow_id=}."),
        )

    return workflow

_get_workflow_task_check_owner(*, project_id, workflow_id, workflow_task_id, user_id, db) async

Check that user has access to Workflow and WorkflowTask.

Parameters:

Name Type Description Default
project_id int
required
workflow_id int
required
workflow_task_id int
required
user_id int
required
db AsyncSession
required

Returns:

Type Description
tuple[WorkflowTaskV2, WorkflowV2]

Tuple of WorkflowTask and Workflow objects.

Raises:

Type Description
HTTPException(status_code=404_NOT_FOUND)

If the WorkflowTask does not exist

HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If the WorkflowTask is not associated to the Workflow

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
async def _get_workflow_task_check_owner(
    *,
    project_id: int,
    workflow_id: int,
    workflow_task_id: int,
    user_id: int,
    db: AsyncSession,
) -> tuple[WorkflowTaskV2, WorkflowV2]:
    """
    Check that user has access to Workflow and WorkflowTask.

    Args:
        project_id:
        workflow_id:
        workflow_task_id:
        user_id:
        db:

    Returns:
        Tuple of WorkflowTask and Workflow objects.

    Raises:
        HTTPException(status_code=404_NOT_FOUND):
            If the WorkflowTask does not exist
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If the WorkflowTask is not associated to the Workflow
    """

    # Access control for workflow
    workflow = await _get_workflow_check_owner(
        workflow_id=workflow_id,
        project_id=project_id,
        user_id=user_id,
        db=db,
    )

    # If WorkflowTask is not in the db, exit
    workflow_task = await db.get(WorkflowTaskV2, workflow_task_id)
    if not workflow_task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="WorkflowTask not found",
        )

    # If WorkflowTask is not part of the expected Workflow, exit
    if workflow_id != workflow_task.workflow_id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f"Invalid {workflow_id=} for {workflow_task_id=}",
        )

    return workflow_task, workflow

_workflow_insert_task(*, workflow_id, task_id, order=None, meta_parallel=None, meta_non_parallel=None, args_non_parallel=None, args_parallel=None, input_filters=None, db) async

Insert a new WorkflowTask into Workflow.task_list

Parameters:

Name Type Description Default
workflow_id int
required
task_id int
required
order Optional[int]
None
meta_parallel Optional[dict[str, Any]]
None
meta_non_parallel Optional[dict[str, Any]]
None
args_non_parallel Optional[dict[str, Any]]
None
args_parallel Optional[dict[str, Any]]
None
input_filters Optional[Filters]
None
db AsyncSession
required
Source code in fractal_server/app/routes/api/v2/_aux_functions.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
async def _workflow_insert_task(
    *,
    workflow_id: int,
    task_id: int,
    order: Optional[int] = None,
    meta_parallel: Optional[dict[str, Any]] = None,
    meta_non_parallel: Optional[dict[str, Any]] = None,
    args_non_parallel: Optional[dict[str, Any]] = None,
    args_parallel: Optional[dict[str, Any]] = None,
    input_filters: Optional[Filters] = None,
    db: AsyncSession,
) -> WorkflowTaskV2:
    """
    Insert a new WorkflowTask into Workflow.task_list

    Args:
        workflow_id:
        task_id:

        order:
        meta_parallel:
        meta_non_parallel:
        args_non_parallel:
        args_parallel:
        input_filters:
        db:
    """
    db_workflow = await db.get(WorkflowV2, workflow_id)
    if db_workflow is None:
        raise ValueError(f"Workflow {workflow_id} does not exist")

    if order is None:
        order = len(db_workflow.task_list)

    # Get task from db
    db_task = await db.get(TaskV2, task_id)
    if db_task is None:
        raise ValueError(f"TaskV2 {task_id} not found.")
    task_type = db_task.type

    # Combine meta_parallel (higher priority)
    # and db_task.meta_parallel (lower priority)
    final_meta_parallel = (db_task.meta_parallel or {}).copy()
    final_meta_parallel.update(meta_parallel or {})
    if final_meta_parallel == {}:
        final_meta_parallel = None
    # Combine meta_non_parallel (higher priority)
    # and db_task.meta_non_parallel (lower priority)
    final_meta_non_parallel = (db_task.meta_non_parallel or {}).copy()
    final_meta_non_parallel.update(meta_non_parallel or {})
    if final_meta_non_parallel == {}:
        final_meta_non_parallel = None

    # Prepare input_filters attribute
    if input_filters is None:
        input_filters_kwarg = {}
    else:
        input_filters_kwarg = dict(input_filters=input_filters)

    # Create DB entry
    wf_task = WorkflowTaskV2(
        task_type=task_type,
        task_id=task_id,
        args_non_parallel=args_non_parallel,
        args_parallel=args_parallel,
        meta_parallel=final_meta_parallel,
        meta_non_parallel=final_meta_non_parallel,
        **input_filters_kwarg,
    )
    db_workflow.task_list.insert(order, wf_task)
    db_workflow.task_list.reorder()  # type: ignore
    flag_modified(db_workflow, "task_list")
    await db.commit()

    # See issue 1087 for 'populate_existing=True'
    wf_task = await db.get(WorkflowTaskV2, wf_task.id, populate_existing=True)

    return wf_task

clean_app_job_list_v2(db, jobs_list) async

Remove from a job list all jobs with status different from submitted.

Parameters:

Name Type Description Default
db AsyncSession

Async database session

required
jobs_list list[int]

List of job IDs currently associated to the app.

required
Return

List of IDs for submitted jobs.

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
async def clean_app_job_list_v2(
    db: AsyncSession, jobs_list: list[int]
) -> list[int]:
    """
    Remove from a job list all jobs with status different from submitted.

    Args:
        db: Async database session
        jobs_list: List of job IDs currently associated to the app.

    Return:
        List of IDs for submitted jobs.
    """
    stmt = select(JobV2).where(JobV2.id.in_(jobs_list))
    result = await db.execute(stmt)
    db_jobs_list = result.scalars().all()
    submitted_job_ids = [
        job.id
        for job in db_jobs_list
        if job.status == JobStatusTypeV2.SUBMITTED
    ]
    return submitted_job_ids