Skip to content

_local_config

Submodule to handle the local-backend configuration for a WorkflowTask

LocalBackendConfig

Bases: BaseModel

Specifications of the local-backend configuration

Attributes:

Name Type Description
parallel_tasks_per_job Optional[int]

Maximum number of tasks to be run in parallel as part of a call to FractalProcessPoolExecutor.map; if None, then all tasks will start at the same time.

Source code in fractal_server/app/runner/v2/_local_experimental/_local_config.py
26
27
28
29
30
31
32
33
34
35
36
37
class LocalBackendConfig(BaseModel, extra=Extra.forbid):
    """
    Specifications of the local-backend configuration

    Attributes:
        parallel_tasks_per_job:
            Maximum number of tasks to be run in parallel as part of a call to
            `FractalProcessPoolExecutor.map`; if `None`, then all tasks will
            start at the same time.
    """

    parallel_tasks_per_job: Optional[int]

LocalBackendConfigError

Bases: ValueError

Local-backend configuration error

Source code in fractal_server/app/runner/v2/_local_experimental/_local_config.py
18
19
20
21
22
23
class LocalBackendConfigError(ValueError):
    """
    Local-backend configuration error
    """

    pass

get_default_local_backend_config()

Return a default LocalBackendConfig configuration object

Source code in fractal_server/app/runner/v2/_local_experimental/_local_config.py
40
41
42
43
44
def get_default_local_backend_config():
    """
    Return a default `LocalBackendConfig` configuration object
    """
    return LocalBackendConfig(parallel_tasks_per_job=None)

get_local_backend_config(wftask, which_type, config_path=None)

Prepare a LocalBackendConfig configuration object

The sources for parallel_tasks_per_job attributes, starting from the highest-priority one, are

  1. Properties in wftask.meta_parallel or wftask.meta_non_parallel (depending on which_type);
  2. The general content of the local-backend configuration file;
  3. The default value (None).

Parameters:

Name Type Description Default
wftask WorkflowTaskV2

WorkflowTaskV2 for which the backend configuration should be prepared.

required
config_path Optional[Path]

Path of local-backend configuration file; if None, use FRACTAL_LOCAL_CONFIG_FILE variable from settings.

None

Returns:

Type Description
LocalBackendConfig

A local-backend configuration object

Source code in fractal_server/app/runner/v2/_local_experimental/_local_config.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def get_local_backend_config(
    wftask: WorkflowTaskV2,
    which_type: Literal["non_parallel", "parallel"],
    config_path: Optional[Path] = None,
) -> LocalBackendConfig:
    """
    Prepare a `LocalBackendConfig` configuration object

    The sources for `parallel_tasks_per_job` attributes, starting from the
    highest-priority one, are

    1. Properties in `wftask.meta_parallel` or `wftask.meta_non_parallel`
       (depending on `which_type`);
    2. The general content of the local-backend configuration file;
    3. The default value (`None`).

    Arguments:
        wftask:
            WorkflowTaskV2 for which the backend configuration should
            be prepared.
        config_path:
            Path of local-backend configuration file; if `None`, use
            `FRACTAL_LOCAL_CONFIG_FILE` variable from settings.

    Returns:
        A local-backend configuration object
    """

    key = "parallel_tasks_per_job"
    default_value = None

    if which_type == "non_parallel":
        wftask_meta = wftask.meta_non_parallel
    elif which_type == "parallel":
        wftask_meta = wftask.meta_parallel
    else:
        raise ValueError(
            "`get_local_backend_config` received an invalid argument"
            f" {which_type=}."
        )

    if wftask_meta and key in wftask_meta:
        parallel_tasks_per_job = wftask_meta[key]
    else:
        if not config_path:
            settings = Inject(get_settings)
            config_path = settings.FRACTAL_LOCAL_CONFIG_FILE
        if config_path is None:
            parallel_tasks_per_job = default_value
        else:
            with config_path.open("r") as f:
                env = json.load(f)
            try:
                _ = LocalBackendConfig(**env)
            except ValidationError as e:
                raise LocalBackendConfigError(
                    f"Error while loading {config_path=}. "
                    f"Original error:\n{str(e)}"
                )

            parallel_tasks_per_job = env.get(key, default_value)
    return LocalBackendConfig(parallel_tasks_per_job=parallel_tasks_per_job)