Skip to content

init_projection_hcs

Task that copies the structure of an OME-NGFF zarr array to a new one.

_get_plate(current_plate_url, proj_plate_url, re_initialize_plate=False)

Get or create an OME-Zarr projection plate.

If the plate already exists, return it. If it does not exist, or if re_initialize_plate is True, create a proj plate and return it.

Source code in fractal_tasks_core/init_projection_hcs.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def _get_plate(
    current_plate_url: str,
    proj_plate_url: str,
    re_initialize_plate: bool = False,
) -> OmeZarrPlate:
    """Get or create an OME-Zarr projection plate.

    If the plate already exists, return it.
    If it does not exist, or if `re_initialize_plate` is True,
        create a proj plate and return it.
    """
    if re_initialize_plate or not Path(proj_plate_url).exists():
        logger.info(f"Creating proj plate: {proj_plate_url}")
        proj_plate_name = proj_plate_url.split("/")[-1]
        plate = open_ome_zarr_plate(current_plate_url).derive_plate(
            proj_plate_url,
            plate_name=proj_plate_name,
            overwrite=re_initialize_plate,
            keep_acquisitions=True,
        )
        logger.info(f"proj plate created: {plate}")
        return plate

    plate = open_ome_zarr_plate(proj_plate_url)
    logger.info(f"Plate already exists: {plate}")
    return plate

_open_well(well_path) cached

Open and return an OME-Zarr well object from the given path.

Source code in fractal_tasks_core/init_projection_hcs.py
19
20
21
22
23
24
25
26
27
28
29
@cache
def _open_well(well_path) -> OmeZarrWell:
    """Open and return an OME-Zarr well object from the given path."""
    try:
        well = open_ome_zarr_well(well_path, mode="r", cache=True)
    except NgioFileNotFoundError as err:
        raise NgioFileNotFoundError(
            f"Could not open well {well_path}. "
            "Ensure that the path is correct and the file exists."
        ) from err
    return well

init_projection_hcs(*, zarr_urls, zarr_dir, method=DaskProjectionMethod.MIP, output_plate_name=Field(default='{plate_name}_{method}', pattern='^.*\\{plate_name\\}.*$'), overwrite=False, re_initialize_plate=False)

Duplicate the OME-Zarr HCS structure for a set of zarr_urls.

This task only processes the zarr images in the zarr_urls, not all the images in the plate. It copies all the plate & well structure, but none of the image metadata or the actual image data:

  • For each plate, create a new OME-Zarr HCS plate with the attributes for all the images in zarr_urls
  • For each well (in each plate), create a new zarr subgroup with the same attributes as the original one.
PARAMETER DESCRIPTION
zarr_urls

List of paths or urls to the individual OME-Zarr image to be processed. (standard argument for Fractal tasks, managed by Fractal server).

TYPE: list[str]

zarr_dir

path of the directory where the new OME-Zarrs will be created. (standard argument for Fractal tasks, managed by Fractal server). zarr_url: Path or url to the individual OME-Zarr image to be processed.

TYPE: str

method

Choose which method to use for intensity projection along the Z axis.

TYPE: DaskProjectionMethod DEFAULT: MIP

output_plate_name

The template for the output plate name. To make sure that the output plate is unique it must contain the placeholder {plate_name}, and it can optionally contain the placeholder {method}.

TYPE: str DEFAULT: Field(default='{plate_name}_{method}', pattern='^.*\\{plate_name\\}.*$')

overwrite

If True, previous projected images with the same "output_plate_name" will be overwritten.

TYPE: bool DEFAULT: False

re_initialize_plate

If True, the projection plate will be re-initialized even if it already exists. If False, the task will incrementally add the projected images to the existing plate if it already exists.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, Any]

Setup information required by the Compute Projection (HCS) task.

Source code in fractal_tasks_core/init_projection_hcs.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
@validate_call
def init_projection_hcs(
    *,
    # Fractal parameters
    zarr_urls: list[str],
    zarr_dir: str,
    method: DaskProjectionMethod = DaskProjectionMethod.MIP,
    output_plate_name: str = Field(
        default="{plate_name}_{method}",
        pattern=r"^.*\{plate_name\}.*$",
    ),
    # Advanced parameters
    overwrite: bool = False,
    re_initialize_plate: bool = False,
) -> dict[str, Any]:
    """Duplicate the OME-Zarr HCS structure for a set of zarr_urls.

    This task only processes the zarr images in the zarr_urls, not all the
    images in the plate. It copies all the  plate & well structure, but none
    of the image metadata or the actual image data:

    - For each plate, create a new OME-Zarr HCS plate with the attributes for
        all the images in zarr_urls
    - For each well (in each plate), create a new zarr subgroup with the
       same attributes as the original one.

    Args:
        zarr_urls: List of paths or urls to the individual OME-Zarr image to
            be processed.
            (standard argument for Fractal tasks, managed by Fractal server).
        zarr_dir: path of the directory where the new OME-Zarrs will be
            created.
            (standard argument for Fractal tasks, managed by Fractal server).
            zarr_url: Path or url to the individual OME-Zarr image to be processed.
        method: Choose which method to use for intensity projection along the
            Z axis.
        output_plate_name: The template for the output plate name. To make sure
            that the output plate is unique it must contain the placeholder
            {plate_name}, and it can optionally contain the placeholder {method}.
        overwrite: If True, previous projected images with the same "output_plate_name"
            will be overwritten.
        re_initialize_plate: If True, the projection plate will be re-initialized
            even if it already exists. If False, the task will incrementally add the
            projected images to the existing plate if it already exists.

    Returns:
        Setup information required by the Compute Projection (HCS) task.
    """
    parallelization_list = []

    # A dictionary to store the plates and avoid re-initializing them multiple
    # times
    proj_plates: dict[str, OmeZarrPlate] = {}
    # A dictionary to store the images and avoid re-initializing querying all
    # wells multiple times
    proj_plates_images_paths: dict[str, list[str]] = {}

    # Generate parallelization list
    for zarr_url in zarr_urls:
        # Check if the zarr_url is valid
        if len(zarr_url.rstrip("/").split("/")) < 4:
            raise ValueError(
                f"Invalid zarr_url: {zarr_url}. "
                "The zarr_url of an image in a plate should be of the form "
                "`/path/to/plate_name/row/column/image_path`. "
                "The zarr_url given is too short to be valid."
            )
        *base, plate_name, row, column, image_path = zarr_url.rstrip("/").split("/")
        base_dir = "/".join(base)

        plate_url = f"{base_dir}/{plate_name}"
        plate_name = plate_name.removesuffix(".zarr")
        proj_plate_name = format_template_name(
            output_plate_name,
            plate_name=plate_name,
            method=method.abbreviation,
        )
        # Make sure the proj_plate_name ends with .zarr
        if not proj_plate_name.endswith(".zarr"):
            proj_plate_name = f"{proj_plate_name}.zarr"
        proj_plate_url = f"{zarr_dir}/{proj_plate_name}"

        if proj_plate_url not in proj_plates:
            _proj_plate = _get_plate(
                current_plate_url=plate_url,
                proj_plate_url=proj_plate_url,
                re_initialize_plate=re_initialize_plate,
            )
            proj_plates[proj_plate_url] = _proj_plate
            proj_plates_images_paths[proj_plate_url] = _proj_plate.images_paths()

        proj_plate = proj_plates[proj_plate_url]
        proj_plate_images_paths = proj_plates_images_paths[proj_plate_url]
        well_path = f"{plate_url}/{row}/{column}"
        well = _open_well(well_path)
        acquisition_id = well.get_image_acquisition_id(image_path)

        proj_image_path = f"{row}/{column}/{image_path}"

        if proj_image_path in proj_plate_images_paths:
            if not overwrite:
                raise NgioFileExistsError(
                    f"Image {proj_image_path} already exists in "
                    f"{proj_plate_url}. Set `overwrite=True` "
                    "to overwrite it."
                )
            logger.info(
                f"Image {proj_image_path} already exists in {proj_plate_url}. "
                "Overwriting it."
            )

        else:
            proj_plate.add_image(
                row=row,
                column=column,
                image_path=image_path,
                acquisition_id=acquisition_id,
            )
            proj_plates_images_paths[proj_plate_url].append(proj_image_path)

        proj_zarr_url = f"{proj_plate_url}/{proj_image_path}"
        proj_init = InitArgsMIP(
            origin_url=zarr_url,
            method=method,
            # Since we checked for existence above,
            # we can safely set this to True
            overwrite=True,
            new_plate_name=proj_plate_name,
        )
        parallelization_item = {
            "zarr_url": proj_zarr_url,
            "init_args": proj_init.model_dump(mode="json"),
        }
        parallelization_list.append(parallelization_item)

    _open_well.cache_clear()
    return {"parallelization_list": parallelization_list}