Skip to content

Commit 9d093a9

Browse files
committed
Check RELEASEGIL in SChunk.__init__, append_data, set_slice, and to_cframe
set_releasegil(True) correctly releases the GIL for compress2(), decompress2(), insert_data(), and update_data(), but had no effect on several other SChunk methods that perform CPU-bound compression. This meant the primary serialization path (pack_tensor / pack_array2) held the GIL for its entire duration. Apply the same pattern already used by insert_data/update_data: split blosc2_schunk_append_buffer into blosc2_compress_ctx (inside `with nogil:` when RELEASEGIL is set) followed by blosc2_schunk_append_chunk (with GIL held). For to_cframe, declare blosc2_schunk_to_buffer as nogil-safe and wrap the call with the RELEASEGIL check. Benchmark result (8 Python threads, blosc2 nthreads=1): Before: pack_tensor speedup=0.9x (no scaling) After: pack_tensor speedup=5.4x (matches compress2's 5.7x)
1 parent 836e989 commit 9d093a9

1 file changed

Lines changed: 75 additions & 8 deletions

File tree

src/blosc2/blosc2_ext.pyx

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ cdef extern from "blosc2.h":
377377
blosc2_schunk *blosc2_schunk_open_offset(const char* urlpath, int64_t offset)
378378
blosc2_schunk* blosc2_schunk_open_offset_udio(const char* urlpath, int64_t offset, const blosc2_io *udio)
379379

380-
int64_t blosc2_schunk_to_buffer(blosc2_schunk* schunk, uint8_t** cframe, c_bool* needs_free)
380+
int64_t blosc2_schunk_to_buffer(blosc2_schunk* schunk, uint8_t** cframe, c_bool* needs_free) nogil
381381
void blosc2_schunk_avoid_cframe_free(blosc2_schunk *schunk, c_bool avoid_cframe_free)
382382
int64_t blosc2_schunk_to_file(blosc2_schunk* schunk, const char* urlpath)
383383
int64_t blosc2_schunk_free(blosc2_schunk *schunk)
@@ -1476,6 +1476,10 @@ cdef class SChunk:
14761476
cdef int64_t index
14771477
cdef Py_buffer buf
14781478
cdef uint8_t *buf_ptr
1479+
cdef int comp_size
1480+
cdef int32_t csize
1481+
cdef uint8_t* chunk
1482+
cdef int32_t len_chunk
14791483
if data is not None and len(data) > 0:
14801484
PyObject_GetBuffer(data, &buf, PyBUF_SIMPLE)
14811485
buf_ptr = <uint8_t *> buf.buf
@@ -1486,8 +1490,27 @@ cdef class SChunk:
14861490
if i == (nchunks - 1):
14871491
len_chunk = len_data - i * chunksize
14881492
index = i * chunksize
1489-
nchunks_ = blosc2_schunk_append_buffer(self.schunk, buf_ptr + index, len_chunk)
1493+
csize = <int32_t> (len_chunk + BLOSC2_MAX_OVERHEAD)
1494+
chunk = <uint8_t*> malloc(csize)
1495+
self.schunk.current_nchunk = i
1496+
if RELEASEGIL:
1497+
with nogil:
1498+
comp_size = blosc2_compress_ctx(self.schunk.cctx, buf_ptr + index, len_chunk, chunk, csize)
1499+
else:
1500+
comp_size = blosc2_compress_ctx(self.schunk.cctx, buf_ptr + index, len_chunk, chunk, csize)
1501+
if comp_size < 0:
1502+
free(chunk)
1503+
PyBuffer_Release(&buf)
1504+
raise RuntimeError("Could not compress the data")
1505+
elif comp_size == 0:
1506+
free(chunk)
1507+
PyBuffer_Release(&buf)
1508+
raise RuntimeError("The result could not fit")
1509+
chunk = <uint8_t*> realloc(chunk, comp_size)
1510+
_check_comp_length('chunk', comp_size)
1511+
nchunks_ = blosc2_schunk_append_chunk(self.schunk, chunk, False)
14901512
if nchunks_ != (i + 1):
1513+
free(chunk)
14911514
PyBuffer_Release(&buf)
14921515
raise RuntimeError("An error occurred while appending the chunks")
14931516
PyBuffer_Release(&buf)
@@ -1622,10 +1645,28 @@ cdef class SChunk:
16221645
def append_data(self, data):
16231646
cdef Py_buffer buf
16241647
PyObject_GetBuffer(data, &buf, PyBUF_SIMPLE)
1625-
rc = blosc2_schunk_append_buffer(self.schunk, buf.buf, <int32_t> buf.len)
1648+
cdef int size
1649+
cdef int32_t len_chunk = <int32_t> (buf.len + BLOSC2_MAX_OVERHEAD)
1650+
cdef uint8_t* chunk = <uint8_t*> malloc(len_chunk)
1651+
self.schunk.current_nchunk = self.schunk.nchunks
1652+
if RELEASEGIL:
1653+
with nogil:
1654+
size = blosc2_compress_ctx(self.schunk.cctx, buf.buf, <int32_t> buf.len, chunk, len_chunk)
1655+
else:
1656+
size = blosc2_compress_ctx(self.schunk.cctx, buf.buf, <int32_t> buf.len, chunk, len_chunk)
16261657
PyBuffer_Release(&buf)
1658+
if size < 0:
1659+
free(chunk)
1660+
raise RuntimeError("Could not compress the data")
1661+
elif size == 0:
1662+
free(chunk)
1663+
raise RuntimeError("The result could not fit")
1664+
chunk = <uint8_t*> realloc(chunk, size)
1665+
_check_comp_length('chunk', size)
1666+
rc = blosc2_schunk_append_chunk(self.schunk, chunk, False)
16271667
if rc < 0:
1628-
raise RuntimeError("Could not append the buffer")
1668+
free(chunk)
1669+
raise RuntimeError("Could not append the chunk")
16291670
return rc
16301671

16311672
def fill_special(self, nitems, special_value, value):
@@ -1899,6 +1940,10 @@ cdef class SChunk:
18991940
cdef int64_t data_start
19001941
cdef uint8_t *data
19011942
cdef uint8_t *chunk
1943+
cdef int32_t alloc_len
1944+
cdef int32_t chunk_nbytes
1945+
cdef int32_t chunksize
1946+
cdef int comp_rc
19021947
if buf.len < nbytes:
19031948
raise ValueError("Not enough data for writing the slice")
19041949

@@ -1922,9 +1967,13 @@ cdef class SChunk:
19221967
data_start = self.schunk.nbytes - (self.schunk.nchunks - 1) * self.schunk.chunksize
19231968
memcpy(data + data_start, buf_ptr + buf_pos, nbytes_copy)
19241969
chunk = <uint8_t *> malloc(chunk_nbytes + BLOSC2_MAX_OVERHEAD)
1925-
rc = blosc2_compress_ctx(self.schunk.cctx, data, chunk_nbytes, chunk, chunk_nbytes + BLOSC2_MAX_OVERHEAD)
1970+
if RELEASEGIL:
1971+
with nogil:
1972+
comp_rc = blosc2_compress_ctx(self.schunk.cctx, data, chunk_nbytes, chunk, chunk_nbytes + BLOSC2_MAX_OVERHEAD)
1973+
else:
1974+
comp_rc = blosc2_compress_ctx(self.schunk.cctx, data, chunk_nbytes, chunk, chunk_nbytes + BLOSC2_MAX_OVERHEAD)
19261975
free(data)
1927-
if rc < 0:
1976+
if comp_rc < 0:
19281977
free(chunk)
19291978
raise RuntimeError("Error while compressing the data")
19301979
rc = blosc2_schunk_update_chunk(self.schunk, self.schunk.nchunks - 1, chunk, True)
@@ -1942,8 +1991,21 @@ cdef class SChunk:
19421991
chunksize = self.schunk.chunksize
19431992
else:
19441993
chunksize = (stop * self.schunk.typesize) % self.schunk.chunksize
1945-
rc = blosc2_schunk_append_buffer(self.schunk, buf_ptr + buf_pos, chunksize)
1994+
alloc_len = <int32_t> (chunksize + BLOSC2_MAX_OVERHEAD)
1995+
chunk = <uint8_t*> malloc(alloc_len)
1996+
self.schunk.current_nchunk = self.schunk.nchunks
1997+
if RELEASEGIL:
1998+
with nogil:
1999+
comp_rc = blosc2_compress_ctx(self.schunk.cctx, buf_ptr + buf_pos, chunksize, chunk, alloc_len)
2000+
else:
2001+
comp_rc = blosc2_compress_ctx(self.schunk.cctx, buf_ptr + buf_pos, chunksize, chunk, alloc_len)
2002+
if comp_rc < 0:
2003+
free(chunk)
2004+
raise RuntimeError("Error while compressing the chunk")
2005+
chunk = <uint8_t*> realloc(chunk, comp_rc)
2006+
rc = blosc2_schunk_append_chunk(self.schunk, chunk, False)
19462007
if rc < 0:
2008+
free(chunk)
19472009
raise RuntimeError("Error while appending the chunk")
19482010
buf_pos += chunksize
19492011
else:
@@ -1955,7 +2017,12 @@ cdef class SChunk:
19552017
def to_cframe(self):
19562018
cdef c_bool needs_free
19572019
cdef uint8_t *cframe
1958-
cframe_len = blosc2_schunk_to_buffer(self.schunk, &cframe, &needs_free)
2020+
cdef int64_t cframe_len
2021+
if RELEASEGIL:
2022+
with nogil:
2023+
cframe_len = blosc2_schunk_to_buffer(self.schunk, &cframe, &needs_free)
2024+
else:
2025+
cframe_len = blosc2_schunk_to_buffer(self.schunk, &cframe, &needs_free)
19592026
if cframe_len < 0:
19602027
raise RuntimeError("Error while getting the cframe")
19612028
out = PyBytes_FromStringAndSize(<char*>cframe, cframe_len)

0 commit comments

Comments
 (0)