diff --git a/.changeset/brave-otters-flush.md b/.changeset/brave-otters-flush.md new file mode 100644 index 0000000..6aa3729 --- /dev/null +++ b/.changeset/brave-otters-flush.md @@ -0,0 +1,5 @@ +--- +"posthog/posthog-php": minor +--- + +Add configurable flush interval support for queued event batching. diff --git a/lib/Client.php b/lib/Client.php index 9f8b149..446f204 100644 --- a/lib/Client.php +++ b/lib/Client.php @@ -107,10 +107,15 @@ class Client implements FeatureFlagEvaluationsHost * * @param string|null $apiKey Your project API key. When omitted or empty, the client is disabled * and uses the noop consumer. + * Time-based options use milliseconds unless the option name says otherwise: + * `timeout` and `maximum_backoff_duration` are in milliseconds for libcurl/HTTP requests, + * while `flush_interval_seconds` is in seconds. For the socket consumer, `timeout` is passed + * to pfsockopen() and is in seconds. + * * @param array{ * host?: string, * ssl?: bool, - * timeout?: int, + * timeout?: int|float, * verify_batch_events_request?: bool, * feature_flag_request_timeout_ms?: int, * maximum_backoff_duration?: int, @@ -118,6 +123,7 @@ class Client implements FeatureFlagEvaluationsHost * debug?: bool, * max_queue_size?: int, * batch_size?: int, + * flush_interval_seconds?: int|float, * compress_request?: bool|string, * error_handler?: callable, * filename?: string, diff --git a/lib/PostHog.php b/lib/PostHog.php index 61fc725..a7a0c6b 100644 --- a/lib/PostHog.php +++ b/lib/PostHog.php @@ -24,10 +24,15 @@ class PostHog * host option is omitted, POSTHOG_HOST is used when present. * * @param string|null $apiKey Your project API key. + * Time-based options use milliseconds unless the option name says otherwise: + * `timeout` and `maximum_backoff_duration` are in milliseconds for libcurl/HTTP requests, + * while `flush_interval_seconds` is in seconds. For the socket consumer, `timeout` is passed + * to pfsockopen() and is in seconds. + * * @param array{ * host?: string, * ssl?: bool, - * timeout?: int, + * timeout?: int|float, * verify_batch_events_request?: bool, * feature_flag_request_timeout_ms?: int, * maximum_backoff_duration?: int, @@ -35,6 +40,7 @@ class PostHog * debug?: bool, * max_queue_size?: int, * batch_size?: int, + * flush_interval_seconds?: int|float, * compress_request?: bool|string, * error_handler?: callable, * filename?: string, diff --git a/lib/QueueConsumer.php b/lib/QueueConsumer.php index b1fa52e..c072bbc 100644 --- a/lib/QueueConsumer.php +++ b/lib/QueueConsumer.php @@ -2,9 +2,15 @@ namespace PostHog; +use Symfony\Component\Clock\Clock; + /** * Base class for consumers that batch messages before delivery. * + * PHP has no portable in-process background timer, so flush_interval_seconds is enforced + * opportunistically when another message is enqueued. Explicit flushes and + * destruction still drain pending messages immediately. + * * @internal */ abstract class QueueConsumer extends Consumer @@ -17,6 +23,8 @@ abstract class QueueConsumer extends Consumer protected $queue; protected $max_queue_size = 1000; protected $batch_size = 100; + protected $flush_interval = 5.0; + protected $flush_after = null; protected $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s protected $host = "us.i.posthog.com"; protected $compress_request = false; @@ -24,7 +32,7 @@ abstract class QueueConsumer extends Consumer /** * Store our api key and options as part of this consumer * @param string $apiKey - * @param array $options + * @param array $options Consumer options. */ public function __construct($apiKey, $options = array()) { @@ -38,6 +46,16 @@ public function __construct($apiKey, $options = array()) $this->batch_size = (int) $options["batch_size"]; } + if (isset($options["flush_interval_seconds"])) { + $flushInterval = $options["flush_interval_seconds"]; + if (is_int($flushInterval) || is_float($flushInterval)) { + $flushInterval = (float) $flushInterval; + if (is_finite($flushInterval) && $flushInterval >= 0) { + $this->flush_interval = $flushInterval; + } + } + } + if (isset($options["maximum_backoff_duration"])) { $this->maximum_backoff_duration = (int) $options["maximum_backoff_duration"]; } @@ -117,6 +135,8 @@ public function flush() $count = count($this->queue); } + $this->resetFlushTimer(); + return $success; } @@ -133,15 +153,37 @@ public function enqueue($item) return false; } + $wasEmpty = $count === 0; $count = array_push($this->queue, $item); - if ($count >= $this->batch_size) { + if ($wasEmpty) { + $this->resetFlushTimer(); + } + + if ($this->flush_interval === 0.0 || $count >= $this->batch_size || $this->flushIntervalElapsed()) { return $this->flush(); // return ->flush() result: true on success } return true; } + private function resetFlushTimer(): void + { + $this->flush_after = count($this->queue) > 0 && $this->flush_interval > 0 + ? $this->now() + $this->flush_interval + : null; + } + + private function flushIntervalElapsed(): bool + { + return $this->flush_after !== null && $this->now() >= $this->flush_after; + } + + private function now(): float + { + return (float) Clock::get()->now()->format('U.u'); + } + /** * Given a batch of messages the method returns * a valid payload. diff --git a/test/PostHogTest.php b/test/PostHogTest.php index abbe64c..264a280 100644 --- a/test/PostHogTest.php +++ b/test/PostHogTest.php @@ -11,6 +11,9 @@ use PostHog\Consumer\NoOp; use PostHog\PostHog; use PostHog\Test\Assets\MockedResponses; +use Symfony\Component\Clock\Clock; +use Symfony\Component\Clock\MockClock; +use Symfony\Component\Clock\NativeClock; class PostHogTest extends TestCase @@ -496,6 +499,156 @@ public function testCapture(): void ); } + public function testCaptureFlushesOnNextEnqueueAfterDefaultFlushInterval(): void + { + $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); + Clock::set($mockClock); + + try { + $httpClient = new MockedHttpClient("app.posthog.com"); + $client = new Client( + self::FAKE_API_KEY, + [ + "debug" => true, + "batch_size" => 100, + ], + $httpClient, + null, + false + ); + + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); + $this->assertSame([], $httpClient->calls ?? []); + + $mockClock->sleep(4); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"])); + $this->assertSame([], $httpClient->calls ?? []); + + $mockClock->sleep(1); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "three"])); + + $batchCalls = array_values(array_filter( + $httpClient->calls ?? [], + static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" + )); + $this->assertCount(1, $batchCalls); + + $payload = json_decode($batchCalls[0]["payload"], true); + $this->assertSame(["one", "two", "three"], array_column($payload["batch"], "event")); + } finally { + Clock::set(new NativeClock()); + } + } + + public function testCaptureFlushIntervalCanBeConfiguredInSeconds(): void + { + $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); + Clock::set($mockClock); + + try { + $httpClient = new MockedHttpClient("app.posthog.com"); + $client = new Client( + self::FAKE_API_KEY, + [ + "debug" => true, + "batch_size" => 100, + "flush_interval_seconds" => 1, + ], + $httpClient, + null, + false + ); + + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); + $mockClock->sleep(1); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"])); + + $batchCalls = array_values(array_filter( + $httpClient->calls ?? [], + static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" + )); + $this->assertCount(1, $batchCalls); + } finally { + Clock::set(new NativeClock()); + } + } + + public function testCaptureFlushIntervalZeroFlushesImmediately(): void + { + $httpClient = new MockedHttpClient("app.posthog.com"); + $client = new Client( + self::FAKE_API_KEY, + [ + "debug" => true, + "batch_size" => 100, + "flush_interval_seconds" => 0, + ], + $httpClient, + null, + false + ); + + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); + + $batchCalls = array_values(array_filter( + $httpClient->calls ?? [], + static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" + )); + $this->assertCount(1, $batchCalls); + + $payload = json_decode($batchCalls[0]["payload"], true); + $this->assertSame(["one"], array_column($payload["batch"], "event")); + } + + public static function invalidCaptureFlushIntervalCases(): array + { + return [ + 'negative interval' => [-1], + 'numeric string interval' => ['1'], + 'non-finite interval' => [INF], + ]; + } + + /** + * @dataProvider invalidCaptureFlushIntervalCases + */ + public function testInvalidCaptureFlushIntervalDefaultsToFiveSeconds(mixed $flushInterval): void + { + $mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00')); + Clock::set($mockClock); + + try { + $httpClient = new MockedHttpClient("app.posthog.com"); + $client = new Client( + self::FAKE_API_KEY, + [ + "debug" => true, + "batch_size" => 100, + "flush_interval_seconds" => $flushInterval, + ], + $httpClient, + null, + false + ); + + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"])); + $mockClock->sleep(4); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"])); + $this->assertSame([], $httpClient->calls ?? []); + + $mockClock->sleep(1); + $this->assertTrue($client->capture(["distinctId" => "john", "event" => "three"])); + + $batchCalls = array_values(array_filter( + $httpClient->calls ?? [], + static fn(array $call): bool => ($call["path"] ?? null) === "/batch/" + )); + $this->assertCount(1, $batchCalls); + } finally { + Clock::set(new NativeClock()); + } + } + public function testCaptureIncludesIsServerProperty(): void { self::assertTrue(