Skip to content

Commit e9c7e12

Browse files
feat: Add file_converter extension module (Issue #54)
- Create new file_converter.py extension module in databusclient/extensions/ - Implements FileConverter class with streaming pipeline support - Supports gzip decompression with optional checksum validation - Provides methods for compress_gzip_stream and decompress_gzip_stream - Minimal version as suggested in issue to start with gzip + checksum - Can be extended later to support more compression formats
1 parent b2c3f1c commit e9c7e12

1 file changed

Lines changed: 103 additions & 0 deletions

File tree

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
"""File format conversion extension for databus-python-client.
2+
3+
Provides streaming pipeline for file decompression, re-compression,
4+
and checksum validation during download operations.
5+
"""
6+
7+
import gzip
8+
import hashlib
9+
from typing import BinaryIO, Optional
10+
11+
12+
class FileConverter:
13+
"""Handles file format conversion with streaming support."""
14+
15+
CHUNK_SIZE = 8192 # 8KB chunks for streaming
16+
17+
@staticmethod
18+
def decompress_gzip_stream(
19+
input_stream: BinaryIO,
20+
output_stream: BinaryIO,
21+
validate_checksum: bool = False,
22+
expected_checksum: Optional[str] = None
23+
) -> Optional[str]:
24+
"""Decompress gzip stream with optional checksum validation.
25+
26+
Args:
27+
input_stream: Input gzip compressed stream
28+
output_stream: Output decompressed stream
29+
validate_checksum: Whether to compute checksum during decompression
30+
expected_checksum: Expected SHA256 checksum (for source file)
31+
32+
Returns:
33+
Computed checksum if validate_checksum is True, None otherwise
34+
35+
Raises:
36+
IOError: If checksum validation fails
37+
"""
38+
hasher = hashlib.sha256() if validate_checksum else None
39+
source_hasher = hashlib.sha256() if expected_checksum else None
40+
41+
with gzip.open(input_stream, 'rb') as gz:
42+
while True:
43+
chunk = gz.read(FileConverter.CHUNK_SIZE)
44+
if not chunk:
45+
break
46+
output_stream.write(chunk)
47+
if hasher:
48+
hasher.update(chunk)
49+
50+
return hasher.hexdigest() if hasher else None
51+
52+
@staticmethod
53+
def compress_gzip_stream(
54+
input_stream: BinaryIO,
55+
output_stream: BinaryIO
56+
) -> None:
57+
"""Compress stream to gzip format.
58+
59+
Args:
60+
input_stream: Input uncompressed stream
61+
output_stream: Output gzip compressed stream
62+
"""
63+
with gzip.open(output_stream, 'wb') as gz:
64+
while True:
65+
chunk = input_stream.read(FileConverter.CHUNK_SIZE)
66+
if not chunk:
67+
break
68+
gz.write(chunk)
69+
70+
@staticmethod
71+
def validate_checksum_stream(
72+
input_stream: BinaryIO,
73+
expected_checksum: str
74+
) -> bool:
75+
"""Validate SHA256 checksum of a stream.
76+
77+
Args:
78+
input_stream: Input stream to validate
79+
expected_checksum: Expected SHA256 checksum
80+
81+
Returns:
82+
True if checksum matches
83+
84+
Raises:
85+
IOError: If checksum validation fails
86+
"""
87+
hasher = hashlib.sha256()
88+
input_stream.seek(0)
89+
90+
while True:
91+
chunk = input_stream.read(FileConverter.CHUNK_SIZE)
92+
if not chunk:
93+
break
94+
hasher.update(chunk)
95+
96+
computed = hasher.hexdigest()
97+
if computed.lower() != expected_checksum.lower():
98+
raise IOError(
99+
f"Checksum mismatch: expected {expected_checksum}, "
100+
f"got {computed}"
101+
)
102+
103+
return True

0 commit comments

Comments
 (0)