Skip to content

remote

This module provides a simple self-standing script that executes arbitrary python code received via pickled files on a cluster node.

FractalVersionMismatch

Bases: RuntimeError

Custom exception for version mismatch

Source code in fractal_server/app/runner/executors/slurm_common/remote.py
29
30
31
32
33
34
class FractalVersionMismatch(RuntimeError):
    """
    Custom exception for version mismatch
    """

    pass

_check_versions_mismatch(server_versions)

Compare the server {python,cloudpickle,fractal_server} versions with the ones available to the current worker

Parameters:

Name Type Description Default
server_versions dict[Literal['python', 'fractal_server', 'cloudpickle'], Union[str, tuple[int]]]

The version used in the fractal-server instance that created the cloudpickle file

required

Raises:

Type Description
FractalVersionMismatch

If the cloudpickle or fractal_server versions do not match with the ones on the server

Source code in fractal_server/app/runner/executors/slurm_common/remote.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def _check_versions_mismatch(
    server_versions: dict[
        Literal["python", "fractal_server", "cloudpickle"],
        Union[str, tuple[int]],
    ]
):
    """
    Compare the server {python,cloudpickle,fractal_server} versions with the
    ones available to the current worker

    Arguments:
        server_versions:
            The version used in the fractal-server instance that created the
            cloudpickle file

    Raises:
        FractalVersionMismatch: If the cloudpickle or fractal_server versions
                                do not match with the ones on the server
    """

    server_python_version = server_versions["python"]
    worker_python_version = sys.version_info[:3]
    if worker_python_version != server_python_version:
        # FIXME: turn this into an error, after fixing a broader CI issue, see
        # https://github.com/fractal-analytics-platform/fractal-server/issues/375
        logging.warning(
            f"{server_python_version=} but {worker_python_version=}. "
            "cloudpickle is not guaranteed to correctly load "
            "pickle files created with different python versions. "
            "Note, however, that if you reached this line it means that "
            "the pickle file was likely loaded correctly."
        )

    server_cloudpickle_version = server_versions["cloudpickle"]
    worker_cloudpickle_version = cloudpickle.__version__
    if worker_cloudpickle_version != server_cloudpickle_version:
        raise FractalVersionMismatch(
            f"{server_cloudpickle_version=} but "
            f"{worker_cloudpickle_version=}"
        )

    server_fractal_server_version = server_versions["fractal_server"]
    worker_fractal_server_version = __VERSION__
    if worker_fractal_server_version != server_fractal_server_version:
        raise FractalVersionMismatch(
            f"{server_fractal_server_version=} but "
            f"{worker_fractal_server_version=}"
        )

worker(*, in_fname, out_fname, extra_import_paths=None)

Execute a job, possibly on a remote node.

Parameters:

Name Type Description Default
in_fname str

Absolute path to the input pickle file (must be readable).

required
out_fname str

Absolute path of the output pickle file (must be writeable).

required
extra_import_paths Optional[str]

Additional import paths

None
Source code in fractal_server/app/runner/executors/slurm_common/remote.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def worker(
    *,
    in_fname: str,
    out_fname: str,
    extra_import_paths: Optional[str] = None,
) -> None:
    """
    Execute a job, possibly on a remote node.

    Arguments:
        in_fname: Absolute path to the input pickle file (must be readable).
        out_fname: Absolute path of the output pickle file (must be writeable).
        extra_import_paths: Additional import paths
    """

    # Create output folder, if missing
    out_dir = os.path.dirname(out_fname)
    if not os.path.exists(out_dir):
        logging.debug(f"_slurm.remote.worker: create {out_dir=}")
        os.mkdir(out_dir)

    if extra_import_paths:
        _extra_import_paths = extra_import_paths.split(":")
        sys.path[:0] = _extra_import_paths

    # Execute the job and capture exceptions
    try:
        with open(in_fname, "rb") as f:
            indata = f.read()
        server_versions, fun, args, kwargs = cloudpickle.loads(indata)
        _check_versions_mismatch(server_versions)

        result = (True, fun(*args, **kwargs))
        out = cloudpickle.dumps(result)
    except Exception as e:
        # Exception objects are not serialisable. Here we save the relevant
        # exception contents in a serializable dictionary. Note that whenever
        # the task failed "properly", the exception is a `TaskExecutionError`
        # and it has additional attributes.

        import traceback

        exc_type, exc_value, traceback_obj = sys.exc_info()
        traceback_obj = traceback_obj.tb_next
        traceback_list = traceback.format_exception(
            exc_type,
            exc_value,
            traceback_obj,
        )
        traceback_string = "".join(traceback_list)
        exc_proxy = dict(
            exc_type_name=exc_type.__name__,
            traceback_string=traceback_string,
            workflow_task_order=getattr(e, "workflow_task_order", None),
            workflow_task_id=getattr(e, "workflow_task_id", None),
            task_name=getattr(e, "task_name", None),
        )
        result = (False, exc_proxy)
        out = cloudpickle.dumps(result)

    # Write the output pickle file
    tempfile = out_fname + ".tmp"
    with open(tempfile, "wb") as f:
        f.write(out)
    os.rename(tempfile, out_fname)