Adding ModTimeStream and ModTimeBucket and Client#99
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis pull request introduces ModTimeBucket, a new streaming API for rblob that orders objects by modification time and emits reflex.Event payloads. It relocates the Decoder interface to utils.go and adds a generic BucketOption[T]. ModTimeBucket implements configurable backoff, decoder, and prefix filtering, a cursor with optional Offset for mid-object resume (legacy two-part cursor still supported), ordered listing by ModTime then Key, reader/decoder lifecycle with polling/backoff, Recv/Close semantics, and comprehensive unit and integration tests including S3 gating. Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rblob/modTimeBlob_test.go`:
- Around line 110-112: The test is using the wrong prefix literal so the created
blobs "prefix/a-obj" and "prefix/b-obj" are excluded; update the call to
openModTimeBucket that passes rblob.WithModTimePrefix to use the correct prefix
string ("prefix/" instead of "prefix.") so the filter matches the created blob
keys and Recv() does not hang; locate the call in the test (openModTimeBucket
with rblob.WithModTimePrefix and rblob.WithModTimeBackoff) and change the prefix
to include the trailing slash.
In `@rblob/modTimeBlob.go`:
- Around line 86-89: The current modtimeCursor only tracks Key and ModTime, so
when a decoder yields multiple records from a single blob all emitted events
share the same cursor (see modtimeCursor and uses of s.cursor.String()), causing
resumed reads to skip remaining records; update modtimeCursor to include an
intra-object position identifier (e.g., RecordIndex or ByteOffset) and ensure
the cursor is updated per decoded record (not once per object), then change
resume logic in the reader/iterator (the code that reopens the blob and iterates
decoded payloads) to, upon reopening the current Key/ModTime, advance/skip
decoded records until reaching the saved intra-object position before emitting
further records so previously-emitted records aren’t replayed or skipped.
- Around line 115-127: NewModTimeBucket should validate and clamp options before
returning: if ModTimeBucket.decoderFunc is nil (e.g. after
WithModTimeDecoder(nil)) reset it to JSONDecoder, and if ModTimeBucket.backoff
is <= 0 (e.g. after WithModTimeBackoff(0) or negative) reset it to time.Minute
(or your chosen default); do this validation in NewModTimeBucket right after
applying opts so the returned ModTimeBucket is always usable and won't panic or
spin at runtime.
In `@rblob/s3_test.go`:
- Around line 21-24: The modTimeS3Prefix flag currently defaults to "prefix",
causing tests to always filter by that prefix; change the flag default to an
empty string (modTimeS3Prefix := flag.String("test_modtime_s3_prefix", "", ...))
and update any code that interprets the prefix (e.g., the streaming/listing
logic that checks modTimeS3Prefix) so that an empty string is treated as "no
prefix filter" (i.e., do not scope the S3 listing when *modTimeS3Prefix == "").
Also update the flag help text to indicate that an empty value means no prefix
filtering and adjust any tests that relied on the old default.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7ed22e5f-5a7f-4f09-a7ab-eea1b93e413a
📒 Files selected for processing (6)
rblob/blob.gorblob/modTimeBlob.gorblob/modTimeBlob_internal_test.gorblob/modTimeBlob_test.gorblob/s3_test.gorblob/utils.go
💤 Files with no reviewable changes (1)
- rblob/blob.go
|
| t.Cleanup(func() { require.NoError(t, b.Close()) }) | ||
|
|
||
| s := &modtimeStream{ | ||
| ctx: context.Background(), |
There was a problem hiding this comment.
For all these context.Background() rather use t.Context().



For External Submissions:
Adding ModTimeStream and ModTimeBucket to allow for S3 bucket to read all objects (filtered by prefix) that are ordered by last modified time and processed in ordered queue.
This work addresses issue #94 and unit tests are included
Summary of how ModTimeBlob works:
Receiving an event (Recv)
Each call to Recv goes through recv, which follows this loop:
recv()
└─ while no active blob (decoder==nil or blobEOF)
└─ loadNextObject()
└─ emit one event from current blob
loadNextObject does the heavy lifting:
sorts them by (ModTime ASC, Key ASC).
the object is the cursor blob (isCursorBlob), it stops skipping to allow a mid-blob resume.
then sets resumeDone=true. Otherwise resets the cursor to {Key, ModTime, Offset:0}.
Back in recv, once a blob is active:
next blob.
Cursor and resume
The event ID is the serialised cursor: "events/2024.json|1700000000000000000|2". A consumer stores the last-seen
ID and passes it back as after on restart. The stream then:
Summary by CodeRabbit
New Features
Public API
Tests