|
5 | 5 | # SPDX-License-Identifier: BSD-3-Clause |
6 | 6 | ####################################################################### |
7 | 7 |
|
8 | | -# CTable indexing: create, query, rebuild, and drop indexes on columns. |
| 8 | +# CTable indexing with mixed dtypes, persistent sidecars, and a packed .b2z bundle. |
9 | 9 |
|
10 | 10 | import shutil |
11 | 11 | import tempfile |
|
19 | 19 | class Measurement: |
20 | 20 | sensor_id: int = blosc2.field(blosc2.int32()) |
21 | 21 | temperature: float = blosc2.field(blosc2.float64()) |
22 | | - region: int = blosc2.field(blosc2.int32()) |
| 22 | + region: str = blosc2.field(blosc2.string(max_length=12), default="") |
| 23 | + active: bool = blosc2.field(blosc2.bool(), default=True) |
| 24 | + status: str = blosc2.field(blosc2.string(max_length=12), default="") |
23 | 25 |
|
24 | 26 |
|
25 | | -# ------------------------------------------------------------------------- |
26 | | -# In-memory table |
27 | | -# ------------------------------------------------------------------------- |
| 27 | +def load_rows(table: blosc2.CTable, nrows: int = 240) -> None: |
| 28 | + regions = ["north", "south", "east", "west"] |
| 29 | + for i in range(nrows): |
| 30 | + region = regions[i % len(regions)] |
| 31 | + active = (i % 7) not in (0, 6) |
| 32 | + status = "alert" if i % 23 == 0 else ("warm" if i % 11 == 0 else "ok") |
| 33 | + table.append([i, 12.5 + (i % 40) * 0.35, region, active, status]) |
28 | 34 |
|
29 | | -t = blosc2.CTable(Measurement) |
30 | | -for i in range(200): |
31 | | - t.append([i, 15.0 + (i % 30) * 0.5, i % 4]) |
32 | 35 |
|
33 | | -# Create a bucket index on 'sensor_id' (default kind). |
34 | | -idx = t.create_index("sensor_id") |
35 | | -print("Index created:", idx) |
36 | | -print("Stale?", idx.stale) |
| 36 | +bundle_path = Path("indexed_measurements.b2z").resolve() |
| 37 | +workspace = Path(tempfile.mkdtemp()) |
| 38 | +table_path = workspace / "indexed_measurements.b2d" |
37 | 39 |
|
38 | | -# The where() call automatically uses the index when possible. |
39 | | -result = t.where(t["sensor_id"] > 180) |
40 | | -print("Rows with sensor_id > 180 (via index):", len(result)) |
41 | | -print("sensor_ids:", sorted(int(v) for v in result["sensor_id"].to_numpy())) |
| 40 | +pt = None |
| 41 | +packed = None |
42 | 42 |
|
43 | | -# List all indexes on the table. |
44 | | -print("\nAll indexes:", t.indexes) |
45 | | - |
46 | | -# After mutating the table the index is marked stale and where() falls |
47 | | -# back to a full scan automatically — results are still correct. |
48 | | -t.append([999, 42.0, 2]) |
49 | | -idx = t.index("sensor_id") |
50 | | -print("\nAfter append, stale?", idx.stale) |
51 | | - |
52 | | -result_stale = t.where(t["sensor_id"] == 999) |
53 | | -print("Row with sensor_id=999 (scan fallback):", len(result_stale)) |
54 | | - |
55 | | -# Rebuild the index to make it current again. |
56 | | -idx = t.rebuild_index("sensor_id") |
57 | | -print("\nAfter rebuild, stale?", idx.stale) |
58 | | - |
59 | | -result_rebuilt = t.where(t["sensor_id"] == 999) |
60 | | -print("Row with sensor_id=999 (via rebuilt index):", len(result_rebuilt)) |
61 | | - |
62 | | -# Drop the index. |
63 | | -t.drop_index("sensor_id") |
64 | | -print("\nIndexes after drop:", t.indexes) |
| 43 | +try: |
| 44 | + pt = blosc2.CTable(Measurement, urlpath=str(table_path), mode="w") |
| 45 | + load_rows(pt) |
| 46 | + |
| 47 | + # Create a couple of indexes on columns with different dtypes. |
| 48 | + idx_sensor = pt.create_index("sensor_id", kind=blosc2.IndexKind.FULL) |
| 49 | + idx_active = pt.create_index("active") |
| 50 | + print("Indexes created:", pt.indexes) |
| 51 | + print("sensor_id stale?", idx_sensor.stale) |
| 52 | + print("active stale?", idx_active.stale) |
| 53 | + |
| 54 | + # Queries can combine indexed and non-indexed predicates. |
| 55 | + recent_active = pt.where((pt["sensor_id"] >= 180) & pt["active"] & (pt["region"] == "north")) |
| 56 | + print("\nLive rows with sensor_id >= 180, active=True, region='north':", len(recent_active)) |
| 57 | + print("sensor_ids:", recent_active["sensor_id"].to_numpy().tolist()) |
| 58 | + print("statuses:", recent_active["status"].to_numpy().tolist()) |
| 59 | + |
| 60 | + # Close the table, pack the TreeStore into a single .b2z file, and reopen it. |
| 61 | + del pt |
| 62 | + pt = None |
65 | 63 |
|
66 | | -# ------------------------------------------------------------------------- |
67 | | -# Persistent table |
68 | | -# ------------------------------------------------------------------------- |
| 64 | + if bundle_path.exists(): |
| 65 | + bundle_path.unlink() |
69 | 66 |
|
70 | | -tmpdir = Path(tempfile.mkdtemp()) |
71 | | -path = str(tmpdir / "measurements.b2d") |
| 67 | + store = blosc2.TreeStore(str(table_path), mode="r") |
| 68 | + try: |
| 69 | + packed_path = store.to_b2z(filename=str(bundle_path), overwrite=True) |
| 70 | + finally: |
| 71 | + store.close() |
72 | 72 |
|
73 | | -try: |
74 | | - pt = blosc2.CTable(Measurement, urlpath=path, mode="w") |
75 | | - for i in range(200): |
76 | | - pt.append([i, 15.0 + (i % 30) * 0.5, i % 4]) |
| 73 | + print(f"\nPacked bundle created at: {packed_path}") |
77 | 74 |
|
78 | | - pidx = pt.create_index("sensor_id") |
79 | | - print("\nPersistent index created:", pidx) |
| 75 | + packed = blosc2.open(str(bundle_path), mode="r") |
| 76 | + print("Reopened object type:", type(packed).__name__) |
| 77 | + print("Indexes after reopen from .b2z:", packed.indexes) |
80 | 78 |
|
81 | | - # Sidecar files are stored under <table.b2d>/_indexes/sensor_id/ |
82 | | - index_dir = Path(path) / "_indexes" / "sensor_id" |
83 | | - sidecars = list(index_dir.glob("**/*.b2nd")) |
84 | | - print(f"Sidecar files: {len(sidecars)}") |
| 79 | + # Query directly against the .b2z bundle; no unpack step is needed. |
| 80 | + warm_active = packed.where(packed["active"] & (packed["status"] == "warm") & (packed["sensor_id"] > 100)) |
| 81 | + print("\nRows from .b2z with active=True, status='warm', sensor_id > 100:", len(warm_active)) |
| 82 | + print("sensor_ids:", warm_active["sensor_id"].to_numpy().tolist()) |
| 83 | + print("regions:", warm_active["region"].to_numpy().tolist()) |
85 | 84 |
|
86 | | - result_p = pt.where(pt["sensor_id"] > 190) |
87 | | - print("Rows sensor_id > 190:", len(result_p)) |
88 | | - |
89 | | - # Close and reopen — the index catalog is persisted. |
90 | | - del pt |
91 | | - pt2 = blosc2.open(path) |
92 | | - print("\nAfter reopen, indexes:", pt2.indexes) |
93 | | - result_p2 = pt2.where(pt2["sensor_id"] > 190) |
94 | | - print("Rows sensor_id > 190 (after reopen):", len(result_p2)) |
| 85 | + print("\nThe packed file is kept on disk.") |
| 86 | + print(f"Inspect it later with: f = blosc2.open({bundle_path.name!r}, mode='r')") |
| 87 | + print("Then call: f.info()") |
| 88 | + print("For a quick check of the available info entry point, print: f.info") |
95 | 89 |
|
96 | 90 | finally: |
97 | | - shutil.rmtree(tmpdir, ignore_errors=True) |
| 91 | + if packed is not None: |
| 92 | + del packed |
| 93 | + if pt is not None: |
| 94 | + del pt |
| 95 | + shutil.rmtree(workspace, ignore_errors=True) |
0 commit comments