Skip to content

_fabric

FractalSSH

Bases: object

Wrapper of fabric.Connection object, enriched with locks.

Note: methods marked as _unsafe should not be used directly, since they do not enforce locking.

Attributes:

Name Type Description
_lock Lock
_connection Connection
default_lock_timeout float
default_max_attempts int
default_base_interval float
sftp_get_prefetch bool
sftp_get_max_requests int
logger_name str
Source code in fractal_server/ssh/_fabric.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
class FractalSSH(object):
    """
    Wrapper of `fabric.Connection` object, enriched with locks.

    Note: methods marked as `_unsafe` should not be used directly,
    since they do not enforce locking.

    Attributes:
        _lock:
        _connection:
        default_lock_timeout:
        default_max_attempts:
        default_base_interval:
        sftp_get_prefetch:
        sftp_get_max_requests:
        logger_name:
    """

    _lock: Lock
    _connection: Connection
    default_lock_timeout: float
    default_max_attempts: int
    default_base_interval: float
    sftp_get_prefetch: bool
    sftp_get_max_requests: int
    logger_name: str

    def __init__(
        self,
        connection: Connection,
        default_timeout: float = 250,
        default_max_attempts: int = 5,
        default_base_interval: float = 3.0,
        sftp_get_prefetch: bool = False,
        sftp_get_max_requests: int = 64,
        logger_name: str = __name__,
    ):
        self._lock = Lock()
        self._connection = connection
        self.default_lock_timeout = default_timeout
        self.default_base_interval = default_base_interval
        self.default_max_attempts = default_max_attempts
        self.sftp_get_prefetch = sftp_get_prefetch
        self.sftp_get_max_requests = sftp_get_max_requests
        self.logger_name = logger_name
        set_logger(self.logger_name)

    @property
    def is_connected(self) -> bool:
        return self._connection.is_connected

    @property
    def logger(self) -> logging.Logger:
        return get_logger(self.logger_name)

    def log_and_raise(self, *, e: Exception, message: str) -> None:
        """
        Log and re-raise an exception from a FractalSSH method.

        Arguments:
            message: Additional message to be logged.
            e: Original exception
        """
        try:
            self.logger.error(message)
            self.logger.error(f"Original Error {type(e)} : \n{str(e)}")
            # Handle the specific case of `NoValidConnectionsError`s from
            # paramiko, which store relevant information in the `errors`
            # attribute
            if hasattr(e, "errors"):
                self.logger.error(f"{type(e)=}")
                for err in e.errors:
                    self.logger.error(f"{err}")
        except Exception as exception:
            # Handle unexpected cases, e.g. (1) `e` has no `type`, or
            # (2) `errors` is not iterable.
            self.logger.error(
                "Unexpected Error while handling exception above: "
                f"{str(exception)}"
            )

        raise e

    def _run(
        self, *args, label: str, lock_timeout: Optional[float] = None, **kwargs
    ) -> Any:
        actual_lock_timeout = self.default_lock_timeout
        if lock_timeout is not None:
            actual_lock_timeout = lock_timeout
        with _acquire_lock_with_timeout(
            lock=self._lock,
            label=label,
            timeout=actual_lock_timeout,
        ):
            return self._connection.run(*args, **kwargs)

    def _sftp_unsafe(self) -> paramiko.sftp_client.SFTPClient:
        """
        This is marked as unsafe because you should only use its methods
        after acquiring a lock.
        """
        return self._connection.sftp()

    def read_remote_json_file(self, filepath: str) -> dict[str, Any]:
        self.logger.info(f"START reading remote JSON file {filepath}.")
        with _acquire_lock_with_timeout(
            lock=self._lock,
            label="read_remote_json_file",
            timeout=self.default_lock_timeout,
        ):

            try:
                with self._sftp_unsafe().open(filepath, "r") as f:
                    data = json.load(f)
            except Exception as e:
                self.log_and_raise(
                    e=e,
                    message=(
                        f"Error in `read_remote_json_file`, for {filepath=}."
                    ),
                )
        self.logger.info(f"END reading remote JSON file {filepath}.")
        return data

    def check_connection(self) -> None:
        """
        Open the SSH connection and handle exceptions.

        This method should always be called at the beginning of background
        operations that use FractalSSH, so that:

        1. We try to restore unusable connections (e.g. due to closed socket).
        2. We provide an informative error if connection cannot be established.
        """
        self.logger.debug(
            f"[check_connection] {self._connection.is_connected=}"
        )
        if self._connection.is_connected:
            # Even if the connection appears open, it could be broken for
            # external reasons (e.g. the socket is closed because the SSH
            # server was restarted). In these cases, we catch the error and
            # try to re-open the connection.
            try:
                self.logger.info(
                    "[check_connection] Run dummy command to check connection."
                )
                # Run both an SFTP and an SSH command, as they correspond to
                # different sockets
                self.remote_exists("/dummy/path/")
                self.run_command(cmd="whoami")
                self.logger.info(
                    "[check_connection] SSH connection is already OK, exit."
                )
                return
            except (OSError, EOFError) as e:
                self.logger.warning(
                    f"[check_connection] Detected error {str(e)}, re-open."
                )
        # Try opening the connection (if it was closed) or to re-open it (if
        # an error happened).
        try:
            self.close()
            with _acquire_lock_with_timeout(
                lock=self._lock,
                label="_connection.open",
                timeout=self.default_lock_timeout,
                logger_name=self.logger_name,
            ):
                self._connection.open()
                self._connection.client.open_sftp()
                self.logger.info(
                    "[check_connection] SSH connection opened, exit."
                )

        except Exception as e:
            raise RuntimeError(
                f"Cannot open SSH connection. Original error:\n{str(e)}"
            )

    def close(self) -> None:
        """
        Aggressively close `self._connection`.

        When `Connection.is_connected` is `False`, `Connection.close()` does
        not call `Connection.client.close()`. Thus we do this explicitly here,
        because we observed cases where `is_connected=False` but the underlying
        `Transport` object was not closed.
        """
        with _acquire_lock_with_timeout(
            lock=self._lock,
            label="_connection.close",
            timeout=self.default_lock_timeout,
        ):
            self._connection.close()
            if self._connection.client is not None:
                self._connection.client.close()

    def run_command(
        self,
        *,
        cmd: str,
        allow_char: Optional[str] = None,
        max_attempts: Optional[int] = None,
        base_interval: Optional[int] = None,
        lock_timeout: Optional[int] = None,
    ) -> str:
        """
        Run a command within an open SSH connection.

        Args:
            cmd: Command to be run
            allow_char: Forbidden chars to allow for this command
            max_attempts:
            base_interval:
            lock_timeout:

        Returns:
            Standard output of the command, if successful.
        """

        validate_cmd(cmd, allow_char=allow_char)

        actual_max_attempts = self.default_max_attempts
        if max_attempts is not None:
            actual_max_attempts = max_attempts

        actual_base_interval = self.default_base_interval
        if base_interval is not None:
            actual_base_interval = base_interval

        actual_lock_timeout = self.default_lock_timeout
        if lock_timeout is not None:
            actual_lock_timeout = lock_timeout

        t_0 = time.perf_counter()
        ind_attempt = 0
        while ind_attempt <= actual_max_attempts:
            ind_attempt += 1
            prefix = f"[attempt {ind_attempt}/{actual_max_attempts}]"
            self.logger.info(f"{prefix} START running '{cmd}' over SSH.")
            try:
                # Case 1: Command runs successfully
                res = self._run(
                    cmd,
                    label=f"run {cmd}",
                    lock_timeout=actual_lock_timeout,
                    hide=True,
                )
                t_1 = time.perf_counter()
                self.logger.info(
                    f"{prefix} END   running '{cmd}' over SSH, "
                    f"elapsed {t_1-t_0:.3f}"
                )
                self.logger.debug("STDOUT:")
                self.logger.debug(res.stdout)
                self.logger.debug("STDERR:")
                self.logger.debug(res.stderr)
                return res.stdout
            except NoValidConnectionsError as e:
                # Case 2: Command fails with a connection error
                self.logger.warning(
                    f"{prefix} Running command `{cmd}` over SSH failed.\n"
                    f"Original NoValidConnectionError:\n{str(e)}.\n"
                    f"{e.errors=}\n"
                )
                if ind_attempt < actual_max_attempts:
                    sleeptime = actual_base_interval**ind_attempt
                    self.logger.warning(
                        f"{prefix} Now sleep {sleeptime:.3f} "
                        "seconds and continue."
                    )
                    time.sleep(sleeptime)
                else:
                    self.logger.error(f"{prefix} Reached last attempt")
                    break
            except UnexpectedExit as e:
                # Case 3: Command fails with an actual error
                error_msg = (
                    f"{prefix} Running command `{cmd}` over SSH failed.\n"
                    f"Original error:\n{str(e)}."
                )
                self.logger.error(error_msg)
                raise RuntimeError(error_msg)
            except Exception as e:
                self.logger.error(
                    f"Running command `{cmd}` over SSH failed.\n"
                    f"Original Error:\n{str(e)}."
                )
                raise e

        raise RuntimeError(
            f"Reached last attempt ({max_attempts=}) for running "
            f"'{cmd}' over SSH"
        )

    def send_file(
        self,
        *,
        local: str,
        remote: str,
        lock_timeout: Optional[float] = None,
    ) -> None:
        """
        Transfer a file via SSH

        Args:
            local: Local path to file.
            remote: Target path on remote host.
            lock_timeout: Timeout for lock acquisition (overrides default).
        """
        try:
            self.logger.info(
                f"[send_file] START transfer of '{local}' over SSH."
            )
            actual_lock_timeout = self.default_lock_timeout
            if lock_timeout is not None:
                actual_lock_timeout = lock_timeout
            with _acquire_lock_with_timeout(
                lock=self._lock,
                label=f"send_file {local=} {remote=}",
                timeout=actual_lock_timeout,
            ):
                self._sftp_unsafe().put(local, remote)
            self.logger.info(
                f"[send_file] END transfer of '{local}' over SSH."
            )
        except Exception as e:
            self.log_and_raise(
                e=e,
                message=(
                    "Error in `send_file`, while "
                    f"transferring {local=} to {remote=}."
                ),
            )

    def fetch_file(
        self,
        *,
        local: str,
        remote: str,
        lock_timeout: Optional[float] = None,
    ) -> None:
        """
        Transfer a file via SSH

        Args:
            local: Local path to file.
            remote: Target path on remote host.
            lock_timeout: Timeout for lock acquisition (overrides default).
        """
        try:
            prefix = "[fetch_file] "
            self.logger.info(f"{prefix} START fetching '{remote}' over SSH.")
            actual_lock_timeout = self.default_lock_timeout
            if lock_timeout is not None:
                actual_lock_timeout = lock_timeout
            with _acquire_lock_with_timeout(
                lock=self._lock,
                label=f"fetch_file {local=} {remote=}",
                timeout=actual_lock_timeout,
            ):
                self._sftp_unsafe().get(
                    remote,
                    local,
                    prefetch=self.sftp_get_prefetch,
                    max_concurrent_prefetch_requests=self.sftp_get_max_requests,  # noqa E501
                )
            self.logger.info(f"{prefix} END fetching '{remote}' over SSH.")
        except Exception as e:
            self.log_and_raise(
                e=e,
                message=(
                    "Error in `fetch_file`, while "
                    f"Transferring {remote=} to {local=}."
                ),
            )

    def mkdir(self, *, folder: str, parents: bool = True) -> None:
        """
        Create a folder remotely via SSH.

        Args:
            folder:
            parents:
        """
        if parents:
            cmd = f"mkdir -p {folder}"
        else:
            cmd = f"mkdir {folder}"
        self.run_command(cmd=cmd)

    def remove_folder(
        self,
        *,
        folder: str,
        safe_root: str,
    ) -> None:
        """
        Removes a folder remotely via SSH.

        This functions calls `rm -r`, after a few checks on `folder`.

        Args:
            folder: Absolute path to a folder that should be removed.
            safe_root: If `folder` is not a subfolder of the absolute
                `safe_root` path, raise an error.
        """
        validate_cmd(folder)
        validate_cmd(safe_root)

        if " " in folder:
            raise ValueError(f"folder='{folder}' includes whitespace.")
        elif " " in safe_root:
            raise ValueError(f"safe_root='{safe_root}' includes whitespace.")
        elif not Path(folder).is_absolute():
            raise ValueError(f"{folder=} is not an absolute path.")
        elif not Path(safe_root).is_absolute():
            raise ValueError(f"{safe_root=} is not an absolute path.")
        elif not (
            Path(folder).resolve().is_relative_to(Path(safe_root).resolve())
        ):
            raise ValueError(f"{folder=} is not a subfolder of {safe_root=}.")
        else:
            cmd = f"rm -r {folder}"
            self.run_command(cmd=cmd)

    def write_remote_file(
        self,
        *,
        path: str,
        content: str,
        lock_timeout: Optional[float] = None,
    ) -> None:
        """
        Open a remote file via SFTP and write it.

        Args:
            path: Absolute path of remote file.
            content: Contents to be written to file.
            lock_timeout: Timeout for lock acquisition (overrides default).
        """
        self.logger.info(f"START writing to remote file {path}.")
        actual_lock_timeout = self.default_lock_timeout
        if lock_timeout is not None:
            actual_lock_timeout = lock_timeout
        with _acquire_lock_with_timeout(
            lock=self._lock,
            label=f"write_remote_file {path=}",
            timeout=actual_lock_timeout,
        ):
            try:
                with self._sftp_unsafe().open(filename=path, mode="w") as f:
                    f.write(content)
            except Exception as e:
                self.log_and_raise(
                    e=e, message=f"Error in `write_remote_file`, for {path=}."
                )

        self.logger.info(f"END writing to remote file {path}.")

    def remote_exists(self, path: str) -> bool:
        """
        Return whether a remote file/folder exists
        """
        self.logger.info(f"START remote_file_exists {path}")
        with _acquire_lock_with_timeout(
            lock=self._lock,
            label=f"remote_file_exists {path=}",
            timeout=self.default_lock_timeout,
        ):
            try:
                self._sftp_unsafe().stat(path)
                self.logger.info(f"END   remote_file_exists {path} / True")
                return True
            except FileNotFoundError:
                self.logger.info(f"END   remote_file_exists {path} / False")
                return False
            except Exception as e:
                self.log_and_raise(
                    e=e, message=f"Error in `remote_exists`, for {path=}."
                )

_sftp_unsafe()

This is marked as unsafe because you should only use its methods after acquiring a lock.

Source code in fractal_server/ssh/_fabric.py
159
160
161
162
163
164
def _sftp_unsafe(self) -> paramiko.sftp_client.SFTPClient:
    """
    This is marked as unsafe because you should only use its methods
    after acquiring a lock.
    """
    return self._connection.sftp()

check_connection()

Open the SSH connection and handle exceptions.

This method should always be called at the beginning of background operations that use FractalSSH, so that:

  1. We try to restore unusable connections (e.g. due to closed socket).
  2. We provide an informative error if connection cannot be established.
Source code in fractal_server/ssh/_fabric.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def check_connection(self) -> None:
    """
    Open the SSH connection and handle exceptions.

    This method should always be called at the beginning of background
    operations that use FractalSSH, so that:

    1. We try to restore unusable connections (e.g. due to closed socket).
    2. We provide an informative error if connection cannot be established.
    """
    self.logger.debug(
        f"[check_connection] {self._connection.is_connected=}"
    )
    if self._connection.is_connected:
        # Even if the connection appears open, it could be broken for
        # external reasons (e.g. the socket is closed because the SSH
        # server was restarted). In these cases, we catch the error and
        # try to re-open the connection.
        try:
            self.logger.info(
                "[check_connection] Run dummy command to check connection."
            )
            # Run both an SFTP and an SSH command, as they correspond to
            # different sockets
            self.remote_exists("/dummy/path/")
            self.run_command(cmd="whoami")
            self.logger.info(
                "[check_connection] SSH connection is already OK, exit."
            )
            return
        except (OSError, EOFError) as e:
            self.logger.warning(
                f"[check_connection] Detected error {str(e)}, re-open."
            )
    # Try opening the connection (if it was closed) or to re-open it (if
    # an error happened).
    try:
        self.close()
        with _acquire_lock_with_timeout(
            lock=self._lock,
            label="_connection.open",
            timeout=self.default_lock_timeout,
            logger_name=self.logger_name,
        ):
            self._connection.open()
            self._connection.client.open_sftp()
            self.logger.info(
                "[check_connection] SSH connection opened, exit."
            )

    except Exception as e:
        raise RuntimeError(
            f"Cannot open SSH connection. Original error:\n{str(e)}"
        )

close()

Aggressively close self._connection.

When Connection.is_connected is False, Connection.close() does not call Connection.client.close(). Thus we do this explicitly here, because we observed cases where is_connected=False but the underlying Transport object was not closed.

Source code in fractal_server/ssh/_fabric.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def close(self) -> None:
    """
    Aggressively close `self._connection`.

    When `Connection.is_connected` is `False`, `Connection.close()` does
    not call `Connection.client.close()`. Thus we do this explicitly here,
    because we observed cases where `is_connected=False` but the underlying
    `Transport` object was not closed.
    """
    with _acquire_lock_with_timeout(
        lock=self._lock,
        label="_connection.close",
        timeout=self.default_lock_timeout,
    ):
        self._connection.close()
        if self._connection.client is not None:
            self._connection.client.close()

fetch_file(*, local, remote, lock_timeout=None)

Transfer a file via SSH

Parameters:

Name Type Description Default
local str

Local path to file.

required
remote str

Target path on remote host.

required
lock_timeout Optional[float]

Timeout for lock acquisition (overrides default).

None
Source code in fractal_server/ssh/_fabric.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def fetch_file(
    self,
    *,
    local: str,
    remote: str,
    lock_timeout: Optional[float] = None,
) -> None:
    """
    Transfer a file via SSH

    Args:
        local: Local path to file.
        remote: Target path on remote host.
        lock_timeout: Timeout for lock acquisition (overrides default).
    """
    try:
        prefix = "[fetch_file] "
        self.logger.info(f"{prefix} START fetching '{remote}' over SSH.")
        actual_lock_timeout = self.default_lock_timeout
        if lock_timeout is not None:
            actual_lock_timeout = lock_timeout
        with _acquire_lock_with_timeout(
            lock=self._lock,
            label=f"fetch_file {local=} {remote=}",
            timeout=actual_lock_timeout,
        ):
            self._sftp_unsafe().get(
                remote,
                local,
                prefetch=self.sftp_get_prefetch,
                max_concurrent_prefetch_requests=self.sftp_get_max_requests,  # noqa E501
            )
        self.logger.info(f"{prefix} END fetching '{remote}' over SSH.")
    except Exception as e:
        self.log_and_raise(
            e=e,
            message=(
                "Error in `fetch_file`, while "
                f"Transferring {remote=} to {local=}."
            ),
        )

log_and_raise(*, e, message)

Log and re-raise an exception from a FractalSSH method.

Parameters:

Name Type Description Default
message str

Additional message to be logged.

required
e Exception

Original exception

required
Source code in fractal_server/ssh/_fabric.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def log_and_raise(self, *, e: Exception, message: str) -> None:
    """
    Log and re-raise an exception from a FractalSSH method.

    Arguments:
        message: Additional message to be logged.
        e: Original exception
    """
    try:
        self.logger.error(message)
        self.logger.error(f"Original Error {type(e)} : \n{str(e)}")
        # Handle the specific case of `NoValidConnectionsError`s from
        # paramiko, which store relevant information in the `errors`
        # attribute
        if hasattr(e, "errors"):
            self.logger.error(f"{type(e)=}")
            for err in e.errors:
                self.logger.error(f"{err}")
    except Exception as exception:
        # Handle unexpected cases, e.g. (1) `e` has no `type`, or
        # (2) `errors` is not iterable.
        self.logger.error(
            "Unexpected Error while handling exception above: "
            f"{str(exception)}"
        )

    raise e

mkdir(*, folder, parents=True)

Create a folder remotely via SSH.

Parameters:

Name Type Description Default
folder str
required
parents bool
True
Source code in fractal_server/ssh/_fabric.py
440
441
442
443
444
445
446
447
448
449
450
451
452
def mkdir(self, *, folder: str, parents: bool = True) -> None:
    """
    Create a folder remotely via SSH.

    Args:
        folder:
        parents:
    """
    if parents:
        cmd = f"mkdir -p {folder}"
    else:
        cmd = f"mkdir {folder}"
    self.run_command(cmd=cmd)

remote_exists(path)

Return whether a remote file/folder exists

Source code in fractal_server/ssh/_fabric.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def remote_exists(self, path: str) -> bool:
    """
    Return whether a remote file/folder exists
    """
    self.logger.info(f"START remote_file_exists {path}")
    with _acquire_lock_with_timeout(
        lock=self._lock,
        label=f"remote_file_exists {path=}",
        timeout=self.default_lock_timeout,
    ):
        try:
            self._sftp_unsafe().stat(path)
            self.logger.info(f"END   remote_file_exists {path} / True")
            return True
        except FileNotFoundError:
            self.logger.info(f"END   remote_file_exists {path} / False")
            return False
        except Exception as e:
            self.log_and_raise(
                e=e, message=f"Error in `remote_exists`, for {path=}."
            )

remove_folder(*, folder, safe_root)

Removes a folder remotely via SSH.

This functions calls rm -r, after a few checks on folder.

Parameters:

Name Type Description Default
folder str

Absolute path to a folder that should be removed.

required
safe_root str

If folder is not a subfolder of the absolute safe_root path, raise an error.

required
Source code in fractal_server/ssh/_fabric.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def remove_folder(
    self,
    *,
    folder: str,
    safe_root: str,
) -> None:
    """
    Removes a folder remotely via SSH.

    This functions calls `rm -r`, after a few checks on `folder`.

    Args:
        folder: Absolute path to a folder that should be removed.
        safe_root: If `folder` is not a subfolder of the absolute
            `safe_root` path, raise an error.
    """
    validate_cmd(folder)
    validate_cmd(safe_root)

    if " " in folder:
        raise ValueError(f"folder='{folder}' includes whitespace.")
    elif " " in safe_root:
        raise ValueError(f"safe_root='{safe_root}' includes whitespace.")
    elif not Path(folder).is_absolute():
        raise ValueError(f"{folder=} is not an absolute path.")
    elif not Path(safe_root).is_absolute():
        raise ValueError(f"{safe_root=} is not an absolute path.")
    elif not (
        Path(folder).resolve().is_relative_to(Path(safe_root).resolve())
    ):
        raise ValueError(f"{folder=} is not a subfolder of {safe_root=}.")
    else:
        cmd = f"rm -r {folder}"
        self.run_command(cmd=cmd)

run_command(*, cmd, allow_char=None, max_attempts=None, base_interval=None, lock_timeout=None)

Run a command within an open SSH connection.

Parameters:

Name Type Description Default
cmd str

Command to be run

required
allow_char Optional[str]

Forbidden chars to allow for this command

None
max_attempts Optional[int]
None
base_interval Optional[int]
None
lock_timeout Optional[int]
None

Returns:

Type Description
str

Standard output of the command, if successful.

Source code in fractal_server/ssh/_fabric.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def run_command(
    self,
    *,
    cmd: str,
    allow_char: Optional[str] = None,
    max_attempts: Optional[int] = None,
    base_interval: Optional[int] = None,
    lock_timeout: Optional[int] = None,
) -> str:
    """
    Run a command within an open SSH connection.

    Args:
        cmd: Command to be run
        allow_char: Forbidden chars to allow for this command
        max_attempts:
        base_interval:
        lock_timeout:

    Returns:
        Standard output of the command, if successful.
    """

    validate_cmd(cmd, allow_char=allow_char)

    actual_max_attempts = self.default_max_attempts
    if max_attempts is not None:
        actual_max_attempts = max_attempts

    actual_base_interval = self.default_base_interval
    if base_interval is not None:
        actual_base_interval = base_interval

    actual_lock_timeout = self.default_lock_timeout
    if lock_timeout is not None:
        actual_lock_timeout = lock_timeout

    t_0 = time.perf_counter()
    ind_attempt = 0
    while ind_attempt <= actual_max_attempts:
        ind_attempt += 1
        prefix = f"[attempt {ind_attempt}/{actual_max_attempts}]"
        self.logger.info(f"{prefix} START running '{cmd}' over SSH.")
        try:
            # Case 1: Command runs successfully
            res = self._run(
                cmd,
                label=f"run {cmd}",
                lock_timeout=actual_lock_timeout,
                hide=True,
            )
            t_1 = time.perf_counter()
            self.logger.info(
                f"{prefix} END   running '{cmd}' over SSH, "
                f"elapsed {t_1-t_0:.3f}"
            )
            self.logger.debug("STDOUT:")
            self.logger.debug(res.stdout)
            self.logger.debug("STDERR:")
            self.logger.debug(res.stderr)
            return res.stdout
        except NoValidConnectionsError as e:
            # Case 2: Command fails with a connection error
            self.logger.warning(
                f"{prefix} Running command `{cmd}` over SSH failed.\n"
                f"Original NoValidConnectionError:\n{str(e)}.\n"
                f"{e.errors=}\n"
            )
            if ind_attempt < actual_max_attempts:
                sleeptime = actual_base_interval**ind_attempt
                self.logger.warning(
                    f"{prefix} Now sleep {sleeptime:.3f} "
                    "seconds and continue."
                )
                time.sleep(sleeptime)
            else:
                self.logger.error(f"{prefix} Reached last attempt")
                break
        except UnexpectedExit as e:
            # Case 3: Command fails with an actual error
            error_msg = (
                f"{prefix} Running command `{cmd}` over SSH failed.\n"
                f"Original error:\n{str(e)}."
            )
            self.logger.error(error_msg)
            raise RuntimeError(error_msg)
        except Exception as e:
            self.logger.error(
                f"Running command `{cmd}` over SSH failed.\n"
                f"Original Error:\n{str(e)}."
            )
            raise e

    raise RuntimeError(
        f"Reached last attempt ({max_attempts=}) for running "
        f"'{cmd}' over SSH"
    )

send_file(*, local, remote, lock_timeout=None)

Transfer a file via SSH

Parameters:

Name Type Description Default
local str

Local path to file.

required
remote str

Target path on remote host.

required
lock_timeout Optional[float]

Timeout for lock acquisition (overrides default).

None
Source code in fractal_server/ssh/_fabric.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def send_file(
    self,
    *,
    local: str,
    remote: str,
    lock_timeout: Optional[float] = None,
) -> None:
    """
    Transfer a file via SSH

    Args:
        local: Local path to file.
        remote: Target path on remote host.
        lock_timeout: Timeout for lock acquisition (overrides default).
    """
    try:
        self.logger.info(
            f"[send_file] START transfer of '{local}' over SSH."
        )
        actual_lock_timeout = self.default_lock_timeout
        if lock_timeout is not None:
            actual_lock_timeout = lock_timeout
        with _acquire_lock_with_timeout(
            lock=self._lock,
            label=f"send_file {local=} {remote=}",
            timeout=actual_lock_timeout,
        ):
            self._sftp_unsafe().put(local, remote)
        self.logger.info(
            f"[send_file] END transfer of '{local}' over SSH."
        )
    except Exception as e:
        self.log_and_raise(
            e=e,
            message=(
                "Error in `send_file`, while "
                f"transferring {local=} to {remote=}."
            ),
        )

write_remote_file(*, path, content, lock_timeout=None)

Open a remote file via SFTP and write it.

Parameters:

Name Type Description Default
path str

Absolute path of remote file.

required
content str

Contents to be written to file.

required
lock_timeout Optional[float]

Timeout for lock acquisition (overrides default).

None
Source code in fractal_server/ssh/_fabric.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
def write_remote_file(
    self,
    *,
    path: str,
    content: str,
    lock_timeout: Optional[float] = None,
) -> None:
    """
    Open a remote file via SFTP and write it.

    Args:
        path: Absolute path of remote file.
        content: Contents to be written to file.
        lock_timeout: Timeout for lock acquisition (overrides default).
    """
    self.logger.info(f"START writing to remote file {path}.")
    actual_lock_timeout = self.default_lock_timeout
    if lock_timeout is not None:
        actual_lock_timeout = lock_timeout
    with _acquire_lock_with_timeout(
        lock=self._lock,
        label=f"write_remote_file {path=}",
        timeout=actual_lock_timeout,
    ):
        try:
            with self._sftp_unsafe().open(filename=path, mode="w") as f:
                f.write(content)
        except Exception as e:
            self.log_and_raise(
                e=e, message=f"Error in `write_remote_file`, for {path=}."
            )

    self.logger.info(f"END writing to remote file {path}.")

FractalSSHList

Bases: object

Collection of FractalSSH objects

Attributes are all private, and access to this collection must be through methods (mostly the get one).

Attributes:

Name Type Description
_data dict[tuple[str, str, str], FractalSSH]

Mapping of unique keys (the SSH-credentials tuples) to FractalSSH objects.

_lock Lock

A threading.Lock object, to be acquired when changing _data.

_timeout float

Timeout for _lock acquisition.

_logger_name str

Logger name.

Source code in fractal_server/ssh/_fabric.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
class FractalSSHList(object):
    """
    Collection of `FractalSSH` objects

    Attributes are all private, and access to this collection must be
    through methods (mostly the `get` one).

    Attributes:
        _data:
            Mapping of unique keys (the SSH-credentials tuples) to
            `FractalSSH` objects.
        _lock:
            A `threading.Lock object`, to be acquired when changing `_data`.
        _timeout: Timeout for `_lock` acquisition.
        _logger_name: Logger name.
    """

    _data: dict[tuple[str, str, str], FractalSSH]
    _lock: Lock
    _timeout: float
    _logger_name: str

    def __init__(
        self,
        *,
        timeout: float = 5.0,
        logger_name: str = "fractal_server.FractalSSHList",
    ):
        self._lock = Lock()
        self._data = {}
        self._timeout = timeout
        self._logger_name = logger_name
        set_logger(self._logger_name)

    @property
    def logger(self) -> logging.Logger:
        """
        This property exists so that we never have to propagate the
        `Logger` object.
        """
        return get_logger(self._logger_name)

    @property
    def size(self) -> int:
        """
        Number of current key-value pairs in `self._data`.
        """
        return len(self._data.values())

    def get(self, *, host: str, user: str, key_path: str) -> FractalSSH:
        """
        Get the `FractalSSH` for the current credentials, or create one.

        Note: Changing `_data` requires acquiring `_lock`.

        Arguments:
            host:
            user:
            key_path:
        """
        key = (host, user, key_path)
        fractal_ssh = self._data.get(key, None)
        if fractal_ssh is not None:
            self.logger.info(
                f"Return existing FractalSSH object for {user}@{host}"
            )
            return fractal_ssh
        else:
            self.logger.info(f"Add new FractalSSH object for {user}@{host}")
            connection = Connection(
                host=host,
                user=user,
                forward_agent=False,
                connect_kwargs={
                    "key_filename": key_path,
                    "look_for_keys": False,
                },
            )
            with _acquire_lock_with_timeout(
                lock=self._lock,
                label="FractalSSHList.get",
                timeout=self._timeout,
            ):
                self._data[key] = FractalSSH(connection=connection)
                return self._data[key]

    def contains(
        self,
        *,
        host: str,
        user: str,
        key_path: str,
    ) -> bool:
        """
        Return whether a given key is present in the collection.

        Arguments:
            host:
            user:
            key_path:
        """
        key = (host, user, key_path)
        return key in self._data.keys()

    def remove(
        self,
        *,
        host: str,
        user: str,
        key_path: str,
    ) -> None:
        """
        Remove a key from `_data` and close the corresponding connection.

        Note: Changing `_data` requires acquiring `_lock`.

        Arguments:
            host:
            user:
            key_path:
        """
        key = (host, user, key_path)
        with _acquire_lock_with_timeout(
            lock=self._lock,
            timeout=self._timeout,
            label="FractalSSHList.remove",
        ):
            self.logger.info(
                f"Removing FractalSSH object for {user}@{host} "
                "from collection."
            )
            fractal_ssh_obj = self._data.pop(key)
            self.logger.info(
                f"Closing FractalSSH object for {user}@{host} "
                f"({fractal_ssh_obj.is_connected=})."
            )
            fractal_ssh_obj.close()

    def close_all(self, *, timeout: float = 5.0):
        """
        Close all `FractalSSH` objects in the collection.

        Arguments:
            timeout:
                Timeout for `FractalSSH._lock` acquisition, to be obtained
                before closing.
        """
        for key, fractal_ssh_obj in self._data.items():
            host, user, _ = key[:]
            self.logger.info(
                f"Closing FractalSSH object for {user}@{host} "
                f"({fractal_ssh_obj.is_connected=})."
            )
            fractal_ssh_obj.close()

logger: logging.Logger property

This property exists so that we never have to propagate the Logger object.

size: int property

Number of current key-value pairs in self._data.

close_all(*, timeout=5.0)

Close all FractalSSH objects in the collection.

Parameters:

Name Type Description Default
timeout float

Timeout for FractalSSH._lock acquisition, to be obtained before closing.

5.0
Source code in fractal_server/ssh/_fabric.py
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
def close_all(self, *, timeout: float = 5.0):
    """
    Close all `FractalSSH` objects in the collection.

    Arguments:
        timeout:
            Timeout for `FractalSSH._lock` acquisition, to be obtained
            before closing.
    """
    for key, fractal_ssh_obj in self._data.items():
        host, user, _ = key[:]
        self.logger.info(
            f"Closing FractalSSH object for {user}@{host} "
            f"({fractal_ssh_obj.is_connected=})."
        )
        fractal_ssh_obj.close()

contains(*, host, user, key_path)

Return whether a given key is present in the collection.

Parameters:

Name Type Description Default
host str
required
user str
required
key_path str
required
Source code in fractal_server/ssh/_fabric.py
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
def contains(
    self,
    *,
    host: str,
    user: str,
    key_path: str,
) -> bool:
    """
    Return whether a given key is present in the collection.

    Arguments:
        host:
        user:
        key_path:
    """
    key = (host, user, key_path)
    return key in self._data.keys()

get(*, host, user, key_path)

Get the FractalSSH for the current credentials, or create one.

Note: Changing _data requires acquiring _lock.

Parameters:

Name Type Description Default
host str
required
user str
required
key_path str
required
Source code in fractal_server/ssh/_fabric.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
def get(self, *, host: str, user: str, key_path: str) -> FractalSSH:
    """
    Get the `FractalSSH` for the current credentials, or create one.

    Note: Changing `_data` requires acquiring `_lock`.

    Arguments:
        host:
        user:
        key_path:
    """
    key = (host, user, key_path)
    fractal_ssh = self._data.get(key, None)
    if fractal_ssh is not None:
        self.logger.info(
            f"Return existing FractalSSH object for {user}@{host}"
        )
        return fractal_ssh
    else:
        self.logger.info(f"Add new FractalSSH object for {user}@{host}")
        connection = Connection(
            host=host,
            user=user,
            forward_agent=False,
            connect_kwargs={
                "key_filename": key_path,
                "look_for_keys": False,
            },
        )
        with _acquire_lock_with_timeout(
            lock=self._lock,
            label="FractalSSHList.get",
            timeout=self._timeout,
        ):
            self._data[key] = FractalSSH(connection=connection)
            return self._data[key]

remove(*, host, user, key_path)

Remove a key from _data and close the corresponding connection.

Note: Changing _data requires acquiring _lock.

Parameters:

Name Type Description Default
host str
required
user str
required
key_path str
required
Source code in fractal_server/ssh/_fabric.py
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
def remove(
    self,
    *,
    host: str,
    user: str,
    key_path: str,
) -> None:
    """
    Remove a key from `_data` and close the corresponding connection.

    Note: Changing `_data` requires acquiring `_lock`.

    Arguments:
        host:
        user:
        key_path:
    """
    key = (host, user, key_path)
    with _acquire_lock_with_timeout(
        lock=self._lock,
        timeout=self._timeout,
        label="FractalSSHList.remove",
    ):
        self.logger.info(
            f"Removing FractalSSH object for {user}@{host} "
            "from collection."
        )
        fractal_ssh_obj = self._data.pop(key)
        self.logger.info(
            f"Closing FractalSSH object for {user}@{host} "
            f"({fractal_ssh_obj.is_connected=})."
        )
        fractal_ssh_obj.close()

_acquire_lock_with_timeout(lock, label, timeout, logger_name=__name__)

Given a threading.Lock object, try to acquire it within a given timeout.

Parameters:

Name Type Description Default
lock Lock
required
label str
required
timeout float
required
logger_name str
__name__
Source code in fractal_server/ssh/_fabric.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@contextmanager
def _acquire_lock_with_timeout(
    lock: Lock,
    label: str,
    timeout: float,
    logger_name: str = __name__,
) -> Generator[Literal[True], Any, None]:
    """
    Given a `threading.Lock` object, try to acquire it within a given timeout.

    Arguments:
        lock:
        label:
        timeout:
        logger_name:
    """
    logger = get_logger(logger_name)
    logger.info(f"Trying to acquire lock for '{label}', with {timeout=}")
    result = lock.acquire(timeout=timeout)
    try:
        if not result:
            logger.error(f"Lock for '{label}' was *not* acquired.")
            raise FractalSSHTimeoutError(
                f"Failed to acquire lock for '{label}' within "
                f"{timeout} seconds"
            )
        logger.info(f"Lock for '{label}' was acquired.")
        yield result
    finally:
        if result:
            lock.release()
            logger.info(f"Lock for '{label}' was released.")