Skip to content

handle_failed_job

Helper functions to handle Dataset history.

assemble_history_failed_job(job, output_dataset, workflow, logger, failed_wftask=None)

Assemble history after a workflow-execution job fails.

Parameters:

Name Type Description Default
job ApplyWorkflow

The failed ApplyWorkflow object.

required
output_dataset Dataset

The output_dataset associated to job.

required
workflow Workflow

The workflow associated to job.

required
logger Logger

A logger instance.

required
failed_wftask Optional[WorkflowTask]

If set, append it to history during step 3; if None, infer it by comparing the job task list and the one in tmp_metadata_file.

None

Returns:

Type Description
list[dict[str, Any]]

The new value of history, to be merged into

list[dict[str, Any]]

output_dataset.meta.

Source code in fractal_server/app/runner/v1/handle_failed_job.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def assemble_history_failed_job(
    job: ApplyWorkflow,
    output_dataset: Dataset,
    workflow: Workflow,
    logger: logging.Logger,
    failed_wftask: Optional[WorkflowTask] = None,
) -> list[dict[str, Any]]:
    """
    Assemble `history` after a workflow-execution job fails.

    Args:
        job:
            The failed `ApplyWorkflow` object.
        output_dataset:
            The `output_dataset` associated to `job`.
        workflow:
            The `workflow` associated to `job`.
        logger: A logger instance.
        failed_wftask:
            If set, append it to `history` during step 3; if `None`, infer
            it by comparing the job task list and the one in
            `tmp_metadata_file`.

    Returns:
        The new value of `history`, to be merged into
        `output_dataset.meta`.
    """

    # The final value of the history attribute should include up to three
    # parts, coming from: the database, the temporary file, the failed-task
    # information.

    # Part 1: Read exising history from DB
    new_history = output_dataset.history

    # Part 2: Extend history based on tmp_metadata_file
    tmp_history_file = Path(job.working_dir) / HISTORY_FILENAME
    try:
        with tmp_history_file.open("r") as f:
            tmp_file_history = json.load(f)
            new_history.extend(tmp_file_history)
    except FileNotFoundError:
        tmp_file_history = []

    # Part 3/A: Identify failed task, if needed
    if failed_wftask is None:
        job_wftasks = workflow.task_list[
            job.first_task_index : (job.last_task_index + 1)  # noqa
        ]
        tmp_file_wftasks = [
            history_item["workflowtask"] for history_item in tmp_file_history
        ]
        if len(job_wftasks) <= len(tmp_file_wftasks):
            n_tasks_job = len(job_wftasks)
            n_tasks_tmp = len(tmp_file_wftasks)
            logger.error(
                "Cannot identify the failed task based on job task list "
                f"(length {n_tasks_job}) and temporary-file task list "
                f"(length {n_tasks_tmp})."
            )
            logger.error("Failed task not appended to history.")
        else:
            failed_wftask = job_wftasks[len(tmp_file_wftasks)]

    # Part 3/B: Append failed task to history
    if failed_wftask is not None:
        failed_wftask_dump = failed_wftask.model_dump(exclude={"task"})
        failed_wftask_dump["task"] = failed_wftask.task.model_dump()
        new_history_item = dict(
            workflowtask=failed_wftask_dump,
            status=WorkflowTaskStatusTypeV1.FAILED,
            parallelization=dict(
                parallelization_level=failed_wftask.parallelization_level,
            ),
        )
        new_history.append(new_history_item)

    return new_history

assemble_meta_failed_job(job, output_dataset)

Assemble Dataset.meta (history excluded) for a failed workflow-execution.

Assemble new value of output_dataset.meta based on the last successful task, i.e. based on the content of the temporary METADATA_FILENAME file.

Parameters:

Name Type Description Default
job ApplyWorkflow

The failed ApplyWorkflow object.

required
output_dataset Dataset

The output_dataset associated to job.

required

Returns:

Type Description
dict[str, Any]

The new value of output_dataset.meta, apart from its history key.

Source code in fractal_server/app/runner/v1/handle_failed_job.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def assemble_meta_failed_job(
    job: ApplyWorkflow,
    output_dataset: Dataset,
) -> dict[str, Any]:
    """
    Assemble `Dataset.meta` (history excluded) for a failed workflow-execution.

    Assemble new value of `output_dataset.meta` based on the last successful
    task, i.e. based on the content of the temporary `METADATA_FILENAME` file.

    Args:
        job:
            The failed `ApplyWorkflow` object.
        output_dataset:
            The `output_dataset` associated to `job`.

    Returns:
        The new value of `output_dataset.meta`, apart from its `history` key.
    """

    new_meta = deepcopy(output_dataset.meta)
    metadata_file = Path(job.working_dir) / METADATA_FILENAME
    try:
        with metadata_file.open("r") as f:
            metadata_update = json.load(f)
        for key, value in metadata_update.items():
            new_meta[key] = value
    except FileNotFoundError:
        pass

    return new_meta