Skip to content

_aux_task_group_disambiguation

_disambiguate_task_groups(*, matching_task_groups, user_id, default_group_id, db) async

Find ownership-based top-priority task group, if any.

Parameters:

Name Type Description Default
matching_task_groups list[TaskGroupV2]
required
user_id int
required
default_group_id int
required
db AsyncSession
required

Returns:

Type Description
TaskGroupV2 | None

The task group or None.

Source code in fractal_server/app/routes/api/v2/_aux_task_group_disambiguation.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def _disambiguate_task_groups(
    *,
    matching_task_groups: list[TaskGroupV2],
    user_id: int,
    default_group_id: int,
    db: AsyncSession,
) -> TaskGroupV2 | None:
    """
    Find ownership-based top-priority task group, if any.

    Args:
        matching_task_groups:
        user_id:
        default_group_id:
        db:

    Returns:
        The task group or `None`.
    """

    # Highest priority: task groups created by user
    list_user_ids = [tg.user_id for tg in matching_task_groups]
    try:
        ind_user_id = list_user_ids.index(user_id)
        task_group = matching_task_groups[ind_user_id]
        logger.debug(
            "[_disambiguate_task_groups] "
            f"Found task group {task_group.id} with {user_id=}, return."
        )
        return task_group
    except ValueError:
        logger.debug(
            "[_disambiguate_task_groups] "
            f"No task group with {user_id=}, continue."
        )

    # Medium priority: task groups owned by default user group
    list_user_group_ids = [tg.user_group_id for tg in matching_task_groups]
    try:
        ind_user_group_id = list_user_group_ids.index(default_group_id)
        task_group = matching_task_groups[ind_user_group_id]
        logger.debug(
            "[_disambiguate_task_groups] "
            f"Found task group {task_group.id} with {user_id=}, return."
        )
        return task_group
    except ValueError:
        logger.debug(
            "[_disambiguate_task_groups] "
            "No task group with user_group_id="
            f"{default_group_id}, continue."
        )

    # Lowest priority: task groups owned by other groups, sorted
    # according to age of the user/usergroup link
    logger.debug(
        "[_disambiguate_task_groups] "
        "Sort remaining task groups by oldest-user-link."
    )
    stm = (
        select(LinkUserGroup.group_id)
        .where(LinkUserGroup.user_id == user_id)
        .where(LinkUserGroup.group_id.in_(list_user_group_ids))
        .order_by(LinkUserGroup.timestamp_created.asc())
    )
    res = await db.execute(stm)
    oldest_user_group_id = res.scalars().first()
    logger.debug(
        "[_disambiguate_task_groups] " f"Result: {oldest_user_group_id=}."
    )
    task_group = next(
        iter(
            task_group
            for task_group in matching_task_groups
            if task_group.user_group_id == oldest_user_group_id
        ),
        None,
    )
    return task_group

_disambiguate_task_groups_not_none(*, matching_task_groups, user_id, default_group_id, db) async

Find ownership-based top-priority task group, and fail otherwise.

Parameters:

Name Type Description Default
matching_task_groups list[TaskGroupV2]
required
user_id int
required
default_group_id int
required
db AsyncSession
required

Returns:

Type Description
TaskGroupV2

The top-priority task group.

Source code in fractal_server/app/routes/api/v2/_aux_task_group_disambiguation.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def _disambiguate_task_groups_not_none(
    *,
    matching_task_groups: list[TaskGroupV2],
    user_id: int,
    default_group_id: int,
    db: AsyncSession,
) -> TaskGroupV2:
    """
    Find ownership-based top-priority task group, and fail otherwise.

    Args:
        matching_task_groups:
        user_id:
        default_group_id:
        db:

    Returns:
        The top-priority task group.
    """
    task_group = await _disambiguate_task_groups(
        matching_task_groups=matching_task_groups,
        user_id=user_id,
        default_group_id=default_group_id,
        db=db,
    )
    if task_group is None:
        error_msg = (
            "[_disambiguate_task_groups_not_none] Could not find a task "
            f"group ({user_id=}, {default_group_id=})."
        )
        logger.error(f"UnreachableBranchError {error_msg}")
        raise UnreachableBranchError(error_msg)
    else:
        return task_group

remove_duplicate_task_groups(*, task_groups, user_id, default_group_id, db) async

Extract an item for each version from a sorted task-group list.

Parameters:

Name Type Description Default
task_groups list[TaskGroupV2]

A list of task groups with identical pkg_name

required
user_id int

User ID

required

Returns:

Type Description
list[TaskGroupV2]

New list of task groups with no duplicate (pkg_name,version) entries

Source code in fractal_server/app/routes/api/v2/_aux_task_group_disambiguation.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
async def remove_duplicate_task_groups(
    *,
    task_groups: list[TaskGroupV2],
    user_id: int,
    default_group_id: int,
    db: AsyncSession,
) -> list[TaskGroupV2]:
    """
    Extract an item for each `version` from a *sorted* task-group list.

    Args:
        task_groups: A list of task groups with identical `pkg_name`
        user_id: User ID

    Returns:
        New list of task groups with no duplicate `(pkg_name,version)` entries
    """

    new_task_groups = [
        (
            await _disambiguate_task_groups_not_none(
                matching_task_groups=list(groups),
                user_id=user_id,
                default_group_id=default_group_id,
                db=db,
            )
        )
        for version, groups in itertools.groupby(
            task_groups, key=lambda tg: tg.version
        )
    ]
    return new_task_groups