Skip to content

handle_failed_job

Helper functions to handle Dataset history.

mark_last_wftask_as_failed(dataset_id, logger_name)

Edit dataset history, by marking last item as failed.

Parameters:

Name Type Description Default
dataset

The DatasetV2 object

required
logger_name str

A logger name.

required
Source code in fractal_server/app/runner/v2/handle_failed_job.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def mark_last_wftask_as_failed(
    dataset_id: int,
    logger_name: str,
) -> None:
    """
    Edit dataset history, by marking last item as failed.

    Args:
        dataset: The `DatasetV2` object
        logger_name: A logger name.
    """

    logger = logging.getLogger(logger_name)
    with next(get_sync_db()) as db:
        db_dataset = db.get(DatasetV2, dataset_id)
        if len(db_dataset.history) == 0:
            logger.warning(
                f"History for {dataset_id=} is empty. Likely reason: the job "
                "failed before its first task was marked as SUBMITTED. "
                "Continue."
            )
            return
        workflowtask_id = db_dataset.history[-1]["workflowtask"]["id"]
        last_item_status = db_dataset.history[-1]["status"]
        if last_item_status != WorkflowTaskStatusTypeV2.SUBMITTED:
            logger.warning(
                "Unexpected branch: "
                f"Last history item, for {workflowtask_id=}, "
                f"has status {last_item_status}. Skip."
            )
            return
        logger.info(f"Setting history item for {workflowtask_id=} to failed.")
        db_dataset.history[-1]["status"] = WorkflowTaskStatusTypeV2.FAILED
        flag_modified(db_dataset, "history")
        db.merge(db_dataset)
        db.commit()