Skip to content

Archive datasets flow

archive_datasets_flow(job_id, dataset_ids=None)

Prefect flow to archive a list of datasets. Corresponds to a "Job" in Scicat. Runs the individual archivals of the single datasets as subflows and reports the overall job status to Scicat.

Parameters:

Name Type Description Default
dataset_ids List[str]

description

None
job_id UUID

description

required

Raises:

Type Description
e

description

Source code in backend/archiver/flows/archive_datasets_flow.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
@flow(
    name="archive_datasetlist",
    log_prints=True,
    flow_run_name=generate_flow_name_job_id,
    on_failure=[on_job_flow_failure],
    on_cancellation=[on_job_flow_cancellation],
)
def archive_datasets_flow(job_id: UUID, dataset_ids: List[str] | None = None):
    """Prefect flow to archive a list of datasets. Corresponds to a "Job" in Scicat. Runs the individual archivals of the single datasets as subflows and reports
    the overall job status to Scicat.

    Args:
        dataset_ids (List[str]): _description_
        job_id (UUID): _description_

    Raises:
        e: _description_
    """
    dataset_ids: List[str] = dataset_ids or []
    access_token = get_scicat_access_token.submit()

    job_update = update_scicat_archival_job_status.submit(
        job_id=job_id,
        status_code=SciCatClient.JOBSTATUSCODE.IN_PROGRESS,
        status_message=SciCatClient.JOBSTATUSMESSAGE.JOB_IN_PROGRESS,
        token=access_token,
    )
    job_update.result()

    dataset_ids_future = get_job_datasetlist.submit(job_id=job_id, token=access_token)
    dataset_ids = dataset_ids_future.result()

    for id in dataset_ids:
        archive_single_dataset_flow(dataset_id=id)

    access_token = get_scicat_access_token.submit()

    update_scicat_archival_job_status.submit(
        job_id=job_id,
        status_code=SciCatClient.JOBSTATUSCODE.FINISHED_SUCCESSFULLY,
        status_message=SciCatClient.JOBSTATUSMESSAGE.JOB_FINISHED,
        token=access_token,
    ).result()

check_free_space_in_LTS()

Prefect task to wait for free space in the LTS. Checks periodically if the condition for enough free space is fulfilled. Only one of these task runs at time; the others are only scheduled once this task has finished, i.e. there is enough space.

Source code in backend/archiver/flows/archive_datasets_flow.py
234
235
236
237
238
239
240
@task(tags=[ConcurrencyLimits().LTS_FREE_TAG])
def check_free_space_in_LTS():
    """Prefect task to wait for free space in the LTS. Checks periodically if the condition for enough
    free space is fulfilled. Only one of these task runs at time; the others are only scheduled once this task
    has finished, i.e. there is enough space.
    """
    asyncio.run(wait_for_free_space())

copy_datablock_from_LTS(dataset_id, datablock)

Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances at the same time.

Source code in backend/archiver/flows/archive_datasets_flow.py
256
257
258
259
260
261
262
263
264
@task(task_run_name=generate_task_name_dataset,
      tags=[ConcurrencyLimits().LTS_READ_TAG],
      retries=5,
      retry_delay_seconds=[60, 120, 240, 480, 960])
def copy_datablock_from_LTS(dataset_id: str, datablock: DataBlock):
    """Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances
    at the same time.
    """
    datablocks_operations.copy_file_from_LTS(dataset_id, datablock)

create_datablocks_flow(dataset_id)

Prefect (sub-)flow to create datablocks (.tar.gz files) for files of a dataset and register them in Scicat.

Parameters:

Name Type Description Default
dataset_id str

Dataset id

required

Returns:

Type Description
List[DataBlock]

List[DataBlock]: List of created and registered datablocks

Source code in backend/archiver/flows/archive_datasets_flow.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
@flow(name="create_datablocks", flow_run_name=generate_subflow_run_name_job_id_dataset_id)
def create_datablocks_flow(dataset_id: str) -> List[DataBlock]:
    """Prefect (sub-)flow to create datablocks (.tar.gz files) for files of a dataset and register them in Scicat.

    Args:
        dataset_id (str): Dataset id

    Returns:
        List[DataBlock]: List of created and registered datablocks
    """

    scicat_token = get_scicat_access_token.submit()

    dataset_update = update_scicat_archival_dataset_lifecycle.submit(
        dataset_id=dataset_id,
        status=SciCatClient.ARCHIVESTATUSMESSAGE.STARTED,
        token=scicat_token,
    )

    orig_datablocks = get_origdatablocks.with_options(
        on_failure=[partial(on_get_origdatablocks_error, dataset_id)]
    ).submit(dataset_id=dataset_id, token=scicat_token, wait_for=[dataset_update])  # type: ignore

    files = download_origdatablocks.submit(dataset_id=dataset_id, origDataBlocks=orig_datablocks)

    tarfiles_future = create_tarfiles.submit(dataset_id, wait_for=[files])
    datablocks_future = create_datablock_entries.submit(dataset_id, orig_datablocks, tarfiles_future)

    # Prefect issue: https://github.com/PrefectHQ/prefect/issues/12028
    # Exceptions are not propagated correctly
    files.result()
    tarfiles_future.result()
    datablocks_future.result()

    scicat_token = get_scicat_access_token.submit(wait_for=[datablocks_future])

    register_future = register_datablocks.submit(
        datablocks=datablocks_future,  # type: ignore
        dataset_id=dataset_id,
        token=scicat_token,
    )

    register_future.result()

    return datablocks_future

move_data_to_LTS(dataset_id, datablock)

Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances at the same time.

Source code in backend/archiver/flows/archive_datasets_flow.py
248
249
250
251
252
253
@task(task_run_name=generate_task_name_dataset, tags=[ConcurrencyLimits().LTS_WRITE_TAG])
def move_data_to_LTS(dataset_id: str, datablock: DataBlock):
    """Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances
    at the same time.
    """
    datablocks_operations.move_data_to_LTS(dataset_id, datablock)

move_datablocks_to_lts_flow(dataset_id, datablocks)

Prefect (sub-)flow to move a datablock to the LTS. Implements the copying of data and verification via checksum.

Parameters:

Name Type Description Default
dataset_id str

description

required
datablock DataBlock

description

required
Source code in backend/archiver/flows/archive_datasets_flow.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
@flow(
    name="move_datablocks_to_lts",
    log_prints=True,
    flow_run_name=generate_subflow_run_name_job_id_dataset_id,
)
def move_datablocks_to_lts_flow(dataset_id: str, datablocks: List[DataBlock]):
    """Prefect (sub-)flow to move a datablock to the LTS. Implements the copying of data and verification via checksum.

    Args:
        dataset_id (str): _description_
        datablock (DataBlock): _description_
    """
    tasks = []
    all_tasks = []

    for datablock in datablocks:

        checksum = calculate_checksum.submit(dataset_id=dataset_id, datablock=datablock)
        free_space = check_free_space_in_LTS.submit(wait_for=[checksum])

        move = move_data_to_LTS.submit(dataset_id=dataset_id, datablock=datablock, wait_for=[free_space])  # type: ignore

        getLogger().info(f"Wait {Variables().ARCHIVER_LTS_WAIT_BEFORE_VERIFY_S}s before verifying datablock")
        sleep = sleep_for.submit(Variables().ARCHIVER_LTS_WAIT_BEFORE_VERIFY_S, wait_for=[move])

        copy = copy_datablock_from_LTS.submit(dataset_id=dataset_id, datablock=datablock, wait_for=[sleep])

        checksum_verification = verify_checksum.submit(
            dataset_id=dataset_id, datablock=datablock, checksum=checksum, wait_for=[copy]
        )

        w = verify_datablock_in_verification.submit(
            dataset_id=dataset_id, datablock=datablock, wait_for=[checksum_verification]
        )  # type: ignore
        tasks.append(w)

        all_tasks.append(free_space)
        all_tasks.append(checksum)
        all_tasks.append(move)
        all_tasks.append(sleep)
        all_tasks.append(copy)
        all_tasks.append(checksum_verification)
        all_tasks.append(w)

    wait_for_futures(tasks)

    # this is necessary to propagate the errors of the tasks
    for t in all_tasks:
        t.result()

on_get_origdatablocks_error(dataset_id, task, task_run, state)

Callback for get_origdatablocks tasks. Reports a user error.

Source code in backend/archiver/flows/archive_datasets_flow.py
50
51
52
53
def on_get_origdatablocks_error(dataset_id: str, task: Task, task_run: TaskRun, state: State):
    """Callback for get_origdatablocks tasks. Reports a user error."""
    scicat_token = get_scicat_access_token()
    report_dataset_user_error(dataset_id, token=scicat_token)

sleep_for(time_in_seconds)

Sleeps for a given amount of time. Required to wait for the LTS to update its internal state. Needs to be blocking as it should prevent the following task to run.

Source code in backend/archiver/flows/archive_datasets_flow.py
226
227
228
229
230
231
@task(task_run_name=generate_sleep_for_task_name)
def sleep_for(time_in_seconds: int):
    """ Sleeps for a given amount of time. Required to wait for the LTS to update its internal state.
        Needs to be blocking as it should prevent the following task to run.
    """
    time.sleep(time_in_seconds)

verify_datablock_in_verification(dataset_id, datablock)

Prefect Task to verify a datablock in the LTS against a checksum. Task of this type run with no concurrency since the LTS does only allow limited concurrent access.

Source code in backend/archiver/flows/archive_datasets_flow.py
276
277
278
279
280
281
@task(task_run_name=generate_task_name_dataset)
def verify_datablock_in_verification(dataset_id: str, datablock: DataBlock) -> None:
    """Prefect Task to verify a datablock in the LTS against a checksum. Task of this type run with no concurrency since the LTS
    does only allow limited concurrent access.
    """
    datablocks_operations.verify_datablock_in_verification(dataset_id=dataset_id, datablock=datablock)