Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .sampo/changesets/steadfast-baroness-vainamoinen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
pypi/posthog: minor
---

Add a default timeout for flushing queued events.
8 changes: 6 additions & 2 deletions posthog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,10 +1026,14 @@ def load_feature_flags():
return _proxy("load_feature_flags")


def flush() -> None:
def flush(timeout_seconds: Optional[float] = 10) -> None:
"""
Tell the client to flush all queued events.

Args:
timeout_seconds: Maximum seconds to wait for the queue to flush.
Defaults to 10 seconds. Pass ``None`` to wait indefinitely.

Examples:
```python
from posthog import flush
Expand All @@ -1039,7 +1043,7 @@ def flush() -> None:
Category:
Client management
"""
_proxy("flush")
_proxy("flush", timeout_seconds=timeout_seconds)


def join() -> None:
Expand Down
30 changes: 27 additions & 3 deletions posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import sys
import threading
import time
import warnings
import weakref
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -1424,10 +1425,14 @@ def _enqueue(self, msg, disable_geoip):
self.log.warning("analytics-python queue is full")
return None

def flush(self) -> None:
def flush(self, timeout_seconds: Optional[float] = 10) -> None:
"""
Force a flush from the internal queue to the server. Do not use directly, call `shutdown()` instead.

Args:
timeout_seconds: Maximum seconds to wait for the queue to flush.
Defaults to 10 seconds. Pass ``None`` to wait indefinitely.

Examples:
```python
posthog.capture('event_name')
Expand All @@ -1436,7 +1441,26 @@ def flush(self) -> None:
"""
queue = self.queue
size = queue.qsize()
queue.join()
try:
if timeout_seconds is None:
queue.join()
else:
deadline = time.monotonic() + timeout_seconds
with queue.all_tasks_done:
while queue.unfinished_tasks:
remaining = deadline - time.monotonic()
if remaining <= 0:
self.log.warning(
"flush timed out after %s seconds with %s items pending.",
timeout_seconds,
queue.unfinished_tasks,
)
return
queue.all_tasks_done.wait(remaining)
except Exception as e:
self.log.exception("error flushing queue: %s", e)
return

# Note that this message may not be precise, because of threading.
self.log.debug("successfully flushed about %s items.", size)

Expand Down Expand Up @@ -1473,7 +1497,7 @@ def shutdown(self) -> None:
posthog.shutdown()
```
"""
self.flush()
self.flush(timeout_seconds=None)
self.join()

if self.exception_capture:
Expand Down
40 changes: 40 additions & 0 deletions posthog/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,38 @@ def test_client_flag_helpers_return_defaults_on_api_error(self, patch_flags):
def test_empty_flush(self):
self.client.flush()

def test_flush_timeout_returns_when_queue_does_not_drain(self):
client = Client(FAKE_TEST_API_KEY, send=False, thread=0)
client.queue.put({"event": "stuck"})

start = time.monotonic()
with self.assertLogs("posthog", level="WARNING") as logs:
client.flush(timeout_seconds=0.01)

self.assertLess(time.monotonic() - start, 1)
self.assertFalse(client.queue.empty())
self.assertIn("flush timed out", logs.output[0])

client.queue.get_nowait()
client.queue.task_done()
Comment thread
marandaneto marked this conversation as resolved.

def test_flush_logs_and_returns_on_unexpected_error(self):
client = Client(FAKE_TEST_API_KEY, send=False, thread=0)
client.queue.put({"event": "stuck"})

with mock.patch.object(
client.queue.all_tasks_done,
"wait",
side_effect=RuntimeError("boom"),
):
with self.assertLogs("posthog", level="ERROR") as logs:
client.flush(timeout_seconds=1)

self.assertIn("error flushing queue", logs.output[0])

client.queue.get_nowait()
client.queue.task_done()

def test_basic_capture(self):
with mock.patch("posthog.client.batch_post") as mock_post:
client = Client(FAKE_TEST_API_KEY, on_error=self.set_fail, sync_mode=True)
Expand Down Expand Up @@ -1835,6 +1867,14 @@ def test_shutdown(self):
for consumer in client.consumers:
self.assertFalse(consumer.is_alive())

def test_shutdown_flushes_without_timeout(self):
client = Client(FAKE_TEST_API_KEY, send=False, thread=0)

with mock.patch.object(client, "flush") as mock_flush:
client.shutdown()

mock_flush.assert_called_once_with(timeout_seconds=None)

def test_synchronous(self):
with mock.patch("posthog.client.batch_post") as mock_post:
client = Client(FAKE_TEST_API_KEY, sync_mode=True)
Expand Down
6 changes: 6 additions & 0 deletions posthog/test/test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ def test_alias(self):
def test_flush(self):
self.posthog.flush()

def test_module_flush_forwards_timeout(self):
with mock.patch.object(posthog, "_proxy") as proxy:
posthog.flush(timeout_seconds=1.5)

proxy.assert_called_once_with("flush", timeout_seconds=1.5)


class TestModuleLevelSetup(unittest.TestCase):
def setUp(self):
Expand Down
4 changes: 2 additions & 2 deletions references/public_api_snapshot.txt
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ function posthog.feature_flags.parse_semver(value: str) -> tuple
function posthog.feature_flags.relative_date_parse_for_feature_flag_matching(value: str) -> Optional[datetime.datetime]
function posthog.feature_flags.resolve_bucketing_value(flag, distinct_id, device_id=None)
function posthog.feature_flags.variant_lookup_table(feature_flag)
function posthog.flush() -> None
function posthog.flush(timeout_seconds: Optional[float] = 10) -> None
function posthog.get_all_flags(distinct_id, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, device_id=None, flag_keys_to_evaluate=None) -> Optional[dict[str, FeatureFlag]]
function posthog.get_all_flags_and_payloads(distinct_id, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, device_id=None, flag_keys_to_evaluate=None) -> FlagsAndPayloads
function posthog.get_feature_flag(key, distinct_id, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, device_id=None) -> Optional[FeatureFlag]
Expand Down Expand Up @@ -1048,7 +1048,7 @@ method posthog.client.Client.capture_exception(exception: Optional[ExceptionArg]
method posthog.client.Client.evaluate_flags(distinct_id: Optional[ID_TYPES] = None, *, groups: Optional[Dict[str, str]] = None, person_properties: Optional[Dict[str, Any]] = None, group_properties: Optional[Dict[str, Dict[str, Any]]] = None, only_evaluate_locally: bool = False, disable_geoip: Optional[bool] = None, flag_keys: Optional[List[str]] = None, device_id: Optional[str] = None) -> FeatureFlagEvaluations
method posthog.client.Client.feature_enabled(key, distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, device_id: Optional[str] = None)
method posthog.client.Client.feature_flag_definitions()
method posthog.client.Client.flush() -> None
method posthog.client.Client.flush(timeout_seconds: Optional[float] = 10) -> None
method posthog.client.Client.get_all_flags(distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, flag_keys_to_evaluate: Optional[list[str]] = None, device_id: Optional[str] = None) -> Optional[dict[str, Union[bool, str]]]
method posthog.client.Client.get_all_flags_and_payloads(distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, disable_geoip=None, flag_keys_to_evaluate: Optional[list[str]] = None, device_id: Optional[str] = None) -> FlagsAndPayloads
method posthog.client.Client.get_feature_flag(key, distinct_id, *, groups=None, person_properties=None, group_properties=None, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, device_id: Optional[str] = None) -> Optional[FlagValue]
Expand Down
Loading