Skip to content

Commit b316a07

Browse files
committed
Break reference cycles and harden async chunk reader
- FieldsAccessor: use weakref to parent NDArray and create NDField instances on access instead of eagerly. Breaks NDArray↔NDField reference cycles so gen-0/gen-1 GC can collect them, reducing thread accumulation on Python 3.14. - sync_read_chunks: wrap reader thread in try/finally with thread.join() to prevent thread leaks. Propagate exceptions from the async reader instead of silently swallowing them. Detect dead reader thread to avoid infinite wait on empty queue. - ThreadPoolExecutor: cap max_workers to min(len(arrs), nthreads) instead of os.cpu_count(), avoiding excess threads when there are few operands.
1 parent 7fbedef commit b316a07

2 files changed

Lines changed: 50 additions & 23 deletions

File tree

src/blosc2/lazyexpr.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,7 +1182,8 @@ def get_chunk(arr, info, nchunk):
11821182
async def async_read_chunks(arrs, info, queue):
11831183
loop = asyncio.get_event_loop()
11841184
shape, chunks_ = arrs[0].shape, arrs[0].chunks
1185-
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count() or 1) as executor:
1185+
max_workers = max(1, min(len(arrs), int(getattr(blosc2, "nthreads", 1) or 1)))
1186+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
11861187
my_chunk_iter = range(arrs[0].schunk.nchunks)
11871188
if len(info) == 5:
11881189
if info[-1] is not None:
@@ -1213,20 +1214,38 @@ def async_read_chunks_thread(arrs, info, queue):
12131214
def sync_read_chunks(arrs, info):
12141215
queue_size = 2 # maximum number of chunks in the queue
12151216
queue = Queue(maxsize=queue_size)
1217+
worker_exc = None
1218+
1219+
def _run_async_reader():
1220+
nonlocal worker_exc
1221+
try:
1222+
async_read_chunks_thread(arrs, info, queue)
1223+
except BaseException as exc:
1224+
worker_exc = exc
1225+
queue.put(None)
12161226

12171227
# Start the async file reading in a separate thread
1218-
thread = threading.Thread(target=async_read_chunks_thread, args=(arrs, info, queue))
1228+
thread = threading.Thread(target=_run_async_reader)
12191229
thread.start()
12201230

1221-
# Read the chunks synchronously from the queue
1222-
while True:
1223-
try:
1224-
chunks = queue.get(timeout=1) # Wait for the next chunk
1225-
if chunks is None: # End of chunks
1226-
break
1227-
yield chunks
1228-
except Empty:
1229-
continue
1231+
try:
1232+
# Read the chunks synchronously from the queue
1233+
while True:
1234+
try:
1235+
chunks = queue.get(timeout=1) # Wait for the next chunk
1236+
if chunks is None: # End of chunks
1237+
if worker_exc is not None:
1238+
raise worker_exc
1239+
break
1240+
yield chunks
1241+
except Empty:
1242+
if not thread.is_alive():
1243+
if worker_exc is not None:
1244+
raise worker_exc
1245+
break
1246+
continue
1247+
finally:
1248+
thread.join()
12301249

12311250

12321251
def read_nchunk(arrs, info):

src/blosc2/ndarray.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import builtins
1111
import inspect
1212
import math
13+
import weakref
1314
from abc import abstractmethod
1415
from collections import OrderedDict, namedtuple
1516
from collections.abc import Mapping
@@ -146,23 +147,33 @@ def __getitem__(self, key: Any) -> Any:
146147
class FieldsAccessor(Mapping):
147148
"""Read-only mapping of structured field views."""
148149

149-
def __init__(self, field_views: dict[str, Any]):
150-
self._field_views = field_views
150+
def __init__(self, ndarr, field_names: Sequence[str]):
151+
self._ndarr_ref = weakref.ref(ndarr)
152+
self._field_names = tuple(field_names)
153+
154+
def _ndarr(self):
155+
ndarr = self._ndarr_ref()
156+
if ndarr is None:
157+
raise ReferenceError("owning NDArray has been released")
158+
return ndarr
151159

152160
def __getitem__(self, key: str) -> Any:
153-
return self._field_views[key]
161+
if key not in self._field_names:
162+
raise KeyError(key)
163+
return NDField(self._ndarr(), key)
154164

155165
def __iter__(self) -> Iterator[str]:
156-
return iter(self._field_views)
166+
return iter(self._field_names)
157167

158168
def __len__(self) -> int:
159-
return len(self._field_views)
169+
return len(self._field_names)
160170

161171
def __setitem__(self, key: str, value: object) -> None:
162172
raise TypeError(f'assign through the field view, e.g. array.fields["{key}"][:] = values')
163173

164174
def copy(self) -> dict[str, Any]:
165-
return dict(self._field_views)
175+
ndarr = self._ndarr()
176+
return {field: NDField(ndarr, field) for field in self._field_names}
166177

167178
def __or__(self, other: object) -> dict[str, Any]:
168179
if not isinstance(other, Mapping):
@@ -175,7 +186,7 @@ def __ror__(self, other: object) -> dict[str, Any]:
175186
return dict(other) | self.copy()
176187

177188
def __repr__(self) -> str:
178-
return repr(self._field_views)
189+
return repr(dict(self))
179190

180191

181192
def is_documented_by(original):
@@ -3730,11 +3741,8 @@ def __init__(self, **kwargs):
37303741
base = kwargs.pop("_base", None)
37313742
super().__init__(kwargs["_array"], base=base)
37323743
# Accessor to fields
3733-
field_views = {}
3734-
if self.dtype.fields:
3735-
for field in self.dtype.fields:
3736-
field_views[field] = NDField(self, field)
3737-
self._fields = FieldsAccessor(field_views)
3744+
field_names = tuple(self.dtype.fields) if self.dtype.fields else ()
3745+
self._fields = FieldsAccessor(self, field_names)
37383746

37393747
@property
37403748
def cparams(self) -> blosc2.CParams:

0 commit comments

Comments
 (0)