Skip to content

Commit 503a75e

Browse files
authored
feat: add flush for batch writing (#180)
1 parent 2deea4e commit 503a75e

4 files changed

Lines changed: 183 additions & 62 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
compression for Flight queries:
1313
- `disable_grpc_compression` parameter in `InfluxDBClient3` constructor
1414
- `INFLUX_DISABLE_GRPC_COMPRESSION` environment variable support in `from_env()`
15+
1. [#180](https://github.com/InfluxCommunity/influxdb3-python/pull/180): Add `flush()` method to `InfluxDBClient3`:
16+
- Allows flushing the write buffer without closing the client when using batching mode.
17+
- Enables applications to ensure data is written before querying, while keeping the client open for further operations.
1518

1619
### Bug Fixes
1720

influxdb_client_3/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,19 @@ def get_server_version(self) -> str:
660660

661661
return version
662662

663+
def flush(self):
664+
"""
665+
Flush any buffered writes to InfluxDB without closing the client.
666+
667+
This method immediately sends all buffered data points to the server
668+
when using batching write mode. After flushing, the client remains
669+
open and ready for more writes.
670+
671+
For synchronous write mode, this is a no-op since data is written
672+
immediately.
673+
"""
674+
self._write_api.flush()
675+
663676
def close(self):
664677
"""Close the client and clean up resources."""
665678
self._write_api.close()

influxdb_client_3/write_client/client/write_api.py

Lines changed: 81 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -287,31 +287,9 @@ def __init__(self,
287287
self._retry_callback = kwargs.get('retry_callback', None)
288288

289289
if self._write_options.write_type is WriteType.batching:
290-
# Define Subject that listen incoming data and produces writes into InfluxDB
291-
self._subject = Subject()
292-
293-
self._disposable = self._subject.pipe(
294-
# Split incoming data to windows by batch_size or flush_interval
295-
ops.window_with_time_or_count(count=write_options.batch_size,
296-
timespan=timedelta(milliseconds=write_options.flush_interval)),
297-
# Map window into groups defined by 'organization', 'bucket' and 'precision'
298-
ops.flat_map(lambda window: window.pipe(
299-
# Group window by 'organization', 'bucket' and 'precision'
300-
ops.group_by(lambda batch_item: batch_item.key),
301-
# Create batch (concatenation line protocols by \n)
302-
ops.map(lambda group: group.pipe(
303-
ops.to_iterable(),
304-
ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))),
305-
ops.merge_all())),
306-
# Write data into InfluxDB (possibility to retry if its fail)
307-
ops.filter(lambda batch: batch.size > 0),
308-
ops.map(mapper=lambda batch: self._to_response(data=batch, delay=self._jitter_delay())),
309-
ops.merge_all()) \
310-
.subscribe(self._on_next, self._on_error, self._on_complete)
311-
290+
self._subject, self._disposable = self._create_batching_pipeline()
312291
else:
313-
self._subject = None
314-
self._disposable = None
292+
self._subject, self._disposable = None, None
315293

316294
if self._write_options.write_type is WriteType.asynchronous:
317295
message = """The 'WriteType.asynchronous' is deprecated and will be removed in future major version.
@@ -426,14 +404,88 @@ def write_payload(payload):
426404
return results[0]
427405
return results
428406

407+
def _create_batching_pipeline(self) -> tuple[Subject[Any], rx.abc.DisposableBase]:
408+
"""Create the batching pipeline for collecting and writing data."""
409+
# Define Subject that listen incoming data and produces writes into InfluxDB
410+
subject = Subject()
411+
412+
disposable = subject.pipe(
413+
# Split incoming data to windows by batch_size or flush_interval
414+
ops.window_with_time_or_count(count=self._write_options.batch_size,
415+
timespan=timedelta(milliseconds=self._write_options.flush_interval)),
416+
# Map window into groups defined by 'organization', 'bucket' and 'precision'
417+
ops.flat_map(lambda window: window.pipe( # type: ignore
418+
# Group window by 'organization', 'bucket' and 'precision'
419+
ops.group_by(lambda batch_item: batch_item.key), # type: ignore
420+
# Create batch (concatenation line protocols by \n)
421+
ops.map(lambda group: group.pipe( # type: ignore
422+
ops.to_iterable(),
423+
ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))), # type: ignore
424+
ops.merge_all())),
425+
# Write data into InfluxDB (possibility to retry if its fail)
426+
ops.filter(lambda batch: batch.size > 0),
427+
ops.map(mapper=lambda batch: self._to_response(data=batch, delay=self._jitter_delay())),
428+
ops.merge_all()) \
429+
.subscribe(self._on_next, self._on_error, self._on_complete)
430+
431+
return subject, disposable
432+
429433
def flush(self):
430-
"""Flush data."""
431-
# TODO
432-
pass
434+
"""
435+
Flush any buffered writes to InfluxDB without closing the client.
436+
437+
This method immediately sends all buffered data points to the server
438+
when using batching write mode. After flushing, the client remains
439+
open and ready for more writes.
440+
441+
For synchronous or asynchronous write modes, this is a no-op since
442+
data is written immediately.
443+
"""
444+
if self._write_options.write_type is not WriteType.batching:
445+
return # Nothing to flush for synchronous/asynchronous writes
446+
447+
self.close() # Close existing batching pipeline
448+
449+
# Recreate the batching pipeline for continued use
450+
self._subject, self._disposable = self._create_batching_pipeline()
433451

434452
def close(self):
435453
"""Flush data and dispose a batching buffer."""
436-
self.__del__()
454+
if self._subject is None:
455+
return # Already closed
456+
457+
self._subject.on_completed()
458+
self._subject.dispose()
459+
self._subject = None
460+
461+
"""
462+
We impose a maximum wait time to ensure that we do not cause a deadlock if the
463+
background thread has exited abnormally
464+
465+
Each iteration waits 100ms, but sleep expects the unit to be seconds so convert
466+
the maximum wait time to seconds.
467+
468+
We keep a counter of how long we've waited
469+
"""
470+
max_wait_time = self._write_options.max_close_wait / 1000
471+
waited = 0
472+
sleep_period = 0.1
473+
474+
# Wait for writing to finish
475+
while not self._disposable.is_disposed:
476+
sleep(sleep_period)
477+
waited += sleep_period
478+
479+
# Have we reached the upper limit?
480+
if waited >= max_wait_time:
481+
logger.warning(
482+
"Reached max_close_wait (%s seconds) waiting for batches to finish writing. Force closing",
483+
max_wait_time
484+
)
485+
break
486+
487+
if self._disposable:
488+
self._disposable = None
437489

438490
def __enter__(self):
439491
"""
@@ -452,40 +504,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
452504

453505
def __del__(self):
454506
"""Close WriteApi."""
455-
if self._subject:
456-
self._subject.on_completed()
457-
self._subject.dispose()
458-
self._subject = None
459-
460-
"""
461-
We impose a maximum wait time to ensure that we do not cause a deadlock if the
462-
background thread has exited abnormally
463-
464-
Each iteration waits 100ms, but sleep expects the unit to be seconds so convert
465-
the maximum wait time to seconds.
466-
467-
We keep a counter of how long we've waited
468-
"""
469-
max_wait_time = self._write_options.max_close_wait / 1000
470-
waited = 0
471-
sleep_period = 0.1
472-
473-
# Wait for writing to finish
474-
while not self._disposable.is_disposed:
475-
sleep(sleep_period)
476-
waited += sleep_period
477-
478-
# Have we reached the upper limit?
479-
if waited >= max_wait_time:
480-
logger.warning(
481-
"Reached max_close_wait (%s seconds) waiting for batches to finish writing. Force closing",
482-
max_wait_time
483-
)
484-
break
485-
486-
if self._disposable:
487-
self._disposable = None
488-
pass
507+
self.close()
489508

490509
def _write_batching(self, bucket, org, data,
491510
precision=None,

tests/test_flush.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Tests for the flush() method in InfluxDBClient3 and WriteApi."""
2+
import unittest
3+
from unittest.mock import MagicMock, patch
4+
5+
from influxdb_client_3 import InfluxDBClient3, WriteOptions, write_client_options, WriteType
6+
7+
8+
class TestFlushMethod(unittest.TestCase):
9+
"""Test cases for the flush() method."""
10+
11+
def test_flush_sends_buffered_data_and_allows_continued_writes(self):
12+
"""Test that flush() sends pending data and allows continued writes."""
13+
write_count = 0
14+
15+
def success_callback(conf, data):
16+
nonlocal write_count
17+
write_count += 1
18+
19+
write_options = WriteOptions(
20+
write_type=WriteType.batching,
21+
batch_size=1000,
22+
flush_interval=60_000,
23+
max_close_wait=5_000
24+
)
25+
26+
wc_opts = write_client_options(
27+
success_callback=success_callback,
28+
write_options=write_options
29+
)
30+
31+
with patch('influxdb_client_3.write_client.client.write_api.WriteApi._post_write') as mock_post:
32+
mock_post.return_value = MagicMock()
33+
34+
client = InfluxDBClient3(
35+
host="http://localhost:8086",
36+
token="my-token",
37+
database="my-db",
38+
write_client_options=wc_opts
39+
)
40+
41+
try:
42+
# Write data, flush, write more, flush again
43+
for i in range(5):
44+
client.write(f"test,tag=value field={i}i")
45+
client.flush()
46+
47+
for i in range(5):
48+
client.write(f"test,tag=value field={i}i")
49+
client.flush()
50+
51+
# Both batches should have been flushed
52+
self.assertEqual(2, write_count)
53+
self.assertEqual(2, mock_post.call_count)
54+
55+
# Verify that all 10 data points (5 per batch) were sent
56+
for call in mock_post.call_args_list:
57+
args, kwargs = call
58+
body = kwargs.get('body') or args[3]
59+
if isinstance(body, bytes):
60+
body = body.decode('utf-8')
61+
for i in range(5):
62+
self.assertIn(f"test,tag=value field={i}i", body)
63+
finally:
64+
client.close()
65+
66+
def test_flush_is_safe_in_synchronous_mode_and_after_close(self):
67+
"""Test that flush() doesn't crash in sync mode or after close."""
68+
# Test synchronous mode
69+
sync_opts = write_client_options(write_options=WriteOptions(write_type=WriteType.synchronous))
70+
with patch('influxdb_client_3.write_client.client.write_api.WriteApi._post_write'):
71+
client = InfluxDBClient3(host="http://localhost:8086", token="t", database="db",
72+
write_client_options=sync_opts)
73+
client.flush() # Should not raise
74+
client.close()
75+
76+
# Test flush after close in batching mode
77+
batch_opts = write_client_options(write_options=WriteOptions(write_type=WriteType.batching))
78+
with patch('influxdb_client_3.write_client.client.write_api.WriteApi._post_write'):
79+
client = InfluxDBClient3(host="http://localhost:8086", token="t", database="db",
80+
write_client_options=batch_opts)
81+
client.close()
82+
client.flush() # Should not raise
83+
84+
85+
if __name__ == '__main__':
86+
unittest.main()

0 commit comments

Comments
 (0)