Skip to content

task

Task

Bases: _TaskBaseV1, SQLModel

Task model

Attributes:

Name Type Description
id Optional[int]

Primary key

command str

Executable command

input_type str

Expected type of input Dataset

output_type str

Expected type of output Dataset

meta Optional[dict[str, Any]]

Additional metadata related to execution (e.g. computational resources)

source str

inherited from _TaskBase

name str

inherited from _TaskBase

args_schema Optional[dict[str, Any]]

JSON schema of task arguments

args_schema_version Optional[str]

label pointing at how the JSON schema of task arguments was generated

Source code in fractal_server/app/models/v1/task.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class Task(_TaskBaseV1, SQLModel, table=True):
    """
    Task model

    Attributes:
        id: Primary key
        command: Executable command
        input_type: Expected type of input `Dataset`
        output_type: Expected type of output `Dataset`
        meta:
            Additional metadata related to execution (e.g. computational
            resources)
        source: inherited from `_TaskBase`
        name: inherited from `_TaskBase`
        args_schema: JSON schema of task arguments
        args_schema_version:
            label pointing at how the JSON schema of task arguments was
            generated
    """

    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    command: str
    source: str = Field(unique=True)
    input_type: str
    output_type: str
    meta: Optional[dict[str, Any]] = Field(sa_column=Column(JSON), default={})
    owner: Optional[str] = None
    version: Optional[str] = None
    args_schema: Optional[dict[str, Any]] = Field(
        sa_column=Column(JSON), default=None
    )
    args_schema_version: Optional[str]
    docs_info: Optional[str] = None
    docs_link: Optional[HttpUrl] = None

    @property
    def parallelization_level(self) -> Optional[str]:
        try:
            return self.meta["parallelization_level"]
        except KeyError:
            return None

    @property
    def is_parallel(self) -> bool:
        return bool(self.parallelization_level)

    @property
    def default_args_from_args_schema(self) -> dict[str, Any]:
        """
        Extract default arguments from args_schema
        """
        # Return {} if there is no args_schema
        if self.args_schema is None:
            return {}
        # Try to construct default_args
        try:
            default_args = {}
            properties = self.args_schema["properties"]
            for prop_name, prop_schema in properties.items():
                default_value = prop_schema.get("default", None)
                if default_value is not None:
                    default_args[prop_name] = default_value
            return default_args
        except KeyError as e:
            logging.warning(
                "Cannot set default_args from args_schema="
                f"{json.dumps(self.args_schema)}\n"
                f"Original KeyError: {str(e)}"
            )
            return {}

default_args_from_args_schema: dict[str, Any] property

Extract default arguments from args_schema