Skip to content

_local_config

Submodule to handle the local-backend configuration for a WorkflowTask

LocalBackendConfig

Bases: BaseModel

Specifications of the local-backend configuration

Attributes:

Name Type Description
parallel_tasks_per_job Optional[int]

Maximum number of tasks to be run in parallel as part of a call to FractalThreadPoolExecutor.map; if None, then all tasks will start at the same time.

Source code in fractal_server/app/runner/v1/_local/_local_config.py
35
36
37
38
39
40
41
42
43
44
45
46
class LocalBackendConfig(BaseModel, extra=Extra.forbid):
    """
    Specifications of the local-backend configuration

    Attributes:
        parallel_tasks_per_job:
            Maximum number of tasks to be run in parallel as part of a call to
            `FractalThreadPoolExecutor.map`; if `None`, then all tasks will
            start at the same time.
    """

    parallel_tasks_per_job: Optional[int]

LocalBackendConfigError

Bases: ValueError

Local-backend configuration error

Source code in fractal_server/app/runner/v1/_local/_local_config.py
27
28
29
30
31
32
class LocalBackendConfigError(ValueError):
    """
    Local-backend configuration error
    """

    pass

get_default_local_backend_config()

Return a default LocalBackendConfig configuration object

Source code in fractal_server/app/runner/v1/_local/_local_config.py
49
50
51
52
53
def get_default_local_backend_config():
    """
    Return a default `LocalBackendConfig` configuration object
    """
    return LocalBackendConfig(parallel_tasks_per_job=None)

get_local_backend_config(wftask, config_path=None)

Prepare a LocalBackendConfig configuration object

The sources for parallel_tasks_per_job attributes, starting from the highest-priority one, are

  1. Properties in wftask.meta;
  2. The general content of the local-backend configuration file;
  3. The default value (None).

Parameters:

Name Type Description Default
wftask WorkflowTask

WorkflowTask (V1) for which the backend configuration should be prepared.

required
config_path Optional[Path]

Path of local-backend configuration file; if None, use FRACTAL_LOCAL_CONFIG_FILE variable from settings.

None

Returns:

Type Description
LocalBackendConfig

A local-backend configuration object

Source code in fractal_server/app/runner/v1/_local/_local_config.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get_local_backend_config(
    wftask: WorkflowTask,
    config_path: Optional[Path] = None,
) -> LocalBackendConfig:
    """
    Prepare a `LocalBackendConfig` configuration object

    The sources for `parallel_tasks_per_job` attributes, starting from the
    highest-priority one, are

    1. Properties in `wftask.meta`;
    2. The general content of the local-backend configuration file;
    3. The default value (`None`).

    Arguments:
        wftask:
            WorkflowTask (V1) for which the backend configuration should
            be prepared.
        config_path:
            Path of local-backend configuration file; if `None`, use
            `FRACTAL_LOCAL_CONFIG_FILE` variable from settings.

    Returns:
        A local-backend configuration object
    """

    key = "parallel_tasks_per_job"
    default = None

    if wftask.meta and key in wftask.meta:
        parallel_tasks_per_job = wftask.meta[key]
    else:
        if not config_path:
            settings = Inject(get_settings)
            config_path = settings.FRACTAL_LOCAL_CONFIG_FILE
        if config_path is None:
            parallel_tasks_per_job = default
        else:
            with config_path.open("r") as f:
                env = json.load(f)
            try:
                _ = LocalBackendConfig(**env)
            except ValidationError as e:
                raise LocalBackendConfigError(
                    f"Error while loading {config_path=}. "
                    f"Original error:\n{str(e)}"
                )

            parallel_tasks_per_job = env.get(key, default)
    return LocalBackendConfig(parallel_tasks_per_job=parallel_tasks_per_job)