Skip to content

status_tools

_postprocess_image_lists(target_images, list_query_url_status)

Source code in fractal_server/images/status_tools.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def _postprocess_image_lists(
    target_images: list[dict[str, Any]],
    list_query_url_status: list[tuple[str, str]],
) -> list[dict[str, Any]]:
    """ """
    t_1 = time.perf_counter()

    # Select only processed images that are part of the target image set
    zarr_url_to_image = {img["zarr_url"]: img for img in target_images}
    target_zarr_urls = zarr_url_to_image.keys()
    list_processed_url_status = [
        url_status
        for url_status in list_query_url_status
        if url_status[0] in target_zarr_urls
    ]

    set_processed_urls = set(
        url_status[0] for url_status in list_processed_url_status
    )
    processed_images_with_status = [
        _enriched_image(
            img=zarr_url_to_image[item[0]],
            status=item[1],
        )
        for item in list_processed_url_status
    ]

    non_processed_urls = target_zarr_urls - set_processed_urls
    non_processed_images_with_status = [
        _enriched_image(
            img=zarr_url_to_image[zarr_url],
            status=HistoryUnitStatusWithUnset.UNSET,
        )
        for zarr_url in non_processed_urls
    ]
    t_2 = time.perf_counter()
    logger.debug(
        f"[enrich_images_async] post-processing, elapsed={t_2 - t_1:.5f} s"
    )

    return processed_images_with_status + non_processed_images_with_status

_prepare_query(*, dataset_id, workflowtask_id)

Note: the query does not include .order_by.

Source code in fractal_server/images/status_tools.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def _prepare_query(
    *,
    dataset_id: int,
    workflowtask_id: int,
) -> Select:
    """
    Note: the query does not include `.order_by`.
    """
    stm = (
        select(HistoryImageCache.zarr_url, HistoryUnit.status)
        .join(HistoryUnit)
        .where(HistoryImageCache.dataset_id == dataset_id)
        .where(HistoryImageCache.workflowtask_id == workflowtask_id)
        .where(HistoryImageCache.latest_history_unit_id == HistoryUnit.id)
    )
    return stm

enrich_images_unsorted_async(*, images, dataset_id, workflowtask_id, db) async

Enrich images with a status-related attribute.

Parameters:

Name Type Description Default
images list[dict[str, Any]]

The input image list

required
dataset_id int

The dataset ID

required
workflowtask_id int

The workflow-task ID

required
db AsyncSession

An async db session

required

Returns:

Type Description
list[dict[str, Any]]

The list of enriched images, not necessarily in the same order as

list[dict[str, Any]]

the input.

Source code in fractal_server/images/status_tools.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def enrich_images_unsorted_async(
    *,
    images: list[dict[str, Any]],
    dataset_id: int,
    workflowtask_id: int,
    db: AsyncSession,
) -> list[dict[str, Any]]:
    """
    Enrich images with a status-related attribute.

    Args:
        images: The input image list
        dataset_id: The dataset ID
        workflowtask_id: The workflow-task ID
        db: An async db session

    Returns:
        The list of enriched images, not necessarily in the same order as
        the input.
    """
    t_0 = time.perf_counter()
    logger.info(
        f"[enrich_images_async] START, {dataset_id=}, {workflowtask_id=}"
    )

    # Get `(zarr_url, status)` for _all_ processed images (including those that
    # are not part of the target image set)
    res = await db.execute(
        _prepare_query(
            dataset_id=dataset_id,
            workflowtask_id=workflowtask_id,
        )
    )
    list_query_url_status = res.all()
    t_1 = time.perf_counter()
    logger.debug(f"[enrich_images_async] query, elapsed={t_1 - t_0:.5f} s")

    output = _postprocess_image_lists(
        target_images=images,
        list_query_url_status=list_query_url_status,
    )

    return output

enrich_images_unsorted_sync(*, images, dataset_id, workflowtask_id)

Enrich images with a status-related attribute.

Parameters:

Name Type Description Default
images list[dict[str, Any]]

The input image list

required
dataset_id int

The dataset ID

required
workflowtask_id int

The workflow-task ID

required

Returns:

Type Description
list[dict[str, Any]]

The list of enriched images, not necessarily in the same order as

list[dict[str, Any]]

the input.

Source code in fractal_server/images/status_tools.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def enrich_images_unsorted_sync(
    *,
    images: list[dict[str, Any]],
    dataset_id: int,
    workflowtask_id: int,
) -> list[dict[str, Any]]:
    """
    Enrich images with a status-related attribute.


    Args:
        images: The input image list
        dataset_id: The dataset ID
        workflowtask_id: The workflow-task ID

    Returns:
        The list of enriched images, not necessarily in the same order as
        the input.
    """

    t_0 = time.perf_counter()
    logger.info(
        f"[enrich_images_async] START, {dataset_id=}, {workflowtask_id=}"
    )

    # Get `(zarr_url, status)` for _all_ processed images (including those that
    # are not part of the target image set)
    with next(get_sync_db()) as db:
        res = db.execute(
            _prepare_query(
                dataset_id=dataset_id,
                workflowtask_id=workflowtask_id,
            )
        )
        list_query_url_status = res.all()
    t_1 = time.perf_counter()
    logger.debug(f"[enrich_images_async] query, elapsed={t_1 - t_0:.5f} s")

    output = _postprocess_image_lists(
        target_images=images,
        list_query_url_status=list_query_url_status,
    )

    return output