-
-
Notifications
You must be signed in to change notification settings - Fork 402
Expand file tree
/
Copy pathv1.py
More file actions
202 lines (140 loc) · 6.36 KB
/
v1.py
File metadata and controls
202 lines (140 loc) · 6.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""Codec interface definitions (v1).
This module defines the abstract interfaces for zarr codecs.
External codec implementations should subclass ``ArrayArrayCodec``,
``ArrayBytesCodec``, or ``BytesBytesCodec`` from this module.
The ``Buffer`` and ``NDBuffer`` types here are protocols — they define
the structural interface that zarr's concrete buffer types implement.
Codec authors should type against these protocols, not zarr's concrete
buffer classes.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING, ClassVar, Protocol, Self, runtime_checkable
if TYPE_CHECKING:
import numpy as np
import numpy.typing as npt
from zarr_interfaces.data_type.v1 import JSON, TBaseDType, TBaseScalar, ZDType
# ---------------------------------------------------------------------------
# Buffer protocols
# ---------------------------------------------------------------------------
class Buffer(Protocol):
"""Protocol for a flat contiguous memory block (bytes-like)."""
def __len__(self) -> int: ...
def __getitem__(self, key: slice) -> Buffer: ...
class NDBuffer(Protocol):
"""Protocol for an N-dimensional array buffer."""
@property
def dtype(self) -> np.dtype[np.generic]: ...
@property
def shape(self) -> tuple[int, ...]: ...
def as_ndarray_like(self) -> npt.NDArray[np.generic]: ...
@classmethod
def from_ndarray_like(cls, data: npt.NDArray[np.generic]) -> NDBuffer: ...
def transpose(self, axes: tuple[int, ...]) -> NDBuffer: ...
def __getitem__(self, key: object) -> NDBuffer: ...
def __setitem__(self, key: object, value: object) -> None: ...
# ---------------------------------------------------------------------------
# ArraySpec protocol
# ---------------------------------------------------------------------------
class ArraySpec(Protocol):
"""Protocol for the specification of a chunk's metadata."""
@property
def shape(self) -> tuple[int, ...]: ...
@property
def dtype(self) -> ZDType[TBaseDType, TBaseScalar]: ...
@property
def fill_value(self) -> object: ...
@property
def ndim(self) -> int: ...
# ---------------------------------------------------------------------------
# Codec input/output type aliases
# ---------------------------------------------------------------------------
type CodecInput = NDBuffer | Buffer
type CodecOutput = NDBuffer | Buffer
# ---------------------------------------------------------------------------
# Sync codec protocol
# ---------------------------------------------------------------------------
@runtime_checkable
class SupportsSyncCodec[CI: CodecInput, CO: CodecOutput](Protocol):
"""Protocol for codecs that support synchronous encode/decode.
The type parameters mirror ``BaseCodec``: ``CI`` is the decoded type
and ``CO`` is the encoded type.
"""
def _decode_sync(self, chunk_data: CO, chunk_spec: ArraySpec) -> CI: ...
def _encode_sync(self, chunk_data: CI, chunk_spec: ArraySpec) -> CO | None: ...
# ---------------------------------------------------------------------------
# Codec ABCs
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class BaseCodec[CI: CodecInput, CO: CodecOutput](ABC):
"""Generic base class for codecs.
Subclass ``ArrayArrayCodec``, ``ArrayBytesCodec``, or
``BytesBytesCodec`` instead of this class directly.
"""
is_fixed_size: ClassVar[bool]
@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
"""Create an instance from a JSON dictionary."""
return cls(**data) # type: ignore[arg-type]
def to_dict(self) -> dict[str, JSON]:
"""Serialize this codec to a JSON dictionary."""
raise NotImplementedError
@abstractmethod
def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int:
"""Return the encoded byte length for a given input byte length."""
...
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
"""Return the chunk spec after encoding by this codec.
Override this for codecs that change shape, dtype, or fill value.
"""
return chunk_spec
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
"""Fill in codec parameters that can be inferred from array metadata."""
return self
def validate(
self,
*,
shape: tuple[int, ...],
dtype: ZDType[TBaseDType, TBaseScalar],
chunk_grid: object,
) -> None:
"""Validate that this codec is compatible with the array metadata.
The default implementation does nothing. Override to add checks.
"""
async def _decode_single(self, chunk_data: CO, chunk_spec: ArraySpec) -> CI:
"""Decode a single chunk. Override this or ``_decode_sync``."""
raise NotImplementedError
async def decode(
self,
chunks_and_specs: Iterable[tuple[CO | None, ArraySpec]],
) -> Iterable[CI | None]:
"""Decode a batch of chunks."""
results: list[CI | None] = []
for chunk_data, chunk_spec in chunks_and_specs:
if chunk_data is not None:
results.append(await self._decode_single(chunk_data, chunk_spec))
else:
results.append(None)
return results
async def _encode_single(self, chunk_data: CI, chunk_spec: ArraySpec) -> CO | None:
"""Encode a single chunk. Override this or ``_encode_sync``."""
raise NotImplementedError
async def encode(
self,
chunks_and_specs: Iterable[tuple[CI | None, ArraySpec]],
) -> Iterable[CO | None]:
"""Encode a batch of chunks."""
results: list[CO | None] = []
for chunk_data, chunk_spec in chunks_and_specs:
if chunk_data is not None:
results.append(await self._encode_single(chunk_data, chunk_spec))
else:
results.append(None)
return results
class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer]):
"""Base class for array-to-array codecs (e.g. transpose, scale_offset)."""
class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]):
"""Base class for array-to-bytes codecs (e.g. bytes, sharding)."""
class BytesBytesCodec(BaseCodec[Buffer, Buffer]):
"""Base class for bytes-to-bytes codecs (e.g. gzip, zstd)."""