Skip to content

Commit d21ed20

Browse files
committed
blocksize_max -> max_blocksize. also, this is persisted in metalayer now.
1 parent ab8b495 commit d21ed20

5 files changed

Lines changed: 66 additions & 40 deletions

File tree

bench/batch_store.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def build_store(codec: blosc2.Codec, clevel: int, use_dict: bool, in_mem: bool)
6161
storage = blosc2.Storage(mode="w")
6262
store = blosc2.BatchStore(
6363
storage=storage,
64-
blocksize_max=BLOCKSIZE_MAX,
64+
max_blocksize=BLOCKSIZE_MAX,
6565
cparams={
6666
"codec": codec,
6767
"clevel": clevel,
@@ -79,7 +79,7 @@ def build_store(codec: blosc2.Codec, clevel: int, use_dict: bool, in_mem: bool)
7979
"clevel": clevel,
8080
"use_dict": use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4),
8181
}
82-
with blosc2.BatchStore(storage=storage, blocksize_max=BLOCKSIZE_MAX, cparams=cparams) as store:
82+
with blosc2.BatchStore(storage=storage, max_blocksize=BLOCKSIZE_MAX, cparams=cparams) as store:
8383
for batch_index in range(NBATCHES):
8484
store.append(make_batch(batch_index))
8585
return None
@@ -123,7 +123,7 @@ def main() -> None:
123123
assert store is not None
124124
read_store = store
125125
else:
126-
read_store = blosc2.BatchStore(urlpath=URLPATH, mode="r", contiguous=True, blocksize_max=BLOCKSIZE_MAX)
126+
read_store = blosc2.BatchStore(urlpath=URLPATH, mode="r", contiguous=True, max_blocksize=BLOCKSIZE_MAX)
127127
samples, timings_ns = measure_random_reads(read_store)
128128
t0 = time.perf_counter()
129129
checksum = 0
@@ -138,7 +138,7 @@ def main() -> None:
138138
print(f" build time: {build_time_s:.3f} s")
139139
print(f" batches: {len(read_store)}")
140140
print(f" objects: {TOTAL_OBJECTS}")
141-
print(f" blocksize_max: {read_store.blocksize_max}")
141+
print(f" max_blocksize: {read_store.max_blocksize}")
142142
print()
143143
print(read_store.info)
144144
print(f"Random scalar reads: {N_RANDOM_READS}")

examples/batch_store.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,19 @@ def main() -> None:
3636
blosc2.remove_urlpath(URLPATH)
3737

3838
storage = blosc2.Storage(urlpath=URLPATH, mode="w", contiguous=True)
39-
with blosc2.BatchStore(storage=storage, blocksize_max=BLOCKSIZE_MAX) as store:
39+
with blosc2.BatchStore(storage=storage, max_blocksize=BLOCKSIZE_MAX) as store:
4040
for batch_index in range(NBATCHES):
4141
store.append(make_batch(batch_index))
4242

4343
total_objects = sum(len(batch) for batch in store)
4444
print("Created BatchStore")
4545
print(f" batches: {len(store)}")
4646
print(f" objects: {total_objects}")
47-
print(f" blocksize_max: {store.blocksize_max}")
47+
print(f" max_blocksize: {store.max_blocksize}")
4848

49-
# Reopen with the same blocksize_max hint so scalar reads can use the
49+
# Reopen with the same max_blocksize hint so scalar reads can use the
5050
# VL-block path instead of decoding the entire batch.
51-
reopened = blosc2.BatchStore(urlpath=URLPATH, mode="r", contiguous=True, blocksize_max=BLOCKSIZE_MAX)
51+
reopened = blosc2.BatchStore(urlpath=URLPATH, mode="r", contiguous=True, max_blocksize=BLOCKSIZE_MAX)
5252

5353
print()
5454
print(reopened.info)

src/blosc2/batch_store.py

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb
1919
from blosc2.info import InfoReporter, format_nbytes_info
2020

21-
_BATCHSTORE_META = {"version": 1, "serializer": "msgpack"}
21+
_BATCHSTORE_META = {"version": 1, "serializer": "msgpack", "max_blocksize": None}
2222

2323

2424
def _check_serialized_size(buffer: bytes) -> None:
@@ -69,9 +69,9 @@ def __getitem__(self, index: int | slice) -> Any | list[Any]:
6969
items = self._decode_items()
7070
index = self._normalize_index(index)
7171
return items[index]
72-
blocksize_max = self._parent.blocksize_max
73-
if blocksize_max is not None:
74-
block_index, item_index = divmod(index, blocksize_max)
72+
max_blocksize = self._parent.max_blocksize
73+
if max_blocksize is not None:
74+
block_index, item_index = divmod(index, max_blocksize)
7575
if block_index >= self._nblocks:
7676
raise IndexError("Batch index out of range")
7777
block = self._get_block(block_index)
@@ -158,6 +158,11 @@ def _attach_schunk(self, schunk: blosc2.SChunk) -> None:
158158
self.schunk = schunk
159159
self.mode = schunk.mode
160160
self.mmap_mode = getattr(schunk, "mmap_mode", None)
161+
try:
162+
batchstore_meta = self.schunk.meta["batchstore"]
163+
except KeyError:
164+
batchstore_meta = {}
165+
self._max_blocksize = batchstore_meta.get("max_blocksize", self._max_blocksize)
161166
self._validate_tag()
162167

163168
def _maybe_open_existing(self, storage: blosc2.Storage) -> bool:
@@ -181,13 +186,13 @@ def _make_storage(self) -> blosc2.Storage:
181186

182187
def __init__(
183188
self,
184-
blocksize_max: int | None = None,
189+
max_blocksize: int | None = None,
185190
_from_schunk: blosc2.SChunk | None = None,
186191
**kwargs: Any,
187192
) -> None:
188-
if blocksize_max is not None and blocksize_max <= 0:
189-
raise ValueError("blocksize_max must be a positive integer")
190-
self._blocksize_max: int | None = blocksize_max
193+
if max_blocksize is not None and max_blocksize <= 0:
194+
raise ValueError("max_blocksize must be a positive integer")
195+
self._max_blocksize: int | None = max_blocksize
191196
if _from_schunk is not None:
192197
if kwargs:
193198
unexpected = ", ".join(sorted(kwargs))
@@ -213,7 +218,7 @@ def __init__(
213218
return
214219

215220
fixed_meta = dict(storage.meta or {})
216-
fixed_meta["batchstore"] = dict(_BATCHSTORE_META)
221+
fixed_meta["batchstore"] = {**_BATCHSTORE_META, "max_blocksize": self._max_blocksize}
217222
storage.meta = fixed_meta
218223
schunk = blosc2.SChunk(chunksize=-1, data=None, cparams=cparams, dparams=dparams, storage=storage)
219224
self._attach_schunk(schunk)
@@ -263,9 +268,29 @@ def _normalize_batch(self, value: object) -> list[Any]:
263268
return values
264269

265270
def _ensure_layout_for_batch(self, batch: list[Any]) -> None:
266-
if self._blocksize_max is None:
271+
if self._max_blocksize is None:
267272
payload_sizes = [len(msgpack_packb(item)) for item in batch]
268-
self._blocksize_max = self._guess_blocksize(payload_sizes)
273+
self._max_blocksize = self._guess_blocksize(payload_sizes)
274+
self._persist_max_blocksize()
275+
276+
def _persist_max_blocksize(self) -> None:
277+
if self._max_blocksize is None or len(self) > 0:
278+
return
279+
storage = self._make_storage()
280+
fixed_meta = dict(storage.meta or {})
281+
fixed_meta["batchstore"] = {
282+
**dict(fixed_meta.get("batchstore", {})),
283+
"max_blocksize": self._max_blocksize,
284+
}
285+
storage.meta = fixed_meta
286+
schunk = blosc2.SChunk(
287+
chunksize=-1,
288+
data=None,
289+
cparams=copy.deepcopy(self.cparams),
290+
dparams=copy.deepcopy(self.dparams),
291+
storage=storage,
292+
)
293+
self._attach_schunk(schunk)
269294

270295
def _guess_blocksize(self, payload_sizes: list[int]) -> int:
271296
if not payload_sizes:
@@ -301,11 +326,11 @@ def _vl_dparams_kwargs(self) -> dict[str, Any]:
301326
return asdict(self.schunk.dparams)
302327

303328
def _compress_batch(self, batch: list[Any]) -> bytes:
304-
if self._blocksize_max is None:
305-
raise RuntimeError("BatchStore blocksize_max is not initialized")
329+
if self._max_blocksize is None:
330+
raise RuntimeError("BatchStore max_blocksize is not initialized")
306331
blocks = [
307-
self._serialize_block(batch[i : i + self._blocksize_max])
308-
for i in range(0, len(batch), self._blocksize_max)
332+
self._serialize_block(batch[i : i + self._max_blocksize])
333+
for i in range(0, len(batch), self._max_blocksize)
309334
]
310335
return blosc2.blosc2_ext.vlcompress(blocks, **self._vl_cparams_kwargs())
311336

@@ -446,8 +471,8 @@ def dparams(self):
446471
return self.schunk.dparams
447472

448473
@property
449-
def blocksize_max(self) -> int | None:
450-
return self._blocksize_max
474+
def max_blocksize(self) -> int | None:
475+
return self._max_blocksize
451476

452477
@property
453478
def typesize(self) -> int:
@@ -492,7 +517,7 @@ def info_items(self) -> list:
492517
("type", f"{self.__class__.__name__}"),
493518
("nbatches", len(self)),
494519
("batch stats", batch_stats),
495-
("blocksize_max", self.blocksize_max),
520+
("max_blocksize", self.max_blocksize),
496521
("nitems", sum(batch_sizes)),
497522
("nbytes", format_nbytes_info(self.nbytes)),
498523
("cbytes", format_nbytes_info(self.cbytes)),
@@ -510,7 +535,7 @@ def copy(self, **kwargs: Any) -> BatchStore:
510535
raise ValueError("meta should not be passed to copy")
511536
kwargs["cparams"] = kwargs.get("cparams", copy.deepcopy(self.cparams))
512537
kwargs["dparams"] = kwargs.get("dparams", copy.deepcopy(self.dparams))
513-
kwargs["blocksize_max"] = kwargs.get("blocksize_max", self.blocksize_max)
538+
kwargs["max_blocksize"] = kwargs.get("max_blocksize", self.max_blocksize)
514539

515540
if "storage" not in kwargs:
516541
kwargs["meta"] = self._copy_meta()

tests/test_batch_store.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ def test_batchstore_roundtrip(contiguous, urlpath):
4646
assert barray.append(batch) == i
4747

4848
assert len(barray) == len(BATCHES)
49-
assert barray.blocksize_max is not None
50-
assert 1 <= barray.blocksize_max <= len(BATCHES[0])
49+
assert barray.max_blocksize is not None
50+
assert 1 <= barray.max_blocksize <= len(BATCHES[0])
5151
assert [batch[:] for batch in barray.iter_batches()] == BATCHES
5252
assert barray.append([1, 2]) == len(BATCHES) + 1
5353
assert [batch[:] for batch in barray.iter_batches()][-1] == [1, 2]
@@ -83,7 +83,7 @@ def test_batchstore_roundtrip(contiguous, urlpath):
8383
if urlpath is not None:
8484
reopened = blosc2.open(urlpath, mode="r")
8585
assert isinstance(reopened, blosc2.BatchStore)
86-
assert reopened.blocksize_max is None
86+
assert reopened.max_blocksize == barray.max_blocksize
8787
assert [batch[:] for batch in reopened.iter_batches()] == expected
8888
with pytest.raises(ValueError):
8989
reopened.append(["nope"])
@@ -145,7 +145,7 @@ def test_batchstore_info():
145145
assert items["type"] == "BatchStore"
146146
assert items["nbatches"] == len(BATCHES)
147147
assert items["batch stats"].startswith("mean=")
148-
assert items["blocksize_max"] == barray.blocksize_max
148+
assert items["max_blocksize"] == barray.max_blocksize
149149
assert items["nitems"] == sum(len(batch) for batch in BATCHES)
150150
assert "urlpath" not in items
151151
assert "contiguous" not in items
@@ -158,7 +158,7 @@ def test_batchstore_info():
158158
assert "type" in text
159159
assert "BatchStore" in text
160160
assert "batch stats" in text
161-
assert "blocksize_max" in text
161+
assert "max_blocksize" in text
162162

163163

164164
def test_batchstore_zstd_does_not_use_dict_by_default():
@@ -167,9 +167,9 @@ def test_batchstore_zstd_does_not_use_dict_by_default():
167167
assert barray.cparams.use_dict is False
168168

169169

170-
def test_batchstore_explicit_blocksize_max():
171-
barray = blosc2.BatchStore(blocksize_max=2)
172-
assert barray.blocksize_max == 2
170+
def test_batchstore_explicit_max_blocksize():
171+
barray = blosc2.BatchStore(max_blocksize=2)
172+
assert barray.max_blocksize == 2
173173
barray.append([1, 2, 3])
174174
barray.append([4])
175175
assert [batch[:] for batch in barray.iter_batches()] == [[1, 2, 3], [4]]
@@ -180,10 +180,10 @@ def test_batchstore_get_vlblock_and_scalar_access():
180180
blosc2.remove_urlpath(urlpath)
181181

182182
batch = [0, 1, 2, 3, 4]
183-
barray = blosc2.BatchStore(storage=_storage(True, urlpath), blocksize_max=2)
183+
barray = blosc2.BatchStore(storage=_storage(True, urlpath), max_blocksize=2)
184184
barray.append(batch)
185185

186-
assert barray.blocksize_max == 2
186+
assert barray.max_blocksize == 2
187187
assert msgpack_unpackb(barray.schunk.get_vlblock(0, 0)) == batch[:2]
188188
assert msgpack_unpackb(barray.schunk.get_vlblock(0, 1)) == batch[2:4]
189189
assert msgpack_unpackb(barray.schunk.get_vlblock(0, 2)) == batch[4:]
@@ -194,6 +194,7 @@ def test_batchstore_get_vlblock_and_scalar_access():
194194

195195
reopened = blosc2.open(urlpath, mode="r")
196196
assert isinstance(reopened, blosc2.BatchStore)
197+
assert reopened.max_blocksize == 2
197198
assert reopened[0][0] == 0
198199
assert reopened[0][2] == 2
199200
assert reopened[0][4] == 4
@@ -203,7 +204,7 @@ def test_batchstore_get_vlblock_and_scalar_access():
203204

204205

205206
def test_batchstore_scalar_reads_cache_vlblocks():
206-
barray = blosc2.BatchStore(blocksize_max=2)
207+
barray = blosc2.BatchStore(max_blocksize=2)
207208
barray.append([0, 1, 2, 3, 4])
208209

209210
batch = barray[0]
@@ -227,7 +228,7 @@ def wrapped_get_vlblock(nchunk, nblock):
227228

228229

229230
def test_batchstore_iter_objects():
230-
barray = blosc2.BatchStore(blocksize_max=2)
231+
barray = blosc2.BatchStore(max_blocksize=2)
231232
batches = [[1, 2, 3], [4], [5, 6]]
232233
barray.extend(batches)
233234

tests/test_tree_store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,7 @@ def test_external_batchstore_support(tmp_path):
658658
store_path = tmp_path / "test_batchstore_external.b2d"
659659

660660
with TreeStore(str(store_path), mode="w", threshold=0) as tstore:
661-
bstore = blosc2.BatchStore(blocksize_max=2)
661+
bstore = blosc2.BatchStore(max_blocksize=2)
662662
bstore.extend([[{"id": 1}, {"id": 2}], [{"id": 3}]])
663663
tstore["/data/batchstore"] = bstore
664664

0 commit comments

Comments
 (0)