Skip to content

_aux_functions

Auxiliary functions to get object from the database or perform simple checks

_check_project_exists(*, project_name, user_id, db) async

Check that no other project with this name exists for this user.

PARAMETER DESCRIPTION
project_name

Project name

TYPE: str

user_id

User ID

TYPE: int

db

TYPE: AsyncSession

RAISES DESCRIPTION
HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If such a project already exists

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def _check_project_exists(
    *,
    project_name: str,
    user_id: int,
    db: AsyncSession,
) -> None:
    """
    Check that no other project with this name exists for this user.

    Args:
        project_name: Project name
        user_id: User ID
        db:

    Raises:
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If such a project already exists
    """
    stm = (
        select(ProjectV2)
        .join(LinkUserProjectV2, LinkUserProjectV2.project_id == ProjectV2.id)
        .where(ProjectV2.name == project_name)
        .where(LinkUserProjectV2.user_id == user_id)
        .where(LinkUserProjectV2.is_owner.is_(True))
    )
    res = await db.execute(stm)
    if res.scalars().all():
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=f"Project name ({project_name}) already in use",
        )

_check_workflow_exists(*, name, project_id, db) async

Check that no other workflow of this project has the same name.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

project_id

Project ID

TYPE: int

db

TYPE: AsyncSession

RAISES DESCRIPTION
HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If such a workflow already exists

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
async def _check_workflow_exists(
    *,
    name: str,
    project_id: int,
    db: AsyncSession,
) -> None:
    """
    Check that no other workflow of this project has the same name.

    Args:
        name: Workflow name
        project_id: Project ID
        db:

    Raises:
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If such a workflow already exists
    """
    stm = (
        select(WorkflowV2)
        .where(WorkflowV2.name == name)
        .where(WorkflowV2.project_id == project_id)
    )
    res = await db.execute(stm)
    if res.scalars().all():
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=f"Workflow with {name=} and {project_id=} already exists.",
        )

_get_dataset_check_access(*, project_id, dataset_id, user_id, required_permissions, db) async

Get a dataset and a project, after access control on the project

PARAMETER DESCRIPTION
project_id

TYPE: int

dataset_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

RETURNS DESCRIPTION
DatasetOrProject

Dictionary with the dataset and project objects (keys: dataset, project).

RAISES DESCRIPTION
HTTPException(status_code=404_UNPROCESSABLE_ENTITY)

If the project or the dataset do not exist or if they are not associated

HTTPException(status_code=403_FORBIDDEN)

If the user is not a member of the project

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
async def _get_dataset_check_access(
    *,
    project_id: int,
    dataset_id: int,
    user_id: int,
    required_permissions: ProjectPermissions,
    db: AsyncSession,
) -> DatasetOrProject:
    """
    Get a dataset and a project, after access control on the project

    Args:
        project_id:
        dataset_id:
        user_id:
        db:

    Returns:
        Dictionary with the dataset and project objects (keys: `dataset`,
            `project`).

    Raises:
        HTTPException(status_code=404_UNPROCESSABLE_ENTITY):
            If the project or the dataset do not exist or if they are not
            associated
        HTTPException(status_code=403_FORBIDDEN):
            If the user is not a member of the project
    """
    # Access control for project
    project = await _get_project_check_access(
        project_id=project_id,
        user_id=user_id,
        required_permissions=required_permissions,
        db=db,
    )

    res = await db.execute(
        select(DatasetV2)
        .where(DatasetV2.id == dataset_id)
        .where(DatasetV2.project_id == project_id)
        .execution_options(populate_existing=True)  # See issue 1087
    )
    dataset = res.scalars().one_or_none()

    if dataset is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Dataset not found"
        )

    return dict(dataset=dataset, project=project)

_get_dataset_or_404(*, dataset_id, db) async

Get a dataset or raise 404.

PARAMETER DESCRIPTION
dataset_id

TYPE: int

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
async def _get_dataset_or_404(
    *,
    dataset_id: int,
    db: AsyncSession,
) -> DatasetV2:
    """
    Get a dataset or raise 404.

    Args:
        dataset_id:
        db:
    """
    ds = await db.get(DatasetV2, dataset_id)
    if ds is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Dataset {dataset_id} not found.",
        )
    else:
        return ds

_get_job_check_access(*, project_id, job_id, user_id, required_permissions, db) async

Get a job and a project, after access control on the project

PARAMETER DESCRIPTION
project_id

TYPE: int

job_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

RETURNS DESCRIPTION
JobAndProject

Dictionary with the job and project objects (keys: job, project).

RAISES DESCRIPTION
HTTPException(status_code=404_UNPROCESSABLE_ENTITY)

If the project or the job do not exist or if they are not associated

HTTPException(status_code=403_FORBIDDEN)

If the user is not a member of the project

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
async def _get_job_check_access(
    *,
    project_id: int,
    job_id: int,
    user_id: int,
    required_permissions: ProjectPermissions,
    db: AsyncSession,
) -> JobAndProject:
    """
    Get a job and a project, after access control on the project

    Args:
        project_id:
        job_id:
        user_id:
        db:

    Returns:
        Dictionary with the job and project objects (keys: `job`,
            `project`).

    Raises:
        HTTPException(status_code=404_UNPROCESSABLE_ENTITY):
            If the project or the job do not exist or if they are not
            associated
        HTTPException(status_code=403_FORBIDDEN):
            If the user is not a member of the project
    """
    # Access control for project
    project = await _get_project_check_access(
        project_id=project_id,
        user_id=user_id,
        required_permissions=required_permissions,
        db=db,
    )

    res = await db.execute(
        select(JobV2)
        .where(JobV2.id == job_id)
        .where(JobV2.project_id == project_id)
    )
    job = res.scalars().one_or_none()

    if job is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Job not found"
        )

    return dict(job=job, project=project)

_get_project_check_access(*, project_id, user_id, required_permissions, db) async

Check that user is a member of project and return the project.

PARAMETER DESCRIPTION
project_id

TYPE: int

user_id

TYPE: int

required_permissions

TYPE: ProjectPermissions

db

TYPE: AsyncSession

RETURNS DESCRIPTION
ProjectV2

The project object

RAISES DESCRIPTION
HTTPException(status_code=403_FORBIDDEN)
  • If the user is not a member of the project;
  • If the user has not accepted the invitation yet;
  • If the user has not the target permissions.
HTTPException(status_code=404_NOT_FOUND)

If the project does not exist

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def _get_project_check_access(
    *,
    project_id: int,
    user_id: int,
    required_permissions: ProjectPermissions,
    db: AsyncSession,
) -> ProjectV2:
    """
    Check that user is a member of project and return the project.

    Args:
        project_id:
        user_id:
        required_permissions:
        db:

    Returns:
        The project object

    Raises:
        HTTPException(status_code=403_FORBIDDEN):
            - If the user is not a member of the project;
            - If the user has not accepted the invitation yet;
            - If the user has not the target permissions.
        HTTPException(status_code=404_NOT_FOUND):
            If the project does not exist
    """
    project = await db.get(ProjectV2, project_id)
    if project is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Project not found"
        )

    link_user_project = await db.get(LinkUserProjectV2, (project_id, user_id))
    if (
        link_user_project is None
        or not link_user_project.is_verified
        or required_permissions not in link_user_project.permissions
    ):
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=(
                "You are not authorized to perform this action. "
                "If you think this is by mistake, "
                "please contact the project owner."
            ),
        )

    return project

_get_submitted_jobs_statement()

RETURNS DESCRIPTION
SelectOfScalar

A sqlmodel statement that selects all Jobs with

SelectOfScalar

Job.status equal to submitted.

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
367
368
369
370
371
372
373
374
def _get_submitted_jobs_statement() -> SelectOfScalar:
    """
    Returns:
        A sqlmodel statement that selects all `Job`s with
        `Job.status` equal to `submitted`.
    """
    stm = select(JobV2).where(JobV2.status == JobStatusType.SUBMITTED)
    return stm

_get_workflow_check_access(*, workflow_id, project_id, user_id, required_permissions, db) async

Get a workflow and a project, after access control on the project.

PARAMETER DESCRIPTION
workflow_id

TYPE: int

project_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

RETURNS DESCRIPTION
WorkflowV2

The workflow object.

RAISES DESCRIPTION
HTTPException(status_code=404_NOT_FOUND)

If the project or the workflow do not exist or if they are not associated

HTTPException(status_code=403_FORBIDDEN)

If the user is not a member of the project

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
async def _get_workflow_check_access(
    *,
    workflow_id: int,
    project_id: int,
    user_id: int,
    required_permissions: ProjectPermissions,
    db: AsyncSession,
) -> WorkflowV2:
    """
    Get a workflow and a project, after access control on the project.

    Args:
        workflow_id:
        project_id:
        user_id:
        db:

    Returns:
        The workflow object.

    Raises:
        HTTPException(status_code=404_NOT_FOUND):
            If the project or the workflow do not exist or if they are not
            associated
        HTTPException(status_code=403_FORBIDDEN):
            If the user is not a member of the project
    """

    # Access control for project
    await _get_project_check_access(
        project_id=project_id,
        user_id=user_id,
        required_permissions=required_permissions,
        db=db,
    )

    res = await db.execute(
        select(WorkflowV2)
        .where(WorkflowV2.id == workflow_id)
        .where(WorkflowV2.project_id == project_id)
        .execution_options(populate_existing=True)  # See issue 1087
    )
    workflow = res.scalars().one_or_none()

    if not workflow:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found"
        )

    return workflow

_get_workflow_or_404(*, workflow_id, db) async

Get a workflow or raise 404.

PARAMETER DESCRIPTION
workflow_id

TYPE: int

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
async def _get_workflow_or_404(
    *,
    workflow_id: int,
    db: AsyncSession,
) -> WorkflowV2:
    """
    Get a workflow or raise 404.

    Args:
        workflow_id:
        db:
    """
    wf = await db.get(WorkflowV2, workflow_id)
    if wf is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Workflow {workflow_id} not found.",
        )
    else:
        return wf

_get_workflow_task_check_access(*, project_id, workflow_id, workflow_task_id, user_id, required_permissions, db) async

Check that user has access to Workflow and WorkflowTask.

PARAMETER DESCRIPTION
project_id

TYPE: int

workflow_id

TYPE: int

workflow_task_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

RETURNS DESCRIPTION
tuple[WorkflowTaskV2, WorkflowV2]

Tuple of WorkflowTask and Workflow objects.

RAISES DESCRIPTION
HTTPException(status_code=404_NOT_FOUND)

If the project, the workflow or the workflowtask do not exist or if they are not associated

HTTPException(status_code=403_FORBIDDEN)

If the user is not a member of the project

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
async def _get_workflow_task_check_access(
    *,
    project_id: int,
    workflow_id: int,
    workflow_task_id: int,
    user_id: int,
    required_permissions: ProjectPermissions,
    db: AsyncSession,
) -> tuple[WorkflowTaskV2, WorkflowV2]:
    """
    Check that user has access to Workflow and WorkflowTask.

    Args:
        project_id:
        workflow_id:
        workflow_task_id:
        user_id:
        db:

    Returns:
        Tuple of WorkflowTask and Workflow objects.

    Raises:
        HTTPException(status_code=404_NOT_FOUND):
            If the project, the workflow or the workflowtask do not exist or
            if they are not associated
        HTTPException(status_code=403_FORBIDDEN):
            If the user is not a member of the project
    """

    # Access control for workflow
    workflow = await _get_workflow_check_access(
        workflow_id=workflow_id,
        project_id=project_id,
        user_id=user_id,
        required_permissions=required_permissions,
        db=db,
    )

    res = await db.execute(
        select(WorkflowTaskV2)
        .where(WorkflowTaskV2.id == workflow_task_id)
        .where(WorkflowTaskV2.workflow_id == workflow_id)
    )
    workflow_task = res.scalars().one_or_none()

    if workflow_task is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="WorkflowTask not found",
        )

    return workflow_task, workflow

_get_workflowtask_or_404(*, workflowtask_id, db) async

Get a workflow task or raise 404.

PARAMETER DESCRIPTION
workflowtask_id

TYPE: int

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
async def _get_workflowtask_or_404(
    *,
    workflowtask_id: int,
    db: AsyncSession,
) -> WorkflowTaskV2:
    """
    Get a workflow task or raise 404.

    Args:
        workflowtask_id:
        db:
    """
    wftask = await db.get(WorkflowTaskV2, workflowtask_id)
    if wftask is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"WorkflowTask {workflowtask_id} not found.",
        )
    else:
        return wftask

_workflow_insert_task(*, workflow_id, task_id, meta_parallel=None, meta_non_parallel=None, args_non_parallel=None, args_parallel=None, type_filters=None, db) async

Insert a new WorkflowTask into Workflow.task_list

PARAMETER DESCRIPTION
workflow_id

TYPE: int

task_id

TYPE: int

meta_parallel

TYPE: dict[str, Any] | None DEFAULT: None

meta_non_parallel

TYPE: dict[str, Any] | None DEFAULT: None

args_non_parallel

TYPE: dict[str, Any] | None DEFAULT: None

args_parallel

TYPE: dict[str, Any] | None DEFAULT: None

type_filters

TYPE: dict[str, bool] | None DEFAULT: None

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
async def _workflow_insert_task(
    *,
    workflow_id: int,
    task_id: int,
    meta_parallel: dict[str, Any] | None = None,
    meta_non_parallel: dict[str, Any] | None = None,
    args_non_parallel: dict[str, Any] | None = None,
    args_parallel: dict[str, Any] | None = None,
    type_filters: dict[str, bool] | None = None,
    db: AsyncSession,
) -> WorkflowTaskV2:
    """
    Insert a new WorkflowTask into Workflow.task_list

    Args:
        workflow_id:
        task_id:

        meta_parallel:
        meta_non_parallel:
        args_non_parallel:
        args_parallel:
        type_filters:
        db:
    """
    db_workflow = await db.get(WorkflowV2, workflow_id)
    if db_workflow is None:
        raise ValueError(f"Workflow {workflow_id} does not exist")

    # Get task from db
    db_task = await db.get(TaskV2, task_id)
    if db_task is None:
        raise ValueError(f"TaskV2 {task_id} not found.")
    task_type = db_task.type

    # Combine meta_parallel (higher priority)
    # and db_task.meta_parallel (lower priority)
    final_meta_parallel = (db_task.meta_parallel or {}).copy()
    final_meta_parallel.update(meta_parallel or {})
    if final_meta_parallel == {}:
        final_meta_parallel = None
    # Combine meta_non_parallel (higher priority)
    # and db_task.meta_non_parallel (lower priority)
    final_meta_non_parallel = (db_task.meta_non_parallel or {}).copy()
    final_meta_non_parallel.update(meta_non_parallel or {})
    if final_meta_non_parallel == {}:
        final_meta_non_parallel = None

    # Create DB entry
    wf_task = WorkflowTaskV2(
        task_type=task_type,
        task_id=task_id,
        args_non_parallel=args_non_parallel,
        args_parallel=args_parallel,
        meta_parallel=final_meta_parallel,
        meta_non_parallel=final_meta_non_parallel,
        type_filters=(type_filters or dict()),
    )
    db_workflow.task_list.append(wf_task)
    flag_modified(db_workflow, "task_list")
    await db.commit()

    wf_task = await db.get(
        WorkflowTaskV2,
        wf_task.id,
        populate_existing=True,  # See issue 1087
    )

    return wf_task

clean_app_job_list(db, jobs_list) async

Remove from a job list all jobs with status different from submitted.

PARAMETER DESCRIPTION
db

Async database session

TYPE: AsyncSession

jobs_list

List of job IDs currently associated to the app.

TYPE: list[int]

Return

List of IDs for submitted jobs.

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
async def clean_app_job_list(
    db: AsyncSession,
    jobs_list: list[int],
) -> list[int]:
    """
    Remove from a job list all jobs with status different from submitted.

    Args:
        db: Async database session
        jobs_list: List of job IDs currently associated to the app.

    Return:
        List of IDs for submitted jobs.
    """
    logger.info(f"[clean_app_job_list] START - {jobs_list=}.")
    stmt = select(JobV2).where(JobV2.id.in_(jobs_list))
    result = await db.execute(stmt)
    db_jobs_list = result.scalars().all()
    submitted_job_ids = [
        job.id for job in db_jobs_list if job.status == JobStatusType.SUBMITTED
    ]
    logger.info(f"[clean_app_job_list] END - {submitted_job_ids=}.")
    return submitted_job_ids