Skip to content

Commit 115985e

Browse files
committed
Break reference cycles and purge stale caches in ctable layer
- _RowIndexer: use weakref to parent CTable to break reference cycles, allowing CTable instances to be collected without gen-2 GC. - indexing.py: stop caching writable sidecar handles in the process-wide _SIDECAR_HANDLE_CACHE after construction — they kept NDArray objects alive indefinitely across tests on macOS/Python 3.14. - indexing.py: add _purge_stale_persistent_caches() to evict index cache entries whose backing paths no longer exist. Call it in _load_store, _open_query_cache_store, _open_sidecar_handle, and _gather_mmap_source. - ref.py: open urlpath operands in mode='r' instead of the default mode='a'; persistent recipe operands only need read access. - test_ctable_indexing.py: add tests verifying that sidecar handles are not cached after index creation, that CTables release without explicit GC, and that stale persistent caches are purged correctly.
1 parent 7defba4 commit 115985e

4 files changed

Lines changed: 130 additions & 6 deletions

File tree

src/blosc2/ctable.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import os
1717
import pprint
1818
import shutil
19+
import weakref
1920
from collections.abc import Iterable
2021
from dataclasses import MISSING
2122
from textwrap import TextWrapper
@@ -448,10 +449,13 @@ def _find_physical_index(arr: blosc2.NDArray, logical_key: int) -> int:
448449

449450
class _RowIndexer:
450451
def __init__(self, table):
451-
self._table = table
452+
self._table_ref = weakref.ref(table)
452453

453454
def __getitem__(self, item):
454-
return self._table._run_row_logic(item)
455+
table = self._table_ref()
456+
if table is None:
457+
raise ReferenceError("owning CTable has been released")
458+
return table._run_row_logic(item)
455459

456460

457461
class _Row:

src/blosc2/indexing.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,48 @@ def _cleanup_in_memory_store(key: int) -> None:
106106
_hot_cache_clear(scope=("memory", key))
107107

108108

109+
def _persistent_cache_path_exists(path: str | int) -> bool:
110+
if not isinstance(path, str):
111+
return False
112+
path_obj = Path(path)
113+
return path_obj.exists() or path_obj.parent.exists()
114+
115+
116+
def _purge_stale_persistent_caches() -> None:
117+
stale_scopes = {
118+
key for key in _PERSISTENT_INDEXES if key[0] == "persistent" and not _persistent_cache_path_exists(key[1])
119+
}
120+
for key in stale_scopes:
121+
_PERSISTENT_INDEXES.pop(key, None)
122+
123+
stale_data_keys = {
124+
key for key in _DATA_CACHE if key[0][0] == "persistent" and not _persistent_cache_path_exists(key[0][1])
125+
}
126+
stale_scopes.update(key[0] for key in stale_data_keys)
127+
for key in stale_data_keys:
128+
_DATA_CACHE.pop(key, None)
129+
130+
stale_handle_keys = {
131+
key
132+
for key in _SIDECAR_HANDLE_CACHE
133+
if key[0][0] == "persistent" and not _persistent_cache_path_exists(key[0][1])
134+
}
135+
stale_scopes.update(key[0] for key in stale_handle_keys)
136+
for key in stale_handle_keys:
137+
_SIDECAR_HANDLE_CACHE.pop(key, None)
138+
139+
stale_query_paths = [path for path in _QUERY_CACHE_STORE_HANDLES if not Path(path).exists()]
140+
for path in stale_query_paths:
141+
_QUERY_CACHE_STORE_HANDLES.pop(path, None)
142+
143+
stale_gather_paths = [path for path in _GATHER_MMAP_HANDLES if not Path(path).exists()]
144+
for path in stale_gather_paths:
145+
_GATHER_MMAP_HANDLES.pop(path, None)
146+
147+
for scope in stale_scopes:
148+
_hot_cache_clear(scope=scope)
149+
150+
109151
@dataclass(slots=True)
110152
class IndexPlan:
111153
usable: bool
@@ -286,6 +328,7 @@ def _resolve_full_index_tmpdir(array: blosc2.NDArray, tmpdir: str | None) -> str
286328

287329

288330
def _load_store(array: blosc2.NDArray) -> dict:
331+
_purge_stale_persistent_caches()
289332
if _is_persistent_array(array):
290333
key = _array_key(array)
291334
cached = _PERSISTENT_INDEXES.get(key)
@@ -405,6 +448,7 @@ def _open_query_cache_store(array: blosc2.NDArray, *, create: bool = False):
405448
Returns ``None`` if the array is not persistent. When *create* is True the
406449
store is created if it does not yet exist.
407450
"""
451+
_purge_stale_persistent_caches()
408452
if not _is_persistent_array(array):
409453
return None
410454
path = _query_cache_payload_path(array)
@@ -920,6 +964,7 @@ def _invalidate_sidecar_cache_entries(array: blosc2.NDArray, token: str, categor
920964

921965

922966
def _open_sidecar_handle(array: blosc2.NDArray, token: str, category: str, name: str, path: str | None):
967+
_purge_stale_persistent_caches()
923968
cache_key = _sidecar_handle_cache_key(array, token, category, name)
924969
cached = _SIDECAR_HANDLE_CACHE.get(cache_key)
925970
if cached is not None:
@@ -1108,8 +1153,11 @@ def _store_array_sidecar(
11081153
kwargs["blocks"] = blocks
11091154
if cparams is not None:
11101155
kwargs["cparams"] = cparams
1156+
# Do not retain writable persistent handles in the process-wide cache.
1157+
# They keep native resources alive after index construction and can
1158+
# accumulate badly across tests on macOS/Python 3.14.
11111159
handle = blosc2.asarray(data, **kwargs)
1112-
_SIDECAR_HANDLE_CACHE[handle_cache_key] = handle
1160+
del handle
11131161
_DATA_CACHE.pop(cache_key, None)
11141162
else:
11151163
path = None
@@ -1152,10 +1200,9 @@ def _create_persistent_sidecar_handle(
11521200
kwargs["cparams"] = cparams
11531201
if length == 0:
11541202
handle = blosc2.asarray(np.empty(0, dtype=dtype), **kwargs)
1155-
_SIDECAR_HANDLE_CACHE[_sidecar_handle_cache_key(array, token, category, name)] = handle
1203+
del handle
11561204
return None, {"path": path, "dtype": dtype.descr if dtype.fields else dtype.str}
11571205
handle = blosc2.empty((length,), dtype=dtype, **kwargs)
1158-
_SIDECAR_HANDLE_CACHE[_sidecar_handle_cache_key(array, token, category, name)] = handle
11591206
return handle, {"path": path, "dtype": dtype.descr if dtype.fields else dtype.str}
11601207

11611208

@@ -4874,6 +4921,7 @@ def _gather_mmap_source(where_x):
48744921
urlpath = getattr(where_x, "urlpath", None)
48754922
if not _supports_block_reads(where_x) or urlpath is None:
48764923
return where_x
4924+
_purge_stale_persistent_caches()
48774925
urlpath = str(urlpath)
48784926
handle = _GATHER_MMAP_HANDLES.get(urlpath)
48794927
if handle is None:

src/blosc2/ref.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ def open(self):
118118
import blosc2
119119

120120
if self.kind == "urlpath":
121-
return blosc2.open(self.urlpath, mode="a")
121+
# Structured refs are used to reopen operands for persisted recipes.
122+
# Read-only access avoids allocating unnecessary writable state.
123+
return blosc2.open(self.urlpath, mode="r")
122124
if self.kind == "dictstore_key":
123125
return blosc2.DictStore(self.urlpath, mode="r")[self.key]
124126
if self.kind == "c2array":

tests/ctable/test_ctable_indexing.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import dataclasses
1111
import shutil
1212
import tempfile
13+
import weakref
1314
from pathlib import Path
1415

1516
import numpy as np
@@ -223,6 +224,33 @@ def test_create_index_persistent(tmpdir):
223224
assert sidecars, "No sidecar .b2nd files found"
224225

225226

227+
def test_create_index_persistent_does_not_cache_sidecar_handles(tmpdir):
228+
import blosc2.indexing as indexing
229+
230+
path = str(tmpdir / "table.b2d")
231+
t = _make_table(50, persistent_path=path)
232+
t.create_index("id", kind=blosc2.IndexKind.FULL)
233+
234+
cached = [
235+
key
236+
for key in indexing._SIDECAR_HANDLE_CACHE
237+
if key[0][0] == "persistent" and str(tmpdir) in key[0][1]
238+
]
239+
assert cached == []
240+
241+
242+
def test_persistent_ctable_releases_immediately_without_gc(tmpdir):
243+
path = str(tmpdir / "table.b2d")
244+
245+
def build_table():
246+
t = _make_table(50, persistent_path=path)
247+
t.create_index("id", kind=blosc2.IndexKind.FULL)
248+
return weakref.ref(t)
249+
250+
table_ref = build_table()
251+
assert table_ref() is None
252+
253+
226254
def test_catalog_survives_reopen(tmpdir):
227255
path = str(tmpdir / "table.b2d")
228256
t = _make_table(30, persistent_path=path)
@@ -250,6 +278,28 @@ def test_where_with_index_matches_scan_persistent(tmpdir):
250278
assert ids_idx == ids_scan
251279

252280

281+
def test_persistent_index_drop_releases_sidecars_without_gc(tmpdir):
282+
import gc
283+
284+
def run_query_and_drop():
285+
path = str(tmpdir / "table.b2d")
286+
t = _make_table(200, persistent_path=path)
287+
t.create_index("id")
288+
result = t.where(t["id"] > 150)
289+
ids = sorted(int(v) for v in result["id"].to_numpy())
290+
assert ids == list(range(151, 200))
291+
t.drop_index("id")
292+
293+
run_query_and_drop()
294+
295+
sidecars = [
296+
obj
297+
for obj in gc.get_objects()
298+
if isinstance(obj, blosc2.NDArray) and obj.urlpath and str(tmpdir) in obj.urlpath and "__index__" in obj.urlpath
299+
]
300+
assert sidecars == []
301+
302+
253303
def test_drop_index_persistent(tmpdir):
254304
path = str(tmpdir / "table.b2d")
255305
t = _make_table(30, persistent_path=path)
@@ -465,3 +515,23 @@ def test_indexed_ctable_b2z_double_open_append_no_corruption(tmp_path):
465515
assert t2.nrows == 50
466516
assert len(t2.indexes) == 1
467517
del t2
518+
519+
520+
def test_indexing_purges_stale_persistent_caches():
521+
import blosc2.indexing as indexing
522+
523+
with tempfile.TemporaryDirectory() as tmpdir:
524+
path = str(Path(tmpdir) / "table.b2d")
525+
t = _make_table(50, persistent_path=path)
526+
t.create_index("id")
527+
_ = t.where(t["id"] > 10)
528+
t.close()
529+
530+
assert any(tmpdir in key[1] for key in indexing._PERSISTENT_INDEXES if key[0] == "persistent")
531+
532+
indexing._purge_stale_persistent_caches()
533+
534+
assert all(tmpdir not in key[1] for key in indexing._PERSISTENT_INDEXES if key[0] == "persistent")
535+
assert all(tmpdir not in key[0][1] for key in indexing._SIDECAR_HANDLE_CACHE if key[0][0] == "persistent")
536+
assert all(tmpdir not in path for path in indexing._QUERY_CACHE_STORE_HANDLES)
537+
assert all(tmpdir not in path for path in indexing._GATHER_MMAP_HANDLES)

0 commit comments

Comments
 (0)