45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208 | @router.post(
"/collect/pip/",
response_model=StateRead,
responses={
201: dict(
description=(
"Task collection successfully started in the background"
)
),
200: dict(
description=(
"Package already collected. Returning info on already "
"available tasks"
)
),
},
)
async def collect_tasks_pip(
task_collect: TaskCollectPipV1,
background_tasks: BackgroundTasks,
response: Response,
user: UserOAuth = Depends(current_active_verified_user),
db: AsyncSession = Depends(get_async_db),
) -> StateRead: # State[TaskCollectStatus]
"""
Task collection endpoint
Trigger the creation of a dedicated virtual environment, the installation
of a package and the collection of tasks as advertised in the manifest.
"""
_raise_if_v1_is_read_only()
logger = set_logger(logger_name="collect_tasks_pip")
# Validate payload as _TaskCollectPip, which has more strict checks than
# TaskCollectPip
try:
task_pkg = _TaskCollectPip(**task_collect.dict(exclude_unset=True))
except ValidationError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid task-collection object. Original error: {e}",
)
with TemporaryDirectory() as tmpdir:
try:
# Copy or download the package wheel file to tmpdir
if task_pkg.is_local_package:
shell_copy(task_pkg.package_path.as_posix(), tmpdir)
pkg_path = Path(tmpdir) / task_pkg.package_path.name
else:
pkg_path = await download_package(
task_pkg=task_pkg, dest=tmpdir
)
# Read package info from wheel file, and override the ones coming
# from the request body
pkg_info = inspect_package(pkg_path)
task_pkg.package_name = pkg_info["pkg_name"]
task_pkg.package_version = pkg_info["pkg_version"]
task_pkg.package_manifest = pkg_info["pkg_manifest"]
task_pkg.check()
except Exception as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid package or manifest. Original error: {e}",
)
try:
venv_path = create_package_dir_pip(task_pkg=task_pkg)
except FileExistsError:
venv_path = create_package_dir_pip(task_pkg=task_pkg, create=False)
try:
task_collect_status = get_collection_data(venv_path)
for task in task_collect_status.task_list:
db_task = await db.get(Task, task.id)
if (
(not db_task)
or db_task.source != task.source
or db_task.name != task.name
):
await db.close()
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
"Cannot collect package. Folder already exists, "
f"but task {task.id} does not exists or it does "
f"not have the expected source ({task.source}) or "
f"name ({task.name})."
),
)
except FileNotFoundError as e:
await db.close()
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
"Cannot collect package. Possible reason: another "
"collection of the same package is in progress. "
f"Original error: {e}"
),
)
except ValidationError as e:
await db.close()
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
"Cannot collect package. Possible reason: an old version "
"of the same package has already been collected. "
f"Original error: {e}"
),
)
task_collect_status.info = "Already installed"
state = State(data=task_collect_status.sanitised_dict())
response.status_code == status.HTTP_200_OK
await db.close()
return state
settings = Inject(get_settings)
# Check that tasks are not already in the DB
for new_task in task_pkg.package_manifest.task_list:
new_task_name_slug = slugify_task_name_for_source_v1(new_task.name)
new_task_source = f"{task_pkg.package_source}:{new_task_name_slug}"
stm = select(Task).where(Task.source == new_task_source)
res = await db.execute(stm)
if res.scalars().all():
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
"Cannot collect package. Task with source "
f'"{new_task_source}" already exists in the database.'
),
)
# All checks are OK, proceed with task collection
full_venv_path = venv_path.relative_to(settings.FRACTAL_TASKS_DIR)
collection_status = TaskCollectStatusV1(
status="pending", venv_path=full_venv_path, package=task_pkg.package
)
# Create State object (after casting venv_path to string)
collection_status_dict = collection_status.dict()
collection_status_dict["venv_path"] = str(collection_status.venv_path)
state = State(data=collection_status_dict)
db.add(state)
await db.commit()
await db.refresh(state)
background_tasks.add_task(
background_collect_pip,
state_id=state.id,
venv_path=venv_path,
task_pkg=task_pkg,
)
logger.debug(
"Task-collection endpoint: start background collection "
"and return state"
)
close_logger(logger)
info = (
"Collecting tasks in the background. "
f"GET /task/collect/{state.id} to query collection status"
)
state.data["info"] = info
response.status_code = status.HTTP_201_CREATED
await db.close()
return state
|