Skip to content

Commit e5146f3

Browse files
Andrewtangtangdpkp
authored andcommitted
Fix zstd multi-frame decompression failure (#2717)
1 parent 29faf60 commit e5146f3

2 files changed

Lines changed: 13 additions & 4 deletions

File tree

kafka/codec.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,5 @@ def zstd_encode(payload):
327327
def zstd_decode(payload):
328328
if not zstd:
329329
raise NotImplementedError("Zstd codec is not available")
330-
try:
331-
return zstd.ZstdDecompressor().decompress(payload)
332-
except zstd.ZstdError:
333-
return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)
330+
with zstd.ZstdDecompressor().stream_reader(io.BytesIO(payload), read_across_frames=True) as reader:
331+
return reader.read()

test/test_codec.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,14 @@ def test_zstd():
124124
b1 = random_string(100).encode('utf-8')
125125
b2 = zstd_decode(zstd_encode(b1))
126126
assert b1 == b2
127+
128+
129+
@pytest.mark.skipif(not has_zstd(), reason="Zstd not available")
130+
def test_zstd_multi_frame():
131+
"""Test that zstd_decode handles multiple concatenated zstd frames."""
132+
frame1_data = b'some payload data ' * 100
133+
frame2_data = b'another frame of data ' * 100
134+
# Concatenate two independently compressed zstd frames
135+
multi_frame_payload = zstd_encode(frame1_data) + zstd_encode(frame2_data)
136+
result = zstd_decode(multi_frame_payload)
137+
assert result == frame1_data + frame2_data

0 commit comments

Comments
 (0)