Skip to content

base_runner

BaseRunner

Base class for Fractal runners.

Source code in fractal_server/app/runner/executors/base_runner.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class BaseRunner:
    """
    Base class for Fractal runners.
    """

    def submit(
        self,
        base_command: str,
        workflow_task_order: int,
        workflow_task_id: int,
        task_name: str,
        parameters: dict[str, Any],
        history_unit_id: int,
        task_type: TaskType,
        task_files: TaskFiles,
        config: Any,
        user_id: int,
    ) -> tuple[Any, BaseException]:
        """
        Run a single fractal task.

        Args:
            base_command:
            workflow_task_order:
            workflow_task_id:
            task_name:
            parameters: Dictionary of parameters.
            history_unit_id:
                Database ID of the corresponding `HistoryUnit` entry.
            task_type: Task type.
            task_files: `TaskFiles` object.
            config: Runner-specific parameters.
            user_id:
        """
        raise NotImplementedError()

    def multisubmit(
        self,
        base_command: str,
        workflow_task_order: int,
        workflow_task_id: int,
        task_name: str,
        list_parameters: list[dict[str, Any]],
        history_unit_ids: list[int],
        list_task_files: list[TaskFiles],
        task_type: TaskType,
        config: Any,
        user_id: int,
    ) -> tuple[dict[int, Any], dict[int, BaseException]]:
        """
        Run a parallel fractal task.

        Args:
            base_command:
            workflow_task_order:
            workflow_task_id:
            task_name:
            parameters:
                Dictionary of parameters. Must include `zarr_urls` key.
            history_unit_ids:
                Database IDs of the corresponding `HistoryUnit` entries.
            task_type: Task type.
            task_files: `TaskFiles` object.
            config: Runner-specific parameters.
            user_id
        """
        raise NotImplementedError()

    def validate_submit_parameters(
        self,
        parameters: dict[str, Any],
        task_type: TaskType,
    ) -> None:
        """
        Validate parameters for `submit` method

        Args:
            parameters: Parameters dictionary.
            task_type: Task type.s
        """
        logger.info("[validate_submit_parameters] START")
        if task_type not in TASK_TYPES_SUBMIT:
            raise ValueError(f"Invalid {task_type=} for `submit`.")
        if not isinstance(parameters, dict):
            raise ValueError("`parameters` must be a dictionary.")
        if task_type in [
            TaskType.NON_PARALLEL,
            TaskType.COMPOUND,
        ]:
            if "zarr_urls" not in parameters.keys():
                raise ValueError(
                    f"No 'zarr_urls' key in in {list(parameters.keys())}"
                )
        elif task_type in [
            TaskType.CONVERTER_NON_PARALLEL,
            TaskType.CONVERTER_COMPOUND,
        ]:
            if "zarr_urls" in parameters.keys():
                raise ValueError(
                    f"Forbidden 'zarr_urls' key in {list(parameters.keys())}"
                )
        logger.info("[validate_submit_parameters] END")

    def validate_multisubmit_parameters(
        self,
        *,
        task_type: TaskType,
        list_parameters: list[dict[str, Any]],
        list_task_files: list[TaskFiles],
        history_unit_ids: list[int],
    ) -> None:
        """
        Validate parameters for `multisubmit` method

        Args:
            task_type: Task type.
            list_parameters: List of parameters dictionaries.
            list_task_files:
            history_unit_ids:
        """
        if task_type not in TASK_TYPES_MULTISUBMIT:
            raise ValueError(f"Invalid {task_type=} for `multisubmit`.")

        if not isinstance(list_parameters, list):
            raise ValueError("`parameters` must be a list.")

        if len(list_parameters) != len(list_task_files):
            raise ValueError(
                f"{len(list_task_files)=} differs from "
                f"{len(list_parameters)=}."
            )
        if len(history_unit_ids) != len(list_parameters):
            raise ValueError(
                f"{len(history_unit_ids)=} differs from "
                f"{len(list_parameters)=}."
            )

        subfolders = {
            task_file.wftask_subfolder_local for task_file in list_task_files
        }
        if len(subfolders) != 1:
            raise ValueError(f"More than one subfolders: {subfolders}.")

        for single_kwargs in list_parameters:
            if not isinstance(single_kwargs, dict):
                raise ValueError("kwargs itemt must be a dictionary.")
            if "zarr_url" not in single_kwargs.keys():
                raise ValueError(
                    f"No 'zarr_url' key in in {list(single_kwargs.keys())}"
                )
        if task_type == TaskType.PARALLEL:
            zarr_urls = [kwargs["zarr_url"] for kwargs in list_parameters]
            if len(zarr_urls) != len(set(zarr_urls)):
                raise ValueError("Non-unique zarr_urls")

multisubmit(base_command, workflow_task_order, workflow_task_id, task_name, list_parameters, history_unit_ids, list_task_files, task_type, config, user_id)

Run a parallel fractal task.

Parameters:

Name Type Description Default
base_command str
required
workflow_task_order int
required
workflow_task_id int
required
task_name str
required
parameters

Dictionary of parameters. Must include zarr_urls key.

required
history_unit_ids list[int]

Database IDs of the corresponding HistoryUnit entries.

required
task_type TaskType

Task type.

required
task_files

TaskFiles object.

required
config Any

Runner-specific parameters.

required
Source code in fractal_server/app/runner/executors/base_runner.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def multisubmit(
    self,
    base_command: str,
    workflow_task_order: int,
    workflow_task_id: int,
    task_name: str,
    list_parameters: list[dict[str, Any]],
    history_unit_ids: list[int],
    list_task_files: list[TaskFiles],
    task_type: TaskType,
    config: Any,
    user_id: int,
) -> tuple[dict[int, Any], dict[int, BaseException]]:
    """
    Run a parallel fractal task.

    Args:
        base_command:
        workflow_task_order:
        workflow_task_id:
        task_name:
        parameters:
            Dictionary of parameters. Must include `zarr_urls` key.
        history_unit_ids:
            Database IDs of the corresponding `HistoryUnit` entries.
        task_type: Task type.
        task_files: `TaskFiles` object.
        config: Runner-specific parameters.
        user_id
    """
    raise NotImplementedError()

submit(base_command, workflow_task_order, workflow_task_id, task_name, parameters, history_unit_id, task_type, task_files, config, user_id)

Run a single fractal task.

Parameters:

Name Type Description Default
base_command str
required
workflow_task_order int
required
workflow_task_id int
required
task_name str
required
parameters dict[str, Any]

Dictionary of parameters.

required
history_unit_id int

Database ID of the corresponding HistoryUnit entry.

required
task_type TaskType

Task type.

required
task_files TaskFiles

TaskFiles object.

required
config Any

Runner-specific parameters.

required
user_id int
required
Source code in fractal_server/app/runner/executors/base_runner.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def submit(
    self,
    base_command: str,
    workflow_task_order: int,
    workflow_task_id: int,
    task_name: str,
    parameters: dict[str, Any],
    history_unit_id: int,
    task_type: TaskType,
    task_files: TaskFiles,
    config: Any,
    user_id: int,
) -> tuple[Any, BaseException]:
    """
    Run a single fractal task.

    Args:
        base_command:
        workflow_task_order:
        workflow_task_id:
        task_name:
        parameters: Dictionary of parameters.
        history_unit_id:
            Database ID of the corresponding `HistoryUnit` entry.
        task_type: Task type.
        task_files: `TaskFiles` object.
        config: Runner-specific parameters.
        user_id:
    """
    raise NotImplementedError()

validate_multisubmit_parameters(*, task_type, list_parameters, list_task_files, history_unit_ids)

Validate parameters for multisubmit method

Parameters:

Name Type Description Default
task_type TaskType

Task type.

required
list_parameters list[dict[str, Any]]

List of parameters dictionaries.

required
list_task_files list[TaskFiles]
required
history_unit_ids list[int]
required
Source code in fractal_server/app/runner/executors/base_runner.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def validate_multisubmit_parameters(
    self,
    *,
    task_type: TaskType,
    list_parameters: list[dict[str, Any]],
    list_task_files: list[TaskFiles],
    history_unit_ids: list[int],
) -> None:
    """
    Validate parameters for `multisubmit` method

    Args:
        task_type: Task type.
        list_parameters: List of parameters dictionaries.
        list_task_files:
        history_unit_ids:
    """
    if task_type not in TASK_TYPES_MULTISUBMIT:
        raise ValueError(f"Invalid {task_type=} for `multisubmit`.")

    if not isinstance(list_parameters, list):
        raise ValueError("`parameters` must be a list.")

    if len(list_parameters) != len(list_task_files):
        raise ValueError(
            f"{len(list_task_files)=} differs from "
            f"{len(list_parameters)=}."
        )
    if len(history_unit_ids) != len(list_parameters):
        raise ValueError(
            f"{len(history_unit_ids)=} differs from "
            f"{len(list_parameters)=}."
        )

    subfolders = {
        task_file.wftask_subfolder_local for task_file in list_task_files
    }
    if len(subfolders) != 1:
        raise ValueError(f"More than one subfolders: {subfolders}.")

    for single_kwargs in list_parameters:
        if not isinstance(single_kwargs, dict):
            raise ValueError("kwargs itemt must be a dictionary.")
        if "zarr_url" not in single_kwargs.keys():
            raise ValueError(
                f"No 'zarr_url' key in in {list(single_kwargs.keys())}"
            )
    if task_type == TaskType.PARALLEL:
        zarr_urls = [kwargs["zarr_url"] for kwargs in list_parameters]
        if len(zarr_urls) != len(set(zarr_urls)):
            raise ValueError("Non-unique zarr_urls")

validate_submit_parameters(parameters, task_type)

Validate parameters for submit method

Parameters:

Name Type Description Default
parameters dict[str, Any]

Parameters dictionary.

required
task_type TaskType

Task type.s

required
Source code in fractal_server/app/runner/executors/base_runner.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def validate_submit_parameters(
    self,
    parameters: dict[str, Any],
    task_type: TaskType,
) -> None:
    """
    Validate parameters for `submit` method

    Args:
        parameters: Parameters dictionary.
        task_type: Task type.s
    """
    logger.info("[validate_submit_parameters] START")
    if task_type not in TASK_TYPES_SUBMIT:
        raise ValueError(f"Invalid {task_type=} for `submit`.")
    if not isinstance(parameters, dict):
        raise ValueError("`parameters` must be a dictionary.")
    if task_type in [
        TaskType.NON_PARALLEL,
        TaskType.COMPOUND,
    ]:
        if "zarr_urls" not in parameters.keys():
            raise ValueError(
                f"No 'zarr_urls' key in in {list(parameters.keys())}"
            )
    elif task_type in [
        TaskType.CONVERTER_NON_PARALLEL,
        TaskType.CONVERTER_COMPOUND,
    ]:
        if "zarr_urls" in parameters.keys():
            raise ValueError(
                f"Forbidden 'zarr_urls' key in {list(parameters.keys())}"
            )
    logger.info("[validate_submit_parameters] END")