Skip to content

remote

This module provides a simple self-standing script that executes arbitrary python code received via pickled files on a cluster node.

ExceptionProxy

Proxy class to serialise exceptions

In general exceptions are not serialisable. This proxy class saves the serialisable content of an exception. On the receiving end, it can be used to reconstruct a TaskExecutionError.

Attributes:

Name Type Description
exc_type_name str

Name of the exception type

tb str

TBD

args

TBD

kwargs dict

TBD

Source code in fractal_server/app/runner/executors/slurm/remote.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class ExceptionProxy:
    """
    Proxy class to serialise exceptions

    In general exceptions are not serialisable. This proxy class saves the
    serialisable content of an exception. On the receiving end, it can be used
    to reconstruct a TaskExecutionError.

    Attributes:
        exc_type_name: Name of the exception type
        tb: TBD
        args: TBD
        kwargs: TBD
    """

    def __init__(
        self, exc_type: Type[BaseException], tb: str, *args, **kwargs
    ):
        self.exc_type_name: str = exc_type.__name__
        self.tb: str = tb
        self.args = args
        self.kwargs: dict = kwargs

FractalVersionMismatch

Bases: RuntimeError

Custom exception for version mismatch

Source code in fractal_server/app/runner/executors/slurm/remote.py
54
55
56
57
58
59
class FractalVersionMismatch(RuntimeError):
    """
    Custom exception for version mismatch
    """

    pass

_check_versions_mismatch(server_versions)

Compare the server {python,cloudpickle,fractal_server} versions with the ones available to the current worker

Parameters:

Name Type Description Default
server_versions dict[Literal['python', 'fractal_server', 'cloudpickle'], Union[str, tuple[int]]]

The version used in the fractal-server instance that created the cloudpickle file

required

Raises:

Type Description
FractalVersionMismatch

If the cloudpickle or fractal_server versions do not match with the ones on the server

Source code in fractal_server/app/runner/executors/slurm/remote.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def _check_versions_mismatch(
    server_versions: dict[
        Literal["python", "fractal_server", "cloudpickle"],
        Union[str, tuple[int]],
    ]
):
    """
    Compare the server {python,cloudpickle,fractal_server} versions with the
    ones available to the current worker

    Arguments:
        server_versions:
            The version used in the fractal-server instance that created the
            cloudpickle file

    Raises:
        FractalVersionMismatch: If the cloudpickle or fractal_server versions
                                do not match with the ones on the server
    """

    server_python_version = server_versions["python"]
    worker_python_version = sys.version_info[:3]
    if worker_python_version != server_python_version:
        # FIXME: turn this into an error, after fixing a broader CI issue, see
        # https://github.com/fractal-analytics-platform/fractal-server/issues/375
        logging.critical(
            f"{server_python_version=} but {worker_python_version=}. "
            "cloudpickle is not guaranteed to correctly load "
            "pickle files created with different python versions. "
            "Note, however, that if you reached this line it means that "
            "the pickle file was likely loaded correctly."
        )

    server_cloudpickle_version = server_versions["cloudpickle"]
    worker_cloudpickle_version = cloudpickle.__version__
    if worker_cloudpickle_version != server_cloudpickle_version:
        raise FractalVersionMismatch(
            f"{server_cloudpickle_version=} but "
            f"{worker_cloudpickle_version=}"
        )

    server_fractal_server_version = server_versions["fractal_server"]
    worker_fractal_server_version = __VERSION__
    if worker_fractal_server_version != server_fractal_server_version:
        raise FractalVersionMismatch(
            f"{server_fractal_server_version=} but "
            f"{worker_fractal_server_version=}"
        )

worker(*, in_fname, out_fname, extra_import_paths=None)

Execute a job, possibly on a remote node.

Parameters:

Name Type Description Default
in_fname str

Absolute path to the input pickle file (must be readable).

required
out_fname str

Absolute path of the output pickle file (must be writeable).

required
extra_import_paths Optional[str]

Additional import paths

None
Source code in fractal_server/app/runner/executors/slurm/remote.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def worker(
    *,
    in_fname: str,
    out_fname: str,
    extra_import_paths: Optional[str] = None,
) -> None:
    """
    Execute a job, possibly on a remote node.

    Arguments:
        in_fname: Absolute path to the input pickle file (must be readable).
        out_fname: Absolute path of the output pickle file (must be writeable).
        extra_import_paths: Additional import paths
    """

    # Create output folder, if missing
    out_dir = os.path.dirname(out_fname)
    if not os.path.exists(out_dir):
        logging.debug(f"_slurm.remote.worker: create {out_dir=}")
        os.mkdir(out_dir)

    if extra_import_paths:
        _extra_import_paths = extra_import_paths.split(":")
        sys.path[:0] = _extra_import_paths

    # Execute the job and catpure exceptions
    try:
        with open(in_fname, "rb") as f:
            indata = f.read()
        server_versions, fun, args, kwargs = cloudpickle.loads(indata)
        _check_versions_mismatch(server_versions)

        result = True, fun(*args, **kwargs)
        out = cloudpickle.dumps(result)
    except Exception as e:
        import traceback

        typ, value, tb = sys.exc_info()
        tb = tb.tb_next
        exc_proxy = ExceptionProxy(
            typ,
            "".join(traceback.format_exception(typ, value, tb)),
            *e.args,
            **e.__dict__,
        )

        result = False, exc_proxy
        out = cloudpickle.dumps(result)

    # Write the output pickle file
    tempfile = out_fname + ".tmp"
    with open(tempfile, "wb") as f:
        f.write(out)
    os.rename(tempfile, out_fname)