Skip to content

base_runner

TASK_TYPES_MULTISUBMIT = [TaskType.COMPOUND, TaskType.CONVERTER_COMPOUND, TaskType.PARALLEL] module-attribute

List of valid task types for BaseRunner.multisubmit.

TASK_TYPES_SUBMIT = [TaskType.COMPOUND, TaskType.CONVERTER_COMPOUND, TaskType.NON_PARALLEL, TaskType.CONVERTER_NON_PARALLEL] module-attribute

List of valid task types for BaseRunner.submit.

BaseRunner

Base class for Fractal runners.

Source code in fractal_server/runner/executors/base_runner.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class BaseRunner:
    """
    Base class for Fractal runners.
    """

    shared_config: JobRunnerConfigLocal | JobRunnerConfigSLURM

    executor_error_log: str | None = None

    def submit(
        self,
        *,
        base_command: str,
        workflow_task_order: int,
        workflow_task_id: int,
        task_name: str,
        parameters: dict[str, Any],
        history_unit_id: int,
        task_type: SubmitTaskType,
        task_files: TaskFiles,
        user_id: int,
        config: Any,
    ) -> tuple[Any, BaseException | None]:
        """
        Run a single fractal task.

        Args:
            base_command:
            workflow_task_order:
            workflow_task_id:
            task_name:
            parameters: Dictionary of parameters.
            history_unit_id:
                Database ID of the corresponding `HistoryUnit` entry.
            task_type: Task type.
            task_files: `TaskFiles` object.
            config: Runner-specific parameters.
            user_id:
        """
        raise NotImplementedError()

    def multisubmit(
        self,
        *,
        base_command: str,
        workflow_task_order: int,
        workflow_task_id: int,
        task_name: str,
        list_parameters: list[dict],
        history_unit_ids: list[int],
        list_task_files: list[TaskFiles],
        task_type: MultisubmitTaskType,
        config: Any,
        user_id: int,
    ) -> tuple[dict[int, Any], dict[int, BaseException]]:
        """
        Run a parallel fractal task.

        Note: `list_parameters`, `list_task_files` and `history_unit_ids`
        have the same size. For parallel tasks, this is also the number of
        input images, while for compound tasks these can differ.

        Args:
            base_command:
            workflow_task_order:
            workflow_task_id:
            task_name:
            list_parameters:
                List of dictionaries of parameters (each one must include
                `zarr_urls` key).
            history_unit_ids:
                Database IDs of the corresponding `HistoryUnit` entries.
            list_task_files: `TaskFiles` objects.
            task_type: Task type.
            config: Runner-specific parameters.
            user_id:
        """
        raise NotImplementedError()

    def validate_submit_parameters(
        self,
        parameters: dict[str, Any],
        task_type: SubmitTaskType | MultisubmitTaskType,
    ) -> None:
        """
        Validate parameters for `submit` method

        Args:
            parameters: Parameters dictionary.
            task_type: Task type.
        """
        logger.info("[validate_submit_parameters] START")
        if task_type not in TASK_TYPES_SUBMIT:
            raise ValueError(f"Invalid {task_type=} for `submit`.")
        if not isinstance(parameters, dict):
            raise ValueError("`parameters` must be a dictionary.")
        if task_type in [
            TaskType.NON_PARALLEL,
            TaskType.COMPOUND,
        ]:
            if "zarr_urls" not in parameters.keys():
                raise ValueError(
                    f"No 'zarr_urls' key in in {list(parameters.keys())}"
                )
        elif task_type in [
            TaskType.CONVERTER_NON_PARALLEL,
            TaskType.CONVERTER_COMPOUND,
        ]:
            if "zarr_urls" in parameters.keys():
                raise ValueError(
                    f"Forbidden 'zarr_urls' key in {list(parameters.keys())}"
                )
        logger.info("[validate_submit_parameters] END")

    def validate_multisubmit_parameters(
        self,
        *,
        task_type: MultisubmitTaskType,
        list_parameters: list[dict[str, Any]],
        list_task_files: list[TaskFiles],
        history_unit_ids: list[int],
    ) -> None:
        """
        Validate parameters for `multisubmit` method

        Args:
            task_type: Task type.
            list_parameters: List of parameters dictionaries.
            list_task_files:
            history_unit_ids:
        """
        if task_type not in TASK_TYPES_MULTISUBMIT:
            raise ValueError(f"Invalid {task_type=} for `multisubmit`.")

        if not isinstance(list_parameters, list):
            raise ValueError("`parameters` must be a list.")

        if len(list_parameters) != len(list_task_files):
            raise ValueError(
                f"{len(list_task_files)=} differs from "
                f"{len(list_parameters)=}."
            )
        if len(history_unit_ids) != len(list_parameters):
            raise ValueError(
                f"{len(history_unit_ids)=} differs from "
                f"{len(list_parameters)=}."
            )

        subfolders = {
            task_file.wftask_subfolder_local for task_file in list_task_files
        }
        if len(subfolders) != 1:
            raise ValueError(f"More than one subfolders: {subfolders}.")

        for single_kwargs in list_parameters:
            if not isinstance(single_kwargs, dict):
                raise ValueError("kwargs itemt must be a dictionary.")
            if "zarr_url" not in single_kwargs.keys():
                raise ValueError(
                    f"No 'zarr_url' key in in {list(single_kwargs.keys())}"
                )
        if task_type == TaskType.PARALLEL:
            zarr_urls = [kwargs["zarr_url"] for kwargs in list_parameters]
            if len(zarr_urls) != len(set(zarr_urls)):
                raise ValueError("Non-unique zarr_urls")

multisubmit(*, base_command, workflow_task_order, workflow_task_id, task_name, list_parameters, history_unit_ids, list_task_files, task_type, config, user_id)

Run a parallel fractal task.

Note: list_parameters, list_task_files and history_unit_ids have the same size. For parallel tasks, this is also the number of input images, while for compound tasks these can differ.

PARAMETER DESCRIPTION
base_command

TYPE: str

workflow_task_order

TYPE: int

workflow_task_id

TYPE: int

task_name

TYPE: str

list_parameters

List of dictionaries of parameters (each one must include zarr_urls key).

TYPE: list[dict]

history_unit_ids

Database IDs of the corresponding HistoryUnit entries.

TYPE: list[int]

list_task_files

TaskFiles objects.

TYPE: list[TaskFiles]

task_type

Task type.

TYPE: MultisubmitTaskType

config

Runner-specific parameters.

TYPE: Any

user_id

TYPE: int

Source code in fractal_server/runner/executors/base_runner.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def multisubmit(
    self,
    *,
    base_command: str,
    workflow_task_order: int,
    workflow_task_id: int,
    task_name: str,
    list_parameters: list[dict],
    history_unit_ids: list[int],
    list_task_files: list[TaskFiles],
    task_type: MultisubmitTaskType,
    config: Any,
    user_id: int,
) -> tuple[dict[int, Any], dict[int, BaseException]]:
    """
    Run a parallel fractal task.

    Note: `list_parameters`, `list_task_files` and `history_unit_ids`
    have the same size. For parallel tasks, this is also the number of
    input images, while for compound tasks these can differ.

    Args:
        base_command:
        workflow_task_order:
        workflow_task_id:
        task_name:
        list_parameters:
            List of dictionaries of parameters (each one must include
            `zarr_urls` key).
        history_unit_ids:
            Database IDs of the corresponding `HistoryUnit` entries.
        list_task_files: `TaskFiles` objects.
        task_type: Task type.
        config: Runner-specific parameters.
        user_id:
    """
    raise NotImplementedError()

submit(*, base_command, workflow_task_order, workflow_task_id, task_name, parameters, history_unit_id, task_type, task_files, user_id, config)

Run a single fractal task.

PARAMETER DESCRIPTION
base_command

TYPE: str

workflow_task_order

TYPE: int

workflow_task_id

TYPE: int

task_name

TYPE: str

parameters

Dictionary of parameters.

TYPE: dict[str, Any]

history_unit_id

Database ID of the corresponding HistoryUnit entry.

TYPE: int

task_type

Task type.

TYPE: SubmitTaskType

task_files

TaskFiles object.

TYPE: TaskFiles

config

Runner-specific parameters.

TYPE: Any

user_id

TYPE: int

Source code in fractal_server/runner/executors/base_runner.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def submit(
    self,
    *,
    base_command: str,
    workflow_task_order: int,
    workflow_task_id: int,
    task_name: str,
    parameters: dict[str, Any],
    history_unit_id: int,
    task_type: SubmitTaskType,
    task_files: TaskFiles,
    user_id: int,
    config: Any,
) -> tuple[Any, BaseException | None]:
    """
    Run a single fractal task.

    Args:
        base_command:
        workflow_task_order:
        workflow_task_id:
        task_name:
        parameters: Dictionary of parameters.
        history_unit_id:
            Database ID of the corresponding `HistoryUnit` entry.
        task_type: Task type.
        task_files: `TaskFiles` object.
        config: Runner-specific parameters.
        user_id:
    """
    raise NotImplementedError()

validate_multisubmit_parameters(*, task_type, list_parameters, list_task_files, history_unit_ids)

Validate parameters for multisubmit method

PARAMETER DESCRIPTION
task_type

Task type.

TYPE: MultisubmitTaskType

list_parameters

List of parameters dictionaries.

TYPE: list[dict[str, Any]]

list_task_files

TYPE: list[TaskFiles]

history_unit_ids

TYPE: list[int]

Source code in fractal_server/runner/executors/base_runner.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def validate_multisubmit_parameters(
    self,
    *,
    task_type: MultisubmitTaskType,
    list_parameters: list[dict[str, Any]],
    list_task_files: list[TaskFiles],
    history_unit_ids: list[int],
) -> None:
    """
    Validate parameters for `multisubmit` method

    Args:
        task_type: Task type.
        list_parameters: List of parameters dictionaries.
        list_task_files:
        history_unit_ids:
    """
    if task_type not in TASK_TYPES_MULTISUBMIT:
        raise ValueError(f"Invalid {task_type=} for `multisubmit`.")

    if not isinstance(list_parameters, list):
        raise ValueError("`parameters` must be a list.")

    if len(list_parameters) != len(list_task_files):
        raise ValueError(
            f"{len(list_task_files)=} differs from "
            f"{len(list_parameters)=}."
        )
    if len(history_unit_ids) != len(list_parameters):
        raise ValueError(
            f"{len(history_unit_ids)=} differs from "
            f"{len(list_parameters)=}."
        )

    subfolders = {
        task_file.wftask_subfolder_local for task_file in list_task_files
    }
    if len(subfolders) != 1:
        raise ValueError(f"More than one subfolders: {subfolders}.")

    for single_kwargs in list_parameters:
        if not isinstance(single_kwargs, dict):
            raise ValueError("kwargs itemt must be a dictionary.")
        if "zarr_url" not in single_kwargs.keys():
            raise ValueError(
                f"No 'zarr_url' key in in {list(single_kwargs.keys())}"
            )
    if task_type == TaskType.PARALLEL:
        zarr_urls = [kwargs["zarr_url"] for kwargs in list_parameters]
        if len(zarr_urls) != len(set(zarr_urls)):
            raise ValueError("Non-unique zarr_urls")

validate_submit_parameters(parameters, task_type)

Validate parameters for submit method

PARAMETER DESCRIPTION
parameters

Parameters dictionary.

TYPE: dict[str, Any]

task_type

Task type.

TYPE: SubmitTaskType | MultisubmitTaskType

Source code in fractal_server/runner/executors/base_runner.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def validate_submit_parameters(
    self,
    parameters: dict[str, Any],
    task_type: SubmitTaskType | MultisubmitTaskType,
) -> None:
    """
    Validate parameters for `submit` method

    Args:
        parameters: Parameters dictionary.
        task_type: Task type.
    """
    logger.info("[validate_submit_parameters] START")
    if task_type not in TASK_TYPES_SUBMIT:
        raise ValueError(f"Invalid {task_type=} for `submit`.")
    if not isinstance(parameters, dict):
        raise ValueError("`parameters` must be a dictionary.")
    if task_type in [
        TaskType.NON_PARALLEL,
        TaskType.COMPOUND,
    ]:
        if "zarr_urls" not in parameters.keys():
            raise ValueError(
                f"No 'zarr_urls' key in in {list(parameters.keys())}"
            )
    elif task_type in [
        TaskType.CONVERTER_NON_PARALLEL,
        TaskType.CONVERTER_COMPOUND,
    ]:
        if "zarr_urls" in parameters.keys():
            raise ValueError(
                f"Forbidden 'zarr_urls' key in {list(parameters.keys())}"
            )
    logger.info("[validate_submit_parameters] END")

MultisubmitTaskType

Bases: StrEnum

Valid task types for BaseRunner.multisubmit.

ATTRIBUTE DESCRIPTION
PARALLEL

Parallel task.

COMPOUND

Compound task.

CONVERTER_COMPOUND

Compound converter task.

Source code in fractal_server/runner/executors/base_runner.py
28
29
30
31
32
33
34
35
36
37
38
39
40
class MultisubmitTaskType(StrEnum):
    """
    Valid task types for `BaseRunner.multisubmit`.

    Attributes:
        PARALLEL: Parallel task.
        COMPOUND: Compound task.
        CONVERTER_COMPOUND: Compound converter task.
    """

    PARALLEL = TaskType.PARALLEL
    COMPOUND = TaskType.COMPOUND
    CONVERTER_COMPOUND = TaskType.CONVERTER_COMPOUND

SubmitTaskType

Bases: StrEnum

Valid task types for BaseRunner.submit.

ATTRIBUTE DESCRIPTION
NON_PARALLEL

Non-parallel task.

COMPOUND

Compound task.

CONVERTER_NON_PARALLEL

Non-parallel converter task.

CONVERTER_COMPOUND

Compound converter task.

Source code in fractal_server/runner/executors/base_runner.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class SubmitTaskType(StrEnum):
    """
    Valid task types for `BaseRunner.submit`.

    Attributes:
        NON_PARALLEL: Non-parallel task.
        COMPOUND: Compound task.
        CONVERTER_NON_PARALLEL: Non-parallel converter task.
        CONVERTER_COMPOUND: Compound converter task.
    """

    NON_PARALLEL = TaskType.NON_PARALLEL
    COMPOUND = TaskType.COMPOUND
    CONVERTER_NON_PARALLEL = TaskType.CONVERTER_NON_PARALLEL
    CONVERTER_COMPOUND = TaskType.CONVERTER_COMPOUND