Skip to content

Datablocks

calculate_md5_checksum(filename, chunksize=2 ** 20)

Calculate an md5 hash of a file

Parameters:

Name Type Description Default
filename Path

absolute or relative path to file

required
chunksize int

default chunk size to calculate hash on. Defaults to 1024*1025.

2 ** 20

Returns:

Name Type Description
str str

hash as str

Source code in backend/archiver/utils/datablocks.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def calculate_md5_checksum(filename: Path, chunksize: int = 2**20) -> str:
    """Calculate an md5 hash of a file

    Args:
        filename (Path): absolute or relative path to file
        chunksize (int, optional): default chunk size to calculate hash on. Defaults to 1024*1025.

    Returns:
        str: hash as str
    """
    import hashlib

    m = hashlib.md5()
    with open(filename, "rb") as f:
        while chunk := f.read(chunksize):
            m.update(chunk)
    return m.hexdigest()

copy_file_to_folder(src_file, dst_folder)

Copies a file to a destination folder (does not need to exist)

Parameters:

Name Type Description Default
src_file Path

Source file

required
dst_folder Path

destination folder - needs to exist

required

Raises:

Type Description
SystemError

raises if operation fails

Source code in backend/archiver/utils/datablocks.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
@log
def copy_file_to_folder(src_file: Path, dst_folder: Path):
    """Copies a file to a destination folder (does not need to exist)

    Args:
        src_file (Path): Source file
        dst_folder (Path): destination folder - needs to exist

    Raises:
        SystemError: raises if operation fails
    """
    if not src_file.exists() or not src_file.is_file():
        raise SystemError(f"Source file {src_file} is not a file or does not exist")
    if dst_folder.is_file():
        raise SystemError(f"Destination folder {dst_folder} is not a folder")

    getLogger().info(f"Start Copy operation. src:{src_file}, dst{dst_folder}")

    with subprocess.Popen(
        ["rsync",
         "-rcvh",  # r: recursive, c: checksum, v: verbose, h: human readable format
         "--stats",  # file transfer stats
         "--no-perms",  # don't preserve the file permissions of the source files
         "--no-owner",  # don't preserve the owner
         "--no-group",  # don't preserve the group ownership
         "--mkpath", str(src_file), str(dst_folder)],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True,
    ) as popen:
        for line in popen.stdout:
            getLogger().info(line)

        popen.stdout.close()
        return_code = popen.wait()
        getLogger().info(f"Finished with return code : {return_code}")

        expected_dst_file = dst_folder / src_file.name

    if not expected_dst_file.exists():
        raise SystemError(f"Copying did not produce file {expected_dst_file}")

create_datablock_entries(dataset_id, folder, origDataBlocks, tar_infos, progress_callback=None)

Create datablock entries compliant with schema provided by scicat

Parameters:

Name Type Description Default
dataset_id str

Dataset identifier

required
folder Path

description

required
origDataBlocks List[OrigDataBlock]

description

required
tarballs List[Path]

description

required

Returns:

Type Description
List[DataBlock]

List[DataBlock]: description

Source code in backend/archiver/utils/datablocks.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
@log
def create_datablock_entries(
    dataset_id: str,
    folder: Path,
    origDataBlocks: List[OrigDataBlock],
    tar_infos: List[ArchiveInfo],
    progress_callback: Callable[[float], None] = None
) -> List[DataBlock]:
    """Create datablock entries compliant with schema provided by scicat

    Args:
        dataset_id (str): Dataset identifier
        folder (Path): _description_
        origDataBlocks (List[OrigDataBlock]): _description_
        tarballs (List[Path]): _description_

    Returns:
        List[DataBlock]: _description_
    """

    version = 1.0

    total_file_count = 0
    for b in origDataBlocks:
        total_file_count += len(b.dataFileList)

    file_count = 0

    datablocks: List[DataBlock] = []

    for idx, tar in enumerate(tar_infos):
        # TODO: is it necessary to use any datablock information?
        o = origDataBlocks[0]

        data_file_list: List[DataFile] = []

        tar_path = folder / tar.path

        tarball = tarfile.open(tar_path)

        def create_datafile_list_entry(tar_info: tarfile.TarInfo) -> DataFile:
            checksum = calculate_md5_checksum(
                StoragePaths.scratch_archival_raw_files_folder(dataset_id) / tar_info.path
            )

            return DataFile(
                path=tar_info.path,
                size=tar_info.size,
                chk=checksum,
                uid=str(tar_info.uid),
                gid=str(tar_info.gid),
                perm=str(tar_info.mode),
                time=str(datetime.datetime.now(datetime.UTC).isoformat()),
            )

        with ThreadPoolExecutor(max_workers=Variables().ARCHIVER_NUM_WORKERS) as executor:
            future_to_key = {executor.submit(create_datafile_list_entry, tar_info): tar_info for tar_info in tarball.getmembers()}

            for future in as_completed(future_to_key):
                exception = future.exception()

                if not exception:
                    data_file_list.append(future.result())
                    file_count += 1
                    if progress_callback:
                        progress_callback(file_count / total_file_count)
                else:
                    raise exception

        datablocks.append(
            DataBlock(
                archiveId=str(StoragePaths.relative_datablocks_folder(dataset_id) / tar_path.name),
                size=tar.unpackedSize,
                packedSize=tar.packedSize,
                chkAlg="md5",
                version=str(version),
                dataFileList=data_file_list,
                rawDatasetId=o.rawdatasetId,
                derivedDatasetId=o.derivedDatasetId,
            )
        )

    return datablocks

create_tarfiles(dataset_id, src_folder, dst_folder, target_size, progress_callback=None)

Create datablocks, i.e. .tar.gz files, from all files in a folder. Folder structures are kept and symlnks not resolved. The created tar files will be named according to the dataset they belong to.

Parameters:

Name Type Description Default
dataset_id str

dataset identifier

required
src_folder Path

source folder to find files to create tars from

required
dst_folder Path

destination folder to write the tar files to

required
target_size int

Target size of the tar file. This is the unpacked size of the files.

required

Returns:

Type Description
List[ArchiveInfo]

List[Path]: description

Source code in backend/archiver/utils/datablocks.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@log_debug
def create_tarfiles(
    dataset_id: str,
    src_folder: Path,
    dst_folder: Path,
    target_size: int,
    progress_callback: Callable[[float], None] = None
) -> List[ArchiveInfo]:
    """Create datablocks, i.e. .tar.gz files, from all files in a folder. Folder structures are kept and symlnks not resolved.
    The created tar files will be named according to the dataset they belong to.

    Args:
        dataset_id (str): dataset identifier
        src_folder (Path): source folder to find files to create tars from
        dst_folder (Path): destination folder to write the tar files to
        target_size (int, optional): Target size of the tar file. This is the unpacked size of the files.

    Returns:
        List[Path]: _description_
    """

    # TODO: corner case: target size < file size
    tarballs: List[ArchiveInfo] = []
    tar_name = dataset_id.replace("/", "-")

    if not any(Path(src_folder).iterdir()):
        raise SystemError(f"Empty folder {src_folder} found.")

    total_file_count = count_files(src_folder)
    current_file_count = 0

    def create_tar(idx: int, files: List) -> ArchiveInfo:
        current_tar_info = ArchiveInfo(
            unpackedSize=0,
            packedSize=0,
            path=Path(dst_folder / Path(f"{tar_name}_{idx}.tar.gz")),
            fileCount=len(files)
        )
        current_tarfile: tarfile.TarFile = tarfile.open(current_tar_info.path, "w")
        for relative_file_path in files:
            full_path = src_folder.joinpath(relative_file_path)
            current_tar_info.unpackedSize += full_path.stat().st_size
            current_tarfile.add(name=full_path, arcname=relative_file_path)

        current_tarfile.close()
        current_tar_info.packedSize = current_tar_info.path.stat().st_size
        return current_tar_info

    with ThreadPoolExecutor(max_workers=Variables().ARCHIVER_NUM_WORKERS) as executor:
        future_to_key = {executor.submit(create_tar, idx, files): (idx, files)
                         for (idx, files) in partition_files_flat(src_folder, target_size)}
        for future in as_completed(future_to_key):
            exception = future.exception()

            if not exception:
                archive_info = future.result()
                tarballs.append(archive_info)
                if progress_callback:
                    current_file_count += archive_info.fileCount
                    progress_callback(current_file_count / total_file_count)
            else:
                raise exception

    return tarballs

download_object_from_s3(client, bucket, folder, object_name, target_path)

Download an object from S3 storage.

Parameters:

Name Type Description Default
bucket Bucket

Bucket to look for file

required
folder Path

s3 prefix for object

required
object_name str

object name, no prefix

required
target_path Path

absolute or relative path for the file to be created

required
Source code in backend/archiver/utils/datablocks.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@log_debug
def download_object_from_s3(
    client: S3Storage, bucket: Bucket, folder: Path, object_name: str, target_path: Path
):
    """Download an object from S3 storage.

    Args:
        bucket (Bucket): Bucket to look for file
        folder (Path): s3 prefix for object
        object_name (str): object name, no prefix
        target_path (Path): absolute or relative path for the file to be created
    """
    client.fget_object(
        bucket=bucket,
        folder=str(folder),
        object_name=object_name,
        target_path=target_path,
    )

download_objects_from_s3(client, prefix, bucket, destination_folder, progress_callback)

Download objects form s3 storage to folder

Parameters:

Name Type Description Default
prefix Path

S3 prefix

required
bucket Bucket

s3 bucket

required
destination_folder Path

Target folder. Will be created if it does not exist.

required

Returns:

Type Description
List[Path]

List[Path]: List of paths of created files

Source code in backend/archiver/utils/datablocks.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@log
def download_objects_from_s3(
    client: S3Storage, prefix: Path, bucket: Bucket, destination_folder: Path, progress_callback
) -> List[Path]:
    """Download objects form s3 storage to folder

    Args:
        prefix (Path): S3 prefix
        bucket (Bucket): s3 bucket
        destination_folder (Path): Target folder. Will be created if it does not exist.

    Returns:
        List[Path]: List of paths of created files
    """
    destination_folder.mkdir(parents=True, exist_ok=True)

    files = client.download_objects(minio_prefix=prefix, bucket=bucket,
                                    destination_folder=destination_folder, progress_callback=progress_callback)

    if len(files) == 0:
        raise SystemError(f"No files found in bucket {bucket.name} at {prefix}")

    return files

list_datablocks(client, prefix, bucket)

List all objects in s3 bucket and path

Parameters:

Name Type Description Default
minio_prefix Path

prefix for files to be listed

required
bucket Bucket

s3 bucket

required

Returns:

Name Type Description
_type_ List[ListedObject]

Iterator to objects

Source code in backend/archiver/utils/datablocks.py
200
201
202
203
204
205
206
207
208
209
210
211
@log
def list_datablocks(client: S3Storage, prefix: Path, bucket: Bucket) -> List[S3Storage.ListedObject]:
    """List all objects in s3 bucket and path

    Args:
        minio_prefix (Path): prefix for files to be listed
        bucket (Bucket): s3 bucket

    Returns:
        _type_: Iterator to objects
    """
    return client.list_objects(bucket, str(prefix))

partition_files_flat(folder, target_size_bytes)

Partitions files in folder into groups such that all the files in a group combined have a target_size_bytes size at maximum. Folders are not treated recursively

Parameters:

Name Type Description Default
folder Path

Folder to partition files in

required
target_size_bytes int

maximum size of grouped files

required

Yields:

Type Description
List[Path]

Generator[List[Path], None, None]: List of paths with maximum size

Source code in backend/archiver/utils/datablocks.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def partition_files_flat(folder: Path, target_size_bytes: int) -> Generator[List[Path], None, None]:
    """Partitions files in folder into groups such that all the files in a group combined
    have a target_size_bytes size at maximum. Folders are not treated recursively

    Args:
        folder (Path): Folder to partition files in
        target_size_bytes (int): maximum size of grouped files

    Yields:
        Generator[List[Path], None, None]: List of paths with maximum size
    """

    if not folder.is_dir():
        yield None

    part: List[Path] = []
    size = 0
    idx = 0
    for dirpath, dirnames, filenames in os.walk(folder):
        for filename in filenames:
            filepath = Path(os.path.join(dirpath, filename))
            if size + os.path.getsize(filepath) > target_size_bytes:
                yield (idx, part)
                part = []
                size = 0
                idx = idx + 1
            part.append(filepath.relative_to(folder))
            size = size + os.path.getsize(filepath)

    yield (idx, part)

sufficient_free_space_on_lts()

Checks for free space on configured LTS storage with respect to configured free space percentage.

Returns:

Name Type Description
boolean

condition of eneough free space satisfied

Source code in backend/archiver/utils/datablocks.py
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
@log
def sufficient_free_space_on_lts():
    """Checks for free space on configured LTS storage with respect to configured free space percentage.

    Returns:
        boolean: condition of eneough free space satisfied
    """

    path = Variables().LTS_STORAGE_ROOT
    stat = shutil.disk_usage(path)
    free_percentage = 100.0 * stat.free / stat.total
    getLogger().info(
        f"LTS free space:{free_percentage:.2}%, expected: {Variables().LTS_FREE_SPACE_PERCENTAGE:.2}%"
    )
    return free_percentage >= Variables().LTS_FREE_SPACE_PERCENTAGE

wait_for_file_accessible(file, timeout_s=360) async

Returns:

Source code in backend/archiver/utils/datablocks.py
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
@log
async def wait_for_file_accessible(file: Path, timeout_s=360):
    """
    Returns:
    """
    total_time_waited_s = 0
    while not os.access(path=file, mode=os.R_OK):
        seconds_to_wait = 30
        getLogger().info(f"File {file} currently not available. Trying again in {seconds_to_wait} seconds.")
        await asyncio.sleep(seconds_to_wait)
        total_time_waited_s += seconds_to_wait
        if total_time_waited_s > timeout_s:
            raise SystemError(f"File f{file} was not accessible within {timeout_s} seconds")

    getLogger().info(f"File {file} accessible.")

    return True

wait_for_free_space() async

Asynchronous wait until there is enough free space. Waits in linear intervals to check for free space

TODO: add exponential backoff for waiting time

Returns:

Name Type Description
boolean

Returns True once there is enough free space

Source code in backend/archiver/utils/datablocks.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
@log
async def wait_for_free_space():
    """Asynchronous wait until there is enough free space. Waits in linear intervals to check for free space

        TODO: add exponential backoff for waiting time

    Returns:
        boolean: Returns True once there is enough free space
    """
    while not sufficient_free_space_on_lts():
        seconds_to_wait = 30
        getLogger().info(f"Not enough free space. Waiting for {seconds_to_wait}s")
        await asyncio.sleep(seconds_to_wait)

    return True