Indexing a Collection.#

In this example, we will see how to index a collection.

from typing import Iterator, List, Optional, Tuple, Union
import pathlib
import pprint

import dask.distributed
import fsspec
import numpy

import zcollection
import zcollection.indexing
import zcollection.partitioning.tests.data

Initialization of the environment#

fs = fsspec.filesystem('memory')
cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)

A collection can be indexed. This allows quick access to the data without having to browse the entire dataset.

Creating the test collection.#

For this latest example, we will index another data set. This one contains measurements of a fictitious satellite on several half-orbits.

zds: zcollection.Dataset = zcollection.Dataset.from_xarray(
    zcollection.partitioning.tests.data.create_test_sequence(5, 20, 10))
print(zds)
<zcollection.dataset.Dataset>
  Dimensions: ('num_lines: 1000',)
Data variables:
    time         (num_lines)  datetime64[ns]: dask.array<chunksize=(1000,)>
    cycle_number (num_lines)  int64: dask.array<chunksize=(1000,)>
    pass_number  (num_lines)  int64: dask.array<chunksize=(1000,)>
    observation  (num_lines)  float64: dask.array<chunksize=(1000,)>
collection: zcollection.Collection = zcollection.create_collection(
    'time',
    zds,
    zcollection.partitioning.Date(('time', ), 'M'),
    partition_base_dir='/one_other_collection',
    filesystem=fs)
collection.insert(zds, merge_callable=zcollection.merging.merge_time_series)
<generator object Collection.insert.<locals>.<genexpr> at 0x7f66807605e0>

Here we have created a collection partitioned by month.

pprint.pprint(fs.listdir('/one_other_collection/year=2000'))
[{'name': '/one_other_collection/year=2000/month=01',
  'size': 0,
  'type': 'directory'},
 {'name': '/one_other_collection/year=2000/month=02',
  'size': 0,
  'type': 'directory'},
 {'name': '/one_other_collection/year=2000/month=03',
  'size': 0,
  'type': 'directory'}]

Class to implement#

The idea of the implementation is to calculate for each visited partition, the slice of data that has a constant quantity. In our example, we will rely on the cycle and pass number information. The first method we will implement is the detection of these constant parts of two vectors containing the cycle and pass number.

def split_half_orbit(
    cycle_number: numpy.ndarray,
    pass_number: numpy.ndarray,
) -> Iterator[Tuple[int, int]]:
    """Calculate the indexes of the start and stop of each half-orbit.

    Args:
        pass_number: Pass numbers.
    Returns:
        Iterator of start and stop indexes.
    """
    assert pass_number.shape == cycle_number.shape
    pass_idx = numpy.where(numpy.roll(pass_number, 1) != pass_number)[0]
    cycle_idx = numpy.where(numpy.roll(cycle_number, 1) != cycle_number)[0]

    half_orbit = numpy.unique(
        numpy.concatenate(
            (pass_idx, cycle_idx, numpy.array([pass_number.size],
                                              dtype='int64'))))
    del pass_idx, cycle_idx

    yield from tuple(zip(half_orbit[:-1], half_orbit[1:]))

Now we will compute these constant parts from a dataset contained in a partition.

def _half_orbit(
    zds: zcollection.Dataset,
    *args,
    dtype: numpy.dtype | None = None,
    **kwargs,
) -> numpy.ndarray:
    """Return the indexes of the start and stop of each half-orbit.

    Args:
        ds: Datasets stored in a partition to be indexed.
    Returns:
        Dictionary of start and stop indexes for each half-orbit.
    """
    pass_number_varname = kwargs.pop('pass_number', 'pass_number')
    cycle_number_varname = kwargs.pop('cycle_number', 'cycle_number')
    pass_number = zds.variables[pass_number_varname].values
    cycle_number = zds.variables[cycle_number_varname].values

    generator = ((
        i0,
        i1,
        cycle_number[i0],
        pass_number[i0],
    ) for i0, i1 in split_half_orbit(cycle_number, pass_number))

    return numpy.fromiter(generator, dtype)

Finally, we implement our indexing class. The base class (zcollection.indexing.Indexer) implements the index update and the associated queries.

class HalfOrbitIndexer(zcollection.indexing.Indexer):
    """Index collection by half-orbit."""
    #: Column name of the cycle number.
    CYCLE_NUMBER = 'cycle_number'

    #: Column name of the pass number.
    PASS_NUMBER = 'pass_number'

    def dtype(self, /, **kwargs) -> List[Tuple[str, str]]:
        """Return the columns of the index.

        Returns:
            A tuple of (name, type) pairs.
        """
        return super().dtype() + [
            (self.CYCLE_NUMBER, 'uint16'),
            (self.PASS_NUMBER, 'uint16'),
        ]

    @classmethod
    def create(
        cls,
        path: Union[pathlib.Path, str],
        zds: zcollection.Collection,
        filesystem: Optional[fsspec.AbstractFileSystem] = None,
        **kwargs,
    ) -> 'HalfOrbitIndexer':
        """Create a new index.

        Args:
            path: The path to the index.
            ds: The collection to be indexed.
            filesystem: The filesystem to use.
        Returns:
            The created index.
        """
        return super()._create(path,
                               zds,
                               meta=dict(attribute=b'value'),
                               filesystem=filesystem)  # type: ignore

    def update(
        self,
        zds: zcollection.Collection,
        partition_size: Optional[int] = None,
        npartitions: Optional[int] = None,
        **kwargs,
    ) -> None:
        """Update the index.

        Args:
            ds: New data stored in the collection to be indexed.
            partition_size: The length of each bag partition.
            npartitions: The number of desired bag partitions.
            cycle_number: The name of the cycle number variable stored in the
                collection. Defaults to "cycle_number".
            pass_number: The name of the pass number variable stored in the
                collection. Defaults to "pass_number".
        """
        super()._update(zds,
                        _half_orbit,
                        partition_size,
                        npartitions,
                        dtype=self.dtype(),
                        **kwargs)

Using the index#

Now we can create our index and fill it.

indexer: HalfOrbitIndexer = HalfOrbitIndexer.create('/index.parquet',
                                                    collection,
                                                    filesystem=fs)
indexer.update(collection)

# The following command allows us to view the information stored in our index:
# the first and last indexes of the partition associated with the registered
# half-orbit number and the identifier of the indexed partition.
indexer.table.to_pandas()
start stop cycle_number pass_number year month
0 0 20 1 1 2000 1
1 20 40 1 2 2000 1
2 40 60 1 3 2000 1
3 60 80 1 4 2000 1
4 80 100 1 5 2000 1
5 100 120 2 1 2000 1
6 120 140 2 2 2000 1
7 140 160 2 3 2000 1
8 160 180 2 4 2000 1
9 180 200 2 5 2000 1
10 200 220 3 1 2000 1
11 220 240 3 2 2000 1
12 240 260 3 3 2000 1
13 260 280 3 4 2000 1
14 280 300 3 5 2000 1
15 300 320 4 1 2000 1
16 320 340 4 2 2000 1
17 340 360 4 3 2000 1
18 360 372 4 4 2000 1
19 0 8 4 4 2000 2
20 8 28 4 5 2000 2
21 28 48 5 1 2000 2
22 48 68 5 2 2000 2
23 68 88 5 3 2000 2
24 88 108 5 4 2000 2
25 108 128 5 5 2000 2
26 128 148 6 1 2000 2
27 148 168 6 2 2000 2
28 168 188 6 3 2000 2
29 188 208 6 4 2000 2
30 208 228 6 5 2000 2
31 228 248 7 1 2000 2
32 248 268 7 2 2000 2
33 268 288 7 3 2000 2
34 288 308 7 4 2000 2
35 308 328 7 5 2000 2
36 328 348 8 1 2000 2
37 0 20 8 2 2000 3
38 20 40 8 3 2000 3
39 40 60 8 4 2000 3
40 60 80 8 5 2000 3
41 80 100 9 1 2000 3
42 100 120 9 2 2000 3
43 120 140 9 3 2000 3
44 140 160 9 4 2000 3
45 160 180 9 5 2000 3
46 180 200 10 1 2000 3
47 200 220 10 2 2000 3
48 220 240 10 3 2000 3
49 240 260 10 4 2000 3
50 260 280 10 5 2000 3


This index can now be used to load a part of a collection.

selection: zcollection.Dataset | None = collection.load(
    indexer=indexer.query(dict(pass_number=[1, 2])),
    delayed=False,
)
assert selection is not None
selection.to_xarray()
<xarray.Dataset>
Dimensions:       (num_lines: 400)
Dimensions without coordinates: num_lines
Data variables:
    cycle_number  (num_lines) int64 ...
    observation   (num_lines) float64 ...
    pass_number   (num_lines) int64 ...
    time          (num_lines) datetime64[ns] 2000-01-01 ... 2000-03-19T06:00:00


Close the local cluster to avoid printing warning messages in the other examples.

client.close()
cluster.close()

Total running time of the script: (0 minutes 0.317 seconds)

Gallery generated by Sphinx-Gallery