Skip to content

utils_database

create_db_tasks_and_update_task_group(*, task_group_id, task_list, db)

Create a TaskGroupV2 with N TaskV2s, and insert them into the database.

Parameters:

Name Type Description Default
task_group_id int

ID of an existing TaskGroupV2 object.

required
task_list list[TaskCreateV2]

List of TaskCreateV2 objects to be inserted into the db.

required
db Session

Synchronous database session

required

Returns:

Type Description
TaskGroupV2

Updated TaskGroupV2 object.

Source code in fractal_server/tasks/v2/utils_database.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def create_db_tasks_and_update_task_group(
    *,
    task_group_id: int,
    task_list: list[TaskCreateV2],
    db: DBSyncSession,
) -> TaskGroupV2:
    """
    Create a `TaskGroupV2` with N `TaskV2`s, and insert them into the database.

    Arguments:
        task_group_id: ID of an existing `TaskGroupV2` object.
        task_list: List of `TaskCreateV2` objects to be inserted into the db.
        db: Synchronous database session

    Returns:
        Updated `TaskGroupV2` object.
    """
    actual_task_list = [
        TaskV2(
            **task.dict(),
            type=_get_task_type(task),
        )
        for task in task_list
    ]
    task_group = db.get(TaskGroupV2, task_group_id)
    task_group.task_list = actual_task_list
    db.add(task_group)
    db.commit()
    db.refresh(task_group)

    return task_group