Skip to content

Commit f51c430

Browse files
committed
New iter_objects for iterating over objects in batch store
1 parent 4eed97c commit f51c430

3 files changed

Lines changed: 53 additions & 27 deletions

File tree

bench/batch_store.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,13 @@ def main() -> None:
125125
else:
126126
read_store = blosc2.BatchStore(urlpath=URLPATH, mode="r", contiguous=True, blocksize_max=BLOCKSIZE_MAX)
127127
samples, timings_ns = measure_random_reads(read_store)
128+
t0 = time.perf_counter()
129+
checksum = 0
130+
nobjects = 0
131+
for obj in read_store.iter_objects():
132+
checksum += obj["blue"]
133+
nobjects += 1
134+
iter_time_s = time.perf_counter() - t0
128135

129136
print()
130137
print("BatchStore benchmark")
@@ -138,6 +145,9 @@ def main() -> None:
138145
print(f" mean: {statistics.fmean(timings_ns) / 1_000:.2f} us")
139146
print(f" max: {max(timings_ns) / 1_000:.2f} us")
140147
print(f" min: {min(timings_ns) / 1_000:.2f} us")
148+
print(f"Object iteration via iter_objects(): {iter_time_s:.3f} s")
149+
print(f" per object: {iter_time_s * 1_000_000 / nobjects:.2f} us")
150+
print(f" checksum: {checksum}")
141151
print("Sample reads:")
142152
for timing_ns, batch_index, item_index, value in samples[:5]:
143153
print(f" {timing_ns / 1_000:.2f} us -> read_store[{batch_index}][{item_index}] = {value}")

src/blosc2/batch_store.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -418,10 +418,17 @@ def __delitem__(self, index: int | slice) -> None:
418418
def __len__(self) -> int:
419419
return self.schunk.nchunks
420420

421-
def __iter__(self) -> Iterator[Batch]:
421+
def iter_batches(self) -> Iterator[Batch]:
422422
for i in range(len(self)):
423423
yield self[i]
424424

425+
def iter_objects(self) -> Iterator[Any]:
426+
for batch in self.iter_batches():
427+
yield from batch
428+
429+
def __iter__(self) -> Iterator[Batch]:
430+
yield from self.iter_batches()
431+
425432
@property
426433
def meta(self):
427434
return self.schunk.meta
@@ -474,7 +481,7 @@ def info(self) -> InfoReporter:
474481
@property
475482
def info_items(self) -> list:
476483
"""A list of tuples with summary information about this BatchStore."""
477-
batch_sizes = [len(batch) for batch in self]
484+
batch_sizes = [len(batch) for batch in self.iter_batches()]
478485
if batch_sizes:
479486
batch_stats = (
480487
f"mean={statistics.fmean(batch_sizes):.2f}, max={max(batch_sizes)}, min={min(batch_sizes)}"
@@ -515,7 +522,7 @@ def copy(self, **kwargs: Any) -> BatchStore:
515522
if "storage" not in kwargs and len(self.vlmeta) > 0:
516523
for key, value in self.vlmeta.getall().items():
517524
out.vlmeta[key] = value
518-
out.extend(self)
525+
out.extend(self.iter_batches())
519526
return out
520527

521528
def __enter__(self) -> BatchStore:

tests/test_batch_store.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ def test_batchstore_roundtrip(contiguous, urlpath):
4848
assert len(barray) == len(BATCHES)
4949
assert barray.blocksize_max is not None
5050
assert 1 <= barray.blocksize_max <= len(BATCHES[0])
51-
assert [batch[:] for batch in barray] == BATCHES
51+
assert [batch[:] for batch in barray.iter_batches()] == BATCHES
5252
assert barray.append([1, 2]) == len(BATCHES) + 1
53-
assert [batch[:] for batch in barray][-1] == [1, 2]
53+
assert [batch[:] for batch in barray.iter_batches()][-1] == [1, 2]
5454

5555
batch0 = barray[0]
5656
assert isinstance(batch0, blosc2.Batch)
@@ -78,13 +78,13 @@ def test_batchstore_roundtrip(contiguous, urlpath):
7878
del expected[2]
7979
del barray[-2]
8080
del expected[-2]
81-
assert [batch[:] for batch in barray] == expected
81+
assert [batch[:] for batch in barray.iter_batches()] == expected
8282

8383
if urlpath is not None:
8484
reopened = blosc2.open(urlpath, mode="r")
8585
assert isinstance(reopened, blosc2.BatchStore)
8686
assert reopened.blocksize_max is None
87-
assert [batch[:] for batch in reopened] == expected
87+
assert [batch[:] for batch in reopened.iter_batches()] == expected
8888
with pytest.raises(ValueError):
8989
reopened.append(["nope"])
9090
with pytest.raises(ValueError):
@@ -105,12 +105,12 @@ def test_batchstore_roundtrip(contiguous, urlpath):
105105
reopened_rw = blosc2.open(urlpath, mode="a")
106106
reopened_rw[0] = ["changed", "batch", 0]
107107
expected[0] = ["changed", "batch", 0]
108-
assert [batch[:] for batch in reopened_rw] == expected
108+
assert [batch[:] for batch in reopened_rw.iter_batches()] == expected
109109

110110
if contiguous:
111111
reopened_mmap = blosc2.open(urlpath, mode="r", mmap_mode="r")
112112
assert isinstance(reopened_mmap, blosc2.BatchStore)
113-
assert [batch[:] for batch in reopened_mmap] == expected
113+
assert [batch[:] for batch in reopened_mmap.iter_batches()] == expected
114114

115115
blosc2.remove_urlpath(urlpath)
116116

@@ -126,11 +126,11 @@ def test_batchstore_from_cframe():
126126

127127
restored = blosc2.from_cframe(barray.to_cframe())
128128
assert isinstance(restored, blosc2.BatchStore)
129-
assert [batch[:] for batch in restored] == expected
129+
assert [batch[:] for batch in restored.iter_batches()] == expected
130130

131131
restored2 = blosc2.from_cframe(barray.to_cframe())
132132
assert isinstance(restored2, blosc2.BatchStore)
133-
assert [batch[:] for batch in restored2] == expected
133+
assert [batch[:] for batch in restored2.iter_batches()] == expected
134134

135135

136136
def test_batchstore_info():
@@ -172,7 +172,7 @@ def test_batchstore_explicit_blocksize_max():
172172
assert barray.blocksize_max == 2
173173
barray.append([1, 2, 3])
174174
barray.append([4])
175-
assert [batch[:] for batch in barray] == [[1, 2, 3], [4]]
175+
assert [batch[:] for batch in barray.iter_batches()] == [[1, 2, 3], [4]]
176176

177177

178178
def test_batchstore_get_vlblock_and_scalar_access():
@@ -226,6 +226,15 @@ def wrapped_get_vlblock(nchunk, nblock):
226226
barray.schunk.get_vlblock = original_get_vlblock
227227

228228

229+
def test_batchstore_iter_objects():
230+
barray = blosc2.BatchStore(blocksize_max=2)
231+
batches = [[1, 2, 3], [4], [5, 6]]
232+
barray.extend(batches)
233+
234+
assert [batch[:] for batch in barray] == batches
235+
assert list(barray.iter_objects()) == [1, 2, 3, 4, 5, 6]
236+
237+
229238
def test_batchstore_respects_explicit_use_dict_and_non_zstd():
230239
barray = blosc2.BatchStore(cparams={"codec": blosc2.Codec.LZ4, "clevel": 5})
231240
assert barray.cparams.codec == blosc2.Codec.LZ4
@@ -287,7 +296,7 @@ def test_batchstore_constructor_kwargs():
287296
barray.extend(BATCHES)
288297

289298
reopened = blosc2.BatchStore(urlpath=urlpath, mode="r", contiguous=True, mmap_mode="r")
290-
assert [batch[:] for batch in reopened] == BATCHES
299+
assert [batch[:] for batch in reopened.iter_batches()] == BATCHES
291300

292301
blosc2.remove_urlpath(urlpath)
293302

@@ -306,21 +315,21 @@ def test_batchstore_list_like_ops(contiguous, urlpath):
306315

307316
barray = blosc2.BatchStore(storage=_storage(contiguous, urlpath))
308317
barray.extend([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
309-
assert [batch[:] for batch in barray] == [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
318+
assert [batch[:] for batch in barray.iter_batches()] == [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
310319
assert barray.pop() == [7, 8, 9]
311320
assert barray.pop(0) == [1, 2, 3]
312-
assert [batch[:] for batch in barray] == [[4, 5, 6]]
321+
assert [batch[:] for batch in barray.iter_batches()] == [[4, 5, 6]]
313322

314323
barray.clear()
315324
assert len(barray) == 0
316-
assert [batch[:] for batch in barray] == []
325+
assert [batch[:] for batch in barray.iter_batches()] == []
317326

318327
barray.extend([["a", "b", "c"], ["d", "e", "f"]])
319-
assert [batch[:] for batch in barray] == [["a", "b", "c"], ["d", "e", "f"]]
328+
assert [batch[:] for batch in barray.iter_batches()] == [["a", "b", "c"], ["d", "e", "f"]]
320329

321330
if urlpath is not None:
322331
reopened = blosc2.open(urlpath, mode="r")
323-
assert [batch[:] for batch in reopened] == [["a", "b", "c"], ["d", "e", "f"]]
332+
assert [batch[:] for batch in reopened.iter_batches()] == [["a", "b", "c"], ["d", "e", "f"]]
324333

325334
blosc2.remove_urlpath(urlpath)
326335

@@ -346,15 +355,15 @@ def test_batchstore_slices(contiguous, urlpath):
346355

347356
barray[2:5] = [["a", "b", "c"], ["d", "e", "f"], ["g", "h", "i"]]
348357
expected[2:5] = [["a", "b", "c"], ["d", "e", "f"], ["g", "h", "i"]]
349-
assert [batch[:] for batch in barray] == expected
358+
assert [batch[:] for batch in barray.iter_batches()] == expected
350359

351360
barray[1:6:2] = [[100, 101, 102], [103, 104, 105], [106, 107, 108]]
352361
expected[1:6:2] = [[100, 101, 102], [103, 104, 105], [106, 107, 108]]
353-
assert [batch[:] for batch in barray] == expected
362+
assert [batch[:] for batch in barray.iter_batches()] == expected
354363

355364
del barray[::3]
356365
del expected[::3]
357-
assert [batch[:] for batch in barray] == expected
366+
assert [batch[:] for batch in barray.iter_batches()] == expected
358367

359368
if urlpath is not None:
360369
reopened = blosc2.open(urlpath, mode="r")
@@ -392,14 +401,14 @@ def test_batchstore_copy():
392401
copied = original.copy(
393402
urlpath=copy_path, contiguous=False, cparams={"codec": blosc2.Codec.LZ4, "clevel": 5}
394403
)
395-
assert [batch[:] for batch in copied] == [batch[:] for batch in original]
404+
assert [batch[:] for batch in copied.iter_batches()] == [batch[:] for batch in original.iter_batches()]
396405
assert copied.urlpath == copy_path
397406
assert copied.schunk.contiguous is False
398407
assert copied.cparams.codec == blosc2.Codec.LZ4
399408
assert copied.cparams.clevel == 5
400409

401410
inmem = original.copy()
402-
assert [batch[:] for batch in inmem] == [batch[:] for batch in original]
411+
assert [batch[:] for batch in inmem.iter_batches()] == [batch[:] for batch in original.iter_batches()]
403412
assert inmem.urlpath is None
404413

405414
with pytest.raises(ValueError, match="meta should not be passed to copy"):
@@ -434,7 +443,7 @@ def test_batchstore_multithreaded_inner_vl(contiguous, nthreads):
434443
)
435444
barray.extend(batches)
436445

437-
assert [batch[:] for batch in barray] == batches
446+
assert [batch[:] for batch in barray.iter_batches()] == batches
438447
assert [barray[i][:] for i in range(len(barray))] == batches
439448

440449

@@ -453,7 +462,7 @@ def test_batchstore_validation_errors():
453462
blosc2.BatchStore().pop()
454463
barray.extend([[1, 2, 3]])
455464
assert barray.append([2, 3]) == 2
456-
assert [batch[:] for batch in barray] == [[1, 2, 3], [2, 3]]
465+
assert [batch[:] for batch in barray.iter_batches()] == [[1, 2, 3], [2, 3]]
457466
with pytest.raises(NotImplementedError):
458467
barray.pop(slice(0, 1))
459468

@@ -466,7 +475,7 @@ def test_batchstore_in_embed_store():
466475
estore["/batch"] = barray
467476
restored = estore["/batch"]
468477
assert isinstance(restored, blosc2.BatchStore)
469-
assert [batch[:] for batch in restored] == BATCHES
478+
assert [batch[:] for batch in restored.iter_batches()] == BATCHES
470479

471480

472481
def test_batchstore_in_dict_store():
@@ -481,6 +490,6 @@ def test_batchstore_in_dict_store():
481490
with blosc2.DictStore(path, mode="r") as dstore:
482491
restored = dstore["/batch"]
483492
assert isinstance(restored, blosc2.BatchStore)
484-
assert [batch[:] for batch in restored] == BATCHES
493+
assert [batch[:] for batch in restored.iter_batches()] == BATCHES
485494

486495
blosc2.remove_urlpath(path)

0 commit comments

Comments
 (0)