Skip to content

Commit c44031d

Browse files
committed
Implement CTable indexing follow-ups
Fix CTable index lifecycle for schema mutations by removing index catalog entries and sidecars when indexed columns are dropped, and rebuilding indexes under the new name after column renames. Improve indexed CTable filtering so where() can expose multiple usable column indexes to the planner for conjunctive predicates, and raise a clear error for malformed table-owned index metadata instead of silently falling back to scans. Add regression coverage for indexed column rename/drop behavior, multi-column indexed conjunctions, and malformed catalog entries. Wire the CTable indexing tutorial into the docs toctree.
1 parent 2ff3140 commit c44031d

File tree

4 files changed

+199
-97
lines changed

4 files changed

+199
-97
lines changed

doc/getting_started/tutorials.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ Tutorials
2020
tutorials/12.batcharray
2121
tutorials/13.containers
2222
tutorials/14.indexing-arrays
23+
tutorials/15.indexing-ctables

src/blosc2/ctable.py

Lines changed: 134 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -2022,6 +2022,13 @@ def drop_column(self, name: str) -> None:
20222022
if len(self.col_names) == 1:
20232023
raise ValueError("Cannot drop the last column.")
20242024

2025+
catalog = self._storage.load_index_catalog()
2026+
if name in catalog:
2027+
descriptor = catalog.pop(name)
2028+
self._validate_index_descriptor(name, descriptor)
2029+
self._drop_index_descriptor(name, descriptor)
2030+
self._storage.save_index_catalog(catalog)
2031+
20252032
if isinstance(self._storage, FileTableStorage):
20262033
self._storage.delete_column(name)
20272034

@@ -2060,6 +2067,15 @@ def rename_column(self, old: str, new: str) -> None:
20602067
raise ValueError(f"Column {new!r} already exists.")
20612068
_validate_column_name(new)
20622069

2070+
catalog = self._storage.load_index_catalog()
2071+
rebuild_kwargs = None
2072+
if old in catalog:
2073+
descriptor = catalog.pop(old)
2074+
self._validate_index_descriptor(old, descriptor)
2075+
rebuild_kwargs = self._index_create_kwargs_from_descriptor(descriptor)
2076+
self._drop_index_descriptor(old, descriptor)
2077+
self._storage.save_index_catalog(catalog)
2078+
20632079
if isinstance(self._storage, FileTableStorage):
20642080
self._cols[new] = self._storage.rename_column(old, new)
20652081
else:
@@ -2088,6 +2104,8 @@ def rename_column(self, old: str, new: str) -> None:
20882104
)
20892105
if isinstance(self._storage, FileTableStorage):
20902106
self._storage.save_schema(schema_to_dict(self._schema))
2107+
if rebuild_kwargs is not None:
2108+
self.create_index(new, **rebuild_kwargs)
20912109

20922110
# ------------------------------------------------------------------
20932111
# Column access
@@ -2366,6 +2384,73 @@ def _mark_all_indexes_stale(self) -> None:
23662384
if changed:
23672385
root._storage.save_index_catalog(catalog)
23682386

2387+
@staticmethod
2388+
def _validate_index_descriptor(col_name: str, descriptor: dict) -> None:
2389+
"""Raise ValueError when an index catalog entry is malformed."""
2390+
if not isinstance(descriptor, dict):
2391+
raise ValueError(f"Malformed index metadata for column {col_name!r}: descriptor must be a dict.")
2392+
token = descriptor.get("token")
2393+
if not isinstance(token, str) or not token:
2394+
raise ValueError(f"Malformed index metadata for column {col_name!r}: missing token.")
2395+
kind = descriptor.get("kind")
2396+
if kind not in {"summary", "bucket", "partial", "full"}:
2397+
raise ValueError(f"Malformed index metadata for column {col_name!r}: invalid kind {kind!r}.")
2398+
if kind == "bucket" and not isinstance(descriptor.get("bucket"), dict):
2399+
raise ValueError(f"Malformed index metadata for column {col_name!r}: missing bucket payload.")
2400+
if kind == "partial" and not isinstance(descriptor.get("partial"), dict):
2401+
raise ValueError(f"Malformed index metadata for column {col_name!r}: missing partial payload.")
2402+
if kind == "full" and not isinstance(descriptor.get("full"), dict):
2403+
raise ValueError(f"Malformed index metadata for column {col_name!r}: missing full payload.")
2404+
2405+
def _drop_index_descriptor(self, col_name: str, descriptor: dict) -> None:
2406+
"""Delete sidecars/cache for a catalog descriptor without touching the column mapping."""
2407+
from pathlib import Path
2408+
2409+
from blosc2.indexing import (
2410+
_IN_MEMORY_INDEXES,
2411+
_PERSISTENT_INDEXES,
2412+
_array_key,
2413+
_clear_cached_data,
2414+
_drop_descriptor_sidecars,
2415+
_is_persistent_array,
2416+
)
2417+
2418+
col_arr = self._cols.get(col_name)
2419+
token = descriptor["token"]
2420+
2421+
if col_arr is not None:
2422+
_clear_cached_data(col_arr, token)
2423+
2424+
if col_arr is not None and _is_persistent_array(col_arr):
2425+
arr_key = _array_key(col_arr)
2426+
store = _PERSISTENT_INDEXES.get(arr_key)
2427+
if store is not None:
2428+
store["indexes"].pop(token, None)
2429+
elif col_arr is not None:
2430+
store = _IN_MEMORY_INDEXES.get(id(col_arr))
2431+
if store is not None:
2432+
store["indexes"].pop(token, None)
2433+
2434+
_drop_descriptor_sidecars(descriptor)
2435+
2436+
anchor = self._storage.index_anchor_path(col_name)
2437+
if anchor is not None:
2438+
proxy_key = ("persistent", str(Path(anchor).resolve()))
2439+
_PERSISTENT_INDEXES.pop(proxy_key, None)
2440+
with contextlib.suppress(OSError):
2441+
os.rmdir(os.path.dirname(anchor))
2442+
2443+
def _index_create_kwargs_from_descriptor(self, descriptor: dict) -> dict[str, Any]:
2444+
"""Return create_index kwargs that rebuild an existing descriptor."""
2445+
build = "ooc" if bool(descriptor.get("ooc", False)) else "memory"
2446+
return {
2447+
"kind": descriptor["kind"],
2448+
"optlevel": int(descriptor.get("optlevel", 5)),
2449+
"name": descriptor.get("name") or None,
2450+
"build": build,
2451+
"cparams": descriptor.get("cparams"),
2452+
}
2453+
23692454
def _build_index_persistent(
23702455
self,
23712456
col_name: str,
@@ -2622,48 +2707,13 @@ def drop_index(self, col_name: str) -> None:
26222707
if self.base is not None:
26232708
raise ValueError("Cannot drop an index from a view.")
26242709

2625-
from pathlib import Path
2626-
2627-
from blosc2.indexing import (
2628-
_IN_MEMORY_INDEXES,
2629-
_PERSISTENT_INDEXES,
2630-
_array_key,
2631-
_clear_cached_data,
2632-
_drop_descriptor_sidecars,
2633-
_is_persistent_array,
2634-
)
2635-
from blosc2.indexing import (
2636-
drop_index as _ix_drop_index,
2637-
)
2638-
26392710
catalog = self._storage.load_index_catalog()
26402711
if col_name not in catalog:
26412712
raise KeyError(f"No index found for column {col_name!r}.")
26422713

26432714
descriptor = catalog.pop(col_name)
2644-
token = descriptor["token"]
2645-
col_arr = self._cols[col_name]
2646-
2647-
if _is_persistent_array(col_arr):
2648-
_drop_descriptor_sidecars(descriptor)
2649-
_clear_cached_data(col_arr, token)
2650-
arr_key = _array_key(col_arr)
2651-
store = _PERSISTENT_INDEXES.get(arr_key)
2652-
if store is not None:
2653-
store["indexes"].pop(token, None)
2654-
anchor = self._storage.index_anchor_path(col_name)
2655-
proxy_key = ("persistent", str(Path(anchor).resolve()))
2656-
_PERSISTENT_INDEXES.pop(proxy_key, None)
2657-
index_dir = os.path.dirname(anchor)
2658-
with contextlib.suppress(OSError):
2659-
os.rmdir(index_dir)
2660-
else:
2661-
with contextlib.suppress(Exception):
2662-
_ix_drop_index(col_arr, field=None)
2663-
store = _IN_MEMORY_INDEXES.get(id(col_arr))
2664-
if store is not None:
2665-
store["indexes"].pop(token, None)
2666-
2715+
self._validate_index_descriptor(col_name, descriptor)
2716+
self._drop_index_descriptor(col_name, descriptor)
26672717
self._storage.save_index_catalog(catalog)
26682718

26692719
def rebuild_index(self, col_name: str) -> CTableIndex:
@@ -2694,22 +2744,11 @@ def rebuild_index(self, col_name: str) -> CTableIndex:
26942744
raise KeyError(f"No index found for column {col_name!r}.")
26952745

26962746
old_desc = catalog[col_name]
2697-
old_kind = old_desc["kind"]
2698-
old_optlevel = int(old_desc.get("optlevel", 5))
2699-
old_name = old_desc.get("name") or None
2700-
old_ooc = bool(old_desc.get("ooc", False))
2701-
old_cparams = old_desc.get("cparams")
2702-
build_str = "ooc" if old_ooc else "memory"
2747+
self._validate_index_descriptor(col_name, old_desc)
2748+
create_kwargs = self._index_create_kwargs_from_descriptor(old_desc)
27032749

27042750
self.drop_index(col_name)
2705-
return self.create_index(
2706-
col_name,
2707-
kind=old_kind,
2708-
optlevel=old_optlevel,
2709-
name=old_name,
2710-
build=build_str,
2711-
cparams=old_cparams,
2712-
)
2751+
return self.create_index(col_name, **create_kwargs)
27132752

27142753
def compact_index(self, col_name: str) -> CTableIndex:
27152754
"""Compact the index for *col_name*, merging any incremental append runs.
@@ -2814,16 +2853,23 @@ def indexes(self) -> list[CTableIndex]:
28142853
return [CTableIndex(self, col_name, desc) for col_name, desc in catalog.items()]
28152854

28162855
@staticmethod
2817-
def _find_indexed_column(root_cols, catalog, operands):
2818-
"""Return ``(col_name, col_arr)`` for the first operand NDArray that maps
2819-
to a root-table column with a live (non-stale) catalog entry, or ``(None, None)``."""
2856+
def _find_indexed_columns(root_cols, catalog, operands):
2857+
"""Return live indexed columns referenced by *operands* in expression order."""
2858+
indexed = []
2859+
seen = set()
28202860
for operand in operands.values():
28212861
if not isinstance(operand, blosc2.NDArray):
28222862
continue
28232863
for col_name, col_arr in root_cols.items():
2824-
if col_arr is operand and col_name in catalog and not catalog[col_name].get("stale", False):
2825-
return col_name, col_arr
2826-
return None, None
2864+
if col_arr is not operand or col_name in seen or col_name not in catalog:
2865+
continue
2866+
descriptor = catalog[col_name]
2867+
CTable._validate_index_descriptor(col_name, descriptor)
2868+
if descriptor.get("stale", False):
2869+
continue
2870+
indexed.append((col_name, col_arr, descriptor))
2871+
seen.add(col_name)
2872+
return indexed
28272873

28282874
def _try_index_where(self, expr_result: blosc2.LazyExpr) -> np.ndarray | None:
28292875
"""Attempt to resolve *expr_result* via a column index.
@@ -2851,51 +2897,47 @@ def _try_index_where(self, expr_result: blosc2.LazyExpr) -> np.ndarray | None:
28512897
expression = expr_result.expression
28522898
operands = dict(expr_result.operands)
28532899

2854-
# Find the first operand NDArray that maps to a root-table column with a live index.
2855-
primary_col_name, primary_col_arr = self._find_indexed_column(root._cols, catalog, operands)
2856-
2857-
if primary_col_arr is None:
2900+
indexed_columns = self._find_indexed_columns(root._cols, catalog, operands)
2901+
if not indexed_columns:
28582902
return None
28592903

2860-
# Inject the catalog descriptor into the appropriate index store so that
2861-
# plan_query can find it via _load_store / _descriptor_for_target.
2862-
descriptor = catalog[primary_col_name]
2863-
arr_key = _array_key(primary_col_arr)
2864-
if _is_persistent_array(primary_col_arr):
2865-
store = _PERSISTENT_INDEXES.get(arr_key) or _default_index_store()
2866-
store["indexes"][descriptor["token"]] = descriptor
2867-
_PERSISTENT_INDEXES[arr_key] = store
2868-
else:
2869-
store = _IN_MEMORY_INDEXES.get(id(primary_col_arr)) or _default_index_store()
2870-
store["indexes"][descriptor["token"]] = descriptor
2871-
_IN_MEMORY_INDEXES[id(primary_col_arr)] = store
2904+
primary_col_name, primary_col_arr, _ = indexed_columns[0]
2905+
2906+
# Inject every usable table-owned descriptor so plan_query can combine them.
2907+
for _col_name, col_arr, descriptor in indexed_columns:
2908+
arr_key = _array_key(col_arr)
2909+
if _is_persistent_array(col_arr):
2910+
store = _PERSISTENT_INDEXES.get(arr_key) or _default_index_store()
2911+
store["indexes"][descriptor["token"]] = descriptor
2912+
_PERSISTENT_INDEXES[arr_key] = store
2913+
else:
2914+
store = _IN_MEMORY_INDEXES.get(id(col_arr)) or _default_index_store()
2915+
store["indexes"][descriptor["token"]] = descriptor
2916+
_IN_MEMORY_INDEXES[id(col_arr)] = store
28722917

28732918
where_dict = {"_where_x": primary_col_arr}
28742919
merged_operands = {**operands, "_where_x": primary_col_arr}
28752920

2876-
try:
2877-
plan = plan_query(expression, merged_operands, where_dict)
2878-
if not plan.usable:
2879-
return None
2921+
plan = plan_query(expression, merged_operands, where_dict)
2922+
if not plan.usable:
2923+
return None
28802924

2881-
if plan.exact_positions is not None:
2882-
return np.asarray(plan.exact_positions, dtype=np.int64)
2925+
if plan.exact_positions is not None:
2926+
return np.asarray(plan.exact_positions, dtype=np.int64)
28832927

2884-
if plan.bucket_masks is not None:
2885-
_, positions = evaluate_bucket_query(
2886-
expression, merged_operands, {}, where_dict, plan, return_positions=True
2887-
)
2888-
return np.asarray(positions, dtype=np.int64)
2928+
if plan.bucket_masks is not None:
2929+
_, positions = evaluate_bucket_query(
2930+
expression, merged_operands, {}, where_dict, plan, return_positions=True
2931+
)
2932+
return np.asarray(positions, dtype=np.int64)
28892933

2890-
if plan.candidate_units is not None and plan.segment_len is not None:
2891-
_, positions = evaluate_segment_query(
2892-
expression, merged_operands, {}, where_dict, plan, return_positions=True
2893-
)
2894-
return np.asarray(positions, dtype=np.int64)
2934+
if plan.candidate_units is not None and plan.segment_len is not None:
2935+
_, positions = evaluate_segment_query(
2936+
expression, merged_operands, {}, where_dict, plan, return_positions=True
2937+
)
2938+
return np.asarray(positions, dtype=np.int64)
28952939

2896-
return None
2897-
except Exception:
2898-
return None
2940+
return None
28992941

29002942
def info(self) -> None:
29012943
"""Print a concise summary of the CTable."""

src/blosc2/schunk.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1706,6 +1706,14 @@ def _open_treestore_root_object(store, urlpath, mode):
17061706
return store
17071707

17081708

1709+
def _finalize_special_open(special, urlpath, mode):
1710+
if special is None:
1711+
return None
1712+
if isinstance(special, blosc2.TreeStore):
1713+
return _open_treestore_root_object(special, urlpath, mode)
1714+
return special
1715+
1716+
17091717
def open(
17101718
urlpath: str | pathlib.Path | blosc2.URLPath, mode: str = "a", offset: int = 0, **kwargs: dict
17111719
) -> (
@@ -1829,9 +1837,8 @@ def open(
18291837
# more expensive store probing when that fails.
18301838
if urlpath.endswith((".b2d", ".b2z", ".b2e")):
18311839
special = _open_special_store(urlpath, mode, offset, **kwargs)
1840+
special = _finalize_special_open(special, urlpath, mode)
18321841
if special is not None:
1833-
if isinstance(special, blosc2.TreeStore):
1834-
return _open_treestore_root_object(special, urlpath, mode)
18351842
return special
18361843

18371844
regular_exc = None
@@ -1845,11 +1852,12 @@ def open(
18451852
return process_opened_object(res)
18461853

18471854
resolved_urlpath = _resolve_store_alias(urlpath)
1848-
special_path = resolved_urlpath if resolved_urlpath != urlpath or not os.path.exists(urlpath) else urlpath
1855+
special_path = (
1856+
resolved_urlpath if resolved_urlpath != urlpath or not os.path.exists(urlpath) else urlpath
1857+
)
18491858
special = _open_special_store(special_path, mode, offset, **kwargs)
1859+
special = _finalize_special_open(special, special_path, mode)
18501860
if special is not None:
1851-
if isinstance(special, blosc2.TreeStore):
1852-
return _open_treestore_root_object(special, special_path, mode)
18531861
return special
18541862

18551863
if regular_exc is not None:

0 commit comments

Comments
 (0)