Skip to content

executor

Custom version of Python ProcessPoolExecutor).

FractalProcessPoolExecutor

Bases: ProcessPoolExecutor

Source code in fractal_server/app/runner/v2/_local_experimental/executor.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class FractalProcessPoolExecutor(ProcessPoolExecutor):

    shutdown_file: Path
    interval: float
    _shutdown: bool
    _shutdown_file_thread: threading.Thread

    def __init__(
        self, shutdown_file: Path, interval: float = 1.0, *args, **kwargs
    ):
        super().__init__(*args, **kwargs, mp_context=mp.get_context("spawn"))
        self.shutdown_file = Path(shutdown_file)
        self.interval = float(interval)
        logger.debug(
            f"Start monitoring {shutdown_file} every {interval} seconds"
        )
        self._shutdown = False
        self._shutdown_file_thread = threading.Thread(
            target=self._run, daemon=True
        )
        self._shutdown_file_thread.start()

    def _run(self):
        """
        Running on '_shutdown_file_thread'.
        """
        while True:
            if self.shutdown_file.exists() or self._shutdown:
                try:
                    self._terminate_processes()
                except Exception as e:
                    logger.error(
                        "Terminate processes failed. "
                        f"Original error: {str(e)}."
                    )
                finally:
                    return
            time.sleep(self.interval)

    def _terminate_processes(self):
        """
        Running on '_shutdown_file_thread'.
        """

        logger.info("Start terminating FractalProcessPoolExecutor processes.")
        # We use 'psutil' in order to easily access the PIDs of the children.
        if self._processes is not None:
            for pid in self._processes.keys():
                parent = psutil.Process(pid)
                children = parent.children(recursive=True)
                for child in children:
                    child.kill()
                parent.kill()
                logger.info(f"Process {pid} and its children terminated.")
        logger.info("FractalProcessPoolExecutor processes terminated.")

    def shutdown(self, *args, **kwargs) -> None:
        self._shutdown = True
        self._shutdown_file_thread.join()
        return super().shutdown(*args, **kwargs)

    def submit(
        self,
        *args,
        local_backend_config: Optional[LocalBackendConfig] = None,
        **kwargs,
    ):
        """
        Compared to the `ProcessPoolExecutor` method, here we accept an
        additional keyword argument (`local_backend_config`), which is then
        simply ignored.
        """
        return super().submit(*args, **kwargs)

    def map(
        self,
        fn: Callable,
        *iterables: Sequence[Iterable],
        local_backend_config: Optional[LocalBackendConfig] = None,
    ):
        """
        Custom version of the `Executor.map` method

        The main change with the respect to the original `map` method is that
        the list of tasks to be executed is split into chunks, and then
        `super().map` is called (sequentially) on each chunk. The goal of this
        change is to limit parallelism, e.g. due to limited computational
        resources.

        Other changes from the `concurrent.futures` `map` method:

        1. Removed `timeout` argument;
        2. Removed `chunksize`;
        3. All iterators (both inputs and output ones) are transformed into
           lists.

        Args:
            fn: A callable function.
            iterables: The argument iterables (one iterable per argument of
                       `fn`).
           local_backend_config: The backend configuration, needed to extract
                                 `parallel_tasks_per_job`.
        """
        # Preliminary check
        iterable_lengths = [len(it) for it in iterables]
        if not len(set(iterable_lengths)) == 1:
            raise ValueError("Iterables have different lengths.")
        # Set total number of arguments
        n_elements = len(iterables[0])

        # Set parallel_tasks_per_job
        if local_backend_config is None:
            local_backend_config = get_default_local_backend_config()
        parallel_tasks_per_job = local_backend_config.parallel_tasks_per_job
        if parallel_tasks_per_job is None:
            parallel_tasks_per_job = n_elements

        # Execute tasks, in chunks of size parallel_tasks_per_job
        results = []
        for ind_chunk in range(0, n_elements, parallel_tasks_per_job):
            chunk_iterables = [
                it[ind_chunk : ind_chunk + parallel_tasks_per_job]  # noqa
                for it in iterables
            ]
            map_iter = super().map(fn, *chunk_iterables)

            try:
                results.extend(list(map_iter))
            except BrokenProcessPool as e:
                raise JobExecutionError(info=e.args[0])

        return iter(results)

_run()

Running on '_shutdown_file_thread'.

Source code in fractal_server/app/runner/v2/_local_experimental/executor.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def _run(self):
    """
    Running on '_shutdown_file_thread'.
    """
    while True:
        if self.shutdown_file.exists() or self._shutdown:
            try:
                self._terminate_processes()
            except Exception as e:
                logger.error(
                    "Terminate processes failed. "
                    f"Original error: {str(e)}."
                )
            finally:
                return
        time.sleep(self.interval)

_terminate_processes()

Running on '_shutdown_file_thread'.

Source code in fractal_server/app/runner/v2/_local_experimental/executor.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def _terminate_processes(self):
    """
    Running on '_shutdown_file_thread'.
    """

    logger.info("Start terminating FractalProcessPoolExecutor processes.")
    # We use 'psutil' in order to easily access the PIDs of the children.
    if self._processes is not None:
        for pid in self._processes.keys():
            parent = psutil.Process(pid)
            children = parent.children(recursive=True)
            for child in children:
                child.kill()
            parent.kill()
            logger.info(f"Process {pid} and its children terminated.")
    logger.info("FractalProcessPoolExecutor processes terminated.")

map(fn, *iterables, local_backend_config=None)

Custom version of the Executor.map method

The main change with the respect to the original map method is that the list of tasks to be executed is split into chunks, and then super().map is called (sequentially) on each chunk. The goal of this change is to limit parallelism, e.g. due to limited computational resources.

Other changes from the concurrent.futures map method:

  1. Removed timeout argument;
  2. Removed chunksize;
  3. All iterators (both inputs and output ones) are transformed into lists.

Parameters:

Name Type Description Default
fn Callable

A callable function.

required
iterables Sequence[Iterable]

The argument iterables (one iterable per argument of fn).

()

local_backend_config: The backend configuration, needed to extract parallel_tasks_per_job.

Source code in fractal_server/app/runner/v2/_local_experimental/executor.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def map(
    self,
    fn: Callable,
    *iterables: Sequence[Iterable],
    local_backend_config: Optional[LocalBackendConfig] = None,
):
    """
    Custom version of the `Executor.map` method

    The main change with the respect to the original `map` method is that
    the list of tasks to be executed is split into chunks, and then
    `super().map` is called (sequentially) on each chunk. The goal of this
    change is to limit parallelism, e.g. due to limited computational
    resources.

    Other changes from the `concurrent.futures` `map` method:

    1. Removed `timeout` argument;
    2. Removed `chunksize`;
    3. All iterators (both inputs and output ones) are transformed into
       lists.

    Args:
        fn: A callable function.
        iterables: The argument iterables (one iterable per argument of
                   `fn`).
       local_backend_config: The backend configuration, needed to extract
                             `parallel_tasks_per_job`.
    """
    # Preliminary check
    iterable_lengths = [len(it) for it in iterables]
    if not len(set(iterable_lengths)) == 1:
        raise ValueError("Iterables have different lengths.")
    # Set total number of arguments
    n_elements = len(iterables[0])

    # Set parallel_tasks_per_job
    if local_backend_config is None:
        local_backend_config = get_default_local_backend_config()
    parallel_tasks_per_job = local_backend_config.parallel_tasks_per_job
    if parallel_tasks_per_job is None:
        parallel_tasks_per_job = n_elements

    # Execute tasks, in chunks of size parallel_tasks_per_job
    results = []
    for ind_chunk in range(0, n_elements, parallel_tasks_per_job):
        chunk_iterables = [
            it[ind_chunk : ind_chunk + parallel_tasks_per_job]  # noqa
            for it in iterables
        ]
        map_iter = super().map(fn, *chunk_iterables)

        try:
            results.extend(list(map_iter))
        except BrokenProcessPool as e:
            raise JobExecutionError(info=e.args[0])

    return iter(results)

submit(*args, local_backend_config=None, **kwargs)

Compared to the ProcessPoolExecutor method, here we accept an additional keyword argument (local_backend_config), which is then simply ignored.

Source code in fractal_server/app/runner/v2/_local_experimental/executor.py
87
88
89
90
91
92
93
94
95
96
97
98
def submit(
    self,
    *args,
    local_backend_config: Optional[LocalBackendConfig] = None,
    **kwargs,
):
    """
    Compared to the `ProcessPoolExecutor` method, here we accept an
    additional keyword argument (`local_backend_config`), which is then
    simply ignored.
    """
    return super().submit(*args, **kwargs)