Skip to content

task_files

TaskFiles

Group all file paths pertaining to a task

Attributes:

Name Type Description
workflow_dir_local Path

Server-owned directory to store all task-execution-related relevant files. Note: users cannot write directly to this folder.

workflow_dir_remote Path

User-side directory with the same scope as workflow_dir_local, and where a user can write.

subfolder_name str

Name of task-specific subfolder

remote_subfolder Path

Path to user-side task-specific subfolder

task_name str

Name of the task

task_order Optional[int]

Positional order of the task within a workflow.

component Optional[str]

Specific component to run the task for (relevant for tasks to be executed in parallel over many components).

file_prefix str

Prefix for all task-related files.

args Path

Path for input json file.

metadiff Path

Path for output json file with metadata update.

out Path

Path for task-execution stdout.

err Path

Path for task-execution stderr.

Source code in fractal_server/app/runner/task_files.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class TaskFiles:
    """
    Group all file paths pertaining to a task

    Attributes:
        workflow_dir_local:
            Server-owned directory to store all task-execution-related relevant
            files. Note: users cannot write directly to this folder.
        workflow_dir_remote:
            User-side directory with the same scope as `workflow_dir_local`,
            and where a user can write.
        subfolder_name:
            Name of task-specific subfolder
        remote_subfolder:
            Path to user-side task-specific subfolder
        task_name:
            Name of the task
        task_order:
            Positional order of the task within a workflow.
        component:
            Specific component to run the task for (relevant for tasks to be
            executed in parallel over many components).
        file_prefix:
            Prefix for all task-related files.
        args:
            Path for input json file.
        metadiff:
            Path for output json file with metadata update.
        out:
            Path for task-execution stdout.
        err:
            Path for task-execution stderr.
    """

    workflow_dir_local: Path
    workflow_dir_remote: Path
    remote_subfolder: Path
    subfolder_name: str
    task_name: str
    task_order: Optional[int] = None
    component: Optional[str] = None

    file_prefix: str
    file_prefix_with_subfolder: str
    args: Path
    out: Path
    err: Path
    log: Path
    metadiff: Path

    def __init__(
        self,
        workflow_dir_local: Path,
        workflow_dir_remote: Path,
        task_name: str,
        task_order: Optional[int] = None,
        component: Optional[str] = None,
    ):
        self.workflow_dir_local = workflow_dir_local
        self.workflow_dir_remote = workflow_dir_remote
        self.task_order = task_order
        self.task_name = task_name
        self.component = component

        if self.component is not None:
            component_safe = sanitize_string(str(self.component))
            component_safe = f"_par_{component_safe}"
        else:
            component_safe = ""

        if self.task_order is not None:
            order = str(self.task_order)
        else:
            order = "0"
        self.file_prefix = f"{order}{component_safe}"
        self.subfolder_name = task_subfolder_name(
            order=order, task_name=self.task_name
        )
        self.remote_subfolder = self.workflow_dir_remote / self.subfolder_name
        self.args = self.remote_subfolder / f"{self.file_prefix}.args.json"
        self.out = self.remote_subfolder / f"{self.file_prefix}.out"
        self.err = self.remote_subfolder / f"{self.file_prefix}.err"
        self.log = self.remote_subfolder / f"{self.file_prefix}.log"
        self.metadiff = (
            self.remote_subfolder / f"{self.file_prefix}.metadiff.json"
        )

get_task_file_paths(workflow_dir_local, workflow_dir_remote, task_name, task_order=None, component=None)

Return the corrisponding TaskFiles object

Source code in fractal_server/app/runner/task_files.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get_task_file_paths(
    workflow_dir_local: Path,
    workflow_dir_remote: Path,
    task_name: str,
    task_order: Optional[int] = None,
    component: Optional[str] = None,
) -> TaskFiles:
    """
    Return the corrisponding TaskFiles object
    """
    return TaskFiles(
        workflow_dir_local=workflow_dir_local,
        workflow_dir_remote=workflow_dir_remote,
        task_name=task_name,
        task_order=task_order,
        component=component,
    )

task_subfolder_name(order, task_name)

Get name of task-specific subfolder.

Parameters:

Name Type Description Default
order Union[int, str]
required
task_name str
required
Source code in fractal_server/app/runner/task_files.py
 8
 9
10
11
12
13
14
15
16
17
def task_subfolder_name(order: Union[int, str], task_name: str) -> str:
    """
    Get name of task-specific subfolder.

    Args:
        order:
        task_name:
    """
    task_name_slug = sanitize_string(task_name)
    return f"{order}_{task_name_slug}"