Skip to content

handle_failed_job

Helper functions to handle Dataset history.

assemble_filters_failed_job(job)

Assemble DatasetV2.filters for a failed workflow-execution.

Assemble new value of filters based on the last successful task, i.e. based on the content of the temporary FILTERS_FILENAME file. If the file is missing, return None.

Argumentss

job: The failed JobV2 object.

Returns:

Type Description
Optional[dict[str, Any]]

The new value of dataset.filters, or None if FILTERS_FILENAME

Optional[dict[str, Any]]

is missing.

Source code in fractal_server/app/runner/v2/handle_failed_job.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def assemble_filters_failed_job(job: JobV2) -> Optional[dict[str, Any]]:
    """
    Assemble `DatasetV2.filters` for a failed workflow-execution.

    Assemble new value of `filters` based on the last successful task, i.e.
    based on the content of the temporary `FILTERS_FILENAME` file. If the file
    is missing, return `None`.

    Argumentss:
        job:
            The failed `JobV2` object.

    Returns:
        The new value of `dataset.filters`, or `None` if `FILTERS_FILENAME`
        is missing.
    """
    tmp_file = Path(job.working_dir) / FILTERS_FILENAME
    try:
        with tmp_file.open("r") as f:
            new_filters = json.load(f)
        return new_filters
    except FileNotFoundError:
        return None

assemble_history_failed_job(job, dataset, workflow, logger_name=None, failed_wftask=None)

Assemble history after a workflow-execution job fails.

Parameters:

Name Type Description Default
job JobV2

The failed JobV2 object.

required
dataset DatasetV2

The DatasetV2 object associated to job.

required
workflow WorkflowV2

The WorkflowV2 object associated to job.

required
logger_name Optional[str]

A logger name.

None
failed_wftask Optional[WorkflowTaskV2]

If set, append it to history during step 3; if None, infer it by comparing the job task list and the one in HISTORY_FILENAME.

None

Returns:

Type Description
list[dict[str, Any]]

The new value of history, to be merged into

list[dict[str, Any]]

dataset.meta.

Source code in fractal_server/app/runner/v2/handle_failed_job.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def assemble_history_failed_job(
    job: JobV2,
    dataset: DatasetV2,
    workflow: WorkflowV2,
    logger_name: Optional[str] = None,
    failed_wftask: Optional[WorkflowTaskV2] = None,
) -> list[dict[str, Any]]:
    """
    Assemble `history` after a workflow-execution job fails.

    Args:
        job:
            The failed `JobV2` object.
        dataset:
            The `DatasetV2` object associated to `job`.
        workflow:
            The `WorkflowV2` object associated to `job`.
        logger_name: A logger name.
        failed_wftask:
            If set, append it to `history` during step 3; if `None`, infer
            it by comparing the job task list and the one in
            `HISTORY_FILENAME`.

    Returns:
        The new value of `history`, to be merged into
        `dataset.meta`.
    """

    logger = logging.getLogger(logger_name)

    # The final value of the history attribute should include up to three
    # parts, coming from: the database, the temporary file, the failed-task
    # information.

    # Part 1: Read exising history from DB
    new_history = dataset.history

    # Part 2: Extend history based on temporary-file contents
    tmp_history_file = Path(job.working_dir) / HISTORY_FILENAME
    try:
        with tmp_history_file.open("r") as f:
            tmp_file_history = json.load(f)
            new_history.extend(tmp_file_history)
    except FileNotFoundError:
        tmp_file_history = []

    # Part 3/A: Identify failed task, if needed
    if failed_wftask is None:
        job_wftasks = workflow.task_list[
            job.first_task_index : (job.last_task_index + 1)  # noqa
        ]
        tmp_file_wftasks = [
            history_item["workflowtask"] for history_item in tmp_file_history
        ]
        if len(job_wftasks) <= len(tmp_file_wftasks):
            n_tasks_job = len(job_wftasks)
            n_tasks_tmp = len(tmp_file_wftasks)
            logger.error(
                "Cannot identify the failed task based on job task list "
                f"(length {n_tasks_job}) and temporary-file task list "
                f"(length {n_tasks_tmp})."
            )
            logger.error("Failed task not appended to history.")
        else:
            failed_wftask = job_wftasks[len(tmp_file_wftasks)]

    # Part 3/B: Append failed task to history
    if failed_wftask is not None:
        failed_wftask_dump = failed_wftask.model_dump(exclude={"task"})
        failed_wftask_dump["task"] = failed_wftask.task.model_dump()
        new_history_item = dict(
            workflowtask=failed_wftask_dump,
            status=WorkflowTaskStatusTypeV2.FAILED,
            parallelization=dict(),  # FIXME: re-include parallelization
        )
        new_history.append(new_history_item)

    return new_history

assemble_images_failed_job(job)

Assemble DatasetV2.images for a failed workflow-execution.

Assemble new value of images based on the last successful task, i.e. based on the content of the temporary IMAGES_FILENAME file. If the file is missing, return None.

Argumentss

job: The failed JobV2 object.

Returns:

Type Description
Optional[dict[str, Any]]

The new value of dataset.images, or None if IMAGES_FILENAME

Optional[dict[str, Any]]

is missing.

Source code in fractal_server/app/runner/v2/handle_failed_job.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def assemble_images_failed_job(job: JobV2) -> Optional[dict[str, Any]]:
    """
    Assemble `DatasetV2.images` for a failed workflow-execution.

    Assemble new value of `images` based on the last successful task, i.e.
    based on the content of the temporary `IMAGES_FILENAME` file. If the file
    is missing, return `None`.

    Argumentss:
        job:
            The failed `JobV2` object.

    Returns:
        The new value of `dataset.images`, or `None` if `IMAGES_FILENAME`
        is missing.
    """
    tmp_file = Path(job.working_dir) / IMAGES_FILENAME
    try:
        with tmp_file.open("r") as f:
            new_images = json.load(f)
        return new_images
    except FileNotFoundError:
        return None