Skip to content

endpoint_operations

create_package_dir_pip(*, task_pkg, create=True)

Create venv folder for a task package and return corresponding Path object

Source code in fractal_server/tasks/v1/endpoint_operations.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def create_package_dir_pip(
    *,
    task_pkg: _TaskCollectPipV1,
    create: bool = True,
) -> Path:
    """
    Create venv folder for a task package and return corresponding Path object
    """
    settings = Inject(get_settings)
    user = FRACTAL_PUBLIC_TASK_SUBDIR
    if task_pkg.package_version is None:
        raise ValueError(
            f"Cannot create venv folder for package `{task_pkg.package}` "
            "with `version=None`."
        )
    normalized_package = normalize_package_name(task_pkg.package)
    package_dir = f"{normalized_package}{task_pkg.package_version}"
    venv_path = settings.FRACTAL_TASKS_DIR / user / package_dir
    if create:
        venv_path.mkdir(exist_ok=False, parents=True)
    return venv_path

download_package(*, task_pkg, dest) async

Download package to destination

Source code in fractal_server/tasks/v1/endpoint_operations.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def download_package(
    *,
    task_pkg: _TaskCollectPipV1,
    dest: Union[str, Path],
) -> Path:
    """
    Download package to destination
    """
    interpreter = get_python_interpreter_v1(version=task_pkg.python_version)
    pip = f"{interpreter} -m pip"
    version = (
        f"=={task_pkg.package_version}" if task_pkg.package_version else ""
    )
    package_and_version = f"{task_pkg.package}{version}"
    cmd = f"{pip} download --no-deps {package_and_version} -d {dest}"
    stdout = await execute_command_async(command=cmd, cwd=Path("."))
    pkg_file = next(
        line.split()[-1] for line in stdout.split("\n") if "Saved" in line
    )
    return Path(pkg_file)

inspect_package(path, logger_name=None)

Inspect task package to extract version, name and manifest

Note that this only works with wheel files, which have a well-defined dist-info section. If we need to generalize to to tar.gz archives, we would need to go and look for PKG-INFO.

Note: package name is normalized via _normalize_package_name.

Parameters:

Name Type Description Default
path Path

Path the path in which the package is saved

required

Returns:

Name Type Description
version_manifest dict

A dictionary containing version, the version of the

dict

pacakge, and manifest, the Fractal manifest object relative to the

dict

tasks.

Source code in fractal_server/tasks/v1/endpoint_operations.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def inspect_package(path: Path, logger_name: Optional[str] = None) -> dict:
    """
    Inspect task package to extract version, name and manifest

    Note that this only works with wheel files, which have a well-defined
    dist-info section. If we need to generalize to to tar.gz archives, we would
    need to go and look for `PKG-INFO`.

    Note: package name is normalized via `_normalize_package_name`.

    Args:
        path: Path
            the path in which the package is saved

    Returns:
        version_manifest: A dictionary containing `version`, the version of the
        pacakge, and `manifest`, the Fractal manifest object relative to the
        tasks.
    """

    logger = get_logger(logger_name)

    if not path.as_posix().endswith(".whl"):
        raise ValueError(
            f"Only wheel packages are supported, given {path.as_posix()}."
        )

    with ZipFile(path) as wheel:
        namelist = wheel.namelist()

        # Read and validate task manifest
        logger.debug(f"Now reading manifest for {path.as_posix()}")
        pkg_manifest = _load_manifest_from_wheel(
            path, wheel, logger_name=logger_name
        )
        logger.debug("Manifest read correctly.")

        # Read package name and version from *.dist-info/METADATA
        logger.debug(
            f"Now reading package name and version for {path.as_posix()}"
        )
        metadata = next(
            name for name in namelist if "dist-info/METADATA" in name
        )
        with wheel.open(metadata) as metadata_fd:
            meta = metadata_fd.read().decode("utf-8")
            pkg_name = next(
                line.split()[-1]
                for line in meta.splitlines()
                if line.startswith("Name")
            )
            pkg_version = next(
                line.split()[-1]
                for line in meta.splitlines()
                if line.startswith("Version")
            )
        logger.debug("Package name and version read correctly.")

    # Normalize package name:
    pkg_name = normalize_package_name(pkg_name)

    info = dict(
        pkg_name=pkg_name,
        pkg_version=pkg_version,
        pkg_manifest=pkg_manifest,
    )
    return info