Skip to content

Adding ModTimeStream and ModTimeBucket and Client#99

Open
alice-luno wants to merge 7 commits into
mainfrom
ayang-modTime-streamfunc
Open

Adding ModTimeStream and ModTimeBucket and Client#99
alice-luno wants to merge 7 commits into
mainfrom
ayang-modTime-streamfunc

Conversation

@alice-luno
Copy link
Copy Markdown
Contributor

@alice-luno alice-luno commented May 29, 2026

For External Submissions:

Adding ModTimeStream and ModTimeBucket to allow for S3 bucket to read all objects (filtered by prefix) that are ordered by last modified time and processed in ordered queue.

This work addresses issue #94 and unit tests are included

Summary of how ModTimeBlob works:

Receiving an event (Recv)

Each call to Recv goes through recv, which follows this loop:

recv()
└─ while no active blob (decoder==nil or blobEOF)
└─ loadNextObject()
└─ emit one event from current blob

loadNextObject does the heavy lifting:

  1. Closes the previous blob reader if one is open.
  2. Refills the object list if exhausted — calls listSorted(), which fetches all objects matching the prefix and
    sorts them by (ModTime ASC, Key ASC).
  3. Skips objects at or before the cursor via after(), with one exception: if resumeDone=false and Offset>0 and
    the object is the cursor blob (isCursorBlob), it stops skipping to allow a mid-blob resume.
  4. Polls (with backoff) if no new objects exist yet.
  5. Opens the next object's reader and decoder.
  6. Mid-blob resume: if this is the cursor blob, decodes and discards the first Offset records (already-consumed),
    then sets resumeDone=true. Otherwise resets the cursor to {Key, ModTime, Offset:0}.
  7. Pre-fetches the first record into s.next.

Back in recv, once a blob is active:

  1. Takes s.next as the payload.
  2. Peeks ahead — decodes the next record. If EOF, sets blobEOF=true so the next Recv call will advance to the
    next blob.
  3. Increments s.cursor.Offset.
  4. Emits a reflex.Event with ID = "key|nano|offset", Timestamp = ModTime, MetaData = payload.

Cursor and resume

The event ID is the serialised cursor: "events/2024.json|1700000000000000000|2". A consumer stores the last-seen
ID and passes it back as after on restart. The stream then:

  • Skips all blobs strictly before that cursor (by ModTime, then Key).
  • Re-opens the cursor blob and discards the first Offset records.
  • Resumes emitting from record Offset+1 onwards.

  • Have you followed the guidelines in our Contributing file?
  • Have you checked to ensure there aren't other open Pull Requests for the same update/change?
  • If it's a non-trivial change, I've linked a new or existing issue to this PR
  • I have performed a self-review of my code
  • Does your submission pass existing tests?
  • Have you written new tests for your new changes, if applicable?
  • Have you added an explanation of what your changes do and why you'd like us to include them?

Summary by CodeRabbit

  • New Features

    • Stream objects ordered by modification time with resumable cursors, prefix filtering, configurable backoff, and per-record event timestamps with stable ordering.
  • Public API

    • Added a Decoder abstraction and a generic BucketOption type to configure buckets and decoders; new ModTimeBucket and stream entry points exposed.
  • Tests

    • Added extensive unit and integration tests covering ordering, resumption, prefix filtering, multi-record handling and S3 integration.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 29, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This pull request introduces ModTimeBucket, a new streaming API for rblob that orders objects by modification time and emits reflex.Event payloads. It relocates the Decoder interface to utils.go and adds a generic BucketOption[T]. ModTimeBucket implements configurable backoff, decoder, and prefix filtering, a cursor with optional Offset for mid-object resume (legacy two-part cursor still supported), ordered listing by ModTime then Key, reader/decoder lifecycle with polling/backoff, Recv/Close semantics, and comprehensive unit and integration tests including S3 gating.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • luno/reflex#97: Introduces the modtimeCursor encode/decode format that this PR's cursor parsing and event ID handling directly depend upon.

Suggested reviewers

  • ScaleneZA
  • onkuh
  • RieshBissessur
  • donovan-luno

Poem

🐇 I hopped through blobs by mod-time light,
I sniffed each cursor through the night,
With tiny paws I skipped the rest,
Decoders cheered — the stream's the best,
Hooray — events ordered just right!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately reflects the main additions in the changeset: ModTimeStream, ModTimeBucket, and related functionality are prominently featured across new and modified files.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The PR description clearly relates to the changeset, explaining ModTimeStream and ModTimeBucket additions for ordered S3 bucket reading by modification time.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch ayang-modTime-streamfunc

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rblob/modTimeBlob_test.go`:
- Around line 110-112: The test is using the wrong prefix literal so the created
blobs "prefix/a-obj" and "prefix/b-obj" are excluded; update the call to
openModTimeBucket that passes rblob.WithModTimePrefix to use the correct prefix
string ("prefix/" instead of "prefix.") so the filter matches the created blob
keys and Recv() does not hang; locate the call in the test (openModTimeBucket
with rblob.WithModTimePrefix and rblob.WithModTimeBackoff) and change the prefix
to include the trailing slash.

In `@rblob/modTimeBlob.go`:
- Around line 86-89: The current modtimeCursor only tracks Key and ModTime, so
when a decoder yields multiple records from a single blob all emitted events
share the same cursor (see modtimeCursor and uses of s.cursor.String()), causing
resumed reads to skip remaining records; update modtimeCursor to include an
intra-object position identifier (e.g., RecordIndex or ByteOffset) and ensure
the cursor is updated per decoded record (not once per object), then change
resume logic in the reader/iterator (the code that reopens the blob and iterates
decoded payloads) to, upon reopening the current Key/ModTime, advance/skip
decoded records until reaching the saved intra-object position before emitting
further records so previously-emitted records aren’t replayed or skipped.
- Around line 115-127: NewModTimeBucket should validate and clamp options before
returning: if ModTimeBucket.decoderFunc is nil (e.g. after
WithModTimeDecoder(nil)) reset it to JSONDecoder, and if ModTimeBucket.backoff
is <= 0 (e.g. after WithModTimeBackoff(0) or negative) reset it to time.Minute
(or your chosen default); do this validation in NewModTimeBucket right after
applying opts so the returned ModTimeBucket is always usable and won't panic or
spin at runtime.

In `@rblob/s3_test.go`:
- Around line 21-24: The modTimeS3Prefix flag currently defaults to "prefix",
causing tests to always filter by that prefix; change the flag default to an
empty string (modTimeS3Prefix := flag.String("test_modtime_s3_prefix", "", ...))
and update any code that interprets the prefix (e.g., the streaming/listing
logic that checks modTimeS3Prefix) so that an empty string is treated as "no
prefix filter" (i.e., do not scope the S3 listing when *modTimeS3Prefix == "").
Also update the flag help text to indicate that an empty value means no prefix
filtering and adjust any tests that relied on the old default.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7ed22e5f-5a7f-4f09-a7ab-eea1b93e413a

📥 Commits

Reviewing files that changed from the base of the PR and between 6547e00 and 0b102bd.

📒 Files selected for processing (6)
  • rblob/blob.go
  • rblob/modTimeBlob.go
  • rblob/modTimeBlob_internal_test.go
  • rblob/modTimeBlob_test.go
  • rblob/s3_test.go
  • rblob/utils.go
💤 Files with no reviewable changes (1)
  • rblob/blob.go

Comment thread rblob/modTimeBlob_test.go
Comment thread rblob/modTimeBlob.go
Comment thread rblob/modTimeBlob.go
Comment thread rblob/s3_test.go
@sonarqubecloud
Copy link
Copy Markdown

t.Cleanup(func() { require.NoError(t, b.Close()) })

s := &modtimeStream{
ctx: context.Background(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all these context.Background() rather use t.Context().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants