Name of the ROI table over which the task loops to
apply napari workflows.
Examples:
FOV_ROI_table
=> loop over the field of views;
organoid_ROI_table
=> loop over the organoid ROI table (generated by another task);
well_ROI_table
=> process the whole well as one image.
Pyramid level of the image to be used as input for
napari-workflows. Choose 0 to process at full resolution.
Levels > 0 are currently only supported for workflows that only
have intensity images as input and only produce a label images as
output.
Expected dimensions (either 2 or 3). Useful
when loading 2D images that are stored in a 3D array with shape
(1, size_x, size_y) [which is the default way Fractal stores 2D
images], but you want to make sure the napari workflow gets a 2D
array to process. Also useful to set to 2 when loading a 2D
OME-Zarr that is saved as (size_x, size_y).
@validate_calldefnapari_workflows_wrapper(*,# Fractal parameterszarr_url:str,# Core parametersworkflow_file:str,input_specs:dict[str,NapariWorkflowsInput],output_specs:dict[str,NapariWorkflowsOutput],input_ROI_table:str="FOV_ROI_table",level:int=0,# Advanced parametersrelabeling:bool=True,expected_dimensions:int=3,overwrite:bool=True,):""" Run a napari-workflow on the ROIs of a single OME-NGFF image. This task takes images and labels and runs a napari-workflow on them that can produce a label and tables as output. Examples of allowed entries for `input_specs` and `output_specs`: ``` input_specs = { "in_1": {"type": "image", "channel": {"wavelength_id": "A01_C02"}}, "in_2": {"type": "image", "channel": {"label": "DAPI"}}, "in_3": {"type": "label", "label_name": "label_DAPI"}, } output_specs = { "out_1": {"type": "label", "label_name": "label_DAPI_new"}, "out_2": {"type": "dataframe", "table_name": "measurements"}, } ``` Args: zarr_url: Path or url to the individual OME-Zarr image to be processed. (standard argument for Fractal tasks, managed by Fractal server). workflow_file: Absolute path to napari-workflows YAML file input_specs: A dictionary of `NapariWorkflowsInput` values. output_specs: A dictionary of `NapariWorkflowsOutput` values. input_ROI_table: Name of the ROI table over which the task loops to apply napari workflows. Examples: `FOV_ROI_table` => loop over the field of views; `organoid_ROI_table` => loop over the organoid ROI table (generated by another task); `well_ROI_table` => process the whole well as one image. level: Pyramid level of the image to be used as input for napari-workflows. Choose `0` to process at full resolution. Levels > 0 are currently only supported for workflows that only have intensity images as input and only produce a label images as output. relabeling: If `True`, apply relabeling so that label values are unique across all ROIs in the well. expected_dimensions: Expected dimensions (either `2` or `3`). Useful when loading 2D images that are stored in a 3D array with shape `(1, size_x, size_y)` [which is the default way Fractal stores 2D images], but you want to make sure the napari workflow gets a 2D array to process. Also useful to set to `2` when loading a 2D OME-Zarr that is saved as `(size_x, size_y)`. overwrite: If `True`, overwrite the task output. """wf:napari_workflows.Worfklow=load_workflow(workflow_file)logger.info(f"Loaded workflow from {workflow_file}")# Validation of input/output specsifnot(set(wf.leafs())<=set(output_specs.keys())):msg=f"Some item of {wf.leafs()=} is not part of {output_specs=}."logger.warning(msg)ifnot(set(wf.roots())<=set(input_specs.keys())):msg=f"Some item of {wf.roots()=} is not part of {input_specs=}."logger.error(msg)raiseValueError(msg)list_outputs=sorted(output_specs.keys())# Characterization of workflow and scope restrictioninput_types=[in_params.typefor(name,in_params)ininput_specs.items()]output_types=[out_params.typefor(name,out_params)inoutput_specs.items()]are_inputs_all_images=set(input_types)=={"image"}are_outputs_all_labels=set(output_types)=={"label"}are_outputs_all_dataframes=set(output_types)=={"dataframe"}is_labeling_workflow=are_inputs_all_imagesandare_outputs_all_labelsis_measurement_only_workflow=are_outputs_all_dataframes# Level-related constraintlogger.info(f"This workflow acts at {level=}")logger.info(f"Is the current workflow a labeling one? {is_labeling_workflow}")iflevel>0andnotis_labeling_workflow:msg=(f"{level=}>0 is currently only accepted for labeling workflows, ""i.e. those going from image(s) to label(s)")logger.error(msg)raiseOutOfTaskScopeError(msg)# Relabeling-related (soft) constraintifis_measurement_only_workflowandrelabeling:logger.warning("This is a measurement-output-only workflow, setting ""relabeling=False.")relabeling=Falseifrelabeling:max_label_for_relabeling=0label_dtype=np.uint32# Read ROI tableROI_table=ad.read_zarr(f"{zarr_url}/tables/{input_ROI_table}")# Load image metadatangff_image_meta=load_NgffImageMeta(zarr_url)num_levels=ngff_image_meta.num_levelscoarsening_xy=ngff_image_meta.coarsening_xy# Read pixel sizes from zattrs filefull_res_pxl_sizes_zyx=ngff_image_meta.get_pixel_sizes_zyx(level=0)# Create list of indices for 3D FOVs spanning the entire Z directionlist_indices=convert_ROI_table_to_indices(ROI_table,level=level,coarsening_xy=coarsening_xy,full_res_pxl_sizes_zyx=full_res_pxl_sizes_zyx,)check_valid_ROI_indices(list_indices,input_ROI_table)num_ROIs=len(list_indices)logger.info(f"Completed reading ROI table {input_ROI_table},"f" found {num_ROIs} ROIs.")# Input preparation: "image" typeimage_inputs=[(name,in_params)for(name,in_params)ininput_specs.items()ifin_params.type=="image"]input_image_arrays={}ifimage_inputs:img_array=da.from_zarr(f"{zarr_url}/{level}")# Loop over image inputs and assign corresponding channel of the imageforname,paramsinimage_inputs:channel=get_channel_from_image_zarr(image_zarr_path=zarr_url,wavelength_id=params.channel.wavelength_id,label=params.channel.label,)channel_index=channel.indexinput_image_arrays[name]=img_array[channel_index]# Handle dimensionsshape=input_image_arrays[name].shapeifexpected_dimensions==3andshape[0]==1:logger.warning(f"Input {name} has shape {shape} "f"but {expected_dimensions=}")ifexpected_dimensions==2:iflen(shape)==2:# We already load the data as a 2D arraypasselifshape[0]==1:input_image_arrays[name]=input_image_arrays[name][0,:,:]else:msg=(f"Input {name} has shape {shape} "f"but {expected_dimensions=}")logger.error(msg)raiseValueError(msg)logger.info(f"Prepared input with {name=} and {params=}")logger.info(f"{input_image_arrays=}")# Input preparation: "label" typelabel_inputs=[(name,in_params)for(name,in_params)ininput_specs.items()ifin_params.type=="label"]iflabel_inputs:# Set target_shape for upscaling labelsifnotimage_inputs:logger.warning(f"{len(label_inputs)=} but num_image_inputs=0. ""Label array(s) will not be upscaled.")upscale_labels=Falseelse:target_shape=list(input_image_arrays.values())[0].shapeupscale_labels=True# Loop over label inputs and load corresponding (upscaled) imageinput_label_arrays={}forname,paramsinlabel_inputs:label_name=params.label_namelabel_array_raw=da.from_zarr(f"{zarr_url}/labels/{label_name}/{level}")input_label_arrays[name]=label_array_raw# Handle dimensionsshape=input_label_arrays[name].shapeifexpected_dimensions==3andshape[0]==1:logger.warning(f"Input {name} has shape {shape} "f"but {expected_dimensions=}")ifexpected_dimensions==2:iflen(shape)==2:# We already load the data as a 2D arraypasselifshape[0]==1:input_label_arrays[name]=input_label_arrays[name][0,:,:]else:msg=(f"Input {name} has shape {shape} "f"but {expected_dimensions=}")logger.error(msg)raiseValueError(msg)ifupscale_labels:# Check that dimensionality matches the imageiflen(input_label_arrays[name].shape)!=len(target_shape):raiseValueError(f"Label {name} has shape "f"{input_label_arrays[name].shape}. ""But the corresponding image has shape "f"{target_shape}. Those dimensionalities do not "f"match. Is {expected_dimensions=} the correct ""setting?")ifexpected_dimensions==3:upscaling_axes=[1,2]else:upscaling_axes=[0,1]input_label_arrays[name]=upscale_array(array=input_label_arrays[name],target_shape=target_shape,axis=upscaling_axes,pad_with_zeros=True,)logger.info(f"Prepared input with {name=} and {params=}")logger.info(f"{input_label_arrays=}")# Output preparation: "label" typelabel_outputs=[(name,out_params)for(name,out_params)inoutput_specs.items()ifout_params.type=="label"]iflabel_outputs:# Preliminary scope checksiflen(label_outputs)>1:raiseOutOfTaskScopeError("Multiple label outputs would break label-inputs-only "f"workflows (found {len(label_outputs)=}).")iflen(label_outputs)>1andrelabeling:raiseOutOfTaskScopeError("Multiple label outputs would break relabeling in labeling+"f"measurement workflows (found {len(label_outputs)=}).")# We only support two cases:# 1. If there exist some input images, then use the first one to# determine output-label array properties# 2. If there are no input images, but there are input labels, then (A)# re-load the pixel sizes and re-build ROI indices, and (B) use the# first input label to determine output-label array propertiesifimage_inputs:reference_array=list(input_image_arrays.values())[0]eliflabel_inputs:reference_array=list(input_label_arrays.values())[0]# Re-load pixel size, matching to the correct levelinput_label_name=label_inputs[0][1].label_namengff_label_image_meta=load_NgffImageMeta(f"{zarr_url}/labels/{input_label_name}")full_res_pxl_sizes_zyx=ngff_label_image_meta.get_pixel_sizes_zyx(level=0)# Create list of indices for 3D FOVs spanning the whole Z directionlist_indices=convert_ROI_table_to_indices(ROI_table,level=level,coarsening_xy=coarsening_xy,full_res_pxl_sizes_zyx=full_res_pxl_sizes_zyx,)check_valid_ROI_indices(list_indices,input_ROI_table)num_ROIs=len(list_indices)logger.info(f"Re-create ROI indices from ROI table {input_ROI_table}, "f"using {full_res_pxl_sizes_zyx=}. ""This is necessary because label-input-only workflows may ""have label inputs that are at a different resolution and ""are not upscaled.")else:msg=("Missing image_inputs and label_inputs, we cannot assign"" label output properties")raiseOutOfTaskScopeError(msg)# Extract label properties from reference_array, and make sure they are# for three dimensionslabel_shape=reference_array.shapelabel_chunksize=reference_array.chunksizeiflen(label_shape)==2andlen(label_chunksize)==2:ifexpected_dimensions==3:raiseValueError(f"Something wrong: {label_shape=} but "f"{expected_dimensions=}")label_shape=(1,label_shape[0],label_shape[1])label_chunksize=(1,label_chunksize[0],label_chunksize[1])logger.info(f"{label_shape=}")logger.info(f"{label_chunksize=}")# Loop over label outputs and (1) set zattrs, (2) create zarr groupoutput_label_zarr_groups:dict[str,Any]={}forname,out_paramsinlabel_outputs:# (1a) Rescale OME-NGFF datasets (relevant for level>0)ifnotngff_image_meta.multiscale.axes[0].name=="c":raiseValueError("Cannot set `remove_channel_axis=True` for multiscale "f"metadata with axes={ngff_image_meta.multiscale.axes}. "'First axis should have name "c".')new_datasets=rescale_datasets(datasets=[ds.model_dump()fordsinngff_image_meta.multiscale.datasets],coarsening_xy=coarsening_xy,reference_level=level,remove_channel_axis=True,)# (1b) Prepare attrs for label grouplabel_name=out_params.label_namelabel_attrs={"image-label":{"version":__OME_NGFF_VERSION__,"source":{"image":"../../"},},"multiscales":[{"name":label_name,"version":__OME_NGFF_VERSION__,"axes":[ax.model_dump()foraxinngff_image_meta.multiscale.axesifax.type!="channel"],"datasets":new_datasets,}],}# (2) Prepare label groupimage_group=zarr.group(zarr_url)label_group=prepare_label_group(image_group,label_name,overwrite=overwrite,label_attrs=label_attrs,logger=logger,)logger.info("Helper function `prepare_label_group` returned "f"{label_group=}")# (3) Create zarr group at level=0store=zarr.storage.FSStore(f"{zarr_url}/labels/{label_name}/0")mask_zarr=zarr.create(shape=label_shape,chunks=label_chunksize,dtype=label_dtype,store=store,overwrite=overwrite,dimension_separator="/",)output_label_zarr_groups[name]=mask_zarrlogger.info(f"Prepared output with {name=} and {out_params=}")logger.info(f"{output_label_zarr_groups=}")# Output preparation: "dataframe" typedataframe_outputs=[(name,out_params)for(name,out_params)inoutput_specs.items()ifout_params.type=="dataframe"]output_dataframe_lists:dict[str,list]={}forname,out_paramsindataframe_outputs:output_dataframe_lists[name]=[]logger.info(f"Prepared output with {name=} and {out_params=}")logger.info(f"{output_dataframe_lists=}")#####fori_ROI,indicesinenumerate(list_indices):s_z,e_z,s_y,e_y,s_x,e_x=indices[:]region=(slice(s_z,e_z),slice(s_y,e_y),slice(s_x,e_x))logger.info(f"ROI {i_ROI+1}/{num_ROIs}: {region=}")# Always re-load napari worfklowwf=load_workflow(workflow_file)# Set inputsforinput_nameininput_specs.keys():input_type=input_specs[input_name].typeifinput_type=="image":wf.set(input_name,load_region(input_image_arrays[input_name],region,compute=True,return_as_3D=False,),)elifinput_type=="label":wf.set(input_name,load_region(input_label_arrays[input_name],region,compute=True,return_as_3D=False,),)# Get outputsoutputs=wf.get(list_outputs)# Iterate first over dataframe outputs (to use the correct# max_label_for_relabeling, if needed)forind_output,output_nameinenumerate(list_outputs):ifoutput_specs[output_name].type!="dataframe":continuedf=outputs[ind_output]ifrelabeling:df["label"]+=max_label_for_relabelinglogger.info(f'ROI {i_ROI+1}/{num_ROIs}: Relabeling "{name}" dataframe'"output, with {max_label_for_relabeling=}")# Append the new-ROI dataframe to the all-ROIs listoutput_dataframe_lists[output_name].append(df)# After all dataframe outputs, iterate over label outputs (which# actually can be only 0 or 1)forind_output,output_nameinenumerate(list_outputs):ifoutput_specs[output_name].type!="label":continuemask=outputs[ind_output]# Check dimensionsiflen(mask.shape)!=expected_dimensions:msg=(f"Output {output_name} has shape {mask.shape} "f"but {expected_dimensions=}")logger.error(msg)raiseValueError(msg)elifexpected_dimensions==2:mask=np.expand_dims(mask,axis=0)# Sanity check: issue warning for non-consecutive labelsunique_labels=np.unique(mask)num_unique_labels_in_this_ROI=len(unique_labels)ifnp.min(unique_labels)==0:num_unique_labels_in_this_ROI-=1num_labels_in_this_ROI=int(np.max(mask))ifnum_labels_in_this_ROI!=num_unique_labels_in_this_ROI:logger.warning(f'ROI {i_ROI+1}/{num_ROIs}: "{name}" label output has'f"non-consecutive labels: {num_labels_in_this_ROI=} but"f"{num_unique_labels_in_this_ROI=}")ifrelabeling:mask[mask>0]+=max_label_for_relabelinglogger.info(f'ROI {i_ROI+1}/{num_ROIs}: Relabeling "{name}" label 'f"output, with {max_label_for_relabeling=}")max_label_for_relabeling+=num_labels_in_this_ROIlogger.info(f"ROI {i_ROI+1}/{num_ROIs}: label-number update with "f"{num_labels_in_this_ROI=}; "f"new {max_label_for_relabeling=}")da.array(mask).to_zarr(url=output_label_zarr_groups[output_name],region=region,compute=True,overwrite=overwrite,)logger.info(f"ROI {i_ROI+1}/{num_ROIs}: output handling complete")# Output handling: "dataframe" type (for each output, concatenate ROI# dataframes, clean up, and store in a AnnData table on-disk)forname,out_paramsindataframe_outputs:table_name=out_params.table_name# Concatenate all FOV dataframeslist_dfs=output_dataframe_lists[name]iflen(list_dfs)==0:measurement_table=ad.AnnData()else:df_well=pd.concat(list_dfs,axis=0,ignore_index=True)# Extract labels and drop them from df_welllabels=pd.DataFrame(df_well["label"].astype(str))df_well.drop(labels=["label"],axis=1,inplace=True)# Convert all to float (warning: some would be int, in principle)measurement_dtype=np.float32df_well=df_well.astype(measurement_dtype)df_well.index=df_well.index.map(str)# Convert to anndatameasurement_table=ad.AnnData(df_well,dtype=measurement_dtype)measurement_table.obs=labels# Write to zarr groupimage_group=zarr.group(zarr_url)table_attrs=dict(type="feature_table",region=dict(path=f"../labels/{out_params.label_name}"),instance_key="label",)write_table(image_group,table_name,measurement_table,overwrite=overwrite,table_attrs=table_attrs,)# Output handling: "label" type (for each output, build and write to disk# pyramid of coarser levels)forname,out_paramsinlabel_outputs:label_name=out_params.label_namebuild_pyramid(zarrurl=f"{zarr_url}/labels/{label_name}",overwrite=overwrite,num_levels=num_levels,coarsening_xy=coarsening_xy,chunksize=label_chunksize,aggregation_function=np.max,)