Source code for zcollection.merging

# Copyright (c) 2023 CNES
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Handle merging of datasets of a partition.
==========================================
"""
from __future__ import annotations

from typing import Protocol
import random
import shutil

import fsspec
import fsspec.implementations.local
import zarr.storage

from .. import dataset, storage, sync
from .time_series import merge_time_series

__all__ = ('MergeCallable', 'perform', 'merge_time_series')

#: Character set used to create a temporary directory.
CHARACTERS = 'abcdefghijklmnopqrstuvwxyz0123456789_'


#: pylint: disable=too-few-public-methods,duplicate-code
[docs] class MergeCallable(Protocol): """Protocol to merge datasets stored in a partition. A merge callable is a function that accepts an existing dataset present in a partition, a new dataset to merge, the partitioning dimension and the axis to merge on. It returns the merged dataset. """
[docs] def __call__( self, existing_ds: dataset.Dataset, inserted_ds: dataset.Dataset, axis: str, partitioning_dim: str, **kwargs, ) -> dataset.Dataset: # pylint: disable=duplicate-code """Call the partition function. Args: existing_ds: The existing dataset. inserted_ds: The inserted dataset. axis: The axis to merge on. partitioning_dim: The partitioning dimension. **kwargs: Additional keyword arguments. Returns: The merged dataset. """ # pylint: disable=unnecessary-ellipsis # Ellipsis is necessary to make the function signature match the # protocol. ... # pragma: no cover
# pylint: enable=unnecessary-ellipsis #: pylint: enable=too-few-public-methods,duplicate-code def _rename( fs: fsspec.AbstractFileSystem, source: str, dest: str, ) -> None: """Rename a directory on a file system. Args: fs: The file system. source: The source directory. dest: The destination directory. """ if isinstance(fs, fsspec.implementations.local.LocalFileSystem): # fspec implementation of the local file system, copy the source # directory to the destination directory and remove the source # directory. This is not efficient. So we use the shutil # implementation to rename the directory. shutil.rmtree(dest, ignore_errors=True) shutil.move(source, dest) return fs.rm(dest, recursive=True) fs.mv(source, dest, recursive=True) def _update_fs( dirname: str, zds: dataset.Dataset, fs: fsspec.AbstractFileSystem, *, synchronizer: sync.Sync | None = None, ) -> None: """Updates a dataset stored in a partition. Args: dirname: The name of the partition. zds: The dataset to update. fs: The file system that the partition is stored on. synchronizer: The instance handling access to critical resources. """ # Name of the temporary directory. temp: str = dirname + '.' + ''.join( random.choice(CHARACTERS) for _ in range(10)) # Initializing Zarr group zarr.storage.init_group(store=fs.get_mapper(temp)) # Writing new data. try: # The synchronization is done by the caller. storage.write_zarr_group(zds, temp, fs, synchronizer or sync.NoSync()) except Exception: # The "write_zarr_group" method throws the exception if all scheduled # tasks are finished. So here we can delete the temporary directory. fs.rm(temp, recursive=True) raise # Rename the existing entry on the file system _rename(fs, temp, dirname)
[docs] def perform( ds_inserted: dataset.Dataset, dirname: str, axis: str, fs: fsspec.AbstractFileSystem, partitioning_dim: str, *, delayed: bool = True, merge_callable: MergeCallable | None, synchronizer: sync.Sync | None = None, **kwargs, ) -> None: """Merges a new dataset with an existing partition. Args: ds_inserted: The dataset to merge. dirname: The name of the partition. axis: The axis to merge on. fs: The file system on which the partition is stored. partitioning_dim: The partitioning dimension. delayed: If True, the existing dataset is loaded lazily. Defaults to True. merge_callable: The merge callable. If None, the inserted dataset overwrites the existing dataset stored in the partition. Defaults to None. synchronizer: The instance handling access to critical resources. Defaults to None. **kwargs: Additional keyword arguments are passed through to the merge callable. """ zds: dataset.Dataset = merge_callable( storage.open_zarr_group( dirname, fs, delayed=delayed), ds_inserted, axis, partitioning_dim, **kwargs) if merge_callable is not None else ds_inserted _update_fs(dirname, zds, fs, synchronizer=synchronizer)