diff --git a/.sampo/changesets/steadfast-baroness-vainamoinen.md b/.sampo/changesets/steadfast-baroness-vainamoinen.md new file mode 100644 index 00000000..0325e552 --- /dev/null +++ b/.sampo/changesets/steadfast-baroness-vainamoinen.md @@ -0,0 +1,5 @@ +--- +pypi/posthog: minor +--- + +Add a default timeout for flushing queued events. diff --git a/posthog/__init__.py b/posthog/__init__.py index 9f0b4893..e645fe7b 100644 --- a/posthog/__init__.py +++ b/posthog/__init__.py @@ -1026,10 +1026,14 @@ def load_feature_flags(): return _proxy("load_feature_flags") -def flush() -> None: +def flush(timeout_seconds: Optional[float] = 10) -> None: """ Tell the client to flush all queued events. + Args: + timeout_seconds: Maximum seconds to wait for the queue to flush. + Defaults to 10 seconds. Pass ``None`` to wait indefinitely. + Examples: ```python from posthog import flush @@ -1039,7 +1043,7 @@ def flush() -> None: Category: Client management """ - _proxy("flush") + _proxy("flush", timeout_seconds=timeout_seconds) def join() -> None: diff --git a/posthog/client.py b/posthog/client.py index 800cc577..1622b67d 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -5,6 +5,7 @@ import os import sys import threading +import time import warnings import weakref from datetime import datetime, timedelta, timezone @@ -1424,10 +1425,14 @@ def _enqueue(self, msg, disable_geoip): self.log.warning("analytics-python queue is full") return None - def flush(self) -> None: + def flush(self, timeout_seconds: Optional[float] = 10) -> None: """ Force a flush from the internal queue to the server. Do not use directly, call `shutdown()` instead. + Args: + timeout_seconds: Maximum seconds to wait for the queue to flush. + Defaults to 10 seconds. Pass ``None`` to wait indefinitely. + Examples: ```python posthog.capture('event_name') @@ -1436,7 +1441,26 @@ def flush(self) -> None: """ queue = self.queue size = queue.qsize() - queue.join() + try: + if timeout_seconds is None: + queue.join() + else: + deadline = time.monotonic() + timeout_seconds + with queue.all_tasks_done: + while queue.unfinished_tasks: + remaining = deadline - time.monotonic() + if remaining <= 0: + self.log.warning( + "flush timed out after %s seconds with %s items pending.", + timeout_seconds, + queue.unfinished_tasks, + ) + return + queue.all_tasks_done.wait(remaining) + except Exception as e: + self.log.exception("error flushing queue: %s", e) + return + # Note that this message may not be precise, because of threading. self.log.debug("successfully flushed about %s items.", size) @@ -1473,7 +1497,7 @@ def shutdown(self) -> None: posthog.shutdown() ``` """ - self.flush() + self.flush(timeout_seconds=None) self.join() if self.exception_capture: diff --git a/posthog/test/test_client.py b/posthog/test/test_client.py index e9d7054c..aeb17741 100644 --- a/posthog/test/test_client.py +++ b/posthog/test/test_client.py @@ -141,6 +141,38 @@ def test_client_flag_helpers_return_defaults_on_api_error(self, patch_flags): def test_empty_flush(self): self.client.flush() + def test_flush_timeout_returns_when_queue_does_not_drain(self): + client = Client(FAKE_TEST_API_KEY, send=False, thread=0) + client.queue.put({"event": "stuck"}) + + start = time.monotonic() + with self.assertLogs("posthog", level="WARNING") as logs: + client.flush(timeout_seconds=0.01) + + self.assertLess(time.monotonic() - start, 1) + self.assertFalse(client.queue.empty()) + self.assertIn("flush timed out", logs.output[0]) + + client.queue.get_nowait() + client.queue.task_done() + + def test_flush_logs_and_returns_on_unexpected_error(self): + client = Client(FAKE_TEST_API_KEY, send=False, thread=0) + client.queue.put({"event": "stuck"}) + + with mock.patch.object( + client.queue.all_tasks_done, + "wait", + side_effect=RuntimeError("boom"), + ): + with self.assertLogs("posthog", level="ERROR") as logs: + client.flush(timeout_seconds=1) + + self.assertIn("error flushing queue", logs.output[0]) + + client.queue.get_nowait() + client.queue.task_done() + def test_basic_capture(self): with mock.patch("posthog.client.batch_post") as mock_post: client = Client(FAKE_TEST_API_KEY, on_error=self.set_fail, sync_mode=True) @@ -1835,6 +1867,14 @@ def test_shutdown(self): for consumer in client.consumers: self.assertFalse(consumer.is_alive()) + def test_shutdown_flushes_without_timeout(self): + client = Client(FAKE_TEST_API_KEY, send=False, thread=0) + + with mock.patch.object(client, "flush") as mock_flush: + client.shutdown() + + mock_flush.assert_called_once_with(timeout_seconds=None) + def test_synchronous(self): with mock.patch("posthog.client.batch_post") as mock_post: client = Client(FAKE_TEST_API_KEY, sync_mode=True) diff --git a/posthog/test/test_module.py b/posthog/test/test_module.py index b9dea8c5..ec3031c4 100644 --- a/posthog/test/test_module.py +++ b/posthog/test/test_module.py @@ -35,6 +35,12 @@ def test_alias(self): def test_flush(self): self.posthog.flush() + def test_module_flush_forwards_timeout(self): + with mock.patch.object(posthog, "_proxy") as proxy: + posthog.flush(timeout_seconds=1.5) + + proxy.assert_called_once_with("flush", timeout_seconds=1.5) + class TestModuleLevelSetup(unittest.TestCase): def setUp(self): diff --git a/references/public_api_snapshot.txt b/references/public_api_snapshot.txt index 2ba8a253..a86983b8 100644 --- a/references/public_api_snapshot.txt +++ b/references/public_api_snapshot.txt @@ -926,7 +926,7 @@ function posthog.feature_flags.parse_semver(value: str) -> tuple function posthog.feature_flags.relative_date_parse_for_feature_flag_matching(value: str) -> Optional[datetime.datetime] function posthog.feature_flags.resolve_bucketing_value(flag, distinct_id, device_id=None) function posthog.feature_flags.variant_lookup_table(feature_flag) -function posthog.flush() -> None +function posthog.flush(timeout_seconds: Optional[float] = 10) -> None function posthog.get_all_flags(distinct_id, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, device_id=None, flag_keys_to_evaluate=None) -> Optional[dict[str, FeatureFlag]] function posthog.get_all_flags_and_payloads(distinct_id, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, device_id=None, flag_keys_to_evaluate=None) -> FlagsAndPayloads function posthog.get_feature_flag(key, distinct_id, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, device_id=None) -> Optional[FeatureFlag] @@ -1048,7 +1048,7 @@ method posthog.client.Client.capture_exception(exception: Optional[ExceptionArg] method posthog.client.Client.evaluate_flags(distinct_id: Optional[ID_TYPES] = None, *, groups: Optional[Dict[str, str]] = None, person_properties: Optional[Dict[str, Any]] = None, group_properties: Optional[Dict[str, Dict[str, Any]]] = None, only_evaluate_locally: bool = False, disable_geoip: Optional[bool] = None, flag_keys: Optional[List[str]] = None, device_id: Optional[str] = None) -> FeatureFlagEvaluations method posthog.client.Client.feature_enabled(key, distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, device_id: Optional[str] = None) method posthog.client.Client.feature_flag_definitions() -method posthog.client.Client.flush() -> None +method posthog.client.Client.flush(timeout_seconds: Optional[float] = 10) -> None method posthog.client.Client.get_all_flags(distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, flag_keys_to_evaluate: Optional[list[str]] = None, device_id: Optional[str] = None) -> Optional[dict[str, Union[bool, str]]] method posthog.client.Client.get_all_flags_and_payloads(distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, flag_keys_to_evaluate: Optional[list[str]] = None, device_id: Optional[str] = None) -> FlagsAndPayloads method posthog.client.Client.get_feature_flag(key, distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, device_id: Optional[str] = None) -> Optional[FlagValue]