
# Overview of a Collection.

This section outlines the steps required to get started with the main features
of a ``Collection``.


In [None]:
from __future__ import annotations

from typing import Iterator
import datetime
import pprint

import dask.distributed
import fsspec
import numpy

import zcollection
import zcollection.tests.data

## Initialization of the environment

Before we create our first collection, we will create a dataset to record.



In [None]:
def create_dataset() -> zcollection.Dataset:
    """Create a dataset to record."""
    generator: Iterator[zcollection.Dataset] = \
        zcollection.tests.data.create_test_dataset_with_fillvalue()
    return next(generator)


zds: zcollection.Dataset | None = create_dataset()
assert zds is not None
zds.to_xarray()

We will create the file system that we will use. In this example, a file
system in memory.



In [None]:
fs: fsspec.AbstractFileSystem = fsspec.filesystem('memory')

Finally we create a local dask cluster using only threads in order to work
with the file system stored in memory.



In [None]:
cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)

## Creation of the partitioning

Before creating our collection, we define the partitioning of our dataset. In
this example, we will partition the data by ``month`` using the variable
``time``.



In [None]:
partition_handler = zcollection.partitioning.Date(('time', ), resolution='M')

Finally, we create our collection:



In [None]:
collection: zcollection.Collection = zcollection.create_collection(
    'time', zds, partition_handler, '/my_collection', filesystem=fs)

<div class="alert alert-info"><h4>Note</h4><p>The collection created can be accessed using the following command ::

       >>> collection = zcollection.open_collection("/my_collection",
       >>>                                          filesystem=fs)</p></div>

When the collection has been created, a configuration file is created. This
file contains all the metadata to ensure that all future inserted data will
have the same features as the existing data (data consistency).



In [None]:
pprint.pprint(collection.metadata.get_config())

Now that the collection has been created, we can insert new records.



In [None]:
collection.insert(zds)

<div class="alert alert-info"><h4>Note</h4><p>When inserting it's possible to specify the `merge strategy of a
    partition <merging_datasets>`. By default, the last inserted data
    overwrite the existing ones. Others strategy can be defined, for example,
    to update existing data (overwrite the updated data, while keeping the
    existing ones). This last strategy allows updating incrementally an
    existing partition. ::

        >>> import zcollection.merging
        >>> collection.insert(
        ...     ds, merge_callable=zcollection.merging.merge_time_series)

    If the time series has data gaps, it is possible to specify a tolerance
    level for detecting data gaps in the inserted axis dataset in order to
    keep the existing data. ::

        >>> collection.insert(
        ...     ds, merge_callable=zcollection.merging.merge_time_series,
        ...     tolerance=numpy.timedelta64(1, 'h'))</p></div>


Let's look at the different partitions thus created.



In [None]:
pprint.pprint(fs.listdir('/my_collection/year=2000'))

This collection is composed of several partitions, but it is always handled
as a single data set.

## Loading data
To load the dataset call the method
:py:meth:`load<zcollection.collection.Collection.load>` on the instance.  By
default, the method loads all partitions stored in the collection.



In [None]:
collection.load(delayed=True)

<div class="alert alert-info"><h4>Note</h4><p>By default, the data is loaded as a :py:class:`dask.array<da.Array>`. It is
  possible to load the data as a :py:class:`numpy.ndarray` by specifying the
  parameter ``delayed=False``.</p></div>

You can also filter the partitions to be considered by filtering the
partitions using keywords used for partitioning in a valid Python expression.



In [None]:
collection.load(filters='year == 2000 and month == 2')

You can also used a callback function to filter partitions with a complex
condition.



In [None]:
collection.load(
    filters=lambda keys: datetime.date(2000, 2, 15) <= datetime.date(
        keys['year'], keys['month'], 1) <= datetime.date(2000, 3, 15))

Note that the :py:meth:`load<zcollection.collection.Collection.load>`
function may return None if no partition has been selected.



In [None]:
assert collection.load(filters='year == 2002 and month == 2') is None

## Editing variables

<div class="alert alert-info"><h4>Note</h4><p>The functions for modifying collections are not usable if the collection
    is :py:meth:`open<zcollection.open_collection>` in read-only mode.</p></div>

It's possible to delete a variable from a collection.



In [None]:
collection.drop_variable('var2')
collection.load()

The variable used for partitioning cannot be deleted.



In [None]:
try:
    collection.drop_variable('time')
except ValueError as exc:
    print(exc)

The :py:meth:`add_variable<zcollection.collection.Collection.add_variable>`
method allows you to add a new variable to the collection.



In [None]:
collection.add_variable(zds.metadata().variables['var2'])

The newly created variable is initialized with its default value.



In [None]:
zds = collection.load()
assert zds is not None
zds.variables['var2'].values

Finally it's possible to
:py:meth:`update<zcollection.collection.Collection.update>` the existing
variables.

In this example, we will alter the variable ``var2`` by setting it to 1
anywhere the variable ``var1`` is defined.



In [None]:
def ones(zds) -> dict[str, numpy.ndarray]:
    """Returns a variable with ones everywhere."""
    return dict(var2=zds.variables['var1'].values * 0 + 1)


collection.update(ones)  # type: ignore[arg-type]

zds = collection.load()
assert zds is not None
zds.variables['var2'].values

<div class="alert alert-info"><h4>Note</h4><p>The method :py:meth:`update<zcollection.collection.Collection.update>`
  supports the ``delayed`` parameter. If ``delayed=True``, the function
  ``ones`` is applied to each partition using a Dask array as container
  for the variables data stored in the provided dataset. This is the default
  behavior. If ``delayed=False``, the function ``ones`` is applied to each
  partition using a Numpy array as container.</p></div>

Sometime is it important to know the values of the neighboring partitions.
This can be done using the
:py:meth:`update<zcollection.collection.Collection.update>` method with the
``depth`` argument. In this example, we will set the variable ``var2`` to 2
everywhere the processed partition is surrounded by at least one partition, -1
if the left partition is missing and -2 if the right partition is missing.

<div class="alert alert-info"><h4>Note</h4><p>``partition_info`` contains information about the target partition: a tuple
  with the partitioned dimension and the slice to select the partition. If the
  start of the slice is 0, it means that the left partition is missing. If the
  stop of the slice is equal to the length of the given dataset, it means that
  the right partition is missing.</p></div>



In [None]:
def twos(ds, partition_info: tuple[str, slice]) -> dict[str, numpy.ndarray]:
    """Returns a variable with twos everywhere if the partition is surrounded
    by partitions on both sides, -1 if the left partition is missing and -2 if
    the right partition is missing."""
    data = numpy.zeros(ds.variables['var1'].shape, dtype='int8')
    dim, indices = partition_info
    assert dim == 'num_lines'
    if indices.start != 0:
        data[:] = -1
    elif indices.stop != data.shape[0]:
        data[:] = -2
    else:
        data[:] = 2
    return dict(var2=data)


collection.update(twos, depth=1)  # type: ignore[arg-type]

zds = collection.load()
assert zds is not None
zds.variables['var2'].values

## Map a function over the collection
It's possible to map a function over the partitions of the collection.



In [None]:
for partition, array in collection.map(lambda ds: (  # type: ignore[arg-type]
        ds['var1'].values + ds['var2'].values)).compute():
    print(f' * partition = {partition}: mean = {array.mean()}')

<div class="alert alert-info"><h4>Note</h4><p>The :py:meth:`map<zcollection.collection.Collection.map>` method is
    lazy. To compute the result, you need to call the method ``compute``
    on the returned object.</p></div>

It's also possible to map a function over the partitions with a a number of
neighboring partitions, like the
:py:meth:`update<zcollection.collection.Collection.update>`. To do so, use the
:py:meth:`map_overlap<zcollection.collection.Collection.map_overlap>` method.



In [None]:
for partition, array in collection.map_overlap(
        lambda ds: (  # type: ignore[arg-type]
            ds['var1'].values + ds['var2'].values),
        depth=1).compute():
    print(f' * partition = {partition}: mean = {array.mean()}')

Close the local cluster to avoid printing warning messages in the other
examples.



In [None]:
client.close()
cluster.close()