Execute a job, possibly on a remote node.
Parameters:
Name |
Type |
Description |
Default |
in_fname |
str
|
Absolute path to the input file (must be readable).
|
required
|
out_fname |
str
|
Absolute path of the output file (must be writeable).
|
required
|
Source code in fractal_server/app/runner/executors/slurm_common/remote.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 | def worker(
*,
in_fname: str,
out_fname: str,
) -> None:
"""
Execute a job, possibly on a remote node.
Arguments:
in_fname: Absolute path to the input file (must be readable).
out_fname: Absolute path of the output file (must be writeable).
"""
# Create output folder, if missing
out_dir = os.path.dirname(out_fname)
if not os.path.exists(out_dir):
logging.debug(f"_slurm.remote.worker: create {out_dir=}")
os.mkdir(out_dir)
# Execute the job and capture exceptions
try:
with open(in_fname) as f:
input_data = json.load(f)
server_python_version = input_data["python_version"]
server_fractal_server_version = input_data["fractal_server_version"]
# Fractal-server version must be identical
worker_fractal_server_version = __VERSION__
if worker_fractal_server_version != server_fractal_server_version:
raise FractalVersionMismatch(
f"{server_fractal_server_version=} but "
f"{worker_fractal_server_version=}"
)
# Python version mismatch only raises a warning
worker_python_version = tuple(sys.version_info[:3])
if worker_python_version != server_python_version:
if worker_python_version[:2] != server_python_version[:2]:
logging.warning(
f"{server_python_version=} but {worker_python_version=}."
)
# Extract some useful paths
metadiff_file_remote = input_data["metadiff_file_remote"]
log_path = input_data["log_file_remote"]
# Execute command
full_command = input_data["full_command"]
call_command_wrapper(cmd=full_command, log_path=log_path)
try:
with open(metadiff_file_remote) as f:
out_meta = json.load(f)
result = (True, out_meta)
except FileNotFoundError:
# Command completed, but it produced no metadiff file
result = (True, None)
except Exception as e:
# Exception objects are not serialisable. Here we save the relevant
# exception contents in a serializable dictionary. Note that whenever
# the task failed "properly", the exception is a `TaskExecutionError`
# and it has additional attributes.
import traceback
exc_type, exc_value, traceback_obj = sys.exc_info()
traceback_obj = traceback_obj.tb_next
traceback_list = traceback.format_exception(
exc_type,
exc_value,
traceback_obj,
)
traceback_string = "".join(traceback_list)
exc_proxy = dict(
exc_type_name=type(e).__name__,
traceback_string=traceback_string,
)
result = (False, exc_proxy)
# Write output file
with open(out_fname, "w") as f:
json.dump(result, f, indent=2)
|