Skip to content

security

Auth subsystem

This module implements the authorisation/authentication subsystem of the Fractal Server. It is based on the FastAPI Users library with support for the SQLModel database adapter.

In particular, this module links the appropriate database models, sets up FastAPIUsers with Barer Token and cookie transports and register local routes. Then, for each OAuth client defined in the Fractal Settings configuration, it registers the client and the relative routes.

All routes are registerd under the auth/ prefix.

SQLModelUserDatabaseAsync

Bases: Generic[UP, ID], BaseUserDatabase[UP, ID]

This class is from fastapi_users_db_sqlmodel Original Copyright: 2022 François Voron, released under MIT licence

Database adapter for SQLModel working purely asynchronously.

Parameters:

Name Type Description Default
user_model Type[UP]

SQLModel model of a DB representation of a user.

required
session AsyncSession

SQLAlchemy async session.

required
Source code in fractal_server/app/security/__init__.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class SQLModelUserDatabaseAsync(Generic[UP, ID], BaseUserDatabase[UP, ID]):
    """
    This class is from fastapi_users_db_sqlmodel
    Original Copyright: 2022 François Voron, released under MIT licence

    Database adapter for SQLModel working purely asynchronously.

    Args:
        user_model: SQLModel model of a DB representation of a user.
        session: SQLAlchemy async session.
    """

    session: AsyncSession
    user_model: Type[UP]
    oauth_account_model: Optional[Type[OAuthAccount]]

    def __init__(
        self,
        session: AsyncSession,
        user_model: Type[UP],
        oauth_account_model: Optional[Type[OAuthAccount]] = None,
    ):
        self.session = session
        self.user_model = user_model
        self.oauth_account_model = oauth_account_model

    async def get(self, id: ID) -> Optional[UP]:
        """Get a single user by id."""
        return await self.session.get(self.user_model, id)

    async def get_by_email(self, email: str) -> Optional[UP]:
        """Get a single user by email."""
        statement = select(self.user_model).where(
            func.lower(self.user_model.email) == func.lower(email)
        )
        results = await self.session.execute(statement)
        object = results.first()
        if object is None:
            return None
        return object[0]

    async def get_by_oauth_account(
        self, oauth: str, account_id: str
    ) -> Optional[UP]:  # noqa
        """Get a single user by OAuth account id."""
        if self.oauth_account_model is None:
            raise NotImplementedError()
        statement = (
            select(self.oauth_account_model)
            .where(self.oauth_account_model.oauth_name == oauth)
            .where(self.oauth_account_model.account_id == account_id)
            .options(selectinload(self.oauth_account_model.user))  # type: ignore  # noqa
        )
        results = await self.session.execute(statement)
        oauth_account = results.first()
        if oauth_account:
            user = oauth_account[0].user  # type: ignore
            return user
        return None

    async def create(self, create_dict: dict[str, Any]) -> UP:
        """Create a user."""
        user = self.user_model(**create_dict)
        self.session.add(user)
        await self.session.commit()
        await self.session.refresh(user)
        return user

    async def update(self, user: UP, update_dict: dict[str, Any]) -> UP:
        for key, value in update_dict.items():
            setattr(user, key, value)
        self.session.add(user)
        await self.session.commit()
        await self.session.refresh(user)
        return user

    async def delete(self, user: UP) -> None:
        await self.session.delete(user)
        await self.session.commit()

    async def add_oauth_account(
        self, user: UP, create_dict: dict[str, Any]
    ) -> UP:  # noqa
        if self.oauth_account_model is None:
            raise NotImplementedError()

        oauth_account = self.oauth_account_model(**create_dict)
        user.oauth_accounts.append(oauth_account)  # type: ignore
        self.session.add(user)

        await self.session.commit()

        return user

    async def update_oauth_account(
        self, user: UP, oauth_account: OAP, update_dict: dict[str, Any]
    ) -> UP:
        if self.oauth_account_model is None:
            raise NotImplementedError()

        for key, value in update_dict.items():
            setattr(oauth_account, key, value)
        self.session.add(oauth_account)
        await self.session.commit()

        return user

create(create_dict) async

Create a user.

Source code in fractal_server/app/security/__init__.py
132
133
134
135
136
137
138
async def create(self, create_dict: dict[str, Any]) -> UP:
    """Create a user."""
    user = self.user_model(**create_dict)
    self.session.add(user)
    await self.session.commit()
    await self.session.refresh(user)
    return user

get(id) async

Get a single user by id.

Source code in fractal_server/app/security/__init__.py
 98
 99
100
async def get(self, id: ID) -> Optional[UP]:
    """Get a single user by id."""
    return await self.session.get(self.user_model, id)

get_by_email(email) async

Get a single user by email.

Source code in fractal_server/app/security/__init__.py
102
103
104
105
106
107
108
109
110
111
async def get_by_email(self, email: str) -> Optional[UP]:
    """Get a single user by email."""
    statement = select(self.user_model).where(
        func.lower(self.user_model.email) == func.lower(email)
    )
    results = await self.session.execute(statement)
    object = results.first()
    if object is None:
        return None
    return object[0]

get_by_oauth_account(oauth, account_id) async

Get a single user by OAuth account id.

Source code in fractal_server/app/security/__init__.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async def get_by_oauth_account(
    self, oauth: str, account_id: str
) -> Optional[UP]:  # noqa
    """Get a single user by OAuth account id."""
    if self.oauth_account_model is None:
        raise NotImplementedError()
    statement = (
        select(self.oauth_account_model)
        .where(self.oauth_account_model.oauth_name == oauth)
        .where(self.oauth_account_model.account_id == account_id)
        .options(selectinload(self.oauth_account_model.user))  # type: ignore  # noqa
    )
    results = await self.session.execute(statement)
    oauth_account = results.first()
    if oauth_account:
        user = oauth_account[0].user  # type: ignore
        return user
    return None

UserManager

Bases: IntegerIDMixin, BaseUserManager[UserOAuth, int]

Source code in fractal_server/app/security/__init__.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class UserManager(IntegerIDMixin, BaseUserManager[UserOAuth, int]):
    def __init__(self, user_db):
        """
        Override `__init__` of `BaseUserManager` to define custom
        `password_helper`.
        """
        super().__init__(
            user_db=user_db,
            password_helper=password_helper,
        )

    async def validate_password(self, password: str, user: UserOAuth) -> None:
        # check password length
        min_length = 4
        max_length = 100
        if len(password) < min_length:
            raise InvalidPasswordException(
                f"The password is too short (minimum length: {min_length})."
            )
        elif len(password) > max_length:
            raise InvalidPasswordException(
                f"The password is too long (maximum length: {min_length})."
            )

    async def on_after_register(
        self, user: UserOAuth, request: Optional[Request] = None
    ):
        logger.info(
            f"New-user registration completed ({user.id=}, {user.email=})."
        )
        async for db in get_async_db():
            # Find default group
            stm = select(UserGroup).where(
                UserGroup.name == FRACTAL_DEFAULT_GROUP_NAME
            )
            res = await db.execute(stm)
            default_group = res.scalar_one_or_none()
            if default_group is None:
                logger.warning(
                    f"No group found with name {FRACTAL_DEFAULT_GROUP_NAME}"
                )
            else:
                link = LinkUserGroup(
                    user_id=user.id, group_id=default_group.id
                )
                db.add(link)
                await db.commit()
                logger.info(
                    f"Added {user.email} user to group {default_group.id=}."
                )

            this_user = await db.get(UserOAuth, user.id)

            this_user.settings = UserSettings()
            await db.merge(this_user)
            await db.commit()
            await db.refresh(this_user)
            logger.info(
                f"Associated empty settings (id={this_user.user_settings_id}) "
                f"to '{this_user.email}'."
            )

            # Send mail section
            settings = Inject(get_settings)

            if this_user.oauth_accounts and settings.MAIL_SETTINGS is not None:
                try:
                    mail_new_oauth_signup(
                        msg=f"New user registered: '{this_user.email}'.",
                        mail_settings=settings.MAIL_SETTINGS,
                    )
                except Exception as e:
                    logger.error(
                        "ERROR sending notification email after oauth "
                        f"registration of {this_user.email}. "
                        f"Original error: '{e}'."
                    )

__init__(user_db)

Override __init__ of BaseUserManager to define custom password_helper.

Source code in fractal_server/app/security/__init__.py
191
192
193
194
195
196
197
198
199
def __init__(self, user_db):
    """
    Override `__init__` of `BaseUserManager` to define custom
    `password_helper`.
    """
    super().__init__(
        user_db=user_db,
        password_helper=password_helper,
    )

_create_first_group()

Create a UserGroup with name=FRACTAL_DEFAULT_GROUP_NAME, if missing.

Source code in fractal_server/app/security/__init__.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def _create_first_group():
    """
    Create a `UserGroup` with `name=FRACTAL_DEFAULT_GROUP_NAME`, if missing.
    """
    function_logger = set_logger("fractal_server.create_first_group")

    function_logger.info(
        f"START _create_first_group, with name '{FRACTAL_DEFAULT_GROUP_NAME}'"
    )
    with next(get_sync_db()) as db:
        group_all = db.execute(
            select(UserGroup).where(
                UserGroup.name == FRACTAL_DEFAULT_GROUP_NAME
            )
        )
        if group_all.scalars().one_or_none() is None:
            first_group = UserGroup(name=FRACTAL_DEFAULT_GROUP_NAME)
            db.add(first_group)
            db.commit()
            function_logger.info(
                f"Created group '{FRACTAL_DEFAULT_GROUP_NAME}'"
            )
        else:
            function_logger.info(
                f"Group '{FRACTAL_DEFAULT_GROUP_NAME}' already exists, skip."
            )
    function_logger.info(
        f"END   _create_first_group, with name '{FRACTAL_DEFAULT_GROUP_NAME}'"
    )

_create_first_user(email, password, is_superuser=False, is_verified=False, username=None) async

Private method to create the first fractal-server user

Create a user with the given default arguments and return a message with the relevant informations. If the user alredy exists, for example after a restart, it returns a message to inform that user already exists.

WARNING: This function is only meant to create the first user, and then it catches and ignores IntegrityErrors (when multiple workers may be trying to concurrently create the first user). This is not the expected behavior for regular user creation, which must rather happen via the /auth/register endpoint.

See fastapi_users docs

Parameters:

Name Type Description Default
email str

New user's email

required
password str

New user's password

required
is_superuser bool

True if the new user is a superuser

False
is_verified bool

True if the new user is verifie

False
username Optional[str]
None
Source code in fractal_server/app/security/__init__.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
async def _create_first_user(
    email: str,
    password: str,
    is_superuser: bool = False,
    is_verified: bool = False,
    username: Optional[str] = None,
) -> None:
    """
    Private method to create the first fractal-server user

    Create a user with the given default arguments and return a message with
    the relevant informations. If the user alredy exists, for example after a
    restart, it returns a message to inform that user already exists.

    **WARNING**: This function is only meant to create the first user, and then
    it catches and ignores `IntegrityError`s (when multiple workers may be
    trying to concurrently create the first user). This is not the expected
    behavior for regular user creation, which must rather happen via the
    /auth/register endpoint.

    See [fastapi_users docs](https://fastapi-users.github.io/fastapi-users/
    12.1/cookbook/create-user-programmatically)

    Arguments:
        email: New user's email
        password: New user's password
        is_superuser: `True` if the new user is a superuser
        is_verified: `True` if the new user is verifie
        username:
    """
    function_logger = set_logger("fractal_server.create_first_user")
    function_logger.info(f"START _create_first_user, with email '{email}'")
    try:
        async with get_async_session_context() as session:
            if is_superuser is True:
                # If a superuser already exists, exit
                stm = select(UserOAuth).where(  # noqa
                    UserOAuth.is_superuser == True  # noqa
                )  # noqa
                res = await session.execute(stm)
                existing_superuser = res.scalars().first()
                if existing_superuser is not None:
                    function_logger.info(
                        f"'{existing_superuser.email}' superuser already "
                        f"exists, skip creation of '{email}'"
                    )
                    return None

            async with get_user_db_context(session) as user_db:
                async with get_user_manager_context(user_db) as user_manager:
                    kwargs = dict(
                        email=email,
                        password=password,
                        is_superuser=is_superuser,
                        is_verified=is_verified,
                    )
                    if username is not None:
                        kwargs["username"] = username
                    user = await user_manager.create(UserCreate(**kwargs))
                    function_logger.info(f"User '{user.email}' created")
    except UserAlreadyExists:
        function_logger.warning(f"User '{email}' already exists")
    except Exception as e:
        function_logger.error(
            f"ERROR in _create_first_user, original error {str(e)}"
        )
        raise e
    finally:
        function_logger.info(f"END   _create_first_user, with email '{email}'")