Skip to content

remote

FractalVersionMismatch

Bases: RuntimeError

Custom exception for version mismatch

Source code in fractal_server/app/runner/executors/slurm_common/remote.py
11
12
13
14
15
16
class FractalVersionMismatch(RuntimeError):
    """
    Custom exception for version mismatch
    """

    pass

worker(*, in_fname, out_fname)

Execute a job, possibly on a remote node.

Parameters:

Name Type Description Default
in_fname str

Absolute path to the input file (must be readable).

required
out_fname str

Absolute path of the output file (must be writeable).

required
Source code in fractal_server/app/runner/executors/slurm_common/remote.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def worker(
    *,
    in_fname: str,
    out_fname: str,
) -> None:
    """
    Execute a job, possibly on a remote node.

    Arguments:
        in_fname: Absolute path to the input file (must be readable).
        out_fname: Absolute path of the output file (must be writeable).
    """

    # Create output folder, if missing
    out_dir = os.path.dirname(out_fname)
    if not os.path.exists(out_dir):
        logging.debug(f"_slurm.remote.worker: create {out_dir=}")
        os.mkdir(out_dir)

    # Execute the job and capture exceptions
    try:
        with open(in_fname) as f:
            input_data = json.load(f)

        server_python_version = input_data["python_version"]
        server_fractal_server_version = input_data["fractal_server_version"]

        # Fractal-server version must be identical
        worker_fractal_server_version = __VERSION__
        if worker_fractal_server_version != server_fractal_server_version:
            raise FractalVersionMismatch(
                f"{server_fractal_server_version=} but "
                f"{worker_fractal_server_version=}"
            )

        # Python version mismatch only raises a warning
        worker_python_version = tuple(sys.version_info[:3])
        if worker_python_version != server_python_version:
            if worker_python_version[:2] != server_python_version[:2]:
                logging.warning(
                    f"{server_python_version=} but {worker_python_version=}."
                )

        # Extract some useful paths
        metadiff_file_remote = input_data["metadiff_file_remote"]
        log_path = input_data["log_file_remote"]

        # Execute command
        full_command = input_data["full_command"]
        call_command_wrapper(cmd=full_command, log_path=log_path)

        try:
            with open(metadiff_file_remote) as f:
                out_meta = json.load(f)
            result = (True, out_meta)
        except FileNotFoundError:
            # Command completed, but it produced no metadiff file
            result = (True, None)

    except Exception as e:
        # Exception objects are not serialisable. Here we save the relevant
        # exception contents in a serializable dictionary. Note that whenever
        # the task failed "properly", the exception is a `TaskExecutionError`
        # and it has additional attributes.
        import traceback

        exc_type, exc_value, traceback_obj = sys.exc_info()
        traceback_obj = traceback_obj.tb_next
        traceback_list = traceback.format_exception(
            exc_type,
            exc_value,
            traceback_obj,
        )
        traceback_string = "".join(traceback_list)
        exc_proxy = dict(
            exc_type_name=type(e).__name__,
            traceback_string=traceback_string,
        )
        result = (False, exc_proxy)

    # Write output file
    with open(out_fname, "w") as f:
        json.dump(result, f, indent=2)