Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4d28854
feat: add distributed project manifest primitives
patrickleet Jun 2, 2026
4e87863
feat: return CommitReceipt from outbox commit
patrickleet Jun 3, 2026
e620e6c
feat: add BusPublisher (Bus -> AsyncMessagePublisher) adapter
patrickleet Jun 3, 2026
8dc2170
feat: add HasOutboxStore capability for repo wrappers
patrickleet Jun 3, 2026
023e5c8
feat: add Service::with_bus + Microservice runtime (produce side)
patrickleet Jun 3, 2026
4bf0cd0
feat: add OutboxCommit::commit_claimed (claim-in-transaction)
patrickleet Jun 3, 2026
dcb82ca
feat: Context::commit_outbox — publish-in-commit via attached bus
patrickleet Jun 3, 2026
cabb902
feat: Microservice::run derives listen/subscribe from handlers
patrickleet Jun 3, 2026
8c2a502
test: SQLite end-to-end durable-enqueue dispatch
patrickleet Jun 3, 2026
85881cb
feat: commit_outbox works for all repo shapes incl snapshots
patrickleet Jun 3, 2026
bef7691
refactor: snapshots are a transparent optimization (one repo type)
patrickleet Jun 5, 2026
8d6eda4
refactor: repo.outbox(msg).commit(agg) publishes — no wrapper
patrickleet Jun 5, 2026
bbe7405
refactor: with_bus is a Service builder step, not a separate type
patrickleet Jun 5, 2026
25d5895
refactor: Service::new().with_repo().with_read_model_store() builder
patrickleet Jun 5, 2026
3bb37df
docs: README durable-enqueue framing + close the Quick Start produce …
patrickleet Jun 5, 2026
3ad2b17
feat: compose read-models + snapshots in one aggregate commit
patrickleet Jun 5, 2026
9879c6c
test: aggregate + outbox + read-model + snapshot in one transaction
patrickleet Jun 5, 2026
dd6cb09
feat: distributed_tooling crate — pure service scaffold generation
patrickleet Jun 5, 2026
b79abb0
refactor: split distributed_tooling generate.rs into focused modules
patrickleet Jun 5, 2026
8ff658f
feat: port GitOps + GitHub workflow generation into distributed_tooling
patrickleet Jun 5, 2026
a4942c3
refactor: model the three GitHub scaffold flags independently
patrickleet Jun 5, 2026
a1b83d3
feat: expose package_name() for default output-dir derivation
patrickleet Jun 5, 2026
7547033
fix: address PR #53 review — generated-output correctness + builder g…
patrickleet Jun 6, 2026
d931a6e
ci: publish distributed_tooling on version tags
patrickleet Jun 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/workflows/on-v-tag-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ jobs:
manifest_path: distributed_macros/Cargo.toml
cargo_publish_args: "--locked"

# distributed_tooling is a standalone crate (only depends on serde_json), so it
# publishes independently of the macros/core crates.
publish-tooling:
uses: unbounded-tech/workflows-rust/.github/workflows/publish.yaml@feat/cargo-publish
secrets:
crates_io_token: ${{ secrets.CRATES_IO_TOKEN }}
with:
manifest_path: distributed_tooling/Cargo.toml
cargo_publish_args: "--locked"
Comment on lines +20 to +28

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Pin workflow references to commit hashes per security policy.

The workflow reference on line 23 uses a branch reference (@feat/cargo-publish) instead of a pinned commit hash. This violates the blanket policy flagged by static analysis and creates a supply-chain security risk—if the referenced branch is compromised, secrets could be exfiltrated or malicious code executed.

Note that this issue also affects lines 13, 32, and 43 in this file. Consider pinning all workflow references to specific commit hashes.

🔒 Example fix for pinning workflow references
-    uses: unbounded-tech/workflows-rust/.github/workflows/publish.yaml@feat/cargo-publish
+    uses: unbounded-tech/workflows-rust/.github/workflows/publish.yaml@a1b2c3d4e5f6  # feat/cargo-publish as of 2026-06-02

Apply the same pattern to lines 13, 32, and 43.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# distributed_tooling is a standalone crate (only depends on serde_json), so it
# publishes independently of the macros/core crates.
publish-tooling:
uses: unbounded-tech/workflows-rust/.github/workflows/publish.yaml@feat/cargo-publish
secrets:
crates_io_token: ${{ secrets.CRATES_IO_TOKEN }}
with:
manifest_path: distributed_tooling/Cargo.toml
cargo_publish_args: "--locked"
# distributed_tooling is a standalone crate (only depends on serde_json), so it
# publishes independently of the macros/core crates.
publish-tooling:
uses: unbounded-tech/workflows-rust/.github/workflows/publish.yaml@a1b2c3d4e5f6 # feat/cargo-publish as of 2026-06-02
secrets:
crates_io_token: ${{ secrets.CRATES_IO_TOKEN }}
with:
manifest_path: distributed_tooling/Cargo.toml
cargo_publish_args: "--locked"
🧰 Tools
🪛 zizmor (1.25.2)

[error] 23-23: unpinned action reference (unpinned-uses): action is not pinned to a hash (required by blanket policy)

(unpinned-uses)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.github/workflows/on-v-tag-publish.yaml around lines 20 - 28, The workflow
currently references third-party workflow branches (e.g., the publish step under
the publish-tooling job using "uses:
unbounded-tech/workflows-rust/.github/workflows/publish.yaml@feat/cargo-publish"),
which must be pinned to commit SHAs; update each "uses:" reference in this file
(including the other occurrences of the same external workflow) to replace
branch refs like "`@feat/cargo-publish`" with the corresponding full commit hash
(e.g., "@<commit-sha>") for the referenced repository so the publish-tooling job
and the other uses entries point to immutable commits.

Source: Linters/SAST tools


publish:
needs: publish-macros
uses: unbounded-tech/workflows-rust/.github/workflows/publish.yaml@feat/cargo-publish
Expand All @@ -27,7 +37,7 @@ jobs:
cargo_publish_args: "--locked"

release:
needs: publish
needs: [publish, publish-tooling]
permissions:
contents: write
uses: unbounded-tech/workflow-simple-release/.github/workflows/workflow.yaml@main
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["distributed_macros"]
members = ["distributed_macros", "distributed_tooling"]
resolver = "2"

[workspace.package]
Expand Down
109 changes: 73 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ pub async fn handle(ctx: &Context<'_, Repo>) -> Result<Value, HandlerError> {
let mut todo = Todo::default();
todo.initialize(input.id.clone(), input.user_id, input.task)?;

// Publish a fact for other services. The outbox row commits atomically
// with the aggregate's events.
// Record a fact for other services. The outbox row commits atomically with
// the aggregate's events. Once a bus is attached (step 3) this `commit`
// publishes the row immediately; with no bus it stays pending for a worker.
let message = OutboxMessage::domain_event("todo.initialized", &todo)?;
ctx.repo().outbox(message).commit(&mut todo).await?;

Expand All @@ -115,41 +116,41 @@ pub async fn handle(ctx: &Context<'_, Repo>) -> Result<Value, HandlerError> {

### 3. Serve it

Register your handlers on a `microsvc::Service` with `register_handlers!`, then
expose the exact same service over direct dispatch, HTTP, gRPC, or the bus. Handlers
are written once and are transport-agnostic.
Build the service fluently from `Service::new()`, register handlers with
`register_handlers!`, then expose the exact same service over direct dispatch,
HTTP, gRPC, or the bus. Handlers are written once and are transport-agnostic.

```rust
use std::sync::Arc;
use distributed::microsvc::{self, Service, Session};
use distributed::bus::{InMemoryBus, RunOptions};
use distributed::{AggregateBuilder, HashMapRepository, Queueable};
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let service = Arc::new(distributed::register_handlers!(
Service::with_repo(
let service = distributed::register_handlers!(
Service::new().with_repo(
HashMapRepository::new()
.queued()
.aggregate::<Todo>()
),
command handlers::todo_create,
command handlers::todo_complete,
));
);

// Direct, in-process dispatch
// Attach a bus and run. `with_bus` closes the loop from step 2: that
// `outbox(..).commit(..)` now publishes on commit, and `run` consumes the
// registered commands (and events). Same handlers, one line of wiring.
service
.dispatch(
"todo.initialize",
json!({ "id": "todo-1", "user_id": "alice", "task": "Ship it" }),
Session::new(),
)
.with_bus(InMemoryBus::new())
.run(RunOptions::idempotent())
.await?;

// ...or expose it over the network / a broker — pick any, they share handlers:
// microsvc::serve(service, "0.0.0.0:3000").await?; // HTTP (feature = "http")
// microsvc::serve_grpc(service, "[::1]:50051").await?; // gRPC (feature = "grpc")
// InMemoryBus::new().listen(service, RunOptions::idempotent()).await?; // bus
// Alternatives that share the same handlers:
// service.dispatch("todo.initialize", json!({ "id": "todo-1", .. }), Session::new()).await?; // in-process
// microsvc::serve(Arc::new(service), "0.0.0.0:3000").await?; // HTTP (feature = "http")
// microsvc::serve_grpc(Arc::new(service), "[::1]:50051").await?; // gRPC (feature = "grpc")

Ok(())
}
Expand All @@ -164,19 +165,19 @@ default you replace with a durable adapter.
```rust
// Persistence: HashMapRepository → durable SQL (features "postgres" / "sqlite")
let repo = distributed::PostgresRepository::connect_and_migrate(database_url).await?;
let service = Arc::new(distributed::register_handlers!(
Service::with_repo(repo.queued().aggregate::<Todo>()),
let service = distributed::register_handlers!(
Service::new().with_repo(repo.queued().aggregate::<Todo>()),
command handlers::todo_create,
command handlers::todo_complete,
));
);

// Transport: InMemoryBus → a real broker. send/listen/publish/subscribe + the
// handlers are unchanged; only this line differs.
// Transport: InMemoryBus → a real broker. The handlers and the
// `with_bus(..).run(..)` wiring are unchanged; only this constructor line differs.
// let bus = NatsBus::connect("nats://localhost:4222", "todos", "app").await?;
// let bus = PostgresBus::new(pool, "todos");
// let bus = RabbitBus::connect("amqp://localhost:5672/%2f", "todos", "app").await?;
// let bus = KafkaBus::connect("localhost:9092", "todos", "app").await?;
bus.listen(service, RunOptions::idempotent()).await?;
service.with_bus(bus).run(RunOptions::idempotent()).await?;
```

| Concern | In-memory default | Swap in for production |
Expand Down Expand Up @@ -708,7 +709,7 @@ read models, the outbox, **and** the durable transport (`PostgresBus`). See
Each outbox message is a durable delivery row committed alongside your domain entity. Aggregate event records are write-side replay history; they become domain events, integration events, commands, or transport messages only when application code creates an `OutboxMessage` for that purpose.

```rust
use distributed::{OutboxCommit, OutboxMessage};
use distributed::OutboxMessage;

let mut todo = Todo::default();
todo.entity.set_correlation_id("req-abc");
Expand All @@ -732,20 +733,44 @@ let message = OutboxMessage::encode_for_entity(
)?;
```

### Draining the Outbox
### Publishing the Outbox

How a committed row reaches the bus depends on whether a bus is attached to the
service:

`OutboxDispatcher` bridges durable outbox rows to a transport publisher, sharing one
claim → publish → complete path between background polling (`dispatch_batch`) and
after-commit immediate dispatch (`dispatch_ids`):
- **Bus attached (`service.with_bus(bus)`)** — `repo.outbox(msg).commit(agg)`
commits the row, then **immediately** after commit claims it under a short
lease and publishes it. A crash before the publish, or a publish failure,
leaves the row claimed under that lease; when the lease expires the polling
worker takes it.
- **No bus** — the row is committed `pending` and a worker publishes it.

The polling worker is the durable backstop in both cases. It is the same
`OutboxDispatcher` primitive composed with your runtime's timer — run it in the
service process or as a separate worker, against the same outbox store:

```rust
let dispatcher = OutboxDispatcher::new(store, publisher, "worker-1", lease, max_attempts);
let outcome = dispatcher.dispatch_ids(&committed_ids).await?; // claim-before-publish
use distributed::{BusPublisher, OutboxDispatcher};
use std::{sync::Arc, time::Duration};

let dispatcher = OutboxDispatcher::new(
repo.outbox_store(),
BusPublisher::new(Arc::new(bus)), // routes commands/events by kind
"outbox-worker-1",
Duration::from_secs(30), // claim lease
5, // max publish attempts
);

loop {
dispatcher.dispatch_batch(100).await?; // claim → publish → complete
tokio::time::sleep(Duration::from_secs(1)).await;
}
```

A row completes only after `publish()` resolves `Ok`; an unknown or failed publish
leaves it retryable (released until the attempt ceiling, then moved to `Failed`).
Claims use leases, so competing workers never publish the same row concurrently.
Claims use leases, so the immediate path and competing workers never publish the
same row concurrently.

## Service Bus

Expand Down Expand Up @@ -780,6 +805,12 @@ bus.subscribe(service.clone(), RunOptions::idempotent()).await?; // fan-out
// let bus = KafkaBus::connect("localhost:9092", "orders", "app").await?;
```

This is the low-level facade. For a `microsvc::Service`, the one-call convenience
is `service.with_bus(bus).run(opts)`: it derives the command names to `listen`
and the event names to `subscribe` from the registered handlers, and makes
`repo.outbox(msg).commit(agg)` publish on commit. Drop to `listen` / `subscribe`
/ `send` / `publish` directly when you need finer control.

Point-to-point vs fan-out is consistently a **consumer-group/identity** choice in
each transport's native topology — the same `group` competes, different `group`s
fan out:
Expand Down Expand Up @@ -827,7 +858,7 @@ The `microsvc` module provides a convention-based async command/event handler fr

### Defining a Service

A `Service<D>` is generic over a dependency type `D` that handlers read via `ctx`. Use `Service::with_repo` for aggregate command handlers, `Service::with_read_model_store` for projection handlers, `Service::with_repo_and_read_model_store` when a handler needs both, or `Service::new(deps)` for an arbitrary dependency.
A `Service<D>` is generic over a dependency type `D` that handlers read via `ctx`. Build one fluently from `Service::new()`: add `.with_repo(repo)` for aggregate command handlers, `.with_read_model_store(store)` for projection handlers (chain both when a handler needs both), and `.with_bus(bus)` to consume from / publish to a transport.

Handlers are registered with a fluent builder. `.command(name)` / `.event(name)` start a registration; `.handle(closure)` adds an unguarded handler and `.guarded(guard, closure)` adds a guarded one. The handler closure receives `&Context<D>` and returns a future:

Expand All @@ -838,7 +869,7 @@ use distributed::{AggregateBuilder, HashMapRepository, Queueable};
use serde_json::json;

let service = Arc::new(
Service::with_repo(HashMapRepository::new().queued().aggregate::<Counter>())
Service::new().with_repo(HashMapRepository::new().queued().aggregate::<Counter>())
.command("counter.initialize")
.handle(|ctx: &Context<Repo>| {
let input = ctx.input::<CreateCounter>();
Expand Down Expand Up @@ -927,7 +958,7 @@ Register them with the `register_handlers!` macro:

```rust
let service = distributed::register_handlers!(
Service::with_repo(HashMapRepository::new().queued().aggregate::<Counter>()),
Service::new().with_repo(HashMapRepository::new().queued().aggregate::<Counter>()),
command handlers::counter_create,
command handlers::counter_increment,
);
Expand Down Expand Up @@ -987,7 +1018,13 @@ Session handling mirrors HTTP — gRPC metadata headers are merged with payload

### Bus Transport

Drive a service from the bus with `listen` (point-to-point) or `subscribe` (fan-out). The same `Service` can handle commands from multiple transports simultaneously — HTTP, gRPC, bus, and direct dispatch all share the same handlers and repository. See [Service Bus](#service-bus) above.
Attach a bus with `service.with_bus(bus)` and drive it with `run(opts)`: it
derives `listen` (point-to-point commands) and `subscribe` (fan-out events) from
the registered handlers, and makes `repo.outbox(msg).commit(agg)` publish on
commit. The same `Service` can handle commands from multiple transports
simultaneously — HTTP, gRPC, bus, and direct dispatch all share the same handlers
and repository. For finer-grained control, call the `listen` / `subscribe` facade
methods directly. See [Service Bus](#service-bus) above.

### Error Handling

Expand Down
10 changes: 10 additions & 0 deletions distributed_tooling/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "distributed_tooling"
description = "Deterministic service-scaffold and artifact generation for Distributed services. Pure (no filesystem, network, or CLI): a ServiceScaffoldSpec in, a GeneratedProject out."
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
serde_json = "1"
Loading
Loading