Overview of a Collection.#

This section outlines the steps required to get started with the main features of a Collection.

from __future__ import annotations

from typing import Iterator
import datetime
import pprint

import dask.distributed
import fsspec
import numpy

import zcollection
import zcollection.tests.data

Initialization of the environment#

Before we create our first collection, we will create a dataset to record.

def create_dataset() -> zcollection.Dataset:
    """Create a dataset to record."""
    generator: Iterator[zcollection.Dataset] = \
        zcollection.tests.data.create_test_dataset_with_fillvalue()
    return next(generator)


zds: zcollection.Dataset | None = create_dataset()
assert zds is not None
zds.to_xarray()
<xarray.Dataset>
Dimensions:  (num_lines: 61, num_pixels: 25)
Dimensions without coordinates: num_lines, num_pixels
Data variables:
    time     (num_lines) datetime64[ns] dask.array<chunksize=(61,), meta=np.ndarray>
    var1     (num_lines, num_pixels) float64 dask.array<chunksize=(61, 25), meta=np.ndarray>
    var2     (num_lines, num_pixels) float64 dask.array<chunksize=(61, 25), meta=np.ndarray>
Attributes:
    attr:     1


We will create the file system that we will use. In this example, a file system in memory.

Finally we create a local dask cluster using only threads in order to work with the file system stored in memory.

cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)

Creation of the partitioning#

Before creating our collection, we define the partitioning of our dataset. In this example, we will partition the data by month using the variable time.

partition_handler = zcollection.partitioning.Date(('time', ), resolution='M')

Finally, we create our collection:

collection: zcollection.Collection = zcollection.create_collection(
    'time', zds, partition_handler, '/my_collection', filesystem=fs)

Note

The collection created can be accessed using the following command

>>> collection = zcollection.open_collection("/my_collection",
>>>                                          filesystem=fs)

When the collection has been created, a configuration file is created. This file contains all the metadata to ensure that all future inserted data will have the same features as the existing data (data consistency).

pprint.pprint(collection.metadata.get_config())
{'attrs': [('attr', 1)],
 'block_size_limit': 134217728,
 'chunks': (),
 'dimensions': ('num_lines', 'num_pixels'),
 'variables': ({'attrs': [('attr', 1)],
                'compressor': {'blocksize': 0,
                               'clevel': 5,
                               'cname': 'lz4',
                               'id': 'blosc',
                               'shuffle': 1},
                'dimensions': ('num_lines',),
                'dtype': '<M8[ns]',
                'fill_value': None,
                'filters': (),
                'name': 'time'},
               {'attrs': [('attr', 1)],
                'compressor': None,
                'dimensions': ('num_lines', 'num_pixels'),
                'dtype': '<f8',
                'fill_value': 214748.3647,
                'filters': ({'astype': '<i4',
                             'dtype': '<f8',
                             'id': 'fixedscaleoffset',
                             'offset': 0,
                             'scale': 10000},),
                'name': 'var1'},
               {'attrs': [('attr', 1)],
                'compressor': None,
                'dimensions': ('num_lines', 'num_pixels'),
                'dtype': '<f8',
                'fill_value': 214748.3647,
                'filters': ({'astype': '<i4',
                             'dtype': '<f8',
                             'id': 'fixedscaleoffset',
                             'offset': 0,
                             'scale': 10000},),
                'name': 'var2'})}

Now that the collection has been created, we can insert new records.

collection.insert(zds)
<generator object Collection.insert.<locals>.<genexpr> at 0x7f6683d1fe20>

Note

When inserting it’s possible to specify the merge strategy of a partition. By default, the last inserted data overwrite the existing ones. Others strategy can be defined, for example, to update existing data (overwrite the updated data, while keeping the existing ones). This last strategy allows updating incrementally an existing partition.

>>> import zcollection.merging
>>> collection.insert(
...     ds, merge_callable=zcollection.merging.merge_time_series)

If the time series has data gaps, it is possible to specify a tolerance level for detecting data gaps in the inserted axis dataset in order to keep the existing data.

>>> collection.insert(
...     ds, merge_callable=zcollection.merging.merge_time_series,
...     tolerance=numpy.timedelta64(1, 'h'))

Let’s look at the different partitions thus created.

pprint.pprint(fs.listdir('/my_collection/year=2000'))
[{'name': '/my_collection/year=2000/month=01', 'size': 0, 'type': 'directory'},
 {'name': '/my_collection/year=2000/month=02', 'size': 0, 'type': 'directory'},
 {'name': '/my_collection/year=2000/month=03', 'size': 0, 'type': 'directory'},
 {'name': '/my_collection/year=2000/month=04', 'size': 0, 'type': 'directory'},
 {'name': '/my_collection/year=2000/month=05', 'size': 0, 'type': 'directory'},
 {'name': '/my_collection/year=2000/month=06', 'size': 0, 'type': 'directory'}]

This collection is composed of several partitions, but it is always handled as a single data set.

Loading data#

To load the dataset call the method load on the instance. By default, the method loads all partitions stored in the collection.

collection.load(delayed=True)
<zcollection.dataset.Dataset>
  Dimensions: ('num_lines: 61', 'num_pixels: 25')
Data variables:
    var2    (num_lines, num_pixels)  float64: dask.array<chunksize=(11, 25)>
    var1    (num_lines, num_pixels)  float64: dask.array<chunksize=(11, 25)>
    time    (num_lines)  datetime64[ns]: dask.array<chunksize=(11,)>
  Attributes:
    attr   : 1

Note

By default, the data is loaded as a dask.array. It is possible to load the data as a numpy.ndarray by specifying the parameter delayed=False.

You can also filter the partitions to be considered by filtering the partitions using keywords used for partitioning in a valid Python expression.

collection.load(filters='year == 2000 and month == 2')
<zcollection.dataset.Dataset>
  Dimensions: ('num_lines: 9', 'num_pixels: 25')
Data variables:
    var2    (num_lines, num_pixels)  float64: dask.array<chunksize=(9, 25)>
    var1    (num_lines, num_pixels)  float64: dask.array<chunksize=(9, 25)>
    time    (num_lines)  datetime64[ns]: dask.array<chunksize=(9,)>
  Attributes:
    attr   : 1

You can also used a callback function to filter partitions with a complex condition.

collection.load(
    filters=lambda keys: datetime.date(2000, 2, 15) <= datetime.date(
        keys['year'], keys['month'], 1) <= datetime.date(2000, 3, 15))
<zcollection.dataset.Dataset>
  Dimensions: ('num_lines: 11', 'num_pixels: 25')
Data variables:
    var2    (num_lines, num_pixels)  float64: dask.array<chunksize=(11, 25)>
    var1    (num_lines, num_pixels)  float64: dask.array<chunksize=(11, 25)>
    time    (num_lines)  datetime64[ns]: dask.array<chunksize=(11,)>
  Attributes:
    attr   : 1

Note that the load function may return None if no partition has been selected.

assert collection.load(filters='year == 2002 and month == 2') is None

Editing variables#

Note

The functions for modifying collections are not usable if the collection is open in read-only mode.

It’s possible to delete a variable from a collection.

collection.drop_variable('var2')
collection.load()
<zcollection.dataset.Dataset>
  Dimensions: ('num_lines: 61', 'num_pixels: 25')
Data variables:
    var1    (num_lines, num_pixels)  float64: dask.array<chunksize=(11, 25)>
    time    (num_lines)  datetime64[ns]: dask.array<chunksize=(11,)>
  Attributes:
    attr   : 1

The variable used for partitioning cannot be deleted.

try:
    collection.drop_variable('time')
except ValueError as exc:
    print(exc)
The variable 'time' is part of the partitioning.

The add_variable method allows you to add a new variable to the collection.

collection.add_variable(zds.metadata().variables['var2'])

The newly created variable is initialized with its default value.

zds = collection.load()
assert zds is not None
zds.variables['var2'].values
masked_array(
  data=[[--, --, --, ..., --, --, --],
        [--, --, --, ..., --, --, --],
        [--, --, --, ..., --, --, --],
        ...,
        [--, --, --, ..., --, --, --],
        [--, --, --, ..., --, --, --],
        [--, --, --, ..., --, --, --]],
  mask=[[ True,  True,  True, ...,  True,  True,  True],
        [ True,  True,  True, ...,  True,  True,  True],
        [ True,  True,  True, ...,  True,  True,  True],
        ...,
        [ True,  True,  True, ...,  True,  True,  True],
        [ True,  True,  True, ...,  True,  True,  True],
        [ True,  True,  True, ...,  True,  True,  True]],
  fill_value=214748.3647,
  dtype=float64)

Finally it’s possible to update the existing variables.

In this example, we will alter the variable var2 by setting it to 1 anywhere the variable var1 is defined.

def ones(zds) -> dict[str, numpy.ndarray]:
    """Returns a variable with ones everywhere."""
    return dict(var2=zds.variables['var1'].values * 0 + 1)


collection.update(ones)  # type: ignore[arg-type]

zds = collection.load()
assert zds is not None
zds.variables['var2'].values
masked_array(
  data=[[--, --, --, ..., --, --, --],
        [1.0, 1.0, 1.0, ..., 1.0, 1.0, 1.0],
        [--, --, --, ..., --, --, --],
        ...,
        [--, --, --, ..., --, --, --],
        [1.0, 1.0, 1.0, ..., 1.0, 1.0, 1.0],
        [--, --, --, ..., --, --, --]],
  mask=[[ True,  True,  True, ...,  True,  True,  True],
        [False, False, False, ..., False, False, False],
        [ True,  True,  True, ...,  True,  True,  True],
        ...,
        [ True,  True,  True, ...,  True,  True,  True],
        [False, False, False, ..., False, False, False],
        [ True,  True,  True, ...,  True,  True,  True]],
  fill_value=214748.3647)

Note

The method update supports the delayed parameter. If delayed=True, the function ones is applied to each partition using a Dask array as container for the variables data stored in the provided dataset. This is the default behavior. If delayed=False, the function ones is applied to each partition using a Numpy array as container.

Sometime is it important to know the values of the neighboring partitions. This can be done using the update method with the depth argument. In this example, we will set the variable var2 to 2 everywhere the processed partition is surrounded by at least one partition, -1 if the left partition is missing and -2 if the right partition is missing.

Note

partition_info contains information about the target partition: a tuple with the partitioned dimension and the slice to select the partition. If the start of the slice is 0, it means that the left partition is missing. If the stop of the slice is equal to the length of the given dataset, it means that the right partition is missing.

def twos(ds, partition_info: tuple[str, slice]) -> dict[str, numpy.ndarray]:
    """Returns a variable with twos everywhere if the partition is surrounded
    by partitions on both sides, -1 if the left partition is missing and -2 if
    the right partition is missing."""
    data = numpy.zeros(ds.variables['var1'].shape, dtype='int8')
    dim, indices = partition_info
    assert dim == 'num_lines'
    if indices.start != 0:
        data[:] = -1
    elif indices.stop != data.shape[0]:
        data[:] = -2
    else:
        data[:] = 2
    return dict(var2=data)


collection.update(twos, depth=1)  # type: ignore[arg-type]

zds = collection.load()
assert zds is not None
zds.variables['var2'].values
masked_array(
  data=[[-2., -2., -2., ..., -2., -2., -2.],
        [-2., -2., -2., ..., -2., -2., -2.],
        [-2., -2., -2., ..., -2., -2., -2.],
        ...,
        [-1., -1., -1., ..., -1., -1., -1.],
        [-1., -1., -1., ..., -1., -1., -1.],
        [-1., -1., -1., ..., -1., -1., -1.]],
  mask=False,
  fill_value=214748.3647)

Map a function over the collection#

It’s possible to map a function over the partitions of the collection.

for partition, array in collection.map(lambda ds: (  # type: ignore[arg-type]
        ds['var1'].values + ds['var2'].values)).compute():
    print(f' * partition = {partition}: mean = {array.mean()}')
* partition = (('year', 2000), ('month', 1)): mean = -1.9995
* partition = (('year', 2000), ('month', 2)): mean = -0.9985
* partition = (('year', 2000), ('month', 3)): mean = -0.9975
* partition = (('year', 2000), ('month', 4)): mean = -0.9964999999999999
* partition = (('year', 2000), ('month', 5)): mean = -0.9955000000000002
* partition = (('year', 2000), ('month', 6)): mean = -0.9945

Note

The map method is lazy. To compute the result, you need to call the method compute on the returned object.

It’s also possible to map a function over the partitions with a a number of neighboring partitions, like the update. To do so, use the map_overlap method.

for partition, array in collection.map_overlap(
        lambda ds: (  # type: ignore[arg-type]
            ds['var1'].values + ds['var2'].values),
        depth=1).compute():
    print(f' * partition = {partition}: mean = {array.mean()}')
* partition = (('year', 2000), ('month', 1)): mean = -1.499
* partition = (('year', 2000), ('month', 2)): mean = -1.3318333333333334
* partition = (('year', 2000), ('month', 3)): mean = -0.9975
* partition = (('year', 2000), ('month', 4)): mean = -0.9965
* partition = (('year', 2000), ('month', 5)): mean = -0.9955
* partition = (('year', 2000), ('month', 6)): mean = -0.995

Close the local cluster to avoid printing warning messages in the other examples.

client.close()
cluster.close()

Total running time of the script: (0 minutes 1.939 seconds)

Gallery generated by Sphinx-Gallery