Skip to content

dataset

create_dataset(project_id, dataset, user=Depends(current_active_user), db=Depends(get_async_db)) async

Add new dataset to current project

Source code in fractal_server/app/routes/api/v1/dataset.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@router.post(
    "/project/{project_id}/dataset/",
    response_model=DatasetReadV1,
    status_code=status.HTTP_201_CREATED,
)
async def create_dataset(
    project_id: int,
    dataset: DatasetCreateV1,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[DatasetReadV1]:
    """
    Add new dataset to current project
    """
    _raise_if_v1_is_read_only()
    await _get_project_check_owner(
        project_id=project_id, user_id=user.id, db=db
    )
    db_dataset = Dataset(project_id=project_id, **dataset.dict())
    db.add(db_dataset)
    await db.commit()
    await db.refresh(db_dataset)
    await db.close()

    return db_dataset

create_resource(project_id, dataset_id, resource, user=Depends(current_active_user), db=Depends(get_async_db)) async

Add resource to an existing dataset

Source code in fractal_server/app/routes/api/v1/dataset.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
@router.post(
    "/project/{project_id}/dataset/{dataset_id}/resource/",
    response_model=ResourceReadV1,
    status_code=status.HTTP_201_CREATED,
)
async def create_resource(
    project_id: int,
    dataset_id: int,
    resource: ResourceCreateV1,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[ResourceReadV1]:
    """
    Add resource to an existing dataset
    """
    _raise_if_v1_is_read_only()
    output = await _get_dataset_check_owner(
        project_id=project_id,
        dataset_id=dataset_id,
        user_id=user.id,
        db=db,
    )
    dataset = output["dataset"]
    db_resource = Resource(dataset_id=dataset.id, **resource.dict())
    db.add(db_resource)
    await db.commit()
    await db.refresh(db_resource)
    await db.close()
    return db_resource

delete_dataset(project_id, dataset_id, user=Depends(current_active_user), db=Depends(get_async_db)) async

Delete a dataset associated to the current project

Source code in fractal_server/app/routes/api/v1/dataset.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@router.delete(
    "/project/{project_id}/dataset/{dataset_id}/",
    status_code=204,
)
async def delete_dataset(
    project_id: int,
    dataset_id: int,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Response:
    """
    Delete a dataset associated to the current project
    """
    _raise_if_v1_is_read_only()
    output = await _get_dataset_check_owner(
        project_id=project_id,
        dataset_id=dataset_id,
        user_id=user.id,
        db=db,
    )
    dataset = output["dataset"]

    # Fail if there exist jobs that are submitted and in relation with the
    # current dataset.
    stm = _get_submitted_jobs_statement().where(
        or_(
            ApplyWorkflow.input_dataset_id == dataset_id,
            ApplyWorkflow.output_dataset_id == dataset_id,
        )
    )
    res = await db.execute(stm)
    jobs = res.scalars().all()
    if jobs:
        string_ids = str([job.id for job in jobs])[1:-1]
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=(
                f"Cannot delete dataset {dataset.id} because it "
                f"is linked to active job(s) {string_ids}."
            ),
        )

    # Cascade operations: set foreign-keys to null for jobs which are in
    # relationship with the current dataset
    # input_dataset
    stm = select(ApplyWorkflow).where(
        ApplyWorkflow.input_dataset_id == dataset_id
    )
    res = await db.execute(stm)
    jobs = res.scalars().all()
    for job in jobs:
        job.input_dataset_id = None
        await db.merge(job)
    await db.commit()
    # output_dataset
    stm = select(ApplyWorkflow).where(
        ApplyWorkflow.output_dataset_id == dataset_id
    )
    res = await db.execute(stm)
    jobs = res.scalars().all()
    for job in jobs:
        job.output_dataset_id = None
        await db.merge(job)
    await db.commit()

    # Delete dataset
    await db.delete(dataset)
    await db.commit()

    return Response(status_code=status.HTTP_204_NO_CONTENT)

delete_resource(project_id, dataset_id, resource_id, user=Depends(current_active_user), db=Depends(get_async_db)) async

Delete a resource of a dataset

Source code in fractal_server/app/routes/api/v1/dataset.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
@router.delete(
    "/project/{project_id}/dataset/{dataset_id}/resource/{resource_id}/",
    status_code=204,
)
async def delete_resource(
    project_id: int,
    dataset_id: int,
    resource_id: int,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Response:
    """
    Delete a resource of a dataset
    """
    _raise_if_v1_is_read_only()
    # Get the dataset DB entry
    output = await _get_dataset_check_owner(
        project_id=project_id,
        dataset_id=dataset_id,
        user_id=user.id,
        db=db,
    )
    dataset = output["dataset"]
    resource = await db.get(Resource, resource_id)
    if not resource or resource.dataset_id != dataset.id:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="Resource does not exist or does not belong to dataset",
        )
    await db.delete(resource)
    await db.commit()
    await db.close()
    return Response(status_code=status.HTTP_204_NO_CONTENT)

export_history_as_workflow(project_id, dataset_id, user=Depends(current_active_user), db=Depends(get_async_db)) async

Extract a reproducible workflow from the dataset history.

Source code in fractal_server/app/routes/api/v1/dataset.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
@router.get(
    "/project/{project_id}/dataset/{dataset_id}/export_history/",
    response_model=WorkflowExportV1,
)
async def export_history_as_workflow(
    project_id: int,
    dataset_id: int,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[WorkflowExportV1]:
    """
    Extract a reproducible workflow from the dataset history.
    """
    # Get the dataset DB entry
    output = await _get_dataset_check_owner(
        project_id=project_id,
        dataset_id=dataset_id,
        user_id=user.id,
        db=db,
    )
    dataset = output["dataset"]

    # Check whether there exists a submitted job such that
    # `job.output_dataset_id==dataset_id`.
    # If at least one such job exists, then this endpoint will fail.
    # We do not support the use case of exporting a reproducible workflow when
    # job execution is in progress; this may change in the future.
    stm = _get_submitted_jobs_statement().where(
        ApplyWorkflow.output_dataset_id == dataset_id
    )
    res = await db.execute(stm)
    jobs = res.scalars().all()
    if jobs:
        string_ids = str([job.id for job in jobs])[1:-1]
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=(
                f"Cannot export history because dataset {dataset.id} "
                f"is linked to active job(s) {string_ids}."
            ),
        )

    # It such a job does not exist, continue with the endpoint. Note that this
    # means that the history in the DB is up-to-date.

    # Read history from DB
    history = dataset.history

    # Construct reproducible workflow
    task_list = []
    for history_item in history:
        wftask = history_item["workflowtask"]
        wftask_status = history_item["status"]
        if wftask_status == "done":
            task_list.append(WorkflowTaskExportV1(**wftask))

    def _slugify_dataset_name(_name: str) -> str:
        _new_name = _name
        for char in (" ", ".", "/", "\\"):
            _new_name = _new_name.replace(char, "_")
        return _new_name

    name = f"history_{_slugify_dataset_name(dataset.name)}"

    workflow = WorkflowExportV1(name=name, task_list=task_list)
    return workflow

get_resource_list(project_id, dataset_id, user=Depends(current_active_user), db=Depends(get_async_db)) async

Get resources from a dataset

Source code in fractal_server/app/routes/api/v1/dataset.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
@router.get(
    "/project/{project_id}/dataset/{dataset_id}/resource/",
    response_model=list[ResourceReadV1],
)
async def get_resource_list(
    project_id: int,
    dataset_id: int,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[list[ResourceReadV1]]:
    """
    Get resources from a dataset
    """
    await _get_dataset_check_owner(
        project_id=project_id,
        dataset_id=dataset_id,
        user_id=user.id,
        db=db,
    )
    stm = select(Resource).where(Resource.dataset_id == dataset_id)
    res = await db.execute(stm)
    resource_list = res.scalars().all()
    await db.close()
    return resource_list

get_user_datasets(history=True, user=Depends(current_active_user), db=Depends(get_async_db)) async

Returns all the datasets of the current user

Source code in fractal_server/app/routes/api/v1/dataset.py
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
@router.get("/dataset/", response_model=list[DatasetReadV1])
async def get_user_datasets(
    history: bool = True,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> list[DatasetReadV1]:
    """
    Returns all the datasets of the current user
    """
    stm = select(Dataset)
    stm = stm.join(Project).where(
        Project.user_list.any(UserOAuth.id == user.id)
    )
    res = await db.execute(stm)
    dataset_list = res.scalars().all()
    await db.close()
    if not history:
        for ds in dataset_list:
            setattr(ds, "history", [])
    return dataset_list

get_workflowtask_status(project_id, dataset_id, user=Depends(current_active_user), db=Depends(get_async_db)) async

Extract the status of all WorkflowTasks that ran on a given Dataset.

Source code in fractal_server/app/routes/api/v1/dataset.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
@router.get(
    "/project/{project_id}/dataset/{dataset_id}/status/",
    response_model=DatasetStatusReadV1,
)
async def get_workflowtask_status(
    project_id: int,
    dataset_id: int,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[DatasetStatusReadV1]:
    """
    Extract the status of all `WorkflowTask`s that ran on a given `Dataset`.
    """
    # Get the dataset DB entry
    output = await _get_dataset_check_owner(
        project_id=project_id,
        dataset_id=dataset_id,
        user_id=user.id,
        db=db,
    )
    dataset = output["dataset"]

    # Check whether there exists a job such that
    # 1. `job.output_dataset_id == dataset_id`, and
    # 2. `job.status` is either submitted or running.
    # If one such job exists, it will be used later. If there are multiple
    # jobs, raise an error.
    # Note: see
    # https://sqlmodel.tiangolo.com/tutorial/where/#type-annotations-and-errors
    # regarding the type-ignore in this code block
    stm = _get_submitted_jobs_statement().where(
        ApplyWorkflow.output_dataset_id == dataset_id
    )
    res = await db.execute(stm)
    running_jobs = res.scalars().all()
    if len(running_jobs) == 0:
        running_job = None
    elif len(running_jobs) == 1:
        running_job = running_jobs[0]
    else:
        string_ids = str([job.id for job in running_jobs])[1:-1]
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=(
                f"Cannot get WorkflowTask statuses as dataset {dataset.id} "
                f"is linked to multiple active jobs: {string_ids}"
            ),
        )

    # Initialize empty dictionary for workflowtasks status
    workflow_tasks_status_dict: dict = {}

    # Lowest priority: read status from DB, which corresponds to jobs that are
    # not running
    history = dataset.history
    for history_item in history:
        wftask_id = history_item["workflowtask"]["id"]
        wftask_status = history_item["status"]
        workflow_tasks_status_dict[wftask_id] = wftask_status

    # If a job is running, then gather more up-to-date information
    if running_job is not None:
        # Get the workflow DB entry
        running_workflow = await _get_workflow_check_owner(
            project_id=project_id,
            workflow_id=running_job.workflow_id,
            user_id=user.id,
            db=db,
        )
        # Mid priority: Set all WorkflowTask's that are part of the running job
        # as "submitted"
        start = running_job.first_task_index
        end = running_job.last_task_index + 1
        for wftask in running_workflow.task_list[start:end]:
            workflow_tasks_status_dict[wftask.id] = "submitted"

        # Highest priority: Read status updates coming from the running-job
        # temporary file. Note: this file only contains information on
        # WorkflowTask's that ran through successfully
        tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME
        try:
            with tmp_file.open("r") as f:
                history = json.load(f)
        except FileNotFoundError:
            history = []
        except JSONDecodeError:
            raise HTTPException(
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
                detail="History file does not include a valid JSON.",
            )

        for history_item in history:
            wftask_id = history_item["workflowtask"]["id"]
            wftask_status = history_item["status"]
            workflow_tasks_status_dict[wftask_id] = wftask_status

    response_body = DatasetStatusReadV1(status=workflow_tasks_status_dict)
    return response_body

read_dataset(project_id, dataset_id, user=Depends(current_active_user), db=Depends(get_async_db)) async

Get info on a dataset associated to the current project

Source code in fractal_server/app/routes/api/v1/dataset.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@router.get(
    "/project/{project_id}/dataset/{dataset_id}/",
    response_model=DatasetReadV1,
)
async def read_dataset(
    project_id: int,
    dataset_id: int,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[DatasetReadV1]:
    """
    Get info on a dataset associated to the current project
    """
    output = await _get_dataset_check_owner(
        project_id=project_id,
        dataset_id=dataset_id,
        user_id=user.id,
        db=db,
    )
    dataset = output["dataset"]
    await db.close()
    return dataset

read_dataset_list(project_id, history=True, user=Depends(current_active_user), db=Depends(get_async_db)) async

Get dataset list for given project

Source code in fractal_server/app/routes/api/v1/dataset.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@router.get(
    "/project/{project_id}/dataset/",
    response_model=list[DatasetReadV1],
)
async def read_dataset_list(
    project_id: int,
    history: bool = True,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[list[DatasetReadV1]]:
    """
    Get dataset list for given project
    """
    # Access control
    project = await _get_project_check_owner(
        project_id=project_id, user_id=user.id, db=db
    )
    # Find datasets of the current project. Note: this select/where approach
    # has much better scaling than refreshing all elements of
    # `project.dataset_list` - ref
    # https://github.com/fractal-analytics-platform/fractal-server/pull/1082#issuecomment-1856676097.
    stm = select(Dataset).where(Dataset.project_id == project.id)
    res = await db.execute(stm)
    dataset_list = res.scalars().all()
    await db.close()
    if not history:
        for ds in dataset_list:
            setattr(ds, "history", [])
    return dataset_list

update_dataset(project_id, dataset_id, dataset_update, user=Depends(current_active_user), db=Depends(get_async_db)) async

Edit a dataset associated to the current project

Source code in fractal_server/app/routes/api/v1/dataset.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@router.patch(
    "/project/{project_id}/dataset/{dataset_id}/",
    response_model=DatasetReadV1,
)
async def update_dataset(
    project_id: int,
    dataset_id: int,
    dataset_update: DatasetUpdateV1,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[DatasetReadV1]:
    """
    Edit a dataset associated to the current project
    """
    _raise_if_v1_is_read_only()
    if dataset_update.history is not None:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="Cannot modify dataset history.",
        )

    output = await _get_dataset_check_owner(
        project_id=project_id,
        dataset_id=dataset_id,
        user_id=user.id,
        db=db,
    )
    db_dataset = output["dataset"]

    for key, value in dataset_update.dict(exclude_unset=True).items():
        setattr(db_dataset, key, value)

    await db.commit()
    await db.refresh(db_dataset)
    await db.close()
    return db_dataset

update_resource(project_id, dataset_id, resource_id, resource_update, user=Depends(current_active_user), db=Depends(get_async_db)) async

Edit a resource of a dataset

Source code in fractal_server/app/routes/api/v1/dataset.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
@router.patch(
    "/project/{project_id}/dataset/{dataset_id}/resource/{resource_id}/",
    response_model=ResourceReadV1,
)
async def update_resource(
    project_id: int,
    dataset_id: int,
    resource_id: int,
    resource_update: ResourceUpdateV1,
    user: UserOAuth = Depends(current_active_user),
    db: AsyncSession = Depends(get_async_db),
) -> Optional[ResourceReadV1]:
    """
    Edit a resource of a dataset
    """
    _raise_if_v1_is_read_only()
    output = await _get_dataset_check_owner(
        project_id=project_id,
        dataset_id=dataset_id,
        user_id=user.id,
        db=db,
    )
    dataset = output["dataset"]
    orig_resource = await db.get(Resource, resource_id)

    if orig_resource not in dataset.resource_list:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=(
                f"Resource {resource_id} is not part of "
                f"dataset {dataset_id}"
            ),
        )

    for key, value in resource_update.dict(exclude_unset=True).items():
        setattr(orig_resource, key, value)
    await db.commit()
    await db.refresh(orig_resource)
    await db.close()
    return orig_resource