Skip to content

_fabric

FractalSSH

Bases: object

FIXME SSH: Fix docstring

Attributes:

Name Type Description
_lock Lock
connection Lock
default_lock_timeout float
logger_name str
Source code in fractal_server/ssh/_fabric.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
class FractalSSH(object):
    """
    FIXME SSH: Fix docstring

    Attributes:
        _lock:
        connection:
        default_lock_timeout:
        logger_name:
    """

    _lock: Lock
    _connection: Connection
    default_lock_timeout: float
    default_max_attempts: int
    default_base_interval: float
    logger_name: str

    def __init__(
        self,
        connection: Connection,
        default_timeout: float = 250,
        default_max_attempts: int = 5,
        default_base_interval: float = 3.0,
        logger_name: str = __name__,
    ):
        self._lock = Lock()
        self._connection = connection
        self.default_lock_timeout = default_timeout
        self.default_base_interval = default_base_interval
        self.default_max_attempts = default_max_attempts
        self.logger_name = logger_name
        set_logger(self.logger_name)

    @contextmanager
    def acquire_timeout(
        self, timeout: float
    ) -> Generator[Literal[True], Any, None]:
        self.logger.debug(f"Trying to acquire lock, with {timeout=}")
        result = self._lock.acquire(timeout=timeout)
        try:
            if not result:
                self.logger.error("Lock was *NOT* acquired.")
                raise FractalSSHTimeoutError(
                    f"Failed to acquire lock within {timeout} seconds"
                )
            self.logger.debug("Lock was acquired.")
            yield result
        finally:
            if result:
                self._lock.release()
                self.logger.debug("Lock was released")

    @property
    def is_connected(self) -> bool:
        return self._connection.is_connected

    @property
    def logger(self) -> logging.Logger:
        return get_logger(self.logger_name)

    def put(
        self, *args, lock_timeout: Optional[float] = None, **kwargs
    ) -> Result:
        actual_lock_timeout = self.default_lock_timeout
        if lock_timeout is not None:
            actual_lock_timeout = lock_timeout
        with self.acquire_timeout(timeout=actual_lock_timeout):
            return self._connection.put(*args, **kwargs)

    def get(
        self, *args, lock_timeout: Optional[float] = None, **kwargs
    ) -> Result:
        actual_lock_timeout = self.default_lock_timeout
        if lock_timeout is not None:
            actual_lock_timeout = lock_timeout
        with self.acquire_timeout(timeout=actual_lock_timeout):
            return self._connection.get(*args, **kwargs)

    def run(
        self, *args, lock_timeout: Optional[float] = None, **kwargs
    ) -> Any:
        actual_lock_timeout = self.default_lock_timeout
        if lock_timeout is not None:
            actual_lock_timeout = lock_timeout
        with self.acquire_timeout(timeout=actual_lock_timeout):
            return self._connection.run(*args, **kwargs)

    def sftp(self) -> paramiko.sftp_client.SFTPClient:
        return self._connection.sftp()

    def check_connection(self) -> None:
        """
        Open the SSH connection and handle exceptions.

        This function can be called from within other functions that use
        `connection`, so that we can provide a meaningful error in case the
        SSH connection cannot be opened.
        """
        if not self._connection.is_connected:
            try:
                self._connection.open()
            except Exception as e:
                raise RuntimeError(
                    f"Cannot open SSH connection. Original error:\n{str(e)}"
                )

    def close(self) -> None:
        """
        Aggressively close `self._connection`.

        When `Connection.is_connected` is `False`, `Connection.close()` does
        not call `Connection.client.close()`. Thus we do this explicitly here,
        because we observed cases where `is_connected=False` but the underlying
        `Transport` object was not closed.
        """

        self._connection.close()

        if self._connection.client is not None:
            self._connection.client.close()

    def run_command(
        self,
        *,
        cmd: str,
        allow_char: Optional[str] = None,
        max_attempts: Optional[int] = None,
        base_interval: Optional[int] = None,
        lock_timeout: Optional[int] = None,
    ) -> str:
        """
        Run a command within an open SSH connection.

        Args:
            cmd: Command to be run
            allow_char: Forbidden chars to allow for this command
            max_attempts:
            base_interval:
            lock_timeout:

        Returns:
            Standard output of the command, if successful.
        """

        validate_cmd(cmd, allow_char=allow_char)

        actual_max_attempts = self.default_max_attempts
        if max_attempts is not None:
            actual_max_attempts = max_attempts

        actual_base_interval = self.default_base_interval
        if base_interval is not None:
            actual_base_interval = base_interval

        actual_lock_timeout = self.default_lock_timeout
        if lock_timeout is not None:
            actual_lock_timeout = lock_timeout

        t_0 = time.perf_counter()
        ind_attempt = 0
        while ind_attempt <= actual_max_attempts:
            ind_attempt += 1
            prefix = f"[attempt {ind_attempt}/{actual_max_attempts}]"
            self.logger.info(f"{prefix} START running '{cmd}' over SSH.")
            try:
                # Case 1: Command runs successfully
                res = self.run(
                    cmd, lock_timeout=actual_lock_timeout, hide=True
                )
                t_1 = time.perf_counter()
                self.logger.info(
                    f"{prefix} END   running '{cmd}' over SSH, "
                    f"elapsed {t_1-t_0:.3f}"
                )
                self.logger.debug(f"STDOUT: {res.stdout}")
                self.logger.debug(f"STDERR: {res.stderr}")
                return res.stdout
            except NoValidConnectionsError as e:
                # Case 2: Command fails with a connection error
                self.logger.warning(
                    f"{prefix} Running command `{cmd}` over SSH failed.\n"
                    f"Original NoValidConnectionError:\n{str(e)}.\n"
                    f"{e.errors=}\n"
                )
                if ind_attempt < actual_max_attempts:
                    sleeptime = actual_base_interval**ind_attempt
                    self.logger.warning(
                        f"{prefix} Now sleep {sleeptime:.3f} "
                        "seconds and continue."
                    )
                    time.sleep(sleeptime)
                else:
                    self.logger.error(f"{prefix} Reached last attempt")
                    break
            except UnexpectedExit as e:
                # Case 3: Command fails with an actual error
                error_msg = (
                    f"{prefix} Running command `{cmd}` over SSH failed.\n"
                    f"Original error:\n{str(e)}."
                )
                self.logger.error(error_msg)
                raise RuntimeError(error_msg)
            except Exception as e:
                self.logger.error(
                    f"Running command `{cmd}` over SSH failed.\n"
                    f"Original Error:\n{str(e)}."
                )
                raise e

        raise RuntimeError(
            f"Reached last attempt ({max_attempts=}) for running "
            f"'{cmd}' over SSH"
        )

    def send_file(
        self,
        *,
        local: str,
        remote: str,
        logger_name: Optional[str] = None,
        lock_timeout: Optional[float] = None,
    ) -> None:
        """
        Transfer a file via SSH

        Args:
            local: Local path to file
            remote: Target path on remote host
            fractal_ssh: FractalSSH connection object with custom lock
            logger_name: Name of the logger

        """
        try:
            self.put(local=local, remote=remote, lock_timeout=lock_timeout)
        except Exception as e:
            logger = get_logger(logger_name=logger_name)
            logger.error(
                f"Transferring {local=} to {remote=} over SSH failed.\n"
                f"Original Error:\n{str(e)}."
            )
            raise e

    def mkdir(self, *, folder: str, parents: bool = True) -> None:
        """
        Create a folder remotely via SSH.

        Args:
            folder:
            fractal_ssh:
            parents:
        """
        # FIXME SSH: try using `mkdir` method of `paramiko.SFTPClient`
        if parents:
            cmd = f"mkdir -p {folder}"
        else:
            cmd = f"mkdir {folder}"
        self.run_command(cmd=cmd)

    def remove_folder(
        self,
        *,
        folder: str,
        safe_root: str,
    ) -> None:
        """
        Removes a folder remotely via SSH.

        This functions calls `rm -r`, after a few checks on `folder`.

        Args:
            folder: Absolute path to a folder that should be removed.
            safe_root: If `folder` is not a subfolder of the absolute
                `safe_root` path, raise an error.
        """
        validate_cmd(folder)
        validate_cmd(safe_root)

        if " " in folder:
            raise ValueError(f"folder='{folder}' includes whitespace.")
        elif " " in safe_root:
            raise ValueError(f"safe_root='{safe_root}' includes whitespace.")
        elif not Path(folder).is_absolute():
            raise ValueError(f"{folder=} is not an absolute path.")
        elif not Path(safe_root).is_absolute():
            raise ValueError(f"{safe_root=} is not an absolute path.")
        elif not (
            Path(folder).resolve().is_relative_to(Path(safe_root).resolve())
        ):
            raise ValueError(f"{folder=} is not a subfolder of {safe_root=}.")
        else:
            cmd = f"rm -r {folder}"
            self.run_command(cmd=cmd)

    def write_remote_file(
        self,
        *,
        path: str,
        content: str,
        lock_timeout: Optional[float] = None,
    ) -> None:
        """
        Open a remote file via SFTP and write it.

        Args:
            path: Absolute path
            contents: File contents
            lock_timeout:
        """
        actual_lock_timeout = self.default_lock_timeout
        if lock_timeout is not None:
            actual_lock_timeout = lock_timeout
        with self.acquire_timeout(timeout=actual_lock_timeout):
            with self.sftp().open(filename=path, mode="w") as f:
                f.write(content)

check_connection()

Open the SSH connection and handle exceptions.

This function can be called from within other functions that use connection, so that we can provide a meaningful error in case the SSH connection cannot be opened.

Source code in fractal_server/ssh/_fabric.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def check_connection(self) -> None:
    """
    Open the SSH connection and handle exceptions.

    This function can be called from within other functions that use
    `connection`, so that we can provide a meaningful error in case the
    SSH connection cannot be opened.
    """
    if not self._connection.is_connected:
        try:
            self._connection.open()
        except Exception as e:
            raise RuntimeError(
                f"Cannot open SSH connection. Original error:\n{str(e)}"
            )

close()

Aggressively close self._connection.

When Connection.is_connected is False, Connection.close() does not call Connection.client.close(). Thus we do this explicitly here, because we observed cases where is_connected=False but the underlying Transport object was not closed.

Source code in fractal_server/ssh/_fabric.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def close(self) -> None:
    """
    Aggressively close `self._connection`.

    When `Connection.is_connected` is `False`, `Connection.close()` does
    not call `Connection.client.close()`. Thus we do this explicitly here,
    because we observed cases where `is_connected=False` but the underlying
    `Transport` object was not closed.
    """

    self._connection.close()

    if self._connection.client is not None:
        self._connection.client.close()

mkdir(*, folder, parents=True)

Create a folder remotely via SSH.

Parameters:

Name Type Description Default
folder str
required
fractal_ssh
required
parents bool
True
Source code in fractal_server/ssh/_fabric.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def mkdir(self, *, folder: str, parents: bool = True) -> None:
    """
    Create a folder remotely via SSH.

    Args:
        folder:
        fractal_ssh:
        parents:
    """
    # FIXME SSH: try using `mkdir` method of `paramiko.SFTPClient`
    if parents:
        cmd = f"mkdir -p {folder}"
    else:
        cmd = f"mkdir {folder}"
    self.run_command(cmd=cmd)

remove_folder(*, folder, safe_root)

Removes a folder remotely via SSH.

This functions calls rm -r, after a few checks on folder.

Parameters:

Name Type Description Default
folder str

Absolute path to a folder that should be removed.

required
safe_root str

If folder is not a subfolder of the absolute safe_root path, raise an error.

required
Source code in fractal_server/ssh/_fabric.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def remove_folder(
    self,
    *,
    folder: str,
    safe_root: str,
) -> None:
    """
    Removes a folder remotely via SSH.

    This functions calls `rm -r`, after a few checks on `folder`.

    Args:
        folder: Absolute path to a folder that should be removed.
        safe_root: If `folder` is not a subfolder of the absolute
            `safe_root` path, raise an error.
    """
    validate_cmd(folder)
    validate_cmd(safe_root)

    if " " in folder:
        raise ValueError(f"folder='{folder}' includes whitespace.")
    elif " " in safe_root:
        raise ValueError(f"safe_root='{safe_root}' includes whitespace.")
    elif not Path(folder).is_absolute():
        raise ValueError(f"{folder=} is not an absolute path.")
    elif not Path(safe_root).is_absolute():
        raise ValueError(f"{safe_root=} is not an absolute path.")
    elif not (
        Path(folder).resolve().is_relative_to(Path(safe_root).resolve())
    ):
        raise ValueError(f"{folder=} is not a subfolder of {safe_root=}.")
    else:
        cmd = f"rm -r {folder}"
        self.run_command(cmd=cmd)

run_command(*, cmd, allow_char=None, max_attempts=None, base_interval=None, lock_timeout=None)

Run a command within an open SSH connection.

Parameters:

Name Type Description Default
cmd str

Command to be run

required
allow_char Optional[str]

Forbidden chars to allow for this command

None
max_attempts Optional[int]
None
base_interval Optional[int]
None
lock_timeout Optional[int]
None

Returns:

Type Description
str

Standard output of the command, if successful.

Source code in fractal_server/ssh/_fabric.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def run_command(
    self,
    *,
    cmd: str,
    allow_char: Optional[str] = None,
    max_attempts: Optional[int] = None,
    base_interval: Optional[int] = None,
    lock_timeout: Optional[int] = None,
) -> str:
    """
    Run a command within an open SSH connection.

    Args:
        cmd: Command to be run
        allow_char: Forbidden chars to allow for this command
        max_attempts:
        base_interval:
        lock_timeout:

    Returns:
        Standard output of the command, if successful.
    """

    validate_cmd(cmd, allow_char=allow_char)

    actual_max_attempts = self.default_max_attempts
    if max_attempts is not None:
        actual_max_attempts = max_attempts

    actual_base_interval = self.default_base_interval
    if base_interval is not None:
        actual_base_interval = base_interval

    actual_lock_timeout = self.default_lock_timeout
    if lock_timeout is not None:
        actual_lock_timeout = lock_timeout

    t_0 = time.perf_counter()
    ind_attempt = 0
    while ind_attempt <= actual_max_attempts:
        ind_attempt += 1
        prefix = f"[attempt {ind_attempt}/{actual_max_attempts}]"
        self.logger.info(f"{prefix} START running '{cmd}' over SSH.")
        try:
            # Case 1: Command runs successfully
            res = self.run(
                cmd, lock_timeout=actual_lock_timeout, hide=True
            )
            t_1 = time.perf_counter()
            self.logger.info(
                f"{prefix} END   running '{cmd}' over SSH, "
                f"elapsed {t_1-t_0:.3f}"
            )
            self.logger.debug(f"STDOUT: {res.stdout}")
            self.logger.debug(f"STDERR: {res.stderr}")
            return res.stdout
        except NoValidConnectionsError as e:
            # Case 2: Command fails with a connection error
            self.logger.warning(
                f"{prefix} Running command `{cmd}` over SSH failed.\n"
                f"Original NoValidConnectionError:\n{str(e)}.\n"
                f"{e.errors=}\n"
            )
            if ind_attempt < actual_max_attempts:
                sleeptime = actual_base_interval**ind_attempt
                self.logger.warning(
                    f"{prefix} Now sleep {sleeptime:.3f} "
                    "seconds and continue."
                )
                time.sleep(sleeptime)
            else:
                self.logger.error(f"{prefix} Reached last attempt")
                break
        except UnexpectedExit as e:
            # Case 3: Command fails with an actual error
            error_msg = (
                f"{prefix} Running command `{cmd}` over SSH failed.\n"
                f"Original error:\n{str(e)}."
            )
            self.logger.error(error_msg)
            raise RuntimeError(error_msg)
        except Exception as e:
            self.logger.error(
                f"Running command `{cmd}` over SSH failed.\n"
                f"Original Error:\n{str(e)}."
            )
            raise e

    raise RuntimeError(
        f"Reached last attempt ({max_attempts=}) for running "
        f"'{cmd}' over SSH"
    )

send_file(*, local, remote, logger_name=None, lock_timeout=None)

Transfer a file via SSH

Parameters:

Name Type Description Default
local str

Local path to file

required
remote str

Target path on remote host

required
fractal_ssh

FractalSSH connection object with custom lock

required
logger_name Optional[str]

Name of the logger

None
Source code in fractal_server/ssh/_fabric.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def send_file(
    self,
    *,
    local: str,
    remote: str,
    logger_name: Optional[str] = None,
    lock_timeout: Optional[float] = None,
) -> None:
    """
    Transfer a file via SSH

    Args:
        local: Local path to file
        remote: Target path on remote host
        fractal_ssh: FractalSSH connection object with custom lock
        logger_name: Name of the logger

    """
    try:
        self.put(local=local, remote=remote, lock_timeout=lock_timeout)
    except Exception as e:
        logger = get_logger(logger_name=logger_name)
        logger.error(
            f"Transferring {local=} to {remote=} over SSH failed.\n"
            f"Original Error:\n{str(e)}."
        )
        raise e

write_remote_file(*, path, content, lock_timeout=None)

Open a remote file via SFTP and write it.

Parameters:

Name Type Description Default
path str

Absolute path

required
contents

File contents

required
lock_timeout Optional[float]
None
Source code in fractal_server/ssh/_fabric.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def write_remote_file(
    self,
    *,
    path: str,
    content: str,
    lock_timeout: Optional[float] = None,
) -> None:
    """
    Open a remote file via SFTP and write it.

    Args:
        path: Absolute path
        contents: File contents
        lock_timeout:
    """
    actual_lock_timeout = self.default_lock_timeout
    if lock_timeout is not None:
        actual_lock_timeout = lock_timeout
    with self.acquire_timeout(timeout=actual_lock_timeout):
        with self.sftp().open(filename=path, mode="w") as f:
            f.write(content)

FractalSSHList

Bases: object

Collection of FractalSSH objects

Attributes are all private, and access to this collection must be through methods (mostly the get one).

Attributes:

Name Type Description
_data dict[tuple[str, str, str], FractalSSH]

Mapping of unique keys (the SSH-credentials tuples) to FractalSSH objects.

_lock Lock

A threading.Lock object, to be acquired when changing _data.

_timeout float

Timeout for _lock acquisition.

_logger_name str

Logger name.

Source code in fractal_server/ssh/_fabric.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
class FractalSSHList(object):
    """
    Collection of `FractalSSH` objects

    Attributes are all private, and access to this collection must be
    through methods (mostly the `get` one).

    Attributes:
        _data:
            Mapping of unique keys (the SSH-credentials tuples) to
            `FractalSSH` objects.
        _lock:
            A `threading.Lock object`, to be acquired when changing `_data`.
        _timeout: Timeout for `_lock` acquisition.
        _logger_name: Logger name.
    """

    _data: dict[tuple[str, str, str], FractalSSH]
    _lock: Lock
    _timeout: float
    _logger_name: str

    def __init__(
        self,
        *,
        timeout: float = 5.0,
        logger_name: str = "fractal_server.FractalSSHList",
    ):
        self._lock = Lock()
        self._data = {}
        self._timeout = timeout
        self._logger_name = logger_name
        set_logger(self._logger_name)

    @property
    def logger(self) -> logging.Logger:
        """
        This property exists so that we never have to propagate the
        `Logger` object.
        """
        return get_logger(self._logger_name)

    @property
    def size(self) -> int:
        """
        Number of current key-value pairs in `self._data`.
        """
        return len(self._data.values())

    def get(self, *, host: str, user: str, key_path: str) -> FractalSSH:
        """
        Get the `FractalSSH` for the current credentials, or create one.

        Note: Changing `_data` requires acquiring `_lock`.

        Arguments:
            host:
            user:
            key_path:
        """
        key = (host, user, key_path)
        fractal_ssh = self._data.get(key, None)
        if fractal_ssh is not None:
            self.logger.info(
                f"Return existing FractalSSH object for {user}@{host}"
            )
            return fractal_ssh
        else:
            self.logger.info(f"Add new FractalSSH object for {user}@{host}")
            connection = Connection(
                host=host,
                user=user,
                forward_agent=False,
                connect_kwargs={
                    "key_filename": key_path,
                    "look_for_keys": False,
                },
            )
            with self.acquire_lock_with_timeout():
                self._data[key] = FractalSSH(connection=connection)
                return self._data[key]

    def contains(
        self,
        *,
        host: str,
        user: str,
        key_path: str,
    ) -> bool:
        """
        Return whether a given key is present in the collection.

        Arguments:
            host:
            user:
            key_path:
        """
        key = (host, user, key_path)
        return key in self._data.keys()

    def remove(
        self,
        *,
        host: str,
        user: str,
        key_path: str,
    ) -> None:
        """
        Remove a key from `_data` and close the corresponding connection.

        Note: Changing `_data` requires acquiring `_lock`.

        Arguments:
            host:
            user:
            key_path:
        """
        key = (host, user, key_path)
        with self.acquire_lock_with_timeout():
            self.logger.info(
                f"Removing FractalSSH object for {user}@{host} "
                "from collection."
            )
            fractal_ssh_obj = self._data.pop(key)
            self.logger.info(
                f"Closing FractalSSH object for {user}@{host} "
                f"({fractal_ssh_obj.is_connected=})."
            )
            fractal_ssh_obj.close()

    def close_all(self, *, timeout: float = 5.0):
        """
        Close all `FractalSSH` objects in the collection.

        Arguments:
            timeout:
                Timeout for `FractalSSH._lock` acquisition, to be obtained
                before closing.
        """
        for key, fractal_ssh_obj in self._data.items():
            host, user, _ = key[:]
            self.logger.info(
                f"Closing FractalSSH object for {user}@{host} "
                f"({fractal_ssh_obj.is_connected=})."
            )
            with fractal_ssh_obj.acquire_timeout(timeout=timeout):
                fractal_ssh_obj.close()

    @contextmanager
    def acquire_lock_with_timeout(self) -> Generator[Literal[True], Any, None]:
        self.logger.debug(
            f"Trying to acquire lock, with timeout {self._timeout} s"
        )
        result = self._lock.acquire(timeout=self._timeout)
        try:
            if not result:
                self.logger.error("Lock was *NOT* acquired.")
                raise FractalSSHListTimeoutError(
                    f"Failed to acquire lock within {self._timeout} ss"
                )
            self.logger.debug("Lock was acquired.")
            yield result
        finally:
            if result:
                self._lock.release()
                self.logger.debug("Lock was released")

logger: logging.Logger property

This property exists so that we never have to propagate the Logger object.

size: int property

Number of current key-value pairs in self._data.

close_all(*, timeout=5.0)

Close all FractalSSH objects in the collection.

Parameters:

Name Type Description Default
timeout float

Timeout for FractalSSH._lock acquisition, to be obtained before closing.

5.0
Source code in fractal_server/ssh/_fabric.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
def close_all(self, *, timeout: float = 5.0):
    """
    Close all `FractalSSH` objects in the collection.

    Arguments:
        timeout:
            Timeout for `FractalSSH._lock` acquisition, to be obtained
            before closing.
    """
    for key, fractal_ssh_obj in self._data.items():
        host, user, _ = key[:]
        self.logger.info(
            f"Closing FractalSSH object for {user}@{host} "
            f"({fractal_ssh_obj.is_connected=})."
        )
        with fractal_ssh_obj.acquire_timeout(timeout=timeout):
            fractal_ssh_obj.close()

contains(*, host, user, key_path)

Return whether a given key is present in the collection.

Parameters:

Name Type Description Default
host str
required
user str
required
key_path str
required
Source code in fractal_server/ssh/_fabric.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def contains(
    self,
    *,
    host: str,
    user: str,
    key_path: str,
) -> bool:
    """
    Return whether a given key is present in the collection.

    Arguments:
        host:
        user:
        key_path:
    """
    key = (host, user, key_path)
    return key in self._data.keys()

get(*, host, user, key_path)

Get the FractalSSH for the current credentials, or create one.

Note: Changing _data requires acquiring _lock.

Parameters:

Name Type Description Default
host str
required
user str
required
key_path str
required
Source code in fractal_server/ssh/_fabric.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def get(self, *, host: str, user: str, key_path: str) -> FractalSSH:
    """
    Get the `FractalSSH` for the current credentials, or create one.

    Note: Changing `_data` requires acquiring `_lock`.

    Arguments:
        host:
        user:
        key_path:
    """
    key = (host, user, key_path)
    fractal_ssh = self._data.get(key, None)
    if fractal_ssh is not None:
        self.logger.info(
            f"Return existing FractalSSH object for {user}@{host}"
        )
        return fractal_ssh
    else:
        self.logger.info(f"Add new FractalSSH object for {user}@{host}")
        connection = Connection(
            host=host,
            user=user,
            forward_agent=False,
            connect_kwargs={
                "key_filename": key_path,
                "look_for_keys": False,
            },
        )
        with self.acquire_lock_with_timeout():
            self._data[key] = FractalSSH(connection=connection)
            return self._data[key]

remove(*, host, user, key_path)

Remove a key from _data and close the corresponding connection.

Note: Changing _data requires acquiring _lock.

Parameters:

Name Type Description Default
host str
required
user str
required
key_path str
required
Source code in fractal_server/ssh/_fabric.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def remove(
    self,
    *,
    host: str,
    user: str,
    key_path: str,
) -> None:
    """
    Remove a key from `_data` and close the corresponding connection.

    Note: Changing `_data` requires acquiring `_lock`.

    Arguments:
        host:
        user:
        key_path:
    """
    key = (host, user, key_path)
    with self.acquire_lock_with_timeout():
        self.logger.info(
            f"Removing FractalSSH object for {user}@{host} "
            "from collection."
        )
        fractal_ssh_obj = self._data.pop(key)
        self.logger.info(
            f"Closing FractalSSH object for {user}@{host} "
            f"({fractal_ssh_obj.is_connected=})."
        )
        fractal_ssh_obj.close()