Skip to content

base_runner

BaseRunner

Bases: object

Base class for Fractal runners.

Source code in fractal_server/app/runner/executors/base_runner.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class BaseRunner(object):
    """
    Base class for Fractal runners.
    """

    def submit(
        self,
        func: callable,
        parameters: dict[str, Any],
        history_unit_id: int,
        task_type: TaskTypeType,
        task_files: TaskFiles,
        config: Any,
        user_id: int,
    ) -> tuple[Any, BaseException]:
        """
        Run a single fractal task.

        Args:
            func: Function to be executed.
            parameters: Dictionary of parameters.
            history_unit_id:
                Database ID of the corresponding `HistoryUnit` entry.
            task_type: Task type.
            task_files: `TaskFiles` object.
            config: Runner-specific parameters.
            user_id:
        """
        raise NotImplementedError()

    def multisubmit(
        self,
        func: callable,
        list_parameters: list[dict[str, Any]],
        history_unit_ids: list[int],
        list_task_files: list[TaskFiles],
        task_type: TaskTypeType,
        config: Any,
        user_id: int,
    ) -> tuple[dict[int, Any], dict[int, BaseException]]:
        """
        Run a parallel fractal task.

        Args:
            func: Function to be executed.
            parameters:
                Dictionary of parameters. Must include `zarr_urls` key.
            history_unit_ids:
                Database IDs of the corresponding `HistoryUnit` entries.
            task_type: Task type.
            task_files: `TaskFiles` object.
            config: Runner-specific parameters.
            user_id
        """
        raise NotImplementedError()

    def validate_submit_parameters(
        self,
        parameters: dict[str, Any],
        task_type: TaskTypeType,
    ) -> None:
        """
        Validate parameters for `submit` method

        Args:
            parameters: Parameters dictionary.
            task_type: Task type.s
        """
        logger.info("[validate_submit_parameters] START")
        if task_type not in TASK_TYPES_SUBMIT:
            raise ValueError(f"Invalid {task_type=} for `submit`.")
        if not isinstance(parameters, dict):
            raise ValueError("`parameters` must be a dictionary.")
        if task_type in ["non_parallel", "compound"]:
            if "zarr_urls" not in parameters.keys():
                raise ValueError(
                    f"No 'zarr_urls' key in in {list(parameters.keys())}"
                )
        elif task_type in ["converter_non_parallel", "converter_compound"]:
            if "zarr_urls" in parameters.keys():
                raise ValueError(
                    f"Forbidden 'zarr_urls' key in {list(parameters.keys())}"
                )
        logger.info("[validate_submit_parameters] END")

    def validate_multisubmit_parameters(
        self,
        *,
        task_type: TaskTypeType,
        list_parameters: list[dict[str, Any]],
        list_task_files: list[TaskFiles],
        history_unit_ids: list[int],
    ) -> None:
        """
        Validate parameters for `multisubmit` method

        Args:
            task_type: Task type.
            list_parameters: List of parameters dictionaries.
            list_task_files:
            history_unit_ids:
        """
        if task_type not in TASK_TYPES_MULTISUBMIT:
            raise ValueError(f"Invalid {task_type=} for `multisubmit`.")

        if not isinstance(list_parameters, list):
            raise ValueError("`parameters` must be a list.")

        if len(list_parameters) != len(list_task_files):
            raise ValueError(
                f"{len(list_task_files)=} differs from "
                f"{len(list_parameters)=}."
            )
        if len(history_unit_ids) != len(list_parameters):
            raise ValueError(
                f"{len(history_unit_ids)=} differs from "
                f"{len(list_parameters)=}."
            )

        subfolders = set(
            task_file.wftask_subfolder_local for task_file in list_task_files
        )
        if len(subfolders) != 1:
            raise ValueError(f"More than one subfolders: {subfolders}.")

        for single_kwargs in list_parameters:
            if not isinstance(single_kwargs, dict):
                raise ValueError("kwargs itemt must be a dictionary.")
            if "zarr_url" not in single_kwargs.keys():
                raise ValueError(
                    f"No 'zarr_url' key in in {list(single_kwargs.keys())}"
                )
        if task_type == "parallel":
            zarr_urls = [kwargs["zarr_url"] for kwargs in list_parameters]
            if len(zarr_urls) != len(set(zarr_urls)):
                raise ValueError("Non-unique zarr_urls")

multisubmit(func, list_parameters, history_unit_ids, list_task_files, task_type, config, user_id)

Run a parallel fractal task.

Parameters:

Name Type Description Default
func callable

Function to be executed.

required
parameters

Dictionary of parameters. Must include zarr_urls key.

required
history_unit_ids list[int]

Database IDs of the corresponding HistoryUnit entries.

required
task_type TaskTypeType

Task type.

required
task_files

TaskFiles object.

required
config Any

Runner-specific parameters.

required
Source code in fractal_server/app/runner/executors/base_runner.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def multisubmit(
    self,
    func: callable,
    list_parameters: list[dict[str, Any]],
    history_unit_ids: list[int],
    list_task_files: list[TaskFiles],
    task_type: TaskTypeType,
    config: Any,
    user_id: int,
) -> tuple[dict[int, Any], dict[int, BaseException]]:
    """
    Run a parallel fractal task.

    Args:
        func: Function to be executed.
        parameters:
            Dictionary of parameters. Must include `zarr_urls` key.
        history_unit_ids:
            Database IDs of the corresponding `HistoryUnit` entries.
        task_type: Task type.
        task_files: `TaskFiles` object.
        config: Runner-specific parameters.
        user_id
    """
    raise NotImplementedError()

submit(func, parameters, history_unit_id, task_type, task_files, config, user_id)

Run a single fractal task.

Parameters:

Name Type Description Default
func callable

Function to be executed.

required
parameters dict[str, Any]

Dictionary of parameters.

required
history_unit_id int

Database ID of the corresponding HistoryUnit entry.

required
task_type TaskTypeType

Task type.

required
task_files TaskFiles

TaskFiles object.

required
config Any

Runner-specific parameters.

required
user_id int
required
Source code in fractal_server/app/runner/executors/base_runner.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def submit(
    self,
    func: callable,
    parameters: dict[str, Any],
    history_unit_id: int,
    task_type: TaskTypeType,
    task_files: TaskFiles,
    config: Any,
    user_id: int,
) -> tuple[Any, BaseException]:
    """
    Run a single fractal task.

    Args:
        func: Function to be executed.
        parameters: Dictionary of parameters.
        history_unit_id:
            Database ID of the corresponding `HistoryUnit` entry.
        task_type: Task type.
        task_files: `TaskFiles` object.
        config: Runner-specific parameters.
        user_id:
    """
    raise NotImplementedError()

validate_multisubmit_parameters(*, task_type, list_parameters, list_task_files, history_unit_ids)

Validate parameters for multisubmit method

Parameters:

Name Type Description Default
task_type TaskTypeType

Task type.

required
list_parameters list[dict[str, Any]]

List of parameters dictionaries.

required
list_task_files list[TaskFiles]
required
history_unit_ids list[int]
required
Source code in fractal_server/app/runner/executors/base_runner.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def validate_multisubmit_parameters(
    self,
    *,
    task_type: TaskTypeType,
    list_parameters: list[dict[str, Any]],
    list_task_files: list[TaskFiles],
    history_unit_ids: list[int],
) -> None:
    """
    Validate parameters for `multisubmit` method

    Args:
        task_type: Task type.
        list_parameters: List of parameters dictionaries.
        list_task_files:
        history_unit_ids:
    """
    if task_type not in TASK_TYPES_MULTISUBMIT:
        raise ValueError(f"Invalid {task_type=} for `multisubmit`.")

    if not isinstance(list_parameters, list):
        raise ValueError("`parameters` must be a list.")

    if len(list_parameters) != len(list_task_files):
        raise ValueError(
            f"{len(list_task_files)=} differs from "
            f"{len(list_parameters)=}."
        )
    if len(history_unit_ids) != len(list_parameters):
        raise ValueError(
            f"{len(history_unit_ids)=} differs from "
            f"{len(list_parameters)=}."
        )

    subfolders = set(
        task_file.wftask_subfolder_local for task_file in list_task_files
    )
    if len(subfolders) != 1:
        raise ValueError(f"More than one subfolders: {subfolders}.")

    for single_kwargs in list_parameters:
        if not isinstance(single_kwargs, dict):
            raise ValueError("kwargs itemt must be a dictionary.")
        if "zarr_url" not in single_kwargs.keys():
            raise ValueError(
                f"No 'zarr_url' key in in {list(single_kwargs.keys())}"
            )
    if task_type == "parallel":
        zarr_urls = [kwargs["zarr_url"] for kwargs in list_parameters]
        if len(zarr_urls) != len(set(zarr_urls)):
            raise ValueError("Non-unique zarr_urls")

validate_submit_parameters(parameters, task_type)

Validate parameters for submit method

Parameters:

Name Type Description Default
parameters dict[str, Any]

Parameters dictionary.

required
task_type TaskTypeType

Task type.s

required
Source code in fractal_server/app/runner/executors/base_runner.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def validate_submit_parameters(
    self,
    parameters: dict[str, Any],
    task_type: TaskTypeType,
) -> None:
    """
    Validate parameters for `submit` method

    Args:
        parameters: Parameters dictionary.
        task_type: Task type.s
    """
    logger.info("[validate_submit_parameters] START")
    if task_type not in TASK_TYPES_SUBMIT:
        raise ValueError(f"Invalid {task_type=} for `submit`.")
    if not isinstance(parameters, dict):
        raise ValueError("`parameters` must be a dictionary.")
    if task_type in ["non_parallel", "compound"]:
        if "zarr_urls" not in parameters.keys():
            raise ValueError(
                f"No 'zarr_urls' key in in {list(parameters.keys())}"
            )
    elif task_type in ["converter_non_parallel", "converter_compound"]:
        if "zarr_urls" in parameters.keys():
            raise ValueError(
                f"Forbidden 'zarr_urls' key in {list(parameters.keys())}"
            )
    logger.info("[validate_submit_parameters] END")