# Copyright (c) 2023 CNES
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Partitioning by date
====================
"""
from __future__ import annotations
from typing import Any, ClassVar, Iterator, Sequence
import datetime
import dask.array.core
import numpy
from . import abc
from ..type_hints import ArrayLike, NDArray
#: Numpy time units
RESOLUTION = ('Y', 'M', 'D', 'h', 'm', 's')
#: Numpy time unit meanings
UNITS = ('year', 'month', 'day', 'hour', 'minute', 'second')
#: Data type for time units
DATA_TYPES = ('uint16', 'uint8', 'uint8', 'uint8', 'uint8', 'uint8')
#: Time separation units
SEPARATORS: dict[str, str] = {
'year': '-',
'month': '-',
'day': 'T',
'hour': ':',
'minute': ':',
'second': '.'
}
def _unique(arr: ArrayLike, is_delayed: bool) -> tuple[NDArray, NDArray]:
"""Return unique elements and their indices.
Args:
arr: Array of elements.
is_delayed: If True, the array is delayed.
Returns:
Tuple of unique elements and their indices.
Raises:
ValueError: If the array is not monotonic.
"""
index: NDArray
indices: NDArray
if is_delayed:
index, indices = abc.unique(arr) # type: ignore[arg-type]
# We don't use here the function `numpy.diff` but `abc.difference` for
# optimization purposes.
if not numpy.all(
abc.difference(index.view(numpy.int64)) >= 0): # type: ignore
raise ValueError('index is not monotonic')
return index, indices
return abc.unique_and_check_monotony(arr)
[docs]
class Date(abc.Partitioning):
"""Initialize a partitioning scheme based on dates.
Args:
variables: A list of strings representing the variables to be used for
partitioning.
resolution: Time resolution of the partitioning. Must be in
:data:`RESOLUTION`.
Raises:
ValueError: If the resolution is not in the list of supported
resolutions or if the partitioning is not performed on a one
dimensional variable.
Example:
>>> partitioning = Date(variables=("time", ), resolution="Y")
"""
__slots__ = ('_attrs', '_index', 'resolution')
#: The ID of the partitioning scheme
ID: ClassVar[str] = 'Date'
def __init__(self, variables: Sequence[str], resolution: str) -> None:
if len(variables) != 1:
raise ValueError(
'Partitioning on dates is performed on a single variable.')
if resolution not in RESOLUTION:
raise ValueError('resolution must be in: ' + ', '.join(RESOLUTION))
index: int = RESOLUTION.index(resolution) + 1
#: The time resolution of the partitioning
self.resolution: str = resolution
#: The time parts used for the partitioning
self._attrs: tuple[str, ...] = UNITS[:index + 1]
#: The indices of the time parts used for the partitioning
self._index = tuple(range(index))
super().__init__(variables,
tuple(DATA_TYPES[ix] for ix in self._index))
[docs]
def _keys(self) -> Sequence[str]:
"""Return the keys of the partitioning scheme."""
return tuple(UNITS[ix] for ix in self._index)
# pylint: disable=arguments-differ
# False positive: the base method is static.
[docs]
def _partition( # type: ignore[override]
self,
selection: tuple[tuple[str, Any], ...],
) -> tuple[str, ...]:
"""Return the partitioning scheme for the given selection."""
datetime64: NDArray = selection[0][1]
py_datetime: datetime.datetime = datetime64.astype('M8[s]').item()
return tuple(UNITS[ix] + '=' +
f'{getattr(py_datetime, self._attrs[ix]):02d}'
for ix in self._index)
# pylint: enable=arguments-differ
[docs]
def _split(
self,
variables: dict[str, ArrayLike],
) -> Iterator[abc.Partition]:
"""Return the partitioning scheme for the given variables."""
index: NDArray
indices: NDArray
name: str
values: ArrayLike
# Determine if the variables are handled by Dask.
is_delayed: bool = any(
isinstance(value, dask.array.core.Array)
for value in variables.values())
name, values = tuple(variables.items())[0]
if not numpy.issubdtype(values.dtype, numpy.dtype('datetime64')):
raise TypeError('values must be a datetime64 array')
index, indices = _unique(
values.astype(f'datetime64[{self.resolution}]'), is_delayed)
indices = abc.concatenate_item(indices, values.size)
return ((((name, date), ), slice(start, indices[ix + 1], None))
for date, (ix, start) in zip(index, enumerate(indices[:-1])))
[docs]
@staticmethod
def _stringify(partition: tuple[tuple[str, int], ...]) -> str:
"""Return a string representation of the partitioning scheme."""
string = ''.join(f'{value:02d}' + SEPARATORS[item]
for item, value in partition)
if string[-1] in SEPARATORS.values():
string = string[:-1]
return string
[docs]
@staticmethod
def join(partition_scheme: tuple[tuple[str, int], ...], sep: str) -> str:
"""Join a partitioning scheme.
Args:
partition_scheme: The partitioning scheme to be joined.
sep: The separator to be used.
Returns:
The joined partitioning scheme.
Example:
>>> partitioning = Date(variables=("time", ), resolution="D")
>>> partitioning.join((("year", 2020), ("month", 1), ("day", 1)),
... "/")
'year=2020/month=01/day=01'
"""
return sep.join(f'{k}={v:02d}' for k, v in partition_scheme)
[docs]
def encode(
self,
partition: tuple[tuple[str, int], ...],
) -> tuple[Any, ...]:
"""Encode a partitioning scheme.
Args:
partition: The partitioning scheme to be encoded.
Returns:
The encoded partitioning scheme.
Example:
>>> partitioning = Date(variables=("time", ), resolution="D")
>>> fields = partitioning.parse("year=2020/month=01/day=01")
>>> fields
(("year", 2020), ("month", 1), ("day", 1))
>>> partitioning.encode(fields)
(numpy.datetime64('2020-01-01'),)
"""
return (numpy.datetime64(self._stringify(partition)), )
[docs]
def decode(
self,
values: tuple[Any, ...],
) -> tuple[tuple[str, int], ...]:
"""Decode a partitioning scheme.
Args:
values: The partitioning scheme to be decoded.
Returns:
The decoded partitioning scheme.
Example:
>>> partitioning = Date(variables=("time", ), resolution="D")
>>> partitioning.decode((numpy.datetime64('2020-01-01'), ))
(("year", 2020), ("month", 1), ("day", 1))
"""
datetime64: NDArray = values[0]
py_datetime: datetime.datetime = datetime64.astype('M8[s]').item()
return tuple((UNITS[ix], getattr(py_datetime, self._attrs[ix]))
for ix in self._index)