Skip to content

db_tools

bulk_upsert_image_cache_fast(*, list_upsert_objects, db)

Insert or update many objects into HistoryImageCache and commit

This function is an optimized version of

for obj in list_upsert_objects:
    db.merge(**obj)
db.commit()

See docs at https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#insert-on-conflict-upsert

NOTE: we tried to replace index_elements with constraint="pk_historyimagecache", but it did not work as expected.

Parameters:

Name Type Description Default
list_upsert_objects list[dict[str, Any]]

List of dictionaries for objects to be upsert-ed.

required
db Session

A sync database session

required
Source code in fractal_server/app/runner/v2/db_tools.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def bulk_upsert_image_cache_fast(
    *,
    list_upsert_objects: list[dict[str, Any]],
    db: Session,
) -> None:
    """
    Insert or update many objects into `HistoryImageCache` and commit

    This function is an optimized version of

    ```python
    for obj in list_upsert_objects:
        db.merge(**obj)
    db.commit()
    ```

    See docs at
    https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#insert-on-conflict-upsert

    NOTE: we tried to replace `index_elements` with
    `constraint="pk_historyimagecache"`, but it did not work as expected.

    Arguments:
        list_upsert_objects:
            List of dictionaries for objects to be upsert-ed.
        db: A sync database session
    """
    len_list_upsert_objects = len(list_upsert_objects)

    logger.debug(f"[bulk_upsert_image_cache_fast] {len_list_upsert_objects=}.")

    if len_list_upsert_objects == 0:
        return None

    for ind in range(0, len_list_upsert_objects, _CHUNK_SIZE):
        stmt = pg_insert(HistoryImageCache).values(
            list_upsert_objects[ind : ind + _CHUNK_SIZE]
        )
        stmt = stmt.on_conflict_do_update(
            index_elements=[
                HistoryImageCache.zarr_url,
                HistoryImageCache.dataset_id,
                HistoryImageCache.workflowtask_id,
            ],
            set_=dict(
                latest_history_unit_id=stmt.excluded.latest_history_unit_id
            ),
        )
        db.execute(stmt)
        db.commit()