From 2569efa562957d422a2bcbd331a488e9a913d775 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Fri, 19 Jun 2026 15:36:54 +0200 Subject: [PATCH 1/3] fix: add flush timeout --- .../steadfast-baroness-vainamoinen.md | 5 ++++ posthog/__init__.py | 8 +++++-- posthog/client.py | 24 +++++++++++++++++-- posthog/test/test_client.py | 15 ++++++++++++ posthog/test/test_module.py | 6 +++++ 5 files changed, 54 insertions(+), 4 deletions(-) create mode 100644 .sampo/changesets/steadfast-baroness-vainamoinen.md diff --git a/.sampo/changesets/steadfast-baroness-vainamoinen.md b/.sampo/changesets/steadfast-baroness-vainamoinen.md new file mode 100644 index 00000000..28641662 --- /dev/null +++ b/.sampo/changesets/steadfast-baroness-vainamoinen.md @@ -0,0 +1,5 @@ +--- +pypi/posthog: patch +--- + +Add a default timeout for flushing queued events. diff --git a/posthog/__init__.py b/posthog/__init__.py index 9f0b4893..e645fe7b 100644 --- a/posthog/__init__.py +++ b/posthog/__init__.py @@ -1026,10 +1026,14 @@ def load_feature_flags(): return _proxy("load_feature_flags") -def flush() -> None: +def flush(timeout_seconds: Optional[float] = 10) -> None: """ Tell the client to flush all queued events. + Args: + timeout_seconds: Maximum seconds to wait for the queue to flush. + Defaults to 10 seconds. Pass ``None`` to wait indefinitely. + Examples: ```python from posthog import flush @@ -1039,7 +1043,7 @@ def flush() -> None: Category: Client management """ - _proxy("flush") + _proxy("flush", timeout_seconds=timeout_seconds) def join() -> None: diff --git a/posthog/client.py b/posthog/client.py index 800cc577..d67afa32 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -5,6 +5,7 @@ import os import sys import threading +import time import warnings import weakref from datetime import datetime, timedelta, timezone @@ -1424,10 +1425,14 @@ def _enqueue(self, msg, disable_geoip): self.log.warning("analytics-python queue is full") return None - def flush(self) -> None: + def flush(self, timeout_seconds: Optional[float] = 10) -> None: """ Force a flush from the internal queue to the server. Do not use directly, call `shutdown()` instead. + Args: + timeout_seconds: Maximum seconds to wait for the queue to flush. + Defaults to 10 seconds. Pass ``None`` to wait indefinitely. + Examples: ```python posthog.capture('event_name') @@ -1436,7 +1441,22 @@ def flush(self) -> None: """ queue = self.queue size = queue.qsize() - queue.join() + if timeout_seconds is None: + queue.join() + else: + deadline = time.monotonic() + timeout_seconds + with queue.all_tasks_done: + while queue.unfinished_tasks: + remaining = deadline - time.monotonic() + if remaining <= 0: + self.log.warning( + "flush timed out after %s seconds with %s items pending.", + timeout_seconds, + queue.unfinished_tasks, + ) + return + queue.all_tasks_done.wait(remaining) + # Note that this message may not be precise, because of threading. self.log.debug("successfully flushed about %s items.", size) diff --git a/posthog/test/test_client.py b/posthog/test/test_client.py index e9d7054c..66cc5181 100644 --- a/posthog/test/test_client.py +++ b/posthog/test/test_client.py @@ -141,6 +141,21 @@ def test_client_flag_helpers_return_defaults_on_api_error(self, patch_flags): def test_empty_flush(self): self.client.flush() + def test_flush_timeout_returns_when_queue_does_not_drain(self): + client = Client(FAKE_TEST_API_KEY, send=False) + client.queue.put({"event": "stuck"}) + + start = time.monotonic() + with self.assertLogs("posthog", level="WARNING") as logs: + client.flush(timeout_seconds=0.01) + + self.assertLess(time.monotonic() - start, 1) + self.assertFalse(client.queue.empty()) + self.assertIn("flush timed out", logs.output[0]) + + client.queue.get_nowait() + client.queue.task_done() + def test_basic_capture(self): with mock.patch("posthog.client.batch_post") as mock_post: client = Client(FAKE_TEST_API_KEY, on_error=self.set_fail, sync_mode=True) diff --git a/posthog/test/test_module.py b/posthog/test/test_module.py index b9dea8c5..ec3031c4 100644 --- a/posthog/test/test_module.py +++ b/posthog/test/test_module.py @@ -35,6 +35,12 @@ def test_alias(self): def test_flush(self): self.posthog.flush() + def test_module_flush_forwards_timeout(self): + with mock.patch.object(posthog, "_proxy") as proxy: + posthog.flush(timeout_seconds=1.5) + + proxy.assert_called_once_with("flush", timeout_seconds=1.5) + class TestModuleLevelSetup(unittest.TestCase): def setUp(self): From 7d3a956a35c4ee0eee7a6606cae90a675d2c4cb2 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Fri, 19 Jun 2026 15:43:13 +0200 Subject: [PATCH 2/3] address pr review feedback --- .../steadfast-baroness-vainamoinen.md | 2 +- posthog/client.py | 36 ++++++++++--------- posthog/test/test_client.py | 27 +++++++++++++- 3 files changed, 47 insertions(+), 18 deletions(-) diff --git a/.sampo/changesets/steadfast-baroness-vainamoinen.md b/.sampo/changesets/steadfast-baroness-vainamoinen.md index 28641662..0325e552 100644 --- a/.sampo/changesets/steadfast-baroness-vainamoinen.md +++ b/.sampo/changesets/steadfast-baroness-vainamoinen.md @@ -1,5 +1,5 @@ --- -pypi/posthog: patch +pypi/posthog: minor --- Add a default timeout for flushing queued events. diff --git a/posthog/client.py b/posthog/client.py index d67afa32..1622b67d 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -1441,21 +1441,25 @@ def flush(self, timeout_seconds: Optional[float] = 10) -> None: """ queue = self.queue size = queue.qsize() - if timeout_seconds is None: - queue.join() - else: - deadline = time.monotonic() + timeout_seconds - with queue.all_tasks_done: - while queue.unfinished_tasks: - remaining = deadline - time.monotonic() - if remaining <= 0: - self.log.warning( - "flush timed out after %s seconds with %s items pending.", - timeout_seconds, - queue.unfinished_tasks, - ) - return - queue.all_tasks_done.wait(remaining) + try: + if timeout_seconds is None: + queue.join() + else: + deadline = time.monotonic() + timeout_seconds + with queue.all_tasks_done: + while queue.unfinished_tasks: + remaining = deadline - time.monotonic() + if remaining <= 0: + self.log.warning( + "flush timed out after %s seconds with %s items pending.", + timeout_seconds, + queue.unfinished_tasks, + ) + return + queue.all_tasks_done.wait(remaining) + except Exception as e: + self.log.exception("error flushing queue: %s", e) + return # Note that this message may not be precise, because of threading. self.log.debug("successfully flushed about %s items.", size) @@ -1493,7 +1497,7 @@ def shutdown(self) -> None: posthog.shutdown() ``` """ - self.flush() + self.flush(timeout_seconds=None) self.join() if self.exception_capture: diff --git a/posthog/test/test_client.py b/posthog/test/test_client.py index 66cc5181..aeb17741 100644 --- a/posthog/test/test_client.py +++ b/posthog/test/test_client.py @@ -142,7 +142,7 @@ def test_empty_flush(self): self.client.flush() def test_flush_timeout_returns_when_queue_does_not_drain(self): - client = Client(FAKE_TEST_API_KEY, send=False) + client = Client(FAKE_TEST_API_KEY, send=False, thread=0) client.queue.put({"event": "stuck"}) start = time.monotonic() @@ -156,6 +156,23 @@ def test_flush_timeout_returns_when_queue_does_not_drain(self): client.queue.get_nowait() client.queue.task_done() + def test_flush_logs_and_returns_on_unexpected_error(self): + client = Client(FAKE_TEST_API_KEY, send=False, thread=0) + client.queue.put({"event": "stuck"}) + + with mock.patch.object( + client.queue.all_tasks_done, + "wait", + side_effect=RuntimeError("boom"), + ): + with self.assertLogs("posthog", level="ERROR") as logs: + client.flush(timeout_seconds=1) + + self.assertIn("error flushing queue", logs.output[0]) + + client.queue.get_nowait() + client.queue.task_done() + def test_basic_capture(self): with mock.patch("posthog.client.batch_post") as mock_post: client = Client(FAKE_TEST_API_KEY, on_error=self.set_fail, sync_mode=True) @@ -1850,6 +1867,14 @@ def test_shutdown(self): for consumer in client.consumers: self.assertFalse(consumer.is_alive()) + def test_shutdown_flushes_without_timeout(self): + client = Client(FAKE_TEST_API_KEY, send=False, thread=0) + + with mock.patch.object(client, "flush") as mock_flush: + client.shutdown() + + mock_flush.assert_called_once_with(timeout_seconds=None) + def test_synchronous(self): with mock.patch("posthog.client.batch_post") as mock_post: client = Client(FAKE_TEST_API_KEY, sync_mode=True) From a8e0ffd817670a6e6f9d6ad137f670dd1478b47a Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Fri, 19 Jun 2026 15:45:56 +0200 Subject: [PATCH 3/3] update public api snapshot --- references/public_api_snapshot.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/references/public_api_snapshot.txt b/references/public_api_snapshot.txt index 2ba8a253..a86983b8 100644 --- a/references/public_api_snapshot.txt +++ b/references/public_api_snapshot.txt @@ -926,7 +926,7 @@ function posthog.feature_flags.parse_semver(value: str) -> tuple function posthog.feature_flags.relative_date_parse_for_feature_flag_matching(value: str) -> Optional[datetime.datetime] function posthog.feature_flags.resolve_bucketing_value(flag, distinct_id, device_id=None) function posthog.feature_flags.variant_lookup_table(feature_flag) -function posthog.flush() -> None +function posthog.flush(timeout_seconds: Optional[float] = 10) -> None function posthog.get_all_flags(distinct_id, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, device_id=None, flag_keys_to_evaluate=None) -> Optional[dict[str, FeatureFlag]] function posthog.get_all_flags_and_payloads(distinct_id, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, device_id=None, flag_keys_to_evaluate=None) -> FlagsAndPayloads function posthog.get_feature_flag(key, distinct_id, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, device_id=None) -> Optional[FeatureFlag] @@ -1048,7 +1048,7 @@ method posthog.client.Client.capture_exception(exception: Optional[ExceptionArg] method posthog.client.Client.evaluate_flags(distinct_id: Optional[ID_TYPES] = None, *, groups: Optional[Dict[str, str]] = None, person_properties: Optional[Dict[str, Any]] = None, group_properties: Optional[Dict[str, Dict[str, Any]]] = None, only_evaluate_locally: bool = False, disable_geoip: Optional[bool] = None, flag_keys: Optional[List[str]] = None, device_id: Optional[str] = None) -> FeatureFlagEvaluations method posthog.client.Client.feature_enabled(key, distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, device_id: Optional[str] = None) method posthog.client.Client.feature_flag_definitions() -method posthog.client.Client.flush() -> None +method posthog.client.Client.flush(timeout_seconds: Optional[float] = 10) -> None method posthog.client.Client.get_all_flags(distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, flag_keys_to_evaluate: Optional[list[str]] = None, device_id: Optional[str] = None) -> Optional[dict[str, Union[bool, str]]] method posthog.client.Client.get_all_flags_and_payloads(distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, flag_keys_to_evaluate: Optional[list[str]] = None, device_id: Optional[str] = None) -> FlagsAndPayloads method posthog.client.Client.get_feature_flag(key, distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, device_id: Optional[str] = None) -> Optional[FlagValue]