Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/brave-otters-flush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"posthog/posthog-php": minor
---

Add configurable flush interval support for queued event batching.
8 changes: 7 additions & 1 deletion lib/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,23 @@ class Client implements FeatureFlagEvaluationsHost
*
* @param string|null $apiKey Your project API key. When omitted or empty, the client is disabled
* and uses the noop consumer.
* Time-based options use milliseconds unless the option name says otherwise:
* `timeout` and `maximum_backoff_duration` are in milliseconds for libcurl/HTTP requests,
* while `flush_interval_seconds` is in seconds. For the socket consumer, `timeout` is passed
* to pfsockopen() and is in seconds.
*
* @param array{
* host?: string,
* ssl?: bool,
* timeout?: int,
* timeout?: int|float,
* verify_batch_events_request?: bool,
* feature_flag_request_timeout_ms?: int,
* maximum_backoff_duration?: int,
* consumer?: 'socket'|'file'|'fork_curl'|'lib_curl'|'noop',
* debug?: bool,
* max_queue_size?: int,
* batch_size?: int,
* flush_interval_seconds?: int|float,
Comment thread
marandaneto marked this conversation as resolved.
* compress_request?: bool|string,
* error_handler?: callable,
* filename?: string,
Expand Down
8 changes: 7 additions & 1 deletion lib/PostHog.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@ class PostHog
* host option is omitted, POSTHOG_HOST is used when present.
*
* @param string|null $apiKey Your project API key.
* Time-based options use milliseconds unless the option name says otherwise:
* `timeout` and `maximum_backoff_duration` are in milliseconds for libcurl/HTTP requests,
* while `flush_interval_seconds` is in seconds. For the socket consumer, `timeout` is passed
* to pfsockopen() and is in seconds.
*
* @param array{
* host?: string,
* ssl?: bool,
* timeout?: int,
* timeout?: int|float,
* verify_batch_events_request?: bool,
* feature_flag_request_timeout_ms?: int,
* maximum_backoff_duration?: int,
* consumer?: 'socket'|'file'|'fork_curl'|'lib_curl'|'noop',
* debug?: bool,
* max_queue_size?: int,
* batch_size?: int,
* flush_interval_seconds?: int|float,
* compress_request?: bool|string,
* error_handler?: callable,
* filename?: string,
Expand Down
46 changes: 44 additions & 2 deletions lib/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

namespace PostHog;

use Symfony\Component\Clock\Clock;

/**
* Base class for consumers that batch messages before delivery.
*
* PHP has no portable in-process background timer, so flush_interval_seconds is enforced
* opportunistically when another message is enqueued. Explicit flushes and
* destruction still drain pending messages immediately.
*
* @internal
*/
abstract class QueueConsumer extends Consumer
Expand All @@ -17,14 +23,16 @@ abstract class QueueConsumer extends Consumer
protected $queue;
protected $max_queue_size = 1000;
protected $batch_size = 100;
protected $flush_interval = 5.0;
protected $flush_after = null;
protected $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s
protected $host = "us.i.posthog.com";
protected $compress_request = false;

/**
* Store our api key and options as part of this consumer
* @param string $apiKey
* @param array $options
* @param array $options Consumer options.
*/
public function __construct($apiKey, $options = array())
{
Expand All @@ -38,6 +46,16 @@ public function __construct($apiKey, $options = array())
$this->batch_size = (int) $options["batch_size"];
}

if (isset($options["flush_interval_seconds"])) {
$flushInterval = $options["flush_interval_seconds"];
if (is_int($flushInterval) || is_float($flushInterval)) {
$flushInterval = (float) $flushInterval;
if (is_finite($flushInterval) && $flushInterval >= 0) {
$this->flush_interval = $flushInterval;
}
}
}

if (isset($options["maximum_backoff_duration"])) {
$this->maximum_backoff_duration = (int) $options["maximum_backoff_duration"];
}
Expand Down Expand Up @@ -117,6 +135,8 @@ public function flush()
$count = count($this->queue);
}

$this->resetFlushTimer();

return $success;
}

Expand All @@ -133,15 +153,37 @@ public function enqueue($item)
return false;
}

$wasEmpty = $count === 0;
$count = array_push($this->queue, $item);

if ($count >= $this->batch_size) {
if ($wasEmpty) {
$this->resetFlushTimer();
}

if ($this->flush_interval === 0.0 || $count >= $this->batch_size || $this->flushIntervalElapsed()) {
return $this->flush(); // return ->flush() result: true on success
}

return true;
}

private function resetFlushTimer(): void
{
$this->flush_after = count($this->queue) > 0 && $this->flush_interval > 0
? $this->now() + $this->flush_interval
: null;
}

private function flushIntervalElapsed(): bool
{
return $this->flush_after !== null && $this->now() >= $this->flush_after;
}

private function now(): float
{
return (float) Clock::get()->now()->format('U.u');
}

/**
* Given a batch of messages the method returns
* a valid payload.
Expand Down
153 changes: 153 additions & 0 deletions test/PostHogTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
use PostHog\Consumer\NoOp;
use PostHog\PostHog;
use PostHog\Test\Assets\MockedResponses;
use Symfony\Component\Clock\Clock;
use Symfony\Component\Clock\MockClock;
use Symfony\Component\Clock\NativeClock;


class PostHogTest extends TestCase
Expand Down Expand Up @@ -496,6 +499,156 @@ public function testCapture(): void
);
}

public function testCaptureFlushesOnNextEnqueueAfterDefaultFlushInterval(): void
{
$mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00'));
Clock::set($mockClock);

try {
$httpClient = new MockedHttpClient("app.posthog.com");
$client = new Client(
self::FAKE_API_KEY,
[
"debug" => true,
"batch_size" => 100,
],
$httpClient,
null,
false
);

$this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"]));
$this->assertSame([], $httpClient->calls ?? []);

$mockClock->sleep(4);
$this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"]));
$this->assertSame([], $httpClient->calls ?? []);

$mockClock->sleep(1);
$this->assertTrue($client->capture(["distinctId" => "john", "event" => "three"]));

$batchCalls = array_values(array_filter(
$httpClient->calls ?? [],
static fn(array $call): bool => ($call["path"] ?? null) === "/batch/"
));
$this->assertCount(1, $batchCalls);

$payload = json_decode($batchCalls[0]["payload"], true);
$this->assertSame(["one", "two", "three"], array_column($payload["batch"], "event"));
} finally {
Clock::set(new NativeClock());
}
}

public function testCaptureFlushIntervalCanBeConfiguredInSeconds(): void
{
$mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00'));
Clock::set($mockClock);

try {
$httpClient = new MockedHttpClient("app.posthog.com");
$client = new Client(
self::FAKE_API_KEY,
[
"debug" => true,
"batch_size" => 100,
"flush_interval_seconds" => 1,
],
$httpClient,
null,
false
);

$this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"]));
$mockClock->sleep(1);
$this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"]));

$batchCalls = array_values(array_filter(
$httpClient->calls ?? [],
static fn(array $call): bool => ($call["path"] ?? null) === "/batch/"
));
$this->assertCount(1, $batchCalls);
} finally {
Clock::set(new NativeClock());
}
}

public function testCaptureFlushIntervalZeroFlushesImmediately(): void
{
$httpClient = new MockedHttpClient("app.posthog.com");
$client = new Client(
self::FAKE_API_KEY,
[
"debug" => true,
"batch_size" => 100,
"flush_interval_seconds" => 0,
],
$httpClient,
null,
false
);

$this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"]));

$batchCalls = array_values(array_filter(
$httpClient->calls ?? [],
static fn(array $call): bool => ($call["path"] ?? null) === "/batch/"
));
$this->assertCount(1, $batchCalls);

$payload = json_decode($batchCalls[0]["payload"], true);
$this->assertSame(["one"], array_column($payload["batch"], "event"));
}

public static function invalidCaptureFlushIntervalCases(): array
{
return [
'negative interval' => [-1],
'numeric string interval' => ['1'],
'non-finite interval' => [INF],
];
}

/**
* @dataProvider invalidCaptureFlushIntervalCases
*/
public function testInvalidCaptureFlushIntervalDefaultsToFiveSeconds(mixed $flushInterval): void
{
$mockClock = new MockClock(new \DateTimeImmutable('2022-05-01 00:00:00'));
Clock::set($mockClock);

try {
$httpClient = new MockedHttpClient("app.posthog.com");
$client = new Client(
self::FAKE_API_KEY,
[
"debug" => true,
"batch_size" => 100,
"flush_interval_seconds" => $flushInterval,
],
$httpClient,
null,
false
);

$this->assertTrue($client->capture(["distinctId" => "john", "event" => "one"]));
$mockClock->sleep(4);
$this->assertTrue($client->capture(["distinctId" => "john", "event" => "two"]));
$this->assertSame([], $httpClient->calls ?? []);

$mockClock->sleep(1);
$this->assertTrue($client->capture(["distinctId" => "john", "event" => "three"]));

$batchCalls = array_values(array_filter(
$httpClient->calls ?? [],
static fn(array $call): bool => ($call["path"] ?? null) === "/batch/"
));
$this->assertCount(1, $batchCalls);
} finally {
Clock::set(new NativeClock());
}
}

public function testCaptureIncludesIsServerProperty(): void
{
self::assertTrue(
Comment thread
marandaneto marked this conversation as resolved.
Expand Down
Loading