Skip to content

security

Auth subsystem

This module implements the authorisation/authentication subsystem of the Fractal Server. It is based on the FastAPI Users library with support for the SQLModel database adapter.

In particular, this module links the appropriate database models, sets up FastAPIUsers with Barer Token and cookie transports and register local routes. Then, for each OAuth client defined in the Fractal Settings configuration, it registers the client and the relative routes.

All routes are registered under the auth/ prefix.

SQLModelUserDatabaseAsync

Bases: Generic[UP, ID], BaseUserDatabase[UP, ID]

This class is from fastapi_users_db_sqlmodel Original Copyright: 2022 François Voron, released under MIT licence

Database adapter for SQLModel working purely asynchronously.

PARAMETER DESCRIPTION
user_model

SQLModel model of a DB representation of a user.

TYPE: type[UP]

session

SQLAlchemy async session.

TYPE: AsyncSession

Source code in fractal_server/app/security/__init__.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class SQLModelUserDatabaseAsync(Generic[UP, ID], BaseUserDatabase[UP, ID]):
    """
    This class is from fastapi_users_db_sqlmodel
    Original Copyright: 2022 François Voron, released under MIT licence

    Database adapter for SQLModel working purely asynchronously.

    Args:
        user_model: SQLModel model of a DB representation of a user.
        session: SQLAlchemy async session.
    """

    session: AsyncSession
    user_model: type[UP]
    oauth_account_model: type[OAuthAccount] | None = None

    def __init__(
        self,
        session: AsyncSession,
        user_model: type[UP],
        oauth_account_model: type[OAuthAccount] | None = None,
    ):
        self.session = session
        self.user_model = user_model
        self.oauth_account_model = oauth_account_model

    async def get(self, id: ID) -> UP | None:
        """Get a single user by id."""
        return await self.session.get(self.user_model, id)

    async def get_by_email(self, email: str) -> UP | None:
        """Get a single user by email."""
        statement = select(self.user_model).where(
            func.lower(self.user_model.email) == func.lower(email)
        )
        results = await self.session.execute(statement)
        object = results.first()
        if object is None:
            return None
        return object[0]

    async def get_by_oauth_account(
        self, oauth: str, account_id: str
    ) -> UP | None:  # noqa
        """Get a single user by OAuth account id."""
        if self.oauth_account_model is None:
            raise NotImplementedError()
        statement = (
            select(self.oauth_account_model)
            .where(self.oauth_account_model.oauth_name == oauth)
            .where(self.oauth_account_model.account_id == account_id)
            .options(selectinload(self.oauth_account_model.user))  # type: ignore  # noqa
        )
        results = await self.session.execute(statement)
        oauth_account = results.first()
        if oauth_account:
            user = oauth_account[0].user  # type: ignore
            return user
        return None

    async def create(self, create_dict: dict[str, Any]) -> UP:
        """Create a user."""
        user = self.user_model(**create_dict)
        self.session.add(user)
        await self.session.commit()
        await self.session.refresh(user)
        return user

    async def update(self, user: UP, update_dict: dict[str, Any]) -> UP:
        for key, value in update_dict.items():
            setattr(user, key, value)
        self.session.add(user)
        await self.session.commit()
        await self.session.refresh(user)
        return user

    async def delete(self, user: UP) -> None:
        await self.session.delete(user)
        await self.session.commit()

    async def add_oauth_account(
        self, user: UP, create_dict: dict[str, Any]
    ) -> UP:  # noqa
        if self.oauth_account_model is None:
            raise NotImplementedError()

        oauth_account = self.oauth_account_model(**create_dict)
        user.oauth_accounts.append(oauth_account)  # type: ignore
        self.session.add(user)

        await self.session.commit()

        return user

    async def update_oauth_account(
        self, user: UP, oauth_account: OAP, update_dict: dict[str, Any]
    ) -> UP:
        if self.oauth_account_model is None:
            raise NotImplementedError()

        for key, value in update_dict.items():
            setattr(oauth_account, key, value)
        self.session.add(oauth_account)
        await self.session.commit()

        return user

create(create_dict) async

Create a user.

Source code in fractal_server/app/security/__init__.py
121
122
123
124
125
126
127
async def create(self, create_dict: dict[str, Any]) -> UP:
    """Create a user."""
    user = self.user_model(**create_dict)
    self.session.add(user)
    await self.session.commit()
    await self.session.refresh(user)
    return user

get(id) async

Get a single user by id.

Source code in fractal_server/app/security/__init__.py
87
88
89
async def get(self, id: ID) -> UP | None:
    """Get a single user by id."""
    return await self.session.get(self.user_model, id)

get_by_email(email) async

Get a single user by email.

Source code in fractal_server/app/security/__init__.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def get_by_email(self, email: str) -> UP | None:
    """Get a single user by email."""
    statement = select(self.user_model).where(
        func.lower(self.user_model.email) == func.lower(email)
    )
    results = await self.session.execute(statement)
    object = results.first()
    if object is None:
        return None
    return object[0]

get_by_oauth_account(oauth, account_id) async

Get a single user by OAuth account id.

Source code in fractal_server/app/security/__init__.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
async def get_by_oauth_account(
    self, oauth: str, account_id: str
) -> UP | None:  # noqa
    """Get a single user by OAuth account id."""
    if self.oauth_account_model is None:
        raise NotImplementedError()
    statement = (
        select(self.oauth_account_model)
        .where(self.oauth_account_model.oauth_name == oauth)
        .where(self.oauth_account_model.account_id == account_id)
        .options(selectinload(self.oauth_account_model.user))  # type: ignore  # noqa
    )
    results = await self.session.execute(statement)
    oauth_account = results.first()
    if oauth_account:
        user = oauth_account[0].user  # type: ignore
        return user
    return None

UserManager

Bases: IntegerIDMixin, BaseUserManager[UserOAuth, int]

Source code in fractal_server/app/security/__init__.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
class UserManager(IntegerIDMixin, BaseUserManager[UserOAuth, int]):
    def __init__(self, user_db):
        """
        Override `__init__` of `BaseUserManager` to define custom
        `password_helper`.
        """
        super().__init__(
            user_db=user_db,
            password_helper=password_helper,
        )

    async def validate_password(self, password: str, user: UserOAuth) -> None:
        # check password length
        min_length = 4
        max_length = 100
        if len(password) < min_length:
            raise InvalidPasswordException(
                f"The password is too short (minimum length: {min_length})."
            )
        elif len(password) > max_length:
            raise InvalidPasswordException(
                f"The password is too long (maximum length: {min_length})."
            )

    async def oauth_callback(
        self: Self,
        oauth_name: str,
        access_token: str,
        account_id: str,
        account_email: str,
        expires_at: int | None = None,
        refresh_token: str | None = None,
        request: Request | None = None,
        *,
        associate_by_email: bool = False,
        is_verified_by_default: bool = False,
    ) -> UserOAuth:
        """
        Handle the callback after a successful OAuth authentication.

        This method extends the corresponding `BaseUserManager` method of
        > fastapi-users v14.0.1, Copyright (c) 2019 François Voron, MIT License

        If the user already exists with this OAuth account, the token is
        updated.

        If a user with the same e-mail already exists and `associate_by_email`
        is True, the OAuth account is associated to this user.
        Otherwise, the `UserNotExists` exception is raised.

        If the user does not exist, send an email to the Fractal admins (if
        configured) and respond with a 400 error status. NOTE: This is the
        function branch where the `fractal-server` implementation deviates
        from the original `fastapi-users` one.

        :param oauth_name: Name of the OAuth client.
        :param access_token: Valid access token for the service provider.
        :param account_id: models.ID of the user on the service provider.
        :param account_email: E-mail of the user on the service provider.
        :param expires_at: Optional timestamp at which the access token
        expires.
        :param refresh_token: Optional refresh token to get a
        fresh access token from the service provider.
        :param request: Optional FastAPI request that
        triggered the operation, defaults to None
        :param associate_by_email: If True, any existing user with the same
        e-mail address will be associated to this user. Defaults to False.
        :param is_verified_by_default: If True, the `is_verified` flag will be
        set to `True` on newly created user. Make sure the OAuth Provider you
        are using does verify the email address before enabling this flag.
        Defaults to False.
        :return: A user.
        """
        from fastapi import HTTPException
        from fastapi import status
        from fastapi_users import exceptions

        oauth_account_dict = {
            "oauth_name": oauth_name,
            "access_token": access_token,
            "account_id": account_id,
            "account_email": account_email,
            "expires_at": expires_at,
            "refresh_token": refresh_token,
        }

        try:
            user = await self.get_by_oauth_account(oauth_name, account_id)
        except exceptions.UserNotExists:
            try:
                # Associate account
                user = await self.get_by_email(account_email)
                if not associate_by_email:
                    raise exceptions.UserAlreadyExists()
                user = await self.user_db.add_oauth_account(
                    user, oauth_account_dict
                )
            except exceptions.UserNotExists:
                # (0) Log
                logger.warning(
                    f"Self-registration attempt by {account_email}."
                )

                # (1) Prepare user-facing error message
                error_msg = (
                    "Thank you for registering for the Fractal service. "
                    "Administrators have been informed to configure your "
                    "account and will get back to you."
                )
                settings = Inject(get_settings)
                if settings.FRACTAL_HELP_URL is not None:
                    error_msg = (
                        f"{error_msg}\n"
                        "You can find more information about the onboarding "
                        f"process at {settings.FRACTAL_HELP_URL}."
                    )

                # (2) Send email to admins
                email_settings = Inject(get_email_settings)
                send_fractal_email_or_log_failure(
                    subject="New OAuth self-registration",
                    msg=(
                        f"User '{account_email}' tried to "
                        "self-register through OAuth.\n"
                        "Please create the Fractal account manually.\n"
                        "Here is the error message displayed to the "
                        f"user:\n{error_msg}"
                    ),
                    email_settings=email_settings.public,
                )

                # (3) Raise
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=error_msg,
                )
        else:
            # Update oauth
            for existing_oauth_account in user.oauth_accounts:
                if (
                    existing_oauth_account.account_id == account_id
                    and existing_oauth_account.oauth_name == oauth_name
                ):
                    user = await self.user_db.update_oauth_account(
                        user, existing_oauth_account, oauth_account_dict
                    )

        return user

    async def on_after_register(
        self, user: UserOAuth, request: Request | None = None
    ):
        settings = Inject(get_settings)
        logger.info(
            f"New-user registration completed ({user.id=}, {user.email=})."
        )
        async for db in get_async_db():
            # Note: if `FRACTAL_DEFAULT_GROUP_NAME=None`, this query will
            # result into `None`
            settings = Inject(get_settings)
            stm = select(UserGroup.id).where(
                UserGroup.name == settings.FRACTAL_DEFAULT_GROUP_NAME
            )
            res = await db.execute(stm)
            default_group_id_or_none = res.scalars().one_or_none()
            if default_group_id_or_none is not None:
                link = LinkUserGroup(
                    user_id=user.id, group_id=default_group_id_or_none
                )
                db.add(link)
                await db.commit()
                logger.info(
                    f"Added {user.email} user to group "
                    f"{default_group_id_or_none=}."
                )
            elif settings.FRACTAL_DEFAULT_GROUP_NAME is not None:
                logger.error(
                    "No group found with name "
                    f"{settings.FRACTAL_DEFAULT_GROUP_NAME}"
                )

__init__(user_db)

Override __init__ of BaseUserManager to define custom password_helper.

Source code in fractal_server/app/security/__init__.py
180
181
182
183
184
185
186
187
188
def __init__(self, user_db):
    """
    Override `__init__` of `BaseUserManager` to define custom
    `password_helper`.
    """
    super().__init__(
        user_db=user_db,
        password_helper=password_helper,
    )

oauth_callback(oauth_name, access_token, account_id, account_email, expires_at=None, refresh_token=None, request=None, *, associate_by_email=False, is_verified_by_default=False) async

Handle the callback after a successful OAuth authentication.

This method extends the corresponding BaseUserManager method of

fastapi-users v14.0.1, Copyright (c) 2019 François Voron, MIT License

If the user already exists with this OAuth account, the token is updated.

If a user with the same e-mail already exists and associate_by_email is True, the OAuth account is associated to this user. Otherwise, the UserNotExists exception is raised.

If the user does not exist, send an email to the Fractal admins (if configured) and respond with a 400 error status. NOTE: This is the function branch where the fractal-server implementation deviates from the original fastapi-users one.

:param oauth_name: Name of the OAuth client. :param access_token: Valid access token for the service provider. :param account_id: models.ID of the user on the service provider. :param account_email: E-mail of the user on the service provider. :param expires_at: Optional timestamp at which the access token expires. :param refresh_token: Optional refresh token to get a fresh access token from the service provider. :param request: Optional FastAPI request that triggered the operation, defaults to None :param associate_by_email: If True, any existing user with the same e-mail address will be associated to this user. Defaults to False. :param is_verified_by_default: If True, the is_verified flag will be set to True on newly created user. Make sure the OAuth Provider you are using does verify the email address before enabling this flag. Defaults to False. :return: A user.

Source code in fractal_server/app/security/__init__.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
async def oauth_callback(
    self: Self,
    oauth_name: str,
    access_token: str,
    account_id: str,
    account_email: str,
    expires_at: int | None = None,
    refresh_token: str | None = None,
    request: Request | None = None,
    *,
    associate_by_email: bool = False,
    is_verified_by_default: bool = False,
) -> UserOAuth:
    """
    Handle the callback after a successful OAuth authentication.

    This method extends the corresponding `BaseUserManager` method of
    > fastapi-users v14.0.1, Copyright (c) 2019 François Voron, MIT License

    If the user already exists with this OAuth account, the token is
    updated.

    If a user with the same e-mail already exists and `associate_by_email`
    is True, the OAuth account is associated to this user.
    Otherwise, the `UserNotExists` exception is raised.

    If the user does not exist, send an email to the Fractal admins (if
    configured) and respond with a 400 error status. NOTE: This is the
    function branch where the `fractal-server` implementation deviates
    from the original `fastapi-users` one.

    :param oauth_name: Name of the OAuth client.
    :param access_token: Valid access token for the service provider.
    :param account_id: models.ID of the user on the service provider.
    :param account_email: E-mail of the user on the service provider.
    :param expires_at: Optional timestamp at which the access token
    expires.
    :param refresh_token: Optional refresh token to get a
    fresh access token from the service provider.
    :param request: Optional FastAPI request that
    triggered the operation, defaults to None
    :param associate_by_email: If True, any existing user with the same
    e-mail address will be associated to this user. Defaults to False.
    :param is_verified_by_default: If True, the `is_verified` flag will be
    set to `True` on newly created user. Make sure the OAuth Provider you
    are using does verify the email address before enabling this flag.
    Defaults to False.
    :return: A user.
    """
    from fastapi import HTTPException
    from fastapi import status
    from fastapi_users import exceptions

    oauth_account_dict = {
        "oauth_name": oauth_name,
        "access_token": access_token,
        "account_id": account_id,
        "account_email": account_email,
        "expires_at": expires_at,
        "refresh_token": refresh_token,
    }

    try:
        user = await self.get_by_oauth_account(oauth_name, account_id)
    except exceptions.UserNotExists:
        try:
            # Associate account
            user = await self.get_by_email(account_email)
            if not associate_by_email:
                raise exceptions.UserAlreadyExists()
            user = await self.user_db.add_oauth_account(
                user, oauth_account_dict
            )
        except exceptions.UserNotExists:
            # (0) Log
            logger.warning(
                f"Self-registration attempt by {account_email}."
            )

            # (1) Prepare user-facing error message
            error_msg = (
                "Thank you for registering for the Fractal service. "
                "Administrators have been informed to configure your "
                "account and will get back to you."
            )
            settings = Inject(get_settings)
            if settings.FRACTAL_HELP_URL is not None:
                error_msg = (
                    f"{error_msg}\n"
                    "You can find more information about the onboarding "
                    f"process at {settings.FRACTAL_HELP_URL}."
                )

            # (2) Send email to admins
            email_settings = Inject(get_email_settings)
            send_fractal_email_or_log_failure(
                subject="New OAuth self-registration",
                msg=(
                    f"User '{account_email}' tried to "
                    "self-register through OAuth.\n"
                    "Please create the Fractal account manually.\n"
                    "Here is the error message displayed to the "
                    f"user:\n{error_msg}"
                ),
                email_settings=email_settings.public,
            )

            # (3) Raise
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=error_msg,
            )
    else:
        # Update oauth
        for existing_oauth_account in user.oauth_accounts:
            if (
                existing_oauth_account.account_id == account_id
                and existing_oauth_account.oauth_name == oauth_name
            ):
                user = await self.user_db.update_oauth_account(
                    user, existing_oauth_account, oauth_account_dict
                )

    return user

_create_first_group()

Create a UserGroup named FRACTAL_DEFAULT_GROUP_NAME, if this variable is set and if such a group does not already exist.

Source code in fractal_server/app/security/__init__.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def _create_first_group():
    """
    Create a `UserGroup` named `FRACTAL_DEFAULT_GROUP_NAME`, if this variable
    is set and if such a group does not already exist.
    """
    settings = Inject(get_settings)
    function_logger = set_logger("fractal_server.create_first_group")

    if settings.FRACTAL_DEFAULT_GROUP_NAME is None:
        function_logger.info(
            f"SKIP because '{settings.FRACTAL_DEFAULT_GROUP_NAME=}'"
        )
        return

    function_logger.info(
        f"START, name '{settings.FRACTAL_DEFAULT_GROUP_NAME}'"
    )
    with next(get_sync_db()) as db:
        group_all = db.execute(
            select(UserGroup).where(
                UserGroup.name == settings.FRACTAL_DEFAULT_GROUP_NAME
            )
        )
        if group_all.scalars().one_or_none() is None:
            first_group = UserGroup(name=settings.FRACTAL_DEFAULT_GROUP_NAME)
            db.add(first_group)
            db.commit()
            function_logger.info(
                f"Created group '{settings.FRACTAL_DEFAULT_GROUP_NAME}'"
            )
        else:
            function_logger.info(
                f"Group '{settings.FRACTAL_DEFAULT_GROUP_NAME}' "
                "already exists, skip."
            )
    function_logger.info("END")
    close_logger(function_logger)

_create_first_user(email, password, project_dir, profile_id=None, is_superuser=False, is_verified=False) async

Private method to create the first fractal-server user

Create a user with the given default arguments and return a message with the relevant information. If the user already exists, for example after a restart, it returns a message to inform that user already exists.

WARNING: This function is only meant to create the first user, and then it catches and ignores IntegrityErrors (when multiple workers may be trying to concurrently create the first user). This is not the expected behavior for regular user creation, which must rather happen via the /auth/register endpoint.

See fastapi_users docs

PARAMETER DESCRIPTION
email

New user's email

TYPE: str

password

New user's password

TYPE: str

is_superuser

True if the new user is a superuser

TYPE: bool DEFAULT: False

is_verified

True if the new user is verified

TYPE: bool DEFAULT: False

Source code in fractal_server/app/security/__init__.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
async def _create_first_user(
    email: str,
    password: str,
    project_dir: str,
    profile_id: int | None = None,
    is_superuser: bool = False,
    is_verified: bool = False,
) -> None:
    """
    Private method to create the first fractal-server user

    Create a user with the given default arguments and return a message with
    the relevant information. If the user already exists, for example after a
    restart, it returns a message to inform that user already exists.

    **WARNING**: This function is only meant to create the first user, and then
    it catches and ignores `IntegrityError`s (when multiple workers may be
    trying to concurrently create the first user). This is not the expected
    behavior for regular user creation, which must rather happen via the
    /auth/register endpoint.

    See [fastapi_users docs](https://fastapi-users.github.io/fastapi-users/
    12.1/cookbook/create-user-programmatically)

    Args:
        email: New user's email
        password: New user's password
        is_superuser: `True` if the new user is a superuser
        is_verified: `True` if the new user is verified
    """
    function_logger = set_logger("fractal_server.create_first_user")
    function_logger.info(f"START _create_first_user, with email '{email}'")
    try:
        async with get_async_session_context() as session:
            if is_superuser is True:
                # If a superuser already exists, exit
                stm = select(UserOAuth).where(  # noqa
                    UserOAuth.is_superuser == True  # noqa
                )  # noqa
                res = await session.execute(stm)
                existing_superuser = res.scalars().first()
                if existing_superuser is not None:
                    function_logger.info(
                        f"'{existing_superuser.email}' superuser already "
                        f"exists, skip creation of '{email}'"
                    )
                    return None

            async with get_user_db_context(session) as user_db:
                async with get_user_manager_context(user_db) as user_manager:
                    kwargs = dict(
                        email=email,
                        password=password,
                        project_dir=project_dir,
                        profile_id=profile_id,
                        is_superuser=is_superuser,
                        is_verified=is_verified,
                    )
                    user = await user_manager.create(UserCreate(**kwargs))
                    function_logger.info(f"User '{user.email}' created")
    except UserAlreadyExists:
        function_logger.warning(f"User '{email}' already exists")
    except Exception as e:
        function_logger.error(
            f"ERROR in _create_first_user, original error {str(e)}"
        )
        raise e
    finally:
        function_logger.info(f"END   _create_first_user, with email '{email}'")
        close_logger(function_logger)