classCollectionRequestData(BaseModel):""" Validate form data _and_ wheel file. """task_collect:TaskCollectPipV2file:Optional[UploadFile]=Noneorigin:TaskGroupV2OriginEnum@model_validator(mode="before")@classmethoddefvalidate_data(cls,values):file=values.get("file")package=values.get("task_collect").packagepackage_version=values.get("task_collect").package_versioniffileisNone:ifpackageisNone:raiseValueError("When no `file` is provided, `package` is required.")values["origin"]=TaskGroupV2OriginEnum.PYPIelse:ifpackageisnotNone:raiseValueError("Cannot set `package` when `file` is provided "f"(given package='{package}').")ifpackage_versionisnotNone:raiseValueError("Cannot set `package_version` when `file` is "f"provided (given package_version='{package_version}').")values["origin"]=TaskGroupV2OriginEnum.WHEELFILEforforbidden_charinFORBIDDEN_CHAR_WHEEL:ifforbidden_charinfile.filename:raiseValueError("Wheel filename has forbidden characters, "f"{FORBIDDEN_CHAR_WHEEL}")returnvalues
@router.post("/collect/pip/",response_model=TaskGroupActivityV2Read,)asyncdefcollect_tasks_pip(request:Request,response:Response,background_tasks:BackgroundTasks,request_data:CollectionRequestData=Depends(parse_request_data),private:bool=False,user_group_id:Optional[int]=None,user:UserOAuth=Depends(current_active_verified_user),db:AsyncSession=Depends(get_async_db),)->TaskGroupActivityV2Read:""" Task-collection endpoint """# Get settingssettings=Inject(get_settings)# Get some validated request datatask_collect=request_data.task_collect# Initialize task-group attributestask_group_attrs=dict(user_id=user.id,origin=request_data.origin,)# Set/check python versioniftask_collect.python_versionisNone:task_group_attrs["python_version"]=settings.FRACTAL_TASKS_PYTHON_DEFAULT_VERSIONelse:task_group_attrs["python_version"]=task_collect.python_versiontry:get_python_interpreter_v2(python_version=task_group_attrs["python_version"])exceptValueError:raiseHTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,detail=(f"Python version {task_group_attrs['python_version']} is ""not available for Fractal task collection."),)# Set pip_extrasiftask_collect.package_extrasisnotNone:task_group_attrs["pip_extras"]=task_collect.package_extras# Set pinned_package_versionsiftask_collect.pinned_package_versionsisnotNone:task_group_attrs["pinned_package_versions"]=task_collect.pinned_package_versions# Initialize wheel_file_content as Nonewheel_file=None# Set pkg_name, version, origin and wheel_pathifrequest_data.origin==TaskGroupV2OriginEnum.WHEELFILE:try:wheel_filename=request_data.file.filenamewheel_info=_parse_wheel_filename(wheel_filename)wheel_file_content=awaitrequest_data.file.read()wheel_file=WheelFile(filename=wheel_filename,contents=wheel_file_content,)exceptValueErrorase:raiseHTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,detail=(f"Invalid wheel-file name {wheel_filename}. "f"Original error: {str(e)}",),)task_group_attrs["pkg_name"]=normalize_package_name(wheel_info["distribution"])task_group_attrs["version"]=wheel_info["version"]elifrequest_data.origin==TaskGroupV2OriginEnum.PYPI:pkg_name=task_collect.packagetask_group_attrs["pkg_name"]=normalize_package_name(pkg_name)latest_version=awaitget_package_version_from_pypi(task_collect.package,task_collect.package_version,)task_group_attrs["version"]=latest_version# Validate query parameters related to user-group ownershipuser_group_id=await_get_valid_user_group_id(user_group_id=user_group_id,private=private,user_id=user.id,db=db,)# Set user_group_idtask_group_attrs["user_group_id"]=user_group_id# Validate user settings (backend-specific)user_settings=awaitvalidate_user_settings(user=user,backend=settings.FRACTAL_RUNNER_BACKEND,db=db)# Set path and venv_pathifsettings.FRACTAL_RUNNER_BACKEND=="slurm_ssh":base_tasks_path=user_settings.ssh_tasks_direlse:base_tasks_path=settings.FRACTAL_TASKS_DIR.as_posix()task_group_path=(Path(base_tasks_path)/str(user.id)/task_group_attrs["pkg_name"]/task_group_attrs["version"]).as_posix()task_group_attrs["path"]=task_group_pathtask_group_attrs["venv_path"]=Path(task_group_path,"venv").as_posix()# Validate TaskGroupV2 attributestry:TaskGroupCreateV2Strict(**task_group_attrs)exceptValidationErrorase:raiseHTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,detail=f"Invalid task-group object. Original error: {e}",)# Database checks# Verify non-duplication constraintsawait_verify_non_duplication_user_constraint(user_id=user.id,pkg_name=task_group_attrs["pkg_name"],version=task_group_attrs["version"],db=db,)await_verify_non_duplication_group_constraint(user_group_id=task_group_attrs["user_group_id"],pkg_name=task_group_attrs["pkg_name"],version=task_group_attrs["version"],db=db,)# Verify that task-group path is uniquestm=select(TaskGroupV2).where(TaskGroupV2.path==task_group_path)res=awaitdb.execute(stm)forconflicting_task_groupinres.scalars().all():raiseHTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,detail=(f"Another task-group already has path={task_group_path}.\n"f"{conflicting_task_group=}"),)# On-disk checksifsettings.FRACTAL_RUNNER_BACKEND!="slurm_ssh":# Verify that folder does not exist (for local collection)ifPath(task_group_path).exists():raiseHTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,detail=f"{task_group_path} already exists.",)# Create TaskGroupV2 objecttask_group=TaskGroupV2(**task_group_attrs)db.add(task_group)awaitdb.commit()awaitdb.refresh(task_group)db.expunge(task_group)# All checks are OK, proceed with task collectiontask_group_activity=TaskGroupActivityV2(user_id=task_group.user_id,taskgroupv2_id=task_group.id,status=TaskGroupActivityStatusV2.PENDING,action=TaskGroupActivityActionV2.COLLECT,pkg_name=task_group.pkg_name,version=task_group.version,)db.add(task_group_activity)awaitdb.commit()awaitdb.refresh(task_group_activity)logger=set_logger(logger_name="collect_tasks_pip")# END of SSH/non-SSH common partifsettings.FRACTAL_RUNNER_BACKEND=="slurm_ssh":# SSH task collection# Use appropriate FractalSSH objectssh_credentials=dict(user=user_settings.ssh_username,host=user_settings.ssh_host,key_path=user_settings.ssh_private_key_path,)fractal_ssh_list=request.app.state.fractal_ssh_listfractal_ssh=fractal_ssh_list.get(**ssh_credentials)background_tasks.add_task(collect_ssh,task_group_id=task_group.id,task_group_activity_id=task_group_activity.id,fractal_ssh=fractal_ssh,tasks_base_dir=user_settings.ssh_tasks_dir,wheel_file=wheel_file,)else:# Local task collectionbackground_tasks.add_task(collect_local,task_group_id=task_group.id,task_group_activity_id=task_group_activity.id,wheel_file=wheel_file,)logger.debug("Task-collection endpoint: start background collection ""and return task_group_activity")reset_logger_handlers(logger)response.status_code=status.HTTP_202_ACCEPTEDreturntask_group_activity
defparse_request_data(package:Optional[str]=Form(None),package_version:Optional[str]=Form(None),package_extras:Optional[str]=Form(None),python_version:Optional[str]=Form(None),pinned_package_versions:Optional[str]=Form(None),file:Optional[UploadFile]=File(None),)->CollectionRequestData:""" Expand the parsing/validation of `parse_form_data`, based on `file`. """try:# Convert dict_pinned_pkg from a JSON string into a Python dictionarydict_pinned_pkg=(json.loads(pinned_package_versions)ifpinned_package_versionselseNone)# Validate and coerce form datatask_collect_pip=TaskCollectPipV2(package=package,package_version=package_version,package_extras=package_extras,python_version=python_version,pinned_package_versions=dict_pinned_pkg,)data=CollectionRequestData(task_collect=task_collect_pip,file=file,)except(ValidationError,json.JSONDecodeError)ase:raiseHTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,detail=f"Invalid request-body\n{str(e)}",)returndata