Skip to content

Archive datasets flow

archive_datablock(dataset_id, datablock)

Prefect task to move a datablock (.tar file) to the LTS. Concurrency of this task is limited to 2 instances at the same time.

Source code in backend/archiver/flows/archive_datasets_flow.py
206
207
208
209
210
211
@task(task_run_name=generate_task_name_dataset, tags=[ConcurrencyLimits().LTS_WRITE_TAG])
def archive_datablock(dataset_id: str, datablock: DataBlock):
    """Prefect task to move a datablock (.tar file) to the LTS. Concurrency of this task is limited to 2 instances
    at the same time.
    """
    datablocks_operations.archive_datablock(dataset_id, datablock)

archive_datablocks_with_storage_protect(dataset_id, datablocks)

Prefect (sub-)flow to move a datablock to the LTS. Implements the copying of data and verification via checksum.

Parameters:

Name Type Description Default
dataset_id str

description

required
datablock DataBlock

description

required
Source code in backend/archiver/flows/archive_datasets_flow.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
@flow(
    name="archive_datablocks_with_storage_protect",
    log_prints=True,
    flow_run_name=generate_subflow_run_name_job_id_dataset_id,
)
def archive_datablocks_with_storage_protect(dataset_id: str, datablocks: List[DataBlock]):
    """Prefect (sub-)flow to move a datablock to the LTS. Implements the copying of data and verification via checksum.

    Args:
        dataset_id (str): _description_
        datablock (DataBlock): _description_
    """
    tasks = []
    all_tasks = []

    for datablock in datablocks:
        move = archive_datablock.submit(dataset_id=dataset_id, datablock=datablock)  # type: ignore
        all_tasks.append(move)

    wait_for_futures(tasks)

    # this is necessary to propagate the errors of the tasks
    for t in all_tasks:
        t.result()

archive_datasets_flow(job_id, dataset_ids=None)

Prefect flow to archive a list of datasets. Corresponds to a "Job" in Scicat. Runs the individual archivals of the single datasets as subflows and reports the overall job status to Scicat.

Parameters:

Name Type Description Default
dataset_ids List[str]

description

None
job_id UUID

description

required

Raises:

Type Description
e

description

Source code in backend/archiver/flows/archive_datasets_flow.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
@flow(
    name="archive_datasetlist",
    log_prints=True,
    flow_run_name=generate_flow_name_job_id,
    on_failure=[on_job_flow_failure],
    on_cancellation=[on_job_flow_cancellation],
)
def archive_datasets_flow(job_id: UUID, dataset_ids: List[str] | None = None):
    """Prefect flow to archive a list of datasets. Corresponds to a "Job" in Scicat. Runs the individual archivals of the single datasets as subflows and reports
    the overall job status to Scicat.

    Args:
        dataset_ids (List[str]): _description_
        job_id (UUID): _description_

    Raises:
        e: _description_
    """
    dataset_ids: List[str] = dataset_ids or []
    access_token = get_scicat_access_token.submit()

    job_update = update_scicat_archival_job_status.submit(
        job_id=job_id,
        status_message=SciCatClient.STATUSMESSAGE.IN_PROGRESS,
        token=access_token,
    )
    job_update.result()

    dataset_ids_future = get_job_datasetlist.submit(job_id=job_id, token=access_token)
    dataset_ids = dataset_ids_future.result()

    for id in dataset_ids:
        archive_single_dataset_flow(dataset_id=id)

    access_token = get_scicat_access_token.submit()

    update_scicat_archival_job_status.submit(
        job_id=job_id,
        status_message=SciCatClient.STATUSMESSAGE.FINISHED_SUCCESSFULLY,
        token=access_token,
    ).result()

create_datablocks_flow(dataset_id)

Prefect (sub-)flow to create datablocks (.tar files) for files of a dataset and register them in Scicat.

Parameters:

Name Type Description Default
dataset_id str

Dataset id

required

Returns:

Type Description
List[DataBlock]

List[DataBlock]: List of created and registered datablocks

Source code in backend/archiver/flows/archive_datasets_flow.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
@flow(name="create_datablocks", flow_run_name=generate_subflow_run_name_job_id_dataset_id)
def create_datablocks_flow(dataset_id: str) -> List[DataBlock]:
    """Prefect (sub-)flow to create datablocks (.tar files) for files of a dataset and register them in Scicat.

    Args:
        dataset_id (str): Dataset id

    Returns:
        List[DataBlock]: List of created and registered datablocks
    """

    scicat_token = get_scicat_access_token.submit()

    dataset_update = update_scicat_archival_dataset_lifecycle.submit(
        dataset_id=dataset_id,
        status=SciCatClient.ARCHIVESTATUSMESSAGE.STARTED,
        token=scicat_token,
    )

    orig_datablocks = get_origdatablocks.with_options(
        on_failure=[partial(on_get_origdatablocks_error, dataset_id)]
    ).submit(dataset_id=dataset_id, token=scicat_token, wait_for=[dataset_update])  # type: ignore

    files = download_origdatablocks.submit(dataset_id=dataset_id, origDataBlocks=orig_datablocks)

    tarfiles_future = create_tarfiles.submit(dataset_id, wait_for=[files])
    datablocks_future = create_datablock_entries.submit(dataset_id, orig_datablocks, tarfiles_future)

    # Prefect issue: https://github.com/PrefectHQ/prefect/issues/12028
    # Exceptions are not propagated correctly
    files.result()
    tarfiles_future.result()
    datablocks_future.result()

    scicat_token = get_scicat_access_token.submit(wait_for=[datablocks_future])

    register_future = register_datablocks.submit(
        datablocks=datablocks_future,  # type: ignore
        dataset_id=dataset_id,
        token=scicat_token,
    )

    register_future.result()

    return datablocks_future

on_get_origdatablocks_error(dataset_id, task, task_run, state)

Callback for get_origdatablocks tasks. Reports a user error.

Source code in backend/archiver/flows/archive_datasets_flow.py
44
45
46
47
def on_get_origdatablocks_error(dataset_id: str, task: Task, task_run: TaskRun, state: State):
    """Callback for get_origdatablocks tasks. Reports a user error."""
    scicat_token = get_scicat_access_token()
    report_dataset_user_error(dataset_id, token=scicat_token)

verify_datablock_in_verification(dataset_id, datablock)

Prefect Task to verify a datablock in the LTS against a checksum. Task of this type run with no concurrency since the LTS does only allow limited concurrent access.

Source code in backend/archiver/flows/archive_datasets_flow.py
221
222
223
224
225
226
@task(task_run_name=generate_task_name_dataset)
def verify_datablock_in_verification(dataset_id: str, datablock: DataBlock) -> None:
    """Prefect Task to verify a datablock in the LTS against a checksum. Task of this type run with no concurrency since the LTS
    does only allow limited concurrent access.
    """
    datablocks_operations.verify_datablock_in_verification(dataset_id=dataset_id, datablock=datablock)