Skip to content

zarr_utils

Module with custom wrappers of the Zarr API.

open_zarr_group_with_overwrite(path, *, overwrite, logger=None, **open_group_kwargs)

Wrap zarr.open_group and add overwrite argument.

This wrapper sets mode="w" for overwrite=True and mode="w-" for overwrite=False.

The expected behavior is

  • if the group does not exist, create it (independently on overwrite);
  • if the group already exists and overwrite=True, replace the group with an empty one;
  • if the group already exists and overwrite=False, fail.

From the zarr.open_group docs:

  • mode="r" means read only (must exist);
  • mode="r+" means read/write (must exist);
  • mode="a" means read/write (create if doesn’t exist);
  • mode="w" means create (overwrite if exists);
  • mode="w-" means create (fail if exists).
PARAMETER DESCRIPTION
path

Store or path to directory in file system or name of zip file (zarr.open_group parameter).

TYPE: Union[str, MutableMapping]

overwrite

Determines the mode parameter of zarr.open_group, which is "w" (if overwrite=True) or "w-" (if overwrite=False).

TYPE: bool

logger

The logger to use (if unset, use logging.getLogger(None))

TYPE: Optional[Logger] DEFAULT: None

open_group_kwargs

Keyword arguments of zarr.open_group.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Group

The zarr group.

RAISES DESCRIPTION
OverwriteNotAllowedError

If overwrite=False and the group already exists.

Source code in fractal_tasks_core/zarr_utils.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def open_zarr_group_with_overwrite(
    path: Union[str, MutableMapping],
    *,
    overwrite: bool,
    logger: Optional[logging.Logger] = None,
    **open_group_kwargs: Any,
) -> zarr.hierarchy.Group:
    """
    Wrap `zarr.open_group` and add `overwrite` argument.

    This wrapper sets `mode="w"` for `overwrite=True` and `mode="w-"` for
    `overwrite=False`.

    The expected behavior is


    * if the group does not exist, create it (independently on `overwrite`);
    * if the group already exists and `overwrite=True`, replace the group with
      an empty one;
    * if the group already exists and `overwrite=False`, fail.

    From the [`zarr.open_group`
    docs](https://zarr.readthedocs.io/en/stable/api/hierarchy.html#zarr.hierarchy.open_group):

    * `mode="r"` means read only (must exist);
    * `mode="r+"` means read/write (must exist);
    * `mode="a"` means read/write (create if doesn’t exist);
    * `mode="w"` means create (overwrite if exists);
    * `mode="w-"` means create (fail if exists).


    Args:
        path:
            Store or path to directory in file system or name of zip file
            (`zarr.open_group` parameter).
        overwrite:
            Determines the `mode` parameter of `zarr.open_group`, which is
            `"w"` (if `overwrite=True`) or `"w-"` (if `overwrite=False`).
        logger:
            The logger to use (if unset, use `logging.getLogger(None)`)
        open_group_kwargs:
            Keyword arguments of `zarr.open_group`.

    Returns:
        The zarr group.

    Raises:
        OverwriteNotAllowedError:
            If `overwrite=False` and the group already exists.
    """

    # Set logger
    if logger is None:
        logger = logging.getLogger(None)

    # Set mode for zarr.open_group
    if overwrite:
        new_mode = "w"
    else:
        new_mode = "w-"

    # Write log about current status
    logger.info(f"Start open_zarr_group_with_overwrite ({overwrite=}).")
    try:
        # Call `zarr.open_group` with `mode="r"`, which fails for missing group
        current_group = zarr.open_group(path, mode="r")
        keys = list(current_group.group_keys())
        logger.info(f"Zarr group {path} already exists, with {keys=}")
    except GroupNotFoundError:
        logger.info(f"Zarr group {path} does not exist yet.")

    # Raise warning if we are overriding an existing value of `mode`
    if "mode" in open_group_kwargs.keys():
        mode = open_group_kwargs.pop("mode")
        logger.warning(
            f"Overriding {mode=} with {new_mode=}, "
            "in open_zarr_group_with_overwrite"
        )

    # Call zarr.open_group
    try:
        return zarr.open_group(path, mode=new_mode, **open_group_kwargs)
    except ContainsGroupError:
        # Re-raise error with custom message and type
        error_msg = (
            f"Cannot create zarr group at {path=} with `{overwrite=}` "
            "(original error: `zarr.errors.ContainsGroupError`).\n"
            "Hint: try setting `overwrite=True`."
        )
        logger.error(error_msg)
        raise OverwriteNotAllowedError(error_msg)