Skip to content

Commit 69579f4

Browse files
committed
Enabled miniexpr for on-disk operands too
1 parent 9f4a1d5 commit 69579f4

3 files changed

Lines changed: 56 additions & 9 deletions

File tree

src/blosc2/blosc2_ext.pyx

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -378,9 +378,9 @@ cdef extern from "blosc2.h":
378378
int blosc2_schunk_decompress_chunk(blosc2_schunk *schunk, int64_t nchunk, void *dest, int32_t nbytes)
379379

380380
int blosc2_schunk_get_chunk(blosc2_schunk *schunk, int64_t nchunk, uint8_t ** chunk,
381-
c_bool *needs_free)
381+
c_bool *needs_free) nogil
382382
int blosc2_schunk_get_lazychunk(blosc2_schunk *schunk, int64_t nchunk, uint8_t ** chunk,
383-
c_bool *needs_free)
383+
c_bool *needs_free) nogil
384384
int blosc2_schunk_get_slice_buffer(blosc2_schunk *schunk, int64_t start, int64_t stop, void *buffer)
385385
int blosc2_schunk_set_slice_buffer(blosc2_schunk *schunk, int64_t start, int64_t stop, void *buffer)
386386
int blosc2_schunk_get_cparams(blosc2_schunk *schunk, blosc2_cparams** cparams)
@@ -616,6 +616,13 @@ cdef extern from "miniexpr.h":
616616
cdef extern from "miniexpr_numpy.h":
617617
me_dtype me_dtype_from_numpy(int numpy_type_num)
618618

619+
cdef extern from "pythread.h":
620+
ctypedef void* PyThread_type_lock
621+
PyThread_type_lock PyThread_allocate_lock() nogil
622+
int PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) nogil
623+
void PyThread_release_lock(PyThread_type_lock lock) nogil
624+
void PyThread_free_lock(PyThread_type_lock lock) nogil
625+
619626

620627
ctypedef struct user_filters_udata:
621628
char* py_func
@@ -666,9 +673,14 @@ cdef _check_comp_length(comp_name, comp_len):
666673

667674

668675
blosc2_init()
676+
cdef PyThread_type_lock chunk_cache_lock = PyThread_allocate_lock()
677+
if chunk_cache_lock == NULL:
678+
raise MemoryError("Could not allocate chunk cache lock")
669679

670680
@atexit.register
671681
def destroy():
682+
if chunk_cache_lock != NULL:
683+
PyThread_free_lock(chunk_cache_lock)
672684
blosc2_destroy()
673685

674686

@@ -1799,6 +1811,11 @@ cdef class SChunk:
17991811
me_data = <me_udata*>self.schunk.storage.cparams.preparams.user_data
18001812
if me_data != NULL:
18011813
if me_data.inputs != NULL:
1814+
for i in range(me_data.ninputs):
1815+
if me_data.inputs[i].chunk_cache.data != NULL:
1816+
free(me_data.inputs[i].chunk_cache.data)
1817+
me_data.inputs[i].chunk_cache.data = NULL
1818+
me_data.inputs[i].chunk_cache.nchunk = -1
18021819
free(me_data.inputs)
18031820
if me_data.miniexpr_handle != NULL: # XXX do we really need the conditional?
18041821
me_free(me_data.miniexpr_handle)
@@ -1897,7 +1914,9 @@ cdef int aux_miniexpr(me_udata *udata, int64_t nchunk, int32_t nblock,
18971914
cdef int rc
18981915
cdef void** input_buffers = <void**> malloc(udata.ninputs * sizeof(uint8_t*))
18991916
cdef float *buf
1900-
cdef void* src
1917+
cdef uint8_t* src
1918+
cdef uint8_t* chunk
1919+
cdef c_bool needs_free
19011920
cdef int32_t chunk_nbytes, chunk_cbytes, block_nbytes
19021921
cdef int start, blocknitems, expected_blocknitems
19031922
cdef int64_t valid_nitems
@@ -1932,7 +1951,32 @@ cdef int aux_miniexpr(me_udata *udata, int64_t nchunk, int32_t nblock,
19321951
for j in range(ndarr.blocknitems):
19331952
buf[j] = 1.
19341953
else:
1935-
src = ndarr.sc.data[nchunk]
1954+
if ndarr.sc.storage.urlpath == NULL:
1955+
src = ndarr.sc.data[nchunk]
1956+
else:
1957+
# We need to get the chunk from disk/network
1958+
if ndarr.chunk_cache.nchunk != nchunk:
1959+
PyThread_acquire_lock(chunk_cache_lock, 1)
1960+
if ndarr.chunk_cache.nchunk != nchunk:
1961+
if ndarr.chunk_cache.data != NULL:
1962+
free(ndarr.chunk_cache.data)
1963+
ndarr.chunk_cache.data = NULL
1964+
rc = blosc2_schunk_get_chunk(ndarr.sc, nchunk, &chunk, &needs_free)
1965+
if rc < 0:
1966+
PyThread_release_lock(chunk_cache_lock)
1967+
raise ValueError("miniexpr: error getting chunk")
1968+
if not needs_free:
1969+
src = <uint8_t*> malloc(rc)
1970+
if src == NULL:
1971+
PyThread_release_lock(chunk_cache_lock)
1972+
raise MemoryError("miniexpr: cannot allocate chunk copy")
1973+
memcpy(src, chunk, rc)
1974+
else:
1975+
src = chunk
1976+
ndarr.chunk_cache.data = src
1977+
ndarr.chunk_cache.nchunk = nchunk
1978+
PyThread_release_lock(chunk_cache_lock)
1979+
src = ndarr.chunk_cache.data
19361980
rc = blosc2_cbuffer_sizes(src, &chunk_nbytes, &chunk_cbytes, &block_nbytes)
19371981
if rc < 0:
19381982
raise ValueError("miniexpr: error getting cbuffer sizes")
@@ -2888,6 +2932,8 @@ cdef class NDArray:
28882932
cdef b2nd_array_t** inputs_ = <b2nd_array_t**> malloc(ninputs * sizeof(b2nd_array_t*))
28892933
for i, operand in enumerate(operands):
28902934
inputs_[i] = <b2nd_array_t*><uintptr_t>operand.c_array
2935+
inputs_[i].chunk_cache.nchunk = -1
2936+
inputs_[i].chunk_cache.data = NULL
28912937
udata.inputs = inputs_
28922938
udata.ninputs = ninputs
28932939
udata.array = self.array

src/blosc2/lazyexpr.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,7 +1290,7 @@ def fast_eval( # noqa: C901
12901290
same_blocks = all(hasattr(op, "blocks") and op.blocks == blocks for op in operands.values())
12911291
if not (same_shape and same_chunks and same_blocks):
12921292
use_miniexpr = False
1293-
if not (all_ndarray and not any_persisted and out is None):
1293+
if not (all_ndarray and out is None):
12941294
use_miniexpr = False
12951295

12961296
if use_miniexpr:
@@ -1982,7 +1982,7 @@ def reduce_slices( # noqa: C901
19821982
del temp
19831983

19841984
# miniexpr reduction path only supported for some cases so far
1985-
if not (where is None and fast_path and all_ndarray and not any_persisted and reduced_shape == ()):
1985+
if not (where is None and fast_path and all_ndarray and reduced_shape == ()):
19861986
use_miniexpr = False
19871987

19881988
# Some reductions are not supported yet in miniexpr

tests/ndarray/test_lazyexpr.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -703,17 +703,18 @@ def test_save_functions(function, dtype_fixture, shape_fixture):
703703
expr_string = f"{function}(na1)"
704704
res_numexpr = ne_evaluate(expr_string)
705705
# Compare the results
706-
np.testing.assert_allclose(res_lazyexpr[:], res_numexpr)
706+
rtol = 1e-6 if dtype_fixture == np.float32 else 1e-15
707+
np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=rtol)
707708

708709
expr_string = f"blosc2.{function}(a1)"
709710
expr = eval(expr_string, {"a1": a1, "blosc2": blosc2})
710711
expr.save(urlpath=urlpath_save)
711712
res_lazyexpr = expr.compute()
712-
np.testing.assert_allclose(res_lazyexpr[:], res_numexpr)
713+
np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=rtol)
713714

714715
expr = blosc2.open(urlpath_save)
715716
res_lazyexpr = expr.compute()
716-
np.testing.assert_allclose(res_lazyexpr[:], res_numexpr)
717+
np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=rtol)
717718

718719
for urlpath in [urlpath_op, urlpath_save]:
719720
blosc2.remove_urlpath(urlpath)

0 commit comments

Comments
 (0)