20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 | def create_manifest(
*,
raw_package_name: str,
task_list_path: str,
fractal_server_2_13: bool = False,
) -> dict[str, Any]:
"""
Create the package manifest based on a `task_list.py` module
Arguments:
raw_package_name:
The name of the package. Note that this name must be importable
(after normalization).
task_list_path:
Relative path to the `task_list.py` module, with respect to the
package root (example `dev.task_list`).
fractal_server_2_13:
If set, produce a manifest which is compatible with
fractal-server<2.14 (that is, exclude tasks' `type`s from manifest
and forbid converter types."
Returns:
Task-package manifest.
"""
# Preliminary validation
if "/" in task_list_path or task_list_path.endswith(".py"):
raise ValueError(
f"Invalid {task_list_path=} (valid example: `dev.task_list`)."
)
# Normalize package name
package_name = normalize_package_name(raw_package_name)
logging.info(f"Start generating a new manifest for {package_name}")
# Prepare an empty manifest
manifest = dict(
manifest_version=MANIFEST_VERSION,
task_list=[],
has_args_schemas=True,
args_schema_version=ARGS_SCHEMA_VERSION,
authors=None,
)
# Import the task-list module
task_list_module = import_module(f"{package_name}.{task_list_path}")
# Load TASK_LIST
TASK_LIST: list[_BaseTask] = getattr(task_list_module, "TASK_LIST")
# Load INPUT_MODELS
try:
INPUT_MODELS = getattr(task_list_module, "INPUT_MODELS")
except AttributeError:
INPUT_MODELS = []
logging.warning(
"No `INPUT_MODELS` found in task_list module. Setting it to `[]`."
)
# Load AUTHORS
try:
manifest["authors"] = getattr(task_list_module, "AUTHORS")
except AttributeError:
logging.warning("No `AUTHORS` found in task_list module.")
# Load DOCS_LINK
try:
DOCS_LINK = getattr(task_list_module, "DOCS_LINK")
except AttributeError:
DOCS_LINK = None
logging.warning("No `DOCS_LINK` found in task_list module.")
# Loop over TASK_LIST, and append the proper task dictionaries
# to manifest["task_list"]
for task_obj in TASK_LIST:
# Convert Pydantic object to dictionary
task_dict = task_obj.model_dump(
exclude={
"meta_init",
"executable_init",
"meta",
"executable",
},
exclude_unset=True,
)
if fractal_server_2_13 and task_obj.type in [
"converter_compound",
"converter_non_parallel",
]:
raise ValueError(
f"Invalid task type {task_obj.type} "
f"(with {fractal_server_2_13=})."
)
if not fractal_server_2_13:
task_dict["type"] = task_obj.type
# Copy some properties from `task_obj` to `task_dict`
if task_obj.executable_non_parallel is not None:
task_dict[
"executable_non_parallel"
] = task_obj.executable_non_parallel
if task_obj.executable_parallel is not None:
task_dict["executable_parallel"] = task_obj.executable_parallel
if task_obj.meta_non_parallel is not None:
task_dict["meta_non_parallel"] = task_obj.meta_non_parallel
if task_obj.meta_parallel is not None:
task_dict["meta_parallel"] = task_obj.meta_parallel
# Autogenerate JSON Schemas for non-parallel/parallel task arguments
for kind in ["non_parallel", "parallel"]:
executable = task_dict.get(f"executable_{kind}")
if executable is not None:
logging.info(f"[{executable}] START")
schema = create_schema_for_single_task(
executable,
package=package_name,
pydantic_models=INPUT_MODELS,
)
validate_arguments(
task_type=task_obj.type,
schema=schema,
executable_kind=kind,
)
logging.info(f"[{executable}] END (new schema)")
task_dict[f"args_schema_{kind}"] = schema
# Compute and set `docs_info`
docs_info = task_dict.get("docs_info")
if docs_info is None:
docs_info = create_docs_info(
executable_non_parallel=task_obj.executable_non_parallel,
executable_parallel=task_obj.executable_parallel,
package=package_name,
)
elif docs_info.startswith("file:"):
docs_info = read_docs_info_from_file(
docs_info=docs_info,
task_list_path=task_list_module.__file__,
)
if docs_info is not None:
task_dict["docs_info"] = docs_info
# Set `docs_link`
if DOCS_LINK is not None:
task_dict["docs_link"] = DOCS_LINK
# Append task
manifest["task_list"].append(task_dict)
return manifest
|