Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a75604a
feat: define `PreparedWrite` and `SupportsChunkPacking` data structures
d-v-b Apr 7, 2026
a072c31
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Apr 7, 2026
47a407f
feat: new codec pipeline that uses sync path
d-v-b Apr 7, 2026
3c27e49
feat: complete second codecpipeline
d-v-b Apr 8, 2026
9b834a4
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Apr 8, 2026
c731cf2
fix: handle rectilinear chunks
d-v-b Apr 8, 2026
9e25150
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Apr 8, 2026
ae0580c
fixup
d-v-b Apr 9, 2026
863cf8f
chore: make phased pipeline the default
d-v-b Apr 9, 2026
053f2ee
fix: fixup
d-v-b Apr 9, 2026
3d5cdf8
Merge branch 'perf/prepared-write-v2' of github.com:d-v-b/zarr-python…
d-v-b Apr 9, 2026
ba393d3
Merge branch 'main' of github.com:zarr-developers/zarr-python into pe…
d-v-b Apr 9, 2026
cfe9539
fix: wire up prototype in setitem
d-v-b Apr 9, 2026
e82da5b
Merge branch 'perf/prepared-write-v2' of github.com:d-v-b/zarr-python…
d-v-b Apr 9, 2026
0b2512b
refactor: define chunklayout class
d-v-b Apr 9, 2026
a18b20f
Merge branch 'perf/prepared-write-v2' of github.com:d-v-b/zarr-python…
d-v-b Apr 9, 2026
5fb28b9
perf: only fetch the chunks we need
d-v-b Apr 9, 2026
8330cde
Merge branch 'perf/prepared-write-v2' of github.com:d-v-b/zarr-python…
d-v-b Apr 9, 2026
f62e16b
Merge branch 'main' into perf/prepared-write-v2-bench
d-v-b Apr 13, 2026
f3a4bbb
Merge branch 'main' into perf/prepared-write-v2-bench
d-v-b Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from abc import abstractmethod
from collections.abc import Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal, Protocol, TypeGuard, runtime_checkable

from typing_extensions import ReadOnly, TypedDict
Expand All @@ -18,7 +19,7 @@
from zarr.abc.store import ByteGetter, ByteSetter, Store
from zarr.core.array_spec import ArraySpec
from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType
from zarr.core.indexing import SelectorTuple
from zarr.core.indexing import ChunkProjection, SelectorTuple
from zarr.core.metadata import ArrayMetadata
from zarr.core.metadata.v3 import ChunkGridMetadata

Expand All @@ -33,6 +34,8 @@
"CodecOutput",
"CodecPipeline",
"GetResult",
"PreparedWrite",
"SupportsChunkCodec",
"SupportsSyncCodec",
]

Expand Down Expand Up @@ -82,6 +85,25 @@ def _decode_sync(self, chunk_data: CO, chunk_spec: ArraySpec) -> CI: ...
def _encode_sync(self, chunk_data: CI, chunk_spec: ArraySpec) -> CO | None: ...


class SupportsChunkCodec(Protocol):
"""Protocol for objects that can decode/encode whole chunks synchronously.

`ChunkTransform` satisfies this protocol. The ``chunk_shape`` parameter
allows decoding/encoding chunks of different shapes (e.g. rectilinear
grids) without rebuilding the transform.
"""

array_spec: ArraySpec

def decode_chunk(
self, chunk_bytes: Buffer, chunk_shape: tuple[int, ...] | None = None
) -> NDBuffer: ...

def encode_chunk(
self, chunk_array: NDBuffer, chunk_shape: tuple[int, ...] | None = None
) -> Buffer | None: ...


class BaseCodec[CI: CodecInput, CO: CodecOutput](Metadata):
"""Generic base class for codecs.

Expand Down Expand Up @@ -207,6 +229,37 @@ class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer]):
"""Base class for array-to-array codecs."""


@dataclass
class PreparedWrite:
"""Intermediate state between reading existing data and writing new data.

Created by `prepare_write_sync` / `prepare_write`, consumed by
`finalize_write_sync` / `finalize_write`. The compute phase sits
in between: iterate over `indexer`, decode the corresponding entry
in `chunk_dict`, merge new data, re-encode, and store the result
back into `chunk_dict`.

Attributes
----------
chunk_dict : dict[tuple[int, ...], Buffer | None]
Per-inner-chunk encoded bytes, keyed by chunk coordinates.
For a regular array this is `{(0,): <bytes>}`. For a sharded
array it contains one entry per inner chunk in the shard,
including chunks not being modified (they pass through
unchanged). `None` means the chunk did not exist on disk.
indexer : list[ChunkProjection]
The inner chunks to modify. Each entry's `chunk_coords`
corresponds to a key in `chunk_dict`. `chunk_selection`
identifies the region within that inner chunk, and
`out_selection` identifies the corresponding region in the
source value array. This is a subset of `chunk_dict`'s keys
— untouched chunks are not listed.
"""

chunk_dict: dict[tuple[int, ...], Buffer | None]
indexer: list[ChunkProjection]


class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]):
"""Base class for array-to-bytes codecs."""

Expand Down
37 changes: 20 additions & 17 deletions src/zarr/codecs/_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ class V2Codec(ArrayBytesCodec):

is_fixed_size = False

async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
cdata = chunk_bytes.as_array_like()
# decompress
if self.compressor:
chunk = await asyncio.to_thread(self.compressor.decode, cdata)
chunk = self.compressor.decode(cdata)
else:
chunk = cdata

# apply filters
if self.filters:
for f in reversed(self.filters):
chunk = await asyncio.to_thread(f.decode, chunk)
chunk = f.decode(chunk)

# view as numpy array with correct dtype
chunk = ensure_ndarray_like(chunk)
Expand All @@ -48,20 +48,9 @@ async def _decode_single(
try:
chunk = chunk.view(chunk_spec.dtype.to_native_dtype())
except TypeError:
# this will happen if the dtype of the chunk
# does not match the dtype of the array spec i.g. if
# the dtype of the chunk_spec is a string dtype, but the chunk
# is an object array. In this case, we need to convert the object
# array to the correct dtype.

chunk = np.array(chunk).astype(chunk_spec.dtype.to_native_dtype())

elif chunk.dtype != object:
# If we end up here, someone must have hacked around with the filters.
# We cannot deal with object arrays unless there is an object
# codec in the filter chain, i.e., a filter that converts from object
# array to something else during encoding, and converts back to object
# array during decoding.
raise RuntimeError("cannot read object array without object codec")

# ensure correct chunk shape
Expand All @@ -70,7 +59,7 @@ async def _decode_single(

return get_ndbuffer_class().from_ndarray_like(chunk)

async def _encode_single(
def _encode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
Expand All @@ -83,18 +72,32 @@ async def _encode_single(
# apply filters
if self.filters:
for f in self.filters:
chunk = await asyncio.to_thread(f.encode, chunk)
chunk = f.encode(chunk)
# check object encoding
if ensure_ndarray_like(chunk).dtype == object:
raise RuntimeError("cannot write object array without object codec")

# compress
if self.compressor:
cdata = await asyncio.to_thread(self.compressor.encode, chunk)
cdata = self.compressor.encode(chunk)
else:
cdata = chunk
cdata = ensure_bytes(cdata)
return chunk_spec.prototype.buffer.from_bytes(cdata)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return await asyncio.to_thread(self._encode_sync, chunk_array, chunk_spec)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
50 changes: 30 additions & 20 deletions src/zarr/codecs/numcodecs/_codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
if TYPE_CHECKING:
from zarr.abc.numcodec import Numcodec
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, BufferPrototype, NDBuffer
from zarr.core.buffer import Buffer, NDBuffer

CODEC_PREFIX = "numcodecs."

Expand Down Expand Up @@ -132,53 +132,63 @@ class _NumcodecsBytesBytesCodec(_NumcodecsCodec, BytesBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper,
self._codec.decode,
chunk_data,
chunk_spec.prototype,
)
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return as_numpy_array_wrapper(self._codec.decode, chunk_data, chunk_spec.prototype)

def _encode(self, chunk_data: Buffer, prototype: BufferPrototype) -> Buffer:
def _encode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
encoded = self._codec.encode(chunk_data.as_array_like())
if isinstance(encoded, np.ndarray): # Required for checksum codecs
return prototype.buffer.from_bytes(encoded.tobytes())
return prototype.buffer.from_bytes(encoded)
return chunk_spec.prototype.buffer.from_bytes(encoded.tobytes())
return chunk_spec.prototype.buffer.from_bytes(encoded)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode, chunk_data, chunk_spec.prototype)
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


class _NumcodecsArrayArrayCodec(_NumcodecsCodec, ArrayArrayCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
def _decode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.decode, chunk_ndarray)
out = self._codec.decode(chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
out = self._codec.encode(chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out)

async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


class _NumcodecsArrayBytesCodec(_NumcodecsCodec, ArrayBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_bytes = chunk_data.to_bytes()
out = await asyncio.to_thread(self._codec.decode, chunk_bytes)
out = self._codec.decode(chunk_bytes)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
out = self._codec.encode(chunk_ndarray)
return chunk_spec.prototype.buffer.from_bytes(out)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


# bytes-to-bytes codecs
class Blosc(_NumcodecsBytesBytesCodec, codec_name="blosc"):
Expand Down
Loading
Loading