From b6b2ec31b393bc2ccbdd717f520f0d9b2d7dc274 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Wed, 17 Jun 2026 17:03:54 +0200 Subject: [PATCH 1/4] feat: add configurable flush interval --- .changeset/brave-otters-flush.md | 5 + lib/Client.php | 1 + lib/PostHog.php | 1 + lib/QueueConsumer.php | 46 +++++++- test/PostHogTest.php | 178 +++++++++++++++++++++++++++++++ 5 files changed, 229 insertions(+), 2 deletions(-) create mode 100644 .changeset/brave-otters-flush.md diff --git a/.changeset/brave-otters-flush.md b/.changeset/brave-otters-flush.md new file mode 100644 index 0000000..6aa3729 --- /dev/null +++ b/.changeset/brave-otters-flush.md @@ -0,0 +1,5 @@ +--- +"posthog/posthog-php": minor +--- + +Add configurable flush interval support for queued event batching. diff --git a/lib/Client.php b/lib/Client.php index ba2c8ab..258dc07 100644 --- a/lib/Client.php +++ b/lib/Client.php @@ -118,6 +118,7 @@ class Client implements FeatureFlagEvaluationsHost * debug?: bool, * max_queue_size?: int, * batch_size?: int, + * flush_interval_seconds?: int|float, * compress_request?: bool|string, * error_handler?: callable, * filename?: string, diff --git a/lib/PostHog.php b/lib/PostHog.php index 850578c..3a87646 100644 --- a/lib/PostHog.php +++ b/lib/PostHog.php @@ -34,6 +34,7 @@ class PostHog * debug?: bool, * max_queue_size?: int, * batch_size?: int, + * flush_interval_seconds?: int|float, * compress_request?: bool|string, * error_handler?: callable, * filename?: string, diff --git a/lib/QueueConsumer.php b/lib/QueueConsumer.php index 9c5bc42..06c2761 100644 --- a/lib/QueueConsumer.php +++ b/lib/QueueConsumer.php @@ -2,9 +2,15 @@ namespace PostHog; +use Symfony\Component\Clock\Clock; + /** * Base class for consumers that batch messages before delivery. * + * PHP has no portable in-process background timer, so flush_interval_seconds is enforced + * opportunistically when another message is enqueued. Explicit flushes and + * destruction still drain pending messages immediately. + * * @internal */ abstract class QueueConsumer extends Consumer @@ -17,6 +23,8 @@ abstract class QueueConsumer extends Consumer protected $queue; protected $max_queue_size = 1000; protected $batch_size = 100; + protected $flush_interval = 5.0; + protected $flush_after = null; protected $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s protected $host = "us.i.posthog.com"; protected $compress_request = false; @@ -24,7 +32,7 @@ abstract class QueueConsumer extends Consumer /** * Store our api key and options as part of this consumer * @param string $apiKey - * @param array $options + * @param array $options Consumer options. */ public function __construct($apiKey, $options = array()) { @@ -38,6 +46,16 @@ public function __construct($apiKey, $options = array()) $this->batch_size = $options["batch_size"]; } + if (isset($options["flush_interval_seconds"])) { + $flushInterval = $options["flush_interval_seconds"]; + if (is_int($flushInterval) || is_float($flushInterval)) { + $flushInterval = (float) $flushInterval; + if ($flushInterval >= 0) { + $this->flush_interval = $flushInterval; + } + } + } + if (isset($options["maximum_backoff_duration"])) { $this->maximum_backoff_duration = (int) $options["maximum_backoff_duration"]; } @@ -117,6 +135,8 @@ public function flush() $count = count($this->queue); } + $this->resetFlushTimer(); + return $success; } @@ -133,15 +153,37 @@ public function enqueue($item) return false; } + $wasEmpty = $count === 0; $count = array_push($this->queue, $item); - if ($count >= $this->batch_size) { + if ($wasEmpty) { + $this->resetFlushTimer(); + } + + if ($this->flush_interval === 0.0 || $count >= $this->batch_size || $this->flushIntervalElapsed()) { return $this->flush(); // return ->flush() result: true on success } return true; } + private function resetFlushTimer(): void + { + $this->flush_after = count($this->queue) > 0 && $this->flush_interval > 0 + ? $this->now() + $this->flush_interval + : null; + } + + private function flushIntervalElapsed(): bool + { + return $this->flush_after !== null && $this->now() >= $this->flush_after; + } + + private function now(): float + { + return (float) Clock::get()->now()->format('U.u'); + } + /** * Given a batch of messages the method returns * a valid payload. diff --git a/test/PostHogTest.php b/test/PostHogTest.php index 130f5d4..b8a1025 100644 --- a/test/PostHogTest.php +++ b/test/PostHogTest.php @@ -11,6 +11,9 @@ use PostHog\Consumer\NoOp; use PostHog\PostHog; use PostHog\Test\Assets\MockedResponses; +use Symfony\Component\Clock\Clock; +use Symfony\Component\Clock\MockClock; +use Symfony\Component\Clock\NativeClock; class PostHogTest extends TestCase @@ -448,6 +451,181 @@ public function testCapture(): void ); } + public function testCaptureFlushesOnNextEnqueueAfterDefaultFlushInterval(): void + { + $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); + Clock::set($mockClock); + + try { + $httpClient = new MockedHttpClient("app.posthog.com"); + $client = new Client( + self::FAKE_API_KEY, + [ + "debug" => true, + "batch_size" => 100, + ], + $httpClient, + null, + false + ); + + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); + $this->assertSame([], $httpClient->calls ?? []); + + $mockClock->sleep(4); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"])); + $this->assertSame([], $httpClient->calls ?? []); + + $mockClock->sleep(1); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "three"])); + + $batchCalls = array_values(array_filter( + $httpClient->calls ?? [], + static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" + )); + $this->assertCount(1, $batchCalls); + + $payload = json_decode($batchCalls[0]["payload"], true); + $this->assertSame(["one", "two", "three"], array_column($payload["batch"], "event")); + } finally { + Clock::set(new NativeClock()); + } + } + + public function testCaptureFlushIntervalCanBeConfiguredInSeconds(): void + { + $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); + Clock::set($mockClock); + + try { + $httpClient = new MockedHttpClient("app.posthog.com"); + $client = new Client( + self::FAKE_API_KEY, + [ + "debug" => true, + "batch_size" => 100, + "flush_interval_seconds" => 1, + ], + $httpClient, + null, + false + ); + + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); + $mockClock->sleep(1); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"])); + + $batchCalls = array_values(array_filter( + $httpClient->calls ?? [], + static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" + )); + $this->assertCount(1, $batchCalls); + } finally { + Clock::set(new NativeClock()); + } + } + + public function testCaptureFlushIntervalZeroFlushesImmediately(): void + { + $httpClient = new MockedHttpClient("app.posthog.com"); + $client = new Client( + self::FAKE_API_KEY, + [ + "debug" => true, + "batch_size" => 100, + "flush_interval_seconds" => 0, + ], + $httpClient, + null, + false + ); + + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); + + $batchCalls = array_values(array_filter( + $httpClient->calls ?? [], + static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" + )); + $this->assertCount(1, $batchCalls); + + $payload = json_decode($batchCalls[0]["payload"], true); + $this->assertSame(["one"], array_column($payload["batch"], "event")); + } + + public function testNegativeCaptureFlushIntervalDefaultsToFiveSeconds(): void + { + $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); + Clock::set($mockClock); + + try { + $httpClient = new MockedHttpClient("app.posthog.com"); + $client = new Client( + self::FAKE_API_KEY, + [ + "debug" => true, + "batch_size" => 100, + "flush_interval_seconds" => -1, + ], + $httpClient, + null, + false + ); + + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); + $mockClock->sleep(4); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"])); + $this->assertSame([], $httpClient->calls ?? []); + + $mockClock->sleep(1); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "three"])); + + $batchCalls = array_values(array_filter( + $httpClient->calls ?? [], + static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" + )); + $this->assertCount(1, $batchCalls); + } finally { + Clock::set(new NativeClock()); + } + } + + public function testInvalidCaptureFlushIntervalDefaultsToFiveSeconds(): void + { + $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); + Clock::set($mockClock); + + try { + $httpClient = new MockedHttpClient("app.posthog.com"); + $client = new Client( + self::FAKE_API_KEY, + [ + "debug" => true, + "batch_size" => 100, + "flush_interval_seconds" => "1", + ], + $httpClient, + null, + false + ); + + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); + $mockClock->sleep(4); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"])); + $this->assertSame([], $httpClient->calls ?? []); + + $mockClock->sleep(1); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "three"])); + + $batchCalls = array_values(array_filter( + $httpClient->calls ?? [], + static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" + )); + $this->assertCount(1, $batchCalls); + } finally { + Clock::set(new NativeClock()); + } + } + public function testCaptureIncludesIsServerProperty(): void { self::assertTrue( From a0fcc28d959b0ee216202bd675e19388c4ff781c Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Wed, 17 Jun 2026 17:08:05 +0200 Subject: [PATCH 2/4] fix: reject non-finite flush intervals --- lib/QueueConsumer.php | 2 +- test/PostHogTest.php | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/lib/QueueConsumer.php b/lib/QueueConsumer.php index 06c2761..354e456 100644 --- a/lib/QueueConsumer.php +++ b/lib/QueueConsumer.php @@ -50,7 +50,7 @@ public function __construct($apiKey, $options = array()) $flushInterval = $options["flush_interval_seconds"]; if (is_int($flushInterval) || is_float($flushInterval)) { $flushInterval = (float) $flushInterval; - if ($flushInterval >= 0) { + if (is_finite($flushInterval) && $flushInterval >= 0) { $this->flush_interval = $flushInterval; } } diff --git a/test/PostHogTest.php b/test/PostHogTest.php index b8a1025..a982442 100644 --- a/test/PostHogTest.php +++ b/test/PostHogTest.php @@ -626,6 +626,43 @@ public function testInvalidCaptureFlushIntervalDefaultsToFiveSeconds(): void } } + public function testNonFiniteCaptureFlushIntervalDefaultsToFiveSeconds(): void + { + $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); + Clock::set($mockClock); + + try { + $httpClient = new MockedHttpClient("app.posthog.com"); + $client = new Client( + self::FAKE_API_KEY, + [ + "debug" => true, + "batch_size" => 100, + "flush_interval_seconds" => INF, + ], + $httpClient, + null, + false + ); + + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); + $mockClock->sleep(4); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"])); + $this->assertSame([], $httpClient->calls ?? []); + + $mockClock->sleep(1); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "three"])); + + $batchCalls = array_values(array_filter( + $httpClient->calls ?? [], + static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" + )); + $this->assertCount(1, $batchCalls); + } finally { + Clock::set(new NativeClock()); + } + } + public function testCaptureIncludesIsServerProperty(): void { self::assertTrue( From d590711b6e4e266fefb74212e7c3e1ac5baa35fc Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Wed, 17 Jun 2026 17:14:02 +0200 Subject: [PATCH 3/4] test: parameterize invalid flush interval cases --- test/PostHogTest.php | 84 ++++++-------------------------------------- 1 file changed, 11 insertions(+), 73 deletions(-) diff --git a/test/PostHogTest.php b/test/PostHogTest.php index a982442..ada65a6 100644 --- a/test/PostHogTest.php +++ b/test/PostHogTest.php @@ -552,81 +552,19 @@ public function testCaptureFlushIntervalZeroFlushesImmediately(): void $this->assertSame(["one"], array_column($payload["batch"], "event")); } - public function testNegativeCaptureFlushIntervalDefaultsToFiveSeconds(): void + public static function invalidCaptureFlushIntervalCases(): array { - $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); - Clock::set($mockClock); - - try { - $httpClient = new MockedHttpClient("app.posthog.com"); - $client = new Client( - self::FAKE_API_KEY, - [ - "debug" => true, - "batch_size" => 100, - "flush_interval_seconds" => -1, - ], - $httpClient, - null, - false - ); - - $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); - $mockClock->sleep(4); - $this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"])); - $this->assertSame([], $httpClient->calls ?? []); - - $mockClock->sleep(1); - $this->assertTrue($client->capture(["distinctId" => "john", "event" => "three"])); - - $batchCalls = array_values(array_filter( - $httpClient->calls ?? [], - static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" - )); - $this->assertCount(1, $batchCalls); - } finally { - Clock::set(new NativeClock()); - } - } - - public function testInvalidCaptureFlushIntervalDefaultsToFiveSeconds(): void - { - $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); - Clock::set($mockClock); - - try { - $httpClient = new MockedHttpClient("app.posthog.com"); - $client = new Client( - self::FAKE_API_KEY, - [ - "debug" => true, - "batch_size" => 100, - "flush_interval_seconds" => "1", - ], - $httpClient, - null, - false - ); - - $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); - $mockClock->sleep(4); - $this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"])); - $this->assertSame([], $httpClient->calls ?? []); - - $mockClock->sleep(1); - $this->assertTrue($client->capture(["distinctId" => "john", "event" => "three"])); - - $batchCalls = array_values(array_filter( - $httpClient->calls ?? [], - static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" - )); - $this->assertCount(1, $batchCalls); - } finally { - Clock::set(new NativeClock()); - } + return [ + 'negative interval' => [-1], + 'numeric string interval' => ['1'], + 'non-finite interval' => [INF], + ]; } - public function testNonFiniteCaptureFlushIntervalDefaultsToFiveSeconds(): void + /** + * @dataProvider invalidCaptureFlushIntervalCases + */ + public function testInvalidCaptureFlushIntervalDefaultsToFiveSeconds(mixed $flushInterval): void { $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); Clock::set($mockClock); @@ -638,7 +576,7 @@ public function testNonFiniteCaptureFlushIntervalDefaultsToFiveSeconds(): void [ "debug" => true, "batch_size" => 100, - "flush_interval_seconds" => INF, + "flush_interval_seconds" => $flushInterval, ], $httpClient, null, From 3d66e416b5cd4e5603ecdd650ecd3bf1a0396d90 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Thu, 18 Jun 2026 13:50:18 +0200 Subject: [PATCH 4/4] docs: clarify time option units --- lib/Client.php | 7 ++++++- lib/PostHog.php | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/Client.php b/lib/Client.php index 16f98ad..446f204 100644 --- a/lib/Client.php +++ b/lib/Client.php @@ -107,10 +107,15 @@ class Client implements FeatureFlagEvaluationsHost * * @param string|null $apiKey Your project API key. When omitted or empty, the client is disabled * and uses the noop consumer. + * Time-based options use milliseconds unless the option name says otherwise: + * `timeout` and `maximum_backoff_duration` are in milliseconds for libcurl/HTTP requests, + * while `flush_interval_seconds` is in seconds. For the socket consumer, `timeout` is passed + * to pfsockopen() and is in seconds. + * * @param array{ * host?: string, * ssl?: bool, - * timeout?: int, + * timeout?: int|float, * verify_batch_events_request?: bool, * feature_flag_request_timeout_ms?: int, * maximum_backoff_duration?: int, diff --git a/lib/PostHog.php b/lib/PostHog.php index 07a08d8..63db4cf 100644 --- a/lib/PostHog.php +++ b/lib/PostHog.php @@ -24,10 +24,15 @@ class PostHog * host option is omitted, POSTHOG_HOST is used when present. * * @param string|null $apiKey Your project API key. + * Time-based options use milliseconds unless the option name says otherwise: + * `timeout` and `maximum_backoff_duration` are in milliseconds for libcurl/HTTP requests, + * while `flush_interval_seconds` is in seconds. For the socket consumer, `timeout` is passed + * to pfsockopen() and is in seconds. + * * @param array{ * host?: string, * ssl?: bool, - * timeout?: int, + * timeout?: int|float, * verify_batch_events_request?: bool, * feature_flag_request_timeout_ms?: int, * maximum_backoff_duration?: int,