Skip to content

_aux_functions_tasks

Auxiliary functions to get task and task-group object from the database or perform simple checks

_check_type_filters_compatibility(*, task_input_types, wftask_type_filters)

Wrap merge_type_filters and raise HTTPException if needed.

Source code in fractal_server/app/routes/api/v2/_aux_functions_tasks.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def _check_type_filters_compatibility(
    *,
    task_input_types: dict[str, bool],
    wftask_type_filters: dict[str, bool],
) -> None:
    """
    Wrap `merge_type_filters` and raise `HTTPException` if needed.
    """
    try:
        merge_type_filters(
            task_input_types=task_input_types,
            wftask_type_filters=wftask_type_filters,
        )
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=f"Incompatible type filters.\nOriginal error: {str(e)}",
        )

_get_task_full_access(*, task_id, user_id, db) async

Get an existing task or raise a 404.

PARAMETER DESCRIPTION
task_id

ID of the required task.

TYPE: int

user_id

ID of the current user.

TYPE: int

db

An asynchronous db session.

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions_tasks.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
async def _get_task_full_access(
    *,
    task_id: int,
    user_id: int,
    db: AsyncSession,
) -> TaskV2:
    """
    Get an existing task or raise a 404.

    Args:
        task_id: ID of the required task.
        user_id: ID of the current user.
        db: An asynchronous db session.
    """
    task = await _get_task_or_404(task_id=task_id, db=db)
    task_group = await _get_task_group_full_access(
        task_group_id=task.taskgroupv2_id, user_id=user_id, db=db
    )

    resource_id = await _get_user_resource_id(user_id=user_id, db=db)
    if resource_id is None or resource_id != task_group.resource_id:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"User {user_id} has no access to TaskGroup's Resource.",
        )

    return task

_get_task_group_full_access(*, task_group_id, user_id, db) async

Get a task group or raise a 403 if user has no full access.

PARAMETER DESCRIPTION
task_group_id

ID of the required task group.

TYPE: int

user_id

ID of the current user.

TYPE: int

db

An asynchronous db session

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions_tasks.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
async def _get_task_group_full_access(
    *,
    task_group_id: int,
    user_id: int,
    db: AsyncSession,
) -> TaskGroupV2:
    """
    Get a task group or raise a 403 if user has no full access.

    Args:
        task_group_id: ID of the required task group.
        user_id: ID of the current user.
        db: An asynchronous db session
    """
    task_group = await _get_task_group_or_404(
        task_group_id=task_group_id, db=db
    )

    if task_group.user_id == user_id:
        return task_group
    else:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=(
                "Current user has no full access to "
                f"TaskGroupV2 {task_group_id}.",
            ),
        )

_get_task_group_or_404(*, task_group_id, db) async

Get an existing task group or raise a 404.

PARAMETER DESCRIPTION
task_group_id

The TaskGroupV2 id

TYPE: int

db

An asynchronous db session

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions_tasks.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
async def _get_task_group_or_404(
    *, task_group_id: int, db: AsyncSession
) -> TaskGroupV2:
    """
    Get an existing task group or raise a 404.

    Args:
        task_group_id: The TaskGroupV2 id
        db: An asynchronous db session
    """
    task_group = await db.get(TaskGroupV2, task_group_id)
    if task_group is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"TaskGroupV2 {task_group_id} not found",
        )
    return task_group

_get_task_group_read_access(*, task_group_id, user_id, db) async

Get a task group or raise a 403 if user has no read access.

PARAMETER DESCRIPTION
task_group_id

ID of the required task group.

TYPE: int

user_id

ID of the current user.

TYPE: int

db

An asynchronous db session.

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions_tasks.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def _get_task_group_read_access(
    *,
    task_group_id: int,
    user_id: int,
    db: AsyncSession,
) -> TaskGroupV2:
    """
    Get a task group or raise a 403 if user has no read access.

    Args:
        task_group_id: ID of the required task group.
        user_id: ID of the current user.
        db: An asynchronous db session.
    """
    task_group = await _get_task_group_or_404(
        task_group_id=task_group_id, db=db
    )

    # Prepare exception to be used below
    forbidden_exception = HTTPException(
        status_code=status.HTTP_403_FORBIDDEN,
        detail=(
            "Current user has no read access to TaskGroupV2 "
            f"{task_group_id}."
        ),
    )

    if task_group.user_id == user_id:
        return task_group
    elif task_group.user_group_id is None:
        raise forbidden_exception
    else:
        stm = (
            select(LinkUserGroup)
            .join(UserOAuth)
            .join(Profile)
            .where(LinkUserGroup.group_id == task_group.user_group_id)
            .where(LinkUserGroup.user_id == user_id)
            .where(UserOAuth.id == user_id)
            .where(Profile.id == UserOAuth.profile_id)
            .where(task_group.resource_id == Profile.resource_id)
        )
        res = await db.execute(stm)
        link = res.unique().scalars().one_or_none()
        if link is None:
            raise forbidden_exception
        else:
            return task_group

_get_task_or_404(*, task_id, db) async

Get an existing task or raise a 404.

PARAMETER DESCRIPTION
task_id

ID of the required task.

TYPE: int

db

An asynchronous db session

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions_tasks.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
async def _get_task_or_404(*, task_id: int, db: AsyncSession) -> TaskV2:
    """
    Get an existing task or raise a 404.

    Args:
        task_id: ID of the required task.
        db: An asynchronous db session
    """
    task = await db.get(TaskV2, task_id)
    if task is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"TaskV2 {task_id} not found",
        )
    return task

_get_task_read_access(*, task_id, user_id, db, require_active=False) async

Get an existing task or raise a 404.

PARAMETER DESCRIPTION
task_id

ID of the required task.

TYPE: int

user_id

ID of the current user.

TYPE: int

db

An asynchronous db session.

TYPE: AsyncSession

require_active

If set, fail when the task group is not active

TYPE: bool DEFAULT: False

Source code in fractal_server/app/routes/api/v2/_aux_functions_tasks.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
async def _get_task_read_access(
    *,
    task_id: int,
    user_id: int,
    db: AsyncSession,
    require_active: bool = False,
) -> TaskV2:
    """
    Get an existing task or raise a 404.

    Args:
        task_id: ID of the required task.
        user_id: ID of the current user.
        db: An asynchronous db session.
        require_active: If set, fail when the task group is not `active`
    """
    task = await _get_task_or_404(task_id=task_id, db=db)
    task_group = await _get_task_group_read_access(
        task_group_id=task.taskgroupv2_id, user_id=user_id, db=db
    )

    resource_id = await _get_user_resource_id(user_id=user_id, db=db)
    if resource_id is None or resource_id != task_group.resource_id:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"User {user_id} has no access to TaskGroup's Resource.",
        )

    if require_active and not task_group.active:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=f"Error: task {task_id} ({task.name}) is not active.",
        )

    return task

_get_valid_user_group_id(*, user_group_id=None, private, user_id, db) async

Validate query parameters for endpoints that create some task(s).

PARAMETER DESCRIPTION
user_group_id

TYPE: int | None DEFAULT: None

private

TYPE: bool

user_id

ID of the current user

TYPE: int

db

An asynchronous db session.

TYPE: AsyncSession

Source code in fractal_server/app/routes/api/v2/_aux_functions_tasks.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
async def _get_valid_user_group_id(
    *,
    user_group_id: int | None = None,
    private: bool,
    user_id: int,
    db: AsyncSession,
) -> int | None:
    """
    Validate query parameters for endpoints that create some task(s).

    Args:
        user_group_id:
        private:
        user_id: ID of the current user
        db: An asynchronous db session.
    """
    if (user_group_id is not None) and (private is True):
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=f"Cannot set both {user_group_id=} and {private=}",
        )
    elif private is True:
        user_group_id = None
    elif user_group_id is None:
        user_group_id = await _get_default_usergroup_id_or_none(db=db)
    else:
        await _verify_user_belongs_to_group(
            user_id=user_id, user_group_id=user_group_id, db=db
        )
    return user_group_id

_verify_non_duplication_group_path(*, path, resource_id, db) async

Verify uniqueness of non-None TaskGroupV2.path

Source code in fractal_server/app/routes/api/v2/_aux_functions_tasks.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
async def _verify_non_duplication_group_path(
    *,
    path: str | None,
    resource_id: int,
    db: AsyncSession,
) -> None:
    """
    Verify uniqueness of non-`None` `TaskGroupV2.path`
    """
    if path is None:
        return
    stm = (
        select(TaskGroupV2.id)
        .where(TaskGroupV2.path == path)
        .where(TaskGroupV2.resource_id == resource_id)
    )
    res = await db.execute(stm)
    duplicate_ids = res.scalars().all()
    if duplicate_ids:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail=(
                f"Other TaskGroups already have {path=}: "
                f"{sorted(duplicate_ids)}."
            ),
        )