feat: add flush interval batching#176
Conversation
Prompt To Fix All With AIFix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
spec/posthog/send_worker_spec.rb:117-173
**Missing test for the `notify` producer-wake path**
The three new tests cover: (1) waiting out the full interval when no new messages arrive, (2) breaking early when the batch is full, and (3) breaking early on `request_flush`. The fourth scenario — a second event arriving *during* the wait that should be consolidated into the same batch — is not tested. This is the primary reason `notify` exists (producers wake the sleeping worker), but there is no test that enqueues a second message while the worker is already inside `wait_for_more_messages`, verifies the worker is woken via `notify`, and asserts both events appear in the same batch before the deadline expires.
Reviews (1): Last reviewed commit: "feat: add flush interval batching" | Re-trigger Greptile |
# Conflicts: # lib/posthog/client.rb # lib/posthog/send_worker.rb
|
Reviews (2): Last reviewed commit: "refactor: rename flush interval option" | Re-trigger Greptile |
|
Reviews (3): Last reviewed commit: "fix: harden flush interval worker lifecy..." | Re-trigger Greptile |
ioannisj
left a comment
There was a problem hiding this comment.
Got this from my bot, may worth checking if this is a valid concern?
Pre-PR flush() removed the batch before sending:
$batch = array_splice($this->queue, 0, min($this->batch_size, $count)); // REMOVE first
$success = $this->flushBatch($batch); // then send
On a socket-open failure, old flushBatch() returned false → the batch was already gone (dropped/lost) and the loop stopped. The queue never grew.
Post-PR flush() copies, sends, then conditionally removes (QueueConsumer.php:119-130):
$batch = array_slice($this->queue, 0, $batchSize); // COPY, don't remove
$result = $this->flushBatch($batch);
$success = true === $result;
if ($success || self::FLUSH_BATCH_RETRYABLE_FAILURE !== $result) {
array_splice($this->queue, 0, $batchSize); // remove only if NOT retryable
}
Now a socket-open failure returns FLUSH_BATCH_RETRYABLE_FAILURE (Socket.php:52-53, changed from return false by this PR) → the batch is retained and the loop stops
|
I checked this concern and it doesn't apply to this Ruby PR. The referenced files/symbols ( |
this PR is ruby and not PHP |
💡 Motivation and Context
Ruby async batching previously sent whatever was queued as soon as the worker ran. Other server SDKs (Python, .NET, Go, Elixir) support a time-based flush threshold so partial batches can accumulate briefly instead of becoming one request per event under low throughput.
This adds a configurable
flush_interval_secondsfor async batching, defaulting to 5 seconds, aligned with the event-batcher spec and the Python SDK behavior from PostHog/posthog-python#672.💚 How did you test it?
bundle exec rubocop lib/posthog/send_worker.rb lib/posthog/client.rb lib/posthog/noop_worker.rb spec/posthog/send_worker_spec.rbbundle exec rspec📝 Checklist
If releasing new changes
pnpm changesetto generate a changeset file🤖 Agent context
Autonomy: Human-driven (agent-assisted)
Implemented with pi/coding-agent. I used the existing Ruby worker/client design and referenced batching behavior from the Python SDK and the shared event-batcher spec. The implementation keeps explicit
flushand shutdown paths immediate while allowing normal async workers to wait up to the configured interval for a fuller batch.