Skip to content

_aux_task_group_disambiguation

_disambiguate_task_groups(*, matching_task_groups, user_id, default_group_id, db) async

Find ownership-based top-priority task group, if any.

PARAMETER DESCRIPTION
matching_task_groups

TYPE: list[TaskGroupV2]

user_id

TYPE: int

default_group_id

TYPE: int | None

db

TYPE: AsyncSession

RETURNS DESCRIPTION
TaskGroupV2 | None

The task group or None.

Source code in fractal_server/app/routes/api/v2/_aux_task_group_disambiguation.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
async def _disambiguate_task_groups(
    *,
    matching_task_groups: list[TaskGroupV2],
    user_id: int,
    default_group_id: int | None,
    db: AsyncSession,
) -> TaskGroupV2 | None:
    """
    Find ownership-based top-priority task group, if any.

    Args:
        matching_task_groups:
        user_id:
        default_group_id:
        db:

    Returns:
        The task group or `None`.
    """

    # Highest priority: task groups created by user
    list_user_ids = [tg.user_id for tg in matching_task_groups]
    try:
        ind_user_id = list_user_ids.index(user_id)
        task_group = matching_task_groups[ind_user_id]
        logger.debug(
            "[_disambiguate_task_groups] "
            f"Found task group {task_group.id} with {user_id=}, return."
        )
        return task_group
    except ValueError:
        logger.debug(
            "[_disambiguate_task_groups] "
            f"No task group with {user_id=}, continue."
        )

    # Medium priority: task groups owned by default user group
    settings = Inject(get_settings)
    list_user_group_ids = [tg.user_group_id for tg in matching_task_groups]
    if settings.FRACTAL_DEFAULT_GROUP_NAME is not None:
        try:
            ind_user_group_id = list_user_group_ids.index(default_group_id)
            task_group = matching_task_groups[ind_user_group_id]
            logger.debug(
                "[_disambiguate_task_groups] "
                f"Found task group {task_group.id} with {user_id=}, return."
            )
            return task_group
        except ValueError:
            logger.debug(
                "[_disambiguate_task_groups] "
                "No task group with user_group_id="
                f"{default_group_id}, continue."
            )

    # Lowest priority: task groups owned by other groups, sorted
    # according to age of the user/usergroup link
    logger.debug(
        "[_disambiguate_task_groups] "
        "Sort remaining task groups by oldest-user-link."
    )
    stm = (
        select(LinkUserGroup.group_id)
        .where(LinkUserGroup.user_id == user_id)
        .where(LinkUserGroup.group_id.in_(list_user_group_ids))
        .order_by(LinkUserGroup.timestamp_created.asc())
    )
    res = await db.execute(stm)
    oldest_user_group_id = res.scalars().first()
    logger.debug(
        "[_disambiguate_task_groups] " f"Result: {oldest_user_group_id=}."
    )
    task_group = next(
        iter(
            task_group
            for task_group in matching_task_groups
            if task_group.user_group_id == oldest_user_group_id
        ),
        None,
    )
    return task_group

_disambiguate_task_groups_not_none(*, matching_task_groups, user_id, default_group_id, db) async

Find ownership-based top-priority task group, and fail otherwise.

PARAMETER DESCRIPTION
matching_task_groups

TYPE: list[TaskGroupV2]

user_id

TYPE: int

default_group_id

TYPE: int | None

db

TYPE: AsyncSession

RETURNS DESCRIPTION
TaskGroupV2

The top-priority task group.

Source code in fractal_server/app/routes/api/v2/_aux_task_group_disambiguation.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def _disambiguate_task_groups_not_none(
    *,
    matching_task_groups: list[TaskGroupV2],
    user_id: int,
    default_group_id: int | None,
    db: AsyncSession,
) -> TaskGroupV2:
    """
    Find ownership-based top-priority task group, and fail otherwise.

    Args:
        matching_task_groups:
        user_id:
        default_group_id:
        db:

    Returns:
        The top-priority task group.
    """
    task_group = await _disambiguate_task_groups(
        matching_task_groups=matching_task_groups,
        user_id=user_id,
        default_group_id=default_group_id,
        db=db,
    )
    if task_group is None:
        error_msg = (
            "[_disambiguate_task_groups_not_none] Could not find a task "
            f"group ({user_id=}, {default_group_id=})."
        )
        logger.error(f"UnreachableBranchError {error_msg}")
        raise UnreachableBranchError(error_msg)
    else:
        return task_group

remove_duplicate_task_groups(*, task_groups, user_id, default_group_id, db) async

Extract an item for each version from a sorted task-group list.

PARAMETER DESCRIPTION
task_groups

A list of task groups with identical pkg_name

TYPE: list[TaskGroupV2]

user_id

User ID

TYPE: int

RETURNS DESCRIPTION
list[TaskGroupV2]

New list of task groups with no duplicate (pkg_name,version) entries

Source code in fractal_server/app/routes/api/v2/_aux_task_group_disambiguation.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
async def remove_duplicate_task_groups(
    *,
    task_groups: list[TaskGroupV2],
    user_id: int,
    default_group_id: int | None,
    db: AsyncSession,
) -> list[TaskGroupV2]:
    """
    Extract an item for each `version` from a *sorted* task-group list.

    Args:
        task_groups: A list of task groups with identical `pkg_name`
        user_id: User ID

    Returns:
        New list of task groups with no duplicate `(pkg_name,version)` entries
    """

    new_task_groups = [
        (
            await _disambiguate_task_groups_not_none(
                matching_task_groups=list(groups),
                user_id=user_id,
                default_group_id=default_group_id,
                db=db,
            )
        )
        for version, groups in itertools.groupby(
            task_groups, key=lambda tg: tg.version
        )
    ]
    return new_task_groups