Skip to content

_slurm_ssh

Slurm Backend

This backend runs fractal workflows in a SLURM cluster using Clusterfutures Executor objects.

process_workflow(*, workflow, dataset, workflow_dir_local, job_id, workflow_dir_remote=None, first_task_index=None, last_task_index=None, logger_name, job_attribute_filters, job_type_filters, fractal_ssh, worker_init=None, user_id, **kwargs)

Process workflow (SLURM backend public interface)

Source code in fractal_server/app/runner/v2/_slurm_ssh.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def process_workflow(
    *,
    workflow: WorkflowV2,
    dataset: DatasetV2,
    workflow_dir_local: Path,
    job_id: int,
    workflow_dir_remote: Optional[Path] = None,
    first_task_index: Optional[int] = None,
    last_task_index: Optional[int] = None,
    logger_name: str,
    job_attribute_filters: AttributeFiltersType,
    job_type_filters: dict[str, bool],
    fractal_ssh: FractalSSH,
    worker_init: Optional[str] = None,
    user_id: int,
    **kwargs,  # not used
) -> None:
    """
    Process workflow (SLURM backend public interface)
    """

    # Set values of first_task_index and last_task_index
    num_tasks = len(workflow.task_list)
    first_task_index, last_task_index = set_start_and_last_task_index(
        num_tasks,
        first_task_index=first_task_index,
        last_task_index=last_task_index,
    )

    if isinstance(worker_init, str):
        worker_init = worker_init.split("\n")

    # Create main remote folder
    try:
        fractal_ssh.mkdir(folder=str(workflow_dir_remote))
        logger.info(f"Created {str(workflow_dir_remote)} via SSH.")
    except Exception as e:
        error_msg = (
            f"Could not create {str(workflow_dir_remote)} via SSH.\n"
            f"Original error: {str(e)}."
        )
        logger.error(error_msg)
        raise JobExecutionError(info=error_msg)

    with SlurmSSHRunner(
        fractal_ssh=fractal_ssh,
        root_dir_local=workflow_dir_local,
        root_dir_remote=workflow_dir_remote,
        common_script_lines=worker_init,
    ) as runner:
        execute_tasks_v2(
            wf_task_list=workflow.task_list[
                first_task_index : (last_task_index + 1)
            ],
            dataset=dataset,
            job_id=job_id,
            runner=runner,
            workflow_dir_local=workflow_dir_local,
            workflow_dir_remote=workflow_dir_remote,
            logger_name=logger_name,
            get_runner_config=get_slurm_config,
            job_attribute_filters=job_attribute_filters,
            job_type_filters=job_type_filters,
            user_id=user_id,
        )