Source code for zcollection.merging.time_series

# Copyright (c) 2023 CNES
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Merging a time series
=====================
"""
import numpy

from . import period
from .. import dataset
from ..type_hints import NDArray


def _merge_time_series(
    existing_ds: dataset.Dataset,
    inserted_ds: dataset.Dataset,
    axis: str,
    partitioning_dim: str,
) -> dataset.Dataset:
    """Merge two time series together.

    See :func:`merge_time_series` for
    details.
    """
    existing_axis: NDArray = existing_ds.variables[axis].values
    inserted_axis: NDArray = inserted_ds.variables[axis].values
    existing_period = period.Period(existing_axis.min(),
                                    existing_axis.max(),
                                    within=True)
    inserted_period = period.Period(inserted_axis.min(),
                                    inserted_axis.max(),
                                    within=True)

    relation: period.PeriodRelation = inserted_period.get_relation(
        existing_period)

    # The new piece is located before the existing data.
    if relation.is_before():
        return inserted_ds.concat(existing_ds, partitioning_dim)

    # The new piece is located after the existing data.
    if relation.is_after():
        return existing_ds.concat(inserted_ds, partitioning_dim)

    # The new piece replace the old one
    if relation.contains():
        return inserted_ds

    intersection: period.Period = inserted_period.intersection(existing_period)

    # The new piece is located before, but there is an overlap
    # between the two datasets.
    if relation.is_before_overlapping():
        # pylint: disable=comparison-with-callable
        indices = numpy.where(
            # comparison between ndarray and datetime64
            existing_axis > intersection.end())[0]  # type: ignore
        # pylint: enable=comparison-with-callable
        return inserted_ds.concat(
            existing_ds.isel({partitioning_dim: indices}), partitioning_dim)

    # The new piece is located after, but there is an overlap
    # between the two datasets.
    if relation.is_after_overlapping():
        # pylint: disable=comparison-with-callable
        indices = numpy.where(
            # comparison between ndarray and datetime64
            existing_axis < intersection.begin)[0]  # type: ignore
        # pylint: enable=comparison-with-callable
        return existing_ds.isel({
            partitioning_dim: indices
        }).concat(inserted_ds, partitioning_dim)

    assert relation.is_inside()
    # comparison between ndarray and datetime64
    index = numpy.where(existing_axis < intersection.begin)[0]  # type: ignore
    before: dataset.Dataset = existing_ds.isel(
        {partitioning_dim: slice(0, index[-1] + 1, None)})

    # pylint: disable=comparison-with-callable
    # comparison between ndarray and datetime64
    index = numpy.where(existing_axis > intersection.end())[0]  # type: ignore
    # pylint: enable=comparison-with-callable
    after: dataset.Dataset = existing_ds.isel(
        {partitioning_dim: slice(index[0], index[-1] + 1, None)})

    return before.concat((inserted_ds, after), partitioning_dim)


[docs] def merge_time_series( existing_ds: dataset.Dataset, inserted_ds: dataset.Dataset, axis: str, partitioning_dim: str, **kwargs, ) -> dataset.Dataset: """Merge two time series together. Replaces only the intersection between the existing dataset and the new one, but also keeps the existing records if they have not been updated. The following figure illustrates the implemented algorithm. Column ``A`` represents the new data and column ``B``, the data already present. The different cells in the columns represent the hours on the day of the measurements. The merge result is shown in column ``C``. It contains the measurements of the column ``A`` or column ``B`` if column ``A`` does not replace them. .. figure:: ../images/merge_time_series.svg :align: center :width: 50% Args: existing_ds: The existing dataset. inserted_ds: The inserted dataset. axis: The axis to merge on. partitioning_dim: The name of the partitioning dimension. kwargs: tolerance: This parameter sets the tolerance level for detecting data gaps in the inserted axis dataset. If set to ``None``, the algorithm will not check for data gaps in the inserted dataset. Returns: The merged dataset. """ tolerance = kwargs.get('tolerance', None) index: NDArray # Check if the inserted dataset contains data gaps. if tolerance is not None: inserted_axis: NDArray = inserted_ds.variables[axis].values delta: NDArray = numpy.concatenate( (numpy.array([0]), numpy.diff(numpy.roll(inserted_axis, 0)))) index = numpy.concatenate( (numpy.array([0], numpy.int64), numpy.where(delta > tolerance)[0], numpy.array([inserted_axis.size], numpy.int64))) else: index = numpy.array([], dtype=numpy.int64) if index.size > 1: # Split the inserted dataset into several datasets between the data # gaps. for ix in range(len(index) - 1): existing_ds = _merge_time_series( existing_ds, inserted_ds.isel( {partitioning_dim: slice(index[ix], index[ix + 1], None)}), axis, partitioning_dim) return existing_ds return _merge_time_series(existing_ds, inserted_ds, axis, partitioning_dim)