Skip to content

executor

Custom version of Python ThreadPoolExecutor).

FractalThreadPoolExecutor

Bases: ThreadPoolExecutor

Custom version of ThreadPoolExecutor) that overrides the submit and map methods

Source code in fractal_server/app/runner/v1/_local/executor.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class FractalThreadPoolExecutor(ThreadPoolExecutor):
    """
    Custom version of
    [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor))
    that overrides the `submit` and `map` methods
    """

    def submit(
        self,
        *args,
        local_backend_config: Optional[LocalBackendConfig] = None,
        **kwargs,
    ):
        """
        Compared to the `ThreadPoolExecutor` method, here we accept an addition
        keyword argument (`local_backend_config`), which is then simply
        ignored.
        """
        return super().submit(*args, **kwargs)

    def map(
        self,
        fn: Callable,
        *iterables: Sequence[Iterable],
        local_backend_config: Optional[LocalBackendConfig] = None,
    ):
        """
        Custom version of the `Executor.map` method

        The main change with the respect to the original `map` method is that
        the list of tasks to be executed is split into chunks, and then
        `super().map` is called (sequentially) on each chunk. The goal of this
        change is to limit parallelism, e.g. due to limited computational
        resources.

        Other changes from the `concurrent.futures` `map` method:

        1. Removed `timeout` argument;
        2. Removed `chunksize`;
        3. All iterators (both inputs and output ones) are transformed into
           lists.

        Args:
            fn: A callable function.
            iterables: The argument iterables (one iterable per argument of
                       `fn`).
           local_backend_config: The backend configuration, needed to extract
                                 `parallel_tasks_per_job`.
        """

        # Preliminary check
        iterable_lengths = [len(it) for it in iterables]
        if not len(set(iterable_lengths)) == 1:
            raise ValueError("Iterables have different lengths.")

        # Set total number of arguments
        n_elements = len(iterables[0])

        # Set parallel_tasks_per_job
        if local_backend_config is None:
            local_backend_config = get_default_local_backend_config()
        parallel_tasks_per_job = local_backend_config.parallel_tasks_per_job
        if parallel_tasks_per_job is None:
            parallel_tasks_per_job = n_elements

        # Execute tasks, in chunks of size parallel_tasks_per_job
        results = []
        for ind_chunk in range(0, n_elements, parallel_tasks_per_job):
            chunk_iterables = [
                it[ind_chunk : ind_chunk + parallel_tasks_per_job]  # noqa
                for it in iterables
            ]
            map_iter = super().map(fn, *chunk_iterables)
            results.extend(list(map_iter))

        return iter(results)

map(fn, *iterables, local_backend_config=None)

Custom version of the Executor.map method

The main change with the respect to the original map method is that the list of tasks to be executed is split into chunks, and then super().map is called (sequentially) on each chunk. The goal of this change is to limit parallelism, e.g. due to limited computational resources.

Other changes from the concurrent.futures map method:

  1. Removed timeout argument;
  2. Removed chunksize;
  3. All iterators (both inputs and output ones) are transformed into lists.

Parameters:

Name Type Description Default
fn Callable

A callable function.

required
iterables Sequence[Iterable]

The argument iterables (one iterable per argument of fn).

()

local_backend_config: The backend configuration, needed to extract parallel_tasks_per_job.

Source code in fractal_server/app/runner/v1/_local/executor.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def map(
    self,
    fn: Callable,
    *iterables: Sequence[Iterable],
    local_backend_config: Optional[LocalBackendConfig] = None,
):
    """
    Custom version of the `Executor.map` method

    The main change with the respect to the original `map` method is that
    the list of tasks to be executed is split into chunks, and then
    `super().map` is called (sequentially) on each chunk. The goal of this
    change is to limit parallelism, e.g. due to limited computational
    resources.

    Other changes from the `concurrent.futures` `map` method:

    1. Removed `timeout` argument;
    2. Removed `chunksize`;
    3. All iterators (both inputs and output ones) are transformed into
       lists.

    Args:
        fn: A callable function.
        iterables: The argument iterables (one iterable per argument of
                   `fn`).
       local_backend_config: The backend configuration, needed to extract
                             `parallel_tasks_per_job`.
    """

    # Preliminary check
    iterable_lengths = [len(it) for it in iterables]
    if not len(set(iterable_lengths)) == 1:
        raise ValueError("Iterables have different lengths.")

    # Set total number of arguments
    n_elements = len(iterables[0])

    # Set parallel_tasks_per_job
    if local_backend_config is None:
        local_backend_config = get_default_local_backend_config()
    parallel_tasks_per_job = local_backend_config.parallel_tasks_per_job
    if parallel_tasks_per_job is None:
        parallel_tasks_per_job = n_elements

    # Execute tasks, in chunks of size parallel_tasks_per_job
    results = []
    for ind_chunk in range(0, n_elements, parallel_tasks_per_job):
        chunk_iterables = [
            it[ind_chunk : ind_chunk + parallel_tasks_per_job]  # noqa
            for it in iterables
        ]
        map_iter = super().map(fn, *chunk_iterables)
        results.extend(list(map_iter))

    return iter(results)

submit(*args, local_backend_config=None, **kwargs)

Compared to the ThreadPoolExecutor method, here we accept an addition keyword argument (local_backend_config), which is then simply ignored.

Source code in fractal_server/app/runner/v1/_local/executor.py
32
33
34
35
36
37
38
39
40
41
42
43
def submit(
    self,
    *args,
    local_backend_config: Optional[LocalBackendConfig] = None,
    **kwargs,
):
    """
    Compared to the `ThreadPoolExecutor` method, here we accept an addition
    keyword argument (`local_backend_config`), which is then simply
    ignored.
    """
    return super().submit(*args, **kwargs)