Skip to content

_aux_functions

Auxiliary functions to get object from the database or perform simple checks

_check_project_exists(*, project_name, user_id, db) async

Check that no other project with this name exists for this user.

PARAMETER DESCRIPTION
project_name

Project name

TYPE: str

user_id

User ID

TYPE: int

db

TYPE: AsyncSession

RAISES DESCRIPTION
HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If such a project already exists

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
async def _check_project_exists(
    *,
    project_name: str,
    user_id: int,
    db: AsyncSession,
) -> None:
    """
    Check that no other project with this name exists for this user.

    Args:
        project_name: Project name
        user_id: User ID
        db:

    Raises:
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If such a project already exists
    """
    stm = (
        select(ProjectV2)
        .join(LinkUserProjectV2)
        .where(ProjectV2.name == project_name)
        .where(LinkUserProjectV2.user_id == user_id)
    )
    res = await db.execute(stm)
    if res.scalars().all():
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=f"Project name ({project_name}) already in use",
        )

_check_workflow_exists(*, name, project_id, db) async

Check that no other workflow of this project has the same name.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

project_id

Project ID

TYPE: int

db

TYPE: AsyncSession

RAISES DESCRIPTION
HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If such a workflow already exists

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
async def _check_workflow_exists(
    *,
    name: str,
    project_id: int,
    db: AsyncSession,
) -> None:
    """
    Check that no other workflow of this project has the same name.

    Args:
        name: Workflow name
        project_id: Project ID
        db:

    Raises:
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If such a workflow already exists
    """
    stm = (
        select(WorkflowV2)
        .where(WorkflowV2.name == name)
        .where(WorkflowV2.project_id == project_id)
    )
    res = await db.execute(stm)
    if res.scalars().all():
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=f"Workflow with {name=} and {project_id=} already exists.",
        )

_get_dataset_check_owner(*, project_id, dataset_id, user_id, db) async

Get a dataset and a project, after access control on the project

PARAMETER DESCRIPTION
project_id

TYPE: int

dataset_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

RETURNS DESCRIPTION
dict[Literal['dataset', 'project'], DatasetV2 | ProjectV2]

Dictionary with the dataset and project objects (keys: dataset, project).

RAISES DESCRIPTION
HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If the dataset is not associated to the project

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
async def _get_dataset_check_owner(
    *,
    project_id: int,
    dataset_id: int,
    user_id: int,
    db: AsyncSession,
) -> dict[Literal["dataset", "project"], DatasetV2 | ProjectV2]:
    """
    Get a dataset and a project, after access control on the project

    Args:
        project_id:
        dataset_id:
        user_id:
        db:

    Returns:
        Dictionary with the dataset and project objects (keys: `dataset`,
            `project`).

    Raises:
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If the dataset is not associated to the project
    """
    # Access control for project
    project = await _get_project_check_owner(
        project_id=project_id, user_id=user_id, db=db
    )

    # Get dataset
    # (See issue 1087 for 'populate_existing=True')
    dataset = await db.get(DatasetV2, dataset_id, populate_existing=True)

    if not dataset:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Dataset not found"
        )
    if dataset.project_id != project_id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=f"Invalid {project_id=} for {dataset_id=}",
        )

    return dict(dataset=dataset, project=project)

_get_dataset_or_404(*, dataset_id, db) async

Get a dataset or raise 404.

PARAMETER DESCRIPTION
dataset_id

TYPE: int

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
async def _get_dataset_or_404(
    *,
    dataset_id: int,
    db: AsyncSession,
) -> DatasetV2:
    """
    Get a dataset or raise 404.

    Args:
        dataset_id:
        db:
    """
    ds = await db.get(DatasetV2, dataset_id)
    if ds is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Dataset {dataset_id} not found.",
        )
    else:
        return ds

_get_job_check_owner(*, project_id, job_id, user_id, db) async

Get a job and a project, after access control on the project

PARAMETER DESCRIPTION
project_id

TYPE: int

job_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

RETURNS DESCRIPTION
dict[Literal['job', 'project'], JobV2 | ProjectV2]

Dictionary with the job and project objects (keys: job, project).

RAISES DESCRIPTION
HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If the job is not associated to the project

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
async def _get_job_check_owner(
    *,
    project_id: int,
    job_id: int,
    user_id: int,
    db: AsyncSession,
) -> dict[Literal["job", "project"], JobV2 | ProjectV2]:
    """
    Get a job and a project, after access control on the project

    Args:
        project_id:
        job_id:
        user_id:
        db:

    Returns:
        Dictionary with the job and project objects (keys: `job`,
            `project`).

    Raises:
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If the job is not associated to the project
    """
    # Access control for project
    project = await _get_project_check_owner(
        project_id=project_id,
        user_id=user_id,
        db=db,
    )
    # Get dataset
    job = await db.get(JobV2, job_id)
    if not job:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Job not found"
        )
    if job.project_id != project_id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=f"Invalid {project_id=} for {job_id=}",
        )
    return dict(job=job, project=project)

_get_project_check_owner(*, project_id, user_id, db) async

Check that user is a member of project and return the project.

PARAMETER DESCRIPTION
project_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

RETURNS DESCRIPTION
ProjectV2

The project object

RAISES DESCRIPTION
HTTPException(status_code=403_FORBIDDEN)

If the user is not a member of the project

HTTPException(status_code=404_NOT_FOUND)

If the project does not exist

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async def _get_project_check_owner(
    *,
    project_id: int,
    user_id: int,
    db: AsyncSession,
) -> ProjectV2:
    """
    Check that user is a member of project and return the project.

    Args:
        project_id:
        user_id:
        db:

    Returns:
        The project object

    Raises:
        HTTPException(status_code=403_FORBIDDEN):
            If the user is not a member of the project
        HTTPException(status_code=404_NOT_FOUND):
            If the project does not exist
    """
    project = await db.get(ProjectV2, project_id)

    link_user_project = await db.get(LinkUserProjectV2, (project_id, user_id))
    if not project:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Project not found"
        )
    if not link_user_project:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"Not allowed on project {project_id}",
        )

    return project

_get_submitted_job_or_none(*, dataset_id, workflow_id, db) async

Get the submitted job for given dataset/workflow, if any.

This function also handles the invalid branch where more than one job is found.

PARAMETER DESCRIPTION
dataset_id

TYPE: int

workflow_id

TYPE: int

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
async def _get_submitted_job_or_none(
    *,
    dataset_id: int,
    workflow_id: int,
    db: AsyncSession,
) -> JobV2 | None:
    """
    Get the submitted job for given dataset/workflow, if any.

    This function also handles the invalid branch where more than one job
    is found.

    Args:
        dataset_id:
        workflow_id:
        db:
    """
    res = await db.execute(
        _get_submitted_jobs_statement()
        .where(JobV2.dataset_id == dataset_id)
        .where(JobV2.workflow_id == workflow_id)
    )
    try:
        return res.scalars().one_or_none()
    except MultipleResultsFound as e:
        error_msg = (
            "Multiple running jobs found for "
            f"{dataset_id=} and {workflow_id=}."
        )
        logger.error(f"{error_msg} Original error: {str(e)}.")
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=error_msg,
        )

_get_submitted_jobs_statement()

RETURNS DESCRIPTION
SelectOfScalar

A sqlmodel statement that selects all Jobs with

SelectOfScalar

Job.status equal to submitted.

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
325
326
327
328
329
330
331
332
def _get_submitted_jobs_statement() -> SelectOfScalar:
    """
    Returns:
        A sqlmodel statement that selects all `Job`s with
        `Job.status` equal to `submitted`.
    """
    stm = select(JobV2).where(JobV2.status == JobStatusTypeV2.SUBMITTED)
    return stm

_get_workflow_check_owner(*, workflow_id, project_id, user_id, db) async

Get a workflow and a project, after access control on the project.

PARAMETER DESCRIPTION
workflow_id

TYPE: int

project_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

RETURNS DESCRIPTION
WorkflowV2

The workflow object.

RAISES DESCRIPTION
HTTPException(status_code=404_NOT_FOUND)

If the workflow does not exist

HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If the workflow is not associated to the project

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
async def _get_workflow_check_owner(
    *,
    workflow_id: int,
    project_id: int,
    user_id: int,
    db: AsyncSession,
) -> WorkflowV2:
    """
    Get a workflow and a project, after access control on the project.

    Args:
        workflow_id:
        project_id:
        user_id:
        db:

    Returns:
        The workflow object.

    Raises:
        HTTPException(status_code=404_NOT_FOUND):
            If the workflow does not exist
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If the workflow is not associated to the project
    """

    # Access control for project
    project = await _get_project_check_owner(
        project_id=project_id, user_id=user_id, db=db
    )

    # Get workflow
    # (See issue 1087 for 'populate_existing=True')
    workflow = await db.get(WorkflowV2, workflow_id, populate_existing=True)

    if not workflow:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found"
        )
    if workflow.project_id != project.id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=(f"Invalid {project_id=} for {workflow_id=}."),
        )

    return workflow

_get_workflow_or_404(*, workflow_id, db) async

Get a workflow or raise 404.

PARAMETER DESCRIPTION
workflow_id

TYPE: int

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
async def _get_workflow_or_404(
    *,
    workflow_id: int,
    db: AsyncSession,
) -> WorkflowV2:
    """
    Get a workflow or raise 404.

    Args:
        workflow_id:
        db:
    """
    wf = await db.get(WorkflowV2, workflow_id)
    if wf is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Workflow {workflow_id} not found.",
        )
    else:
        return wf

_get_workflow_task_check_owner(*, project_id, workflow_id, workflow_task_id, user_id, db) async

Check that user has access to Workflow and WorkflowTask.

PARAMETER DESCRIPTION
project_id

TYPE: int

workflow_id

TYPE: int

workflow_task_id

TYPE: int

user_id

TYPE: int

db

TYPE: AsyncSession

RETURNS DESCRIPTION
tuple[WorkflowTaskV2, WorkflowV2]

Tuple of WorkflowTask and Workflow objects.

RAISES DESCRIPTION
HTTPException(status_code=404_NOT_FOUND)

If the WorkflowTask does not exist

HTTPException(status_code=422_UNPROCESSABLE_ENTITY)

If the WorkflowTask is not associated to the Workflow

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
async def _get_workflow_task_check_owner(
    *,
    project_id: int,
    workflow_id: int,
    workflow_task_id: int,
    user_id: int,
    db: AsyncSession,
) -> tuple[WorkflowTaskV2, WorkflowV2]:
    """
    Check that user has access to Workflow and WorkflowTask.

    Args:
        project_id:
        workflow_id:
        workflow_task_id:
        user_id:
        db:

    Returns:
        Tuple of WorkflowTask and Workflow objects.

    Raises:
        HTTPException(status_code=404_NOT_FOUND):
            If the WorkflowTask does not exist
        HTTPException(status_code=422_UNPROCESSABLE_ENTITY):
            If the WorkflowTask is not associated to the Workflow
    """

    # Access control for workflow
    workflow = await _get_workflow_check_owner(
        workflow_id=workflow_id,
        project_id=project_id,
        user_id=user_id,
        db=db,
    )

    # If WorkflowTask is not in the db, exit
    workflow_task = await db.get(WorkflowTaskV2, workflow_task_id)
    if not workflow_task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="WorkflowTask not found",
        )

    # If WorkflowTask is not part of the expected Workflow, exit
    if workflow_id != workflow_task.workflow_id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=f"Invalid {workflow_id=} for {workflow_task_id=}",
        )

    return workflow_task, workflow

_get_workflowtask_or_404(*, workflowtask_id, db) async

Get a workflow task or raise 404.

PARAMETER DESCRIPTION
workflowtask_id

TYPE: int

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
async def _get_workflowtask_or_404(
    *,
    workflowtask_id: int,
    db: AsyncSession,
) -> WorkflowTaskV2:
    """
    Get a workflow task or raise 404.

    Args:
        workflowtask_id:
        db:
    """
    wftask = await db.get(WorkflowTaskV2, workflowtask_id)
    if wftask is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"WorkflowTask {workflowtask_id} not found.",
        )
    else:
        return wftask

_workflow_insert_task(*, workflow_id, task_id, meta_parallel=None, meta_non_parallel=None, args_non_parallel=None, args_parallel=None, type_filters=None, db) async

Insert a new WorkflowTask into Workflow.task_list

PARAMETER DESCRIPTION
workflow_id

TYPE: int

task_id

TYPE: int

meta_parallel

TYPE: dict[str, Any] | None DEFAULT: None

meta_non_parallel

TYPE: dict[str, Any] | None DEFAULT: None

args_non_parallel

TYPE: dict[str, Any] | None DEFAULT: None

args_parallel

TYPE: dict[str, Any] | None DEFAULT: None

type_filters

TYPE: dict[str, bool] | None DEFAULT: None

db

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
async def _workflow_insert_task(
    *,
    workflow_id: int,
    task_id: int,
    meta_parallel: dict[str, Any] | None = None,
    meta_non_parallel: dict[str, Any] | None = None,
    args_non_parallel: dict[str, Any] | None = None,
    args_parallel: dict[str, Any] | None = None,
    type_filters: dict[str, bool] | None = None,
    db: AsyncSession,
) -> WorkflowTaskV2:
    """
    Insert a new WorkflowTask into Workflow.task_list

    Args:
        workflow_id:
        task_id:

        meta_parallel:
        meta_non_parallel:
        args_non_parallel:
        args_parallel:
        type_filters:
        db:
    """
    db_workflow = await db.get(WorkflowV2, workflow_id)
    if db_workflow is None:
        raise ValueError(f"Workflow {workflow_id} does not exist")

    # Get task from db
    db_task = await db.get(TaskV2, task_id)
    if db_task is None:
        raise ValueError(f"TaskV2 {task_id} not found.")
    task_type = db_task.type

    # Combine meta_parallel (higher priority)
    # and db_task.meta_parallel (lower priority)
    final_meta_parallel = (db_task.meta_parallel or {}).copy()
    final_meta_parallel.update(meta_parallel or {})
    if final_meta_parallel == {}:
        final_meta_parallel = None
    # Combine meta_non_parallel (higher priority)
    # and db_task.meta_non_parallel (lower priority)
    final_meta_non_parallel = (db_task.meta_non_parallel or {}).copy()
    final_meta_non_parallel.update(meta_non_parallel or {})
    if final_meta_non_parallel == {}:
        final_meta_non_parallel = None

    # Create DB entry
    wf_task = WorkflowTaskV2(
        task_type=task_type,
        task_id=task_id,
        args_non_parallel=args_non_parallel,
        args_parallel=args_parallel,
        meta_parallel=final_meta_parallel,
        meta_non_parallel=final_meta_non_parallel,
        type_filters=(type_filters or dict()),
    )
    db_workflow.task_list.append(wf_task)
    flag_modified(db_workflow, "task_list")
    await db.commit()

    # See issue 1087 for 'populate_existing=True'
    wf_task = await db.get(WorkflowTaskV2, wf_task.id, populate_existing=True)

    return wf_task

clean_app_job_list_v2(db, jobs_list) async

Remove from a job list all jobs with status different from submitted.

PARAMETER DESCRIPTION
db

Async database session

TYPE: AsyncSession

jobs_list

List of job IDs currently associated to the app.

TYPE: list[int]

Return

List of IDs for submitted jobs.

Source code in fractal_server/app/routes/api/v2/_aux_functions.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
async def clean_app_job_list_v2(
    db: AsyncSession, jobs_list: list[int]
) -> list[int]:
    """
    Remove from a job list all jobs with status different from submitted.

    Args:
        db: Async database session
        jobs_list: List of job IDs currently associated to the app.

    Return:
        List of IDs for submitted jobs.
    """
    stmt = select(JobV2).where(JobV2.id.in_(jobs_list))
    result = await db.execute(stmt)
    db_jobs_list = result.scalars().all()
    submitted_job_ids = [
        job.id
        for job in db_jobs_list
        if job.status == JobStatusTypeV2.SUBMITTED
    ]
    return submitted_job_ids