Note
Go to the end to download the full example code
Overview of a Collection.#
This section outlines the steps required to get started with the main features
of a Collection
.
from __future__ import annotations
from typing import Iterator
import datetime
import pprint
import dask.distributed
import fsspec
import numpy
import zcollection
import zcollection.tests.data
Initialization of the environment#
Before we create our first collection, we will create a dataset to record.
def create_dataset() -> zcollection.Dataset:
"""Create a dataset to record."""
generator: Iterator[zcollection.Dataset] = \
zcollection.tests.data.create_test_dataset_with_fillvalue()
return next(generator)
zds: zcollection.Dataset | None = create_dataset()
assert zds is not None
zds.to_xarray()
We will create the file system that we will use. In this example, a file system in memory.
fs: fsspec.AbstractFileSystem = fsspec.filesystem('memory')
Finally we create a local dask cluster using only threads in order to work with the file system stored in memory.
cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)
Creation of the partitioning#
Before creating our collection, we define the partitioning of our dataset. In
this example, we will partition the data by month
using the variable
time
.
partition_handler = zcollection.partitioning.Date(('time', ), resolution='M')
Finally, we create our collection:
collection: zcollection.Collection = zcollection.create_collection(
'time', zds, partition_handler, '/my_collection', filesystem=fs)
Note
The collection created can be accessed using the following command
>>> collection = zcollection.open_collection("/my_collection",
>>> filesystem=fs)
When the collection has been created, a configuration file is created. This file contains all the metadata to ensure that all future inserted data will have the same features as the existing data (data consistency).
pprint.pprint(collection.metadata.get_config())
{'attrs': [('attr', 1)],
'block_size_limit': 134217728,
'chunks': (),
'dimensions': ('num_lines', 'num_pixels'),
'variables': ({'attrs': [('attr', 1)],
'compressor': {'blocksize': 0,
'clevel': 5,
'cname': 'lz4',
'id': 'blosc',
'shuffle': 1},
'dimensions': ('num_lines',),
'dtype': '<M8[ns]',
'fill_value': None,
'filters': (),
'name': 'time'},
{'attrs': [('attr', 1)],
'compressor': None,
'dimensions': ('num_lines', 'num_pixels'),
'dtype': '<f8',
'fill_value': 214748.3647,
'filters': ({'astype': '<i4',
'dtype': '<f8',
'id': 'fixedscaleoffset',
'offset': 0,
'scale': 10000},),
'name': 'var1'},
{'attrs': [('attr', 1)],
'compressor': None,
'dimensions': ('num_lines', 'num_pixels'),
'dtype': '<f8',
'fill_value': 214748.3647,
'filters': ({'astype': '<i4',
'dtype': '<f8',
'id': 'fixedscaleoffset',
'offset': 0,
'scale': 10000},),
'name': 'var2'})}
Now that the collection has been created, we can insert new records.
collection.insert(zds)
<generator object Collection.insert.<locals>.<genexpr> at 0x7f6683d1fe20>
Note
When inserting it’s possible to specify the merge strategy of a partition. By default, the last inserted data overwrite the existing ones. Others strategy can be defined, for example, to update existing data (overwrite the updated data, while keeping the existing ones). This last strategy allows updating incrementally an existing partition.
>>> import zcollection.merging
>>> collection.insert(
... ds, merge_callable=zcollection.merging.merge_time_series)
If the time series has data gaps, it is possible to specify a tolerance level for detecting data gaps in the inserted axis dataset in order to keep the existing data.
>>> collection.insert(
... ds, merge_callable=zcollection.merging.merge_time_series,
... tolerance=numpy.timedelta64(1, 'h'))
Let’s look at the different partitions thus created.
pprint.pprint(fs.listdir('/my_collection/year=2000'))
[{'name': '/my_collection/year=2000/month=01', 'size': 0, 'type': 'directory'},
{'name': '/my_collection/year=2000/month=02', 'size': 0, 'type': 'directory'},
{'name': '/my_collection/year=2000/month=03', 'size': 0, 'type': 'directory'},
{'name': '/my_collection/year=2000/month=04', 'size': 0, 'type': 'directory'},
{'name': '/my_collection/year=2000/month=05', 'size': 0, 'type': 'directory'},
{'name': '/my_collection/year=2000/month=06', 'size': 0, 'type': 'directory'}]
This collection is composed of several partitions, but it is always handled as a single data set.
Loading data#
To load the dataset call the method
load
on the instance. By
default, the method loads all partitions stored in the collection.
collection.load(delayed=True)
<zcollection.dataset.Dataset>
Dimensions: ('num_lines: 61', 'num_pixels: 25')
Data variables:
var2 (num_lines, num_pixels) float64: dask.array<chunksize=(11, 25)>
var1 (num_lines, num_pixels) float64: dask.array<chunksize=(11, 25)>
time (num_lines) datetime64[ns]: dask.array<chunksize=(11,)>
Attributes:
attr : 1
Note
By default, the data is loaded as a dask.array
. It is
possible to load the data as a numpy.ndarray
by specifying the
parameter delayed=False
.
You can also filter the partitions to be considered by filtering the partitions using keywords used for partitioning in a valid Python expression.
collection.load(filters='year == 2000 and month == 2')
<zcollection.dataset.Dataset>
Dimensions: ('num_lines: 9', 'num_pixels: 25')
Data variables:
var2 (num_lines, num_pixels) float64: dask.array<chunksize=(9, 25)>
var1 (num_lines, num_pixels) float64: dask.array<chunksize=(9, 25)>
time (num_lines) datetime64[ns]: dask.array<chunksize=(9,)>
Attributes:
attr : 1
You can also used a callback function to filter partitions with a complex condition.
collection.load(
filters=lambda keys: datetime.date(2000, 2, 15) <= datetime.date(
keys['year'], keys['month'], 1) <= datetime.date(2000, 3, 15))
<zcollection.dataset.Dataset>
Dimensions: ('num_lines: 11', 'num_pixels: 25')
Data variables:
var2 (num_lines, num_pixels) float64: dask.array<chunksize=(11, 25)>
var1 (num_lines, num_pixels) float64: dask.array<chunksize=(11, 25)>
time (num_lines) datetime64[ns]: dask.array<chunksize=(11,)>
Attributes:
attr : 1
Note that the load
function may return None if no partition has been selected.
assert collection.load(filters='year == 2002 and month == 2') is None
Editing variables#
Note
The functions for modifying collections are not usable if the collection
is open
in read-only mode.
It’s possible to delete a variable from a collection.
collection.drop_variable('var2')
collection.load()
<zcollection.dataset.Dataset>
Dimensions: ('num_lines: 61', 'num_pixels: 25')
Data variables:
var1 (num_lines, num_pixels) float64: dask.array<chunksize=(11, 25)>
time (num_lines) datetime64[ns]: dask.array<chunksize=(11,)>
Attributes:
attr : 1
The variable used for partitioning cannot be deleted.
try:
collection.drop_variable('time')
except ValueError as exc:
print(exc)
The variable 'time' is part of the partitioning.
The add_variable
method allows you to add a new variable to the collection.
collection.add_variable(zds.metadata().variables['var2'])
The newly created variable is initialized with its default value.
zds = collection.load()
assert zds is not None
zds.variables['var2'].values
masked_array(
data=[[--, --, --, ..., --, --, --],
[--, --, --, ..., --, --, --],
[--, --, --, ..., --, --, --],
...,
[--, --, --, ..., --, --, --],
[--, --, --, ..., --, --, --],
[--, --, --, ..., --, --, --]],
mask=[[ True, True, True, ..., True, True, True],
[ True, True, True, ..., True, True, True],
[ True, True, True, ..., True, True, True],
...,
[ True, True, True, ..., True, True, True],
[ True, True, True, ..., True, True, True],
[ True, True, True, ..., True, True, True]],
fill_value=214748.3647,
dtype=float64)
Finally it’s possible to
update
the existing
variables.
In this example, we will alter the variable var2
by setting it to 1
anywhere the variable var1
is defined.
def ones(zds) -> dict[str, numpy.ndarray]:
"""Returns a variable with ones everywhere."""
return dict(var2=zds.variables['var1'].values * 0 + 1)
collection.update(ones) # type: ignore[arg-type]
zds = collection.load()
assert zds is not None
zds.variables['var2'].values
masked_array(
data=[[--, --, --, ..., --, --, --],
[1.0, 1.0, 1.0, ..., 1.0, 1.0, 1.0],
[--, --, --, ..., --, --, --],
...,
[--, --, --, ..., --, --, --],
[1.0, 1.0, 1.0, ..., 1.0, 1.0, 1.0],
[--, --, --, ..., --, --, --]],
mask=[[ True, True, True, ..., True, True, True],
[False, False, False, ..., False, False, False],
[ True, True, True, ..., True, True, True],
...,
[ True, True, True, ..., True, True, True],
[False, False, False, ..., False, False, False],
[ True, True, True, ..., True, True, True]],
fill_value=214748.3647)
Note
The method update
supports the delayed
parameter. If delayed=True
, the function
ones
is applied to each partition using a Dask array as container
for the variables data stored in the provided dataset. This is the default
behavior. If delayed=False
, the function ones
is applied to each
partition using a Numpy array as container.
Sometime is it important to know the values of the neighboring partitions.
This can be done using the
update
method with the
depth
argument. In this example, we will set the variable var2
to 2
everywhere the processed partition is surrounded by at least one partition, -1
if the left partition is missing and -2 if the right partition is missing.
Note
partition_info
contains information about the target partition: a tuple
with the partitioned dimension and the slice to select the partition. If the
start of the slice is 0, it means that the left partition is missing. If the
stop of the slice is equal to the length of the given dataset, it means that
the right partition is missing.
def twos(ds, partition_info: tuple[str, slice]) -> dict[str, numpy.ndarray]:
"""Returns a variable with twos everywhere if the partition is surrounded
by partitions on both sides, -1 if the left partition is missing and -2 if
the right partition is missing."""
data = numpy.zeros(ds.variables['var1'].shape, dtype='int8')
dim, indices = partition_info
assert dim == 'num_lines'
if indices.start != 0:
data[:] = -1
elif indices.stop != data.shape[0]:
data[:] = -2
else:
data[:] = 2
return dict(var2=data)
collection.update(twos, depth=1) # type: ignore[arg-type]
zds = collection.load()
assert zds is not None
zds.variables['var2'].values
masked_array(
data=[[-2., -2., -2., ..., -2., -2., -2.],
[-2., -2., -2., ..., -2., -2., -2.],
[-2., -2., -2., ..., -2., -2., -2.],
...,
[-1., -1., -1., ..., -1., -1., -1.],
[-1., -1., -1., ..., -1., -1., -1.],
[-1., -1., -1., ..., -1., -1., -1.]],
mask=False,
fill_value=214748.3647)
Map a function over the collection#
It’s possible to map a function over the partitions of the collection.
for partition, array in collection.map(lambda ds: ( # type: ignore[arg-type]
ds['var1'].values + ds['var2'].values)).compute():
print(f' * partition = {partition}: mean = {array.mean()}')
* partition = (('year', 2000), ('month', 1)): mean = -1.9995
* partition = (('year', 2000), ('month', 2)): mean = -0.9985
* partition = (('year', 2000), ('month', 3)): mean = -0.9975
* partition = (('year', 2000), ('month', 4)): mean = -0.9964999999999999
* partition = (('year', 2000), ('month', 5)): mean = -0.9955000000000002
* partition = (('year', 2000), ('month', 6)): mean = -0.9945
Note
The map
method is
lazy. To compute the result, you need to call the method compute
on the returned object.
It’s also possible to map a function over the partitions with a a number of
neighboring partitions, like the
update
. To do so, use the
map_overlap
method.
for partition, array in collection.map_overlap(
lambda ds: ( # type: ignore[arg-type]
ds['var1'].values + ds['var2'].values),
depth=1).compute():
print(f' * partition = {partition}: mean = {array.mean()}')
* partition = (('year', 2000), ('month', 1)): mean = -1.499
* partition = (('year', 2000), ('month', 2)): mean = -1.3318333333333334
* partition = (('year', 2000), ('month', 3)): mean = -0.9975
* partition = (('year', 2000), ('month', 4)): mean = -0.9965
* partition = (('year', 2000), ('month', 5)): mean = -0.9955
* partition = (('year', 2000), ('month', 6)): mean = -0.995
Close the local cluster to avoid printing warning messages in the other examples.
client.close()
cluster.close()
Total running time of the script: (0 minutes 1.939 seconds)