Skip to content

_utils

_customize_and_run_template(template_filename, replacements, script_dir, logger_name, prefix)

Customize one of the template bash scripts.

Parameters:

Name Type Description Default
template_filename str

Filename of the template file (ends with ".sh").

required
replacements list[tuple[str, str]]

Dictionary of replacements.

required
script_dir str

Local folder where the script will be placed.

required
prefix int

Prefix for the script filename.

required
Source code in fractal_server/tasks/v2/local/_utils.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def _customize_and_run_template(
    template_filename: str,
    replacements: list[tuple[str, str]],
    script_dir: str,
    logger_name: str,
    prefix: int,
) -> str:
    """
    Customize one of the template bash scripts.

    Args:
        template_filename: Filename of the template file (ends with ".sh").
        replacements: Dictionary of replacements.
        script_dir: Local folder where the script will be placed.
        prefix: Prefix for the script filename.
    """
    logger = get_logger(logger_name=logger_name)
    logger.debug(f"_customize_and_run_template {template_filename} - START")

    # Prepare name and path of script
    if not template_filename.endswith(".sh"):
        raise ValueError(
            f"Invalid {template_filename=} (it must end with '.sh')."
        )

    script_filename = f"{prefix}{template_filename}"
    script_path_local = Path(script_dir) / script_filename
    # Read template
    customize_template(
        template_name=template_filename,
        replacements=replacements,
        script_path=script_path_local,
    )
    cmd = f"bash {script_path_local}"
    logger.debug(f"Now run '{cmd}' ")
    stdout = execute_command_sync(command=cmd, logger_name=logger_name)
    logger.debug(f"_customize_and_run_template {template_filename} - END")
    return stdout

check_task_files_exist(task_list)

Check that the modules listed in task commands point to existing files.

Parameters:

Name Type Description Default
task_list list[TaskCreateV2]
required
Source code in fractal_server/tasks/v2/local/_utils.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def check_task_files_exist(task_list: list[TaskCreateV2]) -> None:
    """
    Check that the modules listed in task commands point to existing files.

    Args:
        task_list:
    """
    for _task in task_list:
        if _task.command_non_parallel is not None:
            _task_path = _task.command_non_parallel.split()[1]
            if not Path(_task_path).exists():
                raise FileNotFoundError(
                    f"Task `{_task.name}` has `command_non_parallel` "
                    f"pointing to missing file `{_task_path}`."
                )
        if _task.command_parallel is not None:
            _task_path = _task.command_parallel.split()[1]
            if not Path(_task_path).exists():
                raise FileNotFoundError(
                    f"Task `{_task.name}` has `command_parallel` "
                    f"pointing to missing file `{_task_path}`."
                )