Skip to content

job

download_job_logs(project_id, job_id, user=Depends(current_active_user), db=Depends(get_async_db)) async

Download zipped job folder

Source code in fractal_server/app/routes/api/v2/job.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@router.get(
    "/project/{project_id}/job/{job_id}/download/",
    response_class=StreamingResponse,
)
async def download_job_logs(
    project_id: int,
    job_id: int,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> StreamingResponse:
    """
    Download zipped job folder
    """
    output = await _get_job_check_owner(
        project_id=project_id,
        job_id=job_id,
        user_id=user.id,
        db=db,
    )
    job = output["job"]
    zip_name = f"{Path(job.working_dir).name}_archive.zip"
    return StreamingResponse(
        _zip_folder_to_byte_stream_iterator(folder=job.working_dir),
        media_type="application/x-zip-compressed",
        headers={"Content-Disposition": f"attachment;filename={zip_name}"},
    )

get_job_list(project_id, user=Depends(current_active_user), log=True, db=Depends(get_async_db)) async

Get job list for given project

Source code in fractal_server/app/routes/api/v2/job.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@router.get(
    "/project/{project_id}/job/",
    response_model=list[JobReadV2],
)
async def get_job_list(
    project_id: int,
    user: UserOAuth = Depends(current_active_user),
    log: bool = True,
    db: AsyncSession = Depends(get_async_db),
) -> Optional[list[JobReadV2]]:
    """
    Get job list for given project
    """
    project = await _get_project_check_owner(
        project_id=project_id, user_id=user.id, db=db
    )

    stm = select(JobV2).where(JobV2.project_id == project.id)
    res = await db.execute(stm)
    job_list = res.scalars().all()
    await db.close()
    if not log:
        for job in job_list:
            setattr(job, "log", None)

    return job_list

get_user_jobs(user=Depends(current_active_user), log=True, db=Depends(get_async_db)) async

Returns all the jobs of the current user

Source code in fractal_server/app/routes/api/v2/job.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@router.get("/job/", response_model=list[JobReadV2])
async def get_user_jobs(
    user: UserOAuth = Depends(current_active_user),
    log: bool = True,
    db: AsyncSession = Depends(get_async_db),
) -> list[JobReadV2]:
    """
    Returns all the jobs of the current user
    """
    stm = (
        select(JobV2)
        .join(ProjectV2)
        .where(ProjectV2.user_list.any(UserOAuth.id == user.id))
    )
    res = await db.execute(stm)
    job_list = res.scalars().all()
    await db.close()
    if not log:
        for job in job_list:
            setattr(job, "log", None)

    return job_list

get_workflow_jobs(project_id, workflow_id, user=Depends(current_active_user), db=Depends(get_async_db)) async

Returns all the jobs related to a specific workflow

Source code in fractal_server/app/routes/api/v2/job.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@router.get(
    "/project/{project_id}/workflow/{workflow_id}/job/",
    response_model=list[JobReadV2],
)
async def get_workflow_jobs(
    project_id: int,
    workflow_id: int,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[list[JobReadV2]]:
    """
    Returns all the jobs related to a specific workflow
    """
    await _get_workflow_check_owner(
        project_id=project_id, workflow_id=workflow_id, user_id=user.id, db=db
    )
    stm = select(JobV2).where(JobV2.workflow_id == workflow_id)
    res = await db.execute(stm)
    job_list = res.scalars().all()
    return job_list

read_job(project_id, job_id, show_tmp_logs=False, user=Depends(current_active_user), db=Depends(get_async_db)) async

Return info on an existing job

Source code in fractal_server/app/routes/api/v2/job.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@router.get(
    "/project/{project_id}/job/{job_id}/",
    response_model=JobReadV2,
)
async def read_job(
    project_id: int,
    job_id: int,
    show_tmp_logs: bool = False,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[JobReadV2]:
    """
    Return info on an existing job
    """

    output = await _get_job_check_owner(
        project_id=project_id,
        job_id=job_id,
        user_id=user.id,
        db=db,
    )
    job = output["job"]
    await db.close()

    if show_tmp_logs and (job.status == JobStatusTypeV2.SUBMITTED):
        try:
            with open(f"{job.working_dir}/{WORKFLOW_LOG_FILENAME}", "r") as f:
                job.log = f.read()
        except FileNotFoundError:
            pass

    return job

stop_job(project_id, job_id, user=Depends(current_active_user), db=Depends(get_async_db)) async

Stop execution of a workflow job.

Source code in fractal_server/app/routes/api/v2/job.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
@router.get(
    "/project/{project_id}/job/{job_id}/stop/",
    status_code=202,
)
async def stop_job(
    project_id: int,
    job_id: int,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Response:
    """
    Stop execution of a workflow job.
    """

    _check_shutdown_is_supported()

    # Get job from DB
    output = await _get_job_check_owner(
        project_id=project_id,
        job_id=job_id,
        user_id=user.id,
        db=db,
    )
    job = output["job"]

    _write_shutdown_file(job=job)

    return Response(status_code=status.HTTP_202_ACCEPTED)