From 382dbef46f3df38ed01f7ec6dac2a4e7fd64e4ee Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Thu, 11 Jun 2026 03:37:24 -0500 Subject: [PATCH 1/2] =?UTF-8?q?refactor:=20public=20API=20hygiene=20?= =?UTF-8?q?=E2=80=94=20non=5Fexhaustive,=20re-export=20pruning,=20crate=20?= =?UTF-8?q?docs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit API-affecting cleanup that is free at 0.1.0 and expensive later (2026-06-10 review). Root re-export pruning and #[non_exhaustive] additions change the public surface; reachability is preserved under module paths. - #[non_exhaustive] sweep on the remaining growing types: ReadModelError, HandlerError, ConsumerAckKind, KnativeIntegrationKind, TransportCapabilities, and OutboxMessage. Added TransportCapabilities::new(..) so external transport authors can still build a custom capability profile now that the struct literal is blocked. OutboxMessage keeps its new()/create()/encode()/Default constructors. NOTE: RepositoryError and LockError were intentionally NOT re-touched — #[non_exhaustive] was already added to them in #83. - Crate-level docs: #![doc = include_str!("../README.md")] so docs.rs renders the README. README rust fences are tagged `rust,ignore` (the snippets are deliberately abbreviated fragments referencing app-specific types, per the README's own "Example Conventions" note); the Project Structure tree is now `text`. cargo test --doc compiles clean. The real drift-catching fix is in the in-source doctests: the stale microsvc examples that showed a SYNC dispatch / `fn handle` are corrected to async (`dispatch(..).await`, `pub async fn handle`, closures returning a future) in microsvc/mod.rs, context.rs, and service.rs. - Pruned crate-root re-exports: the low-level outbox row plumbing (outbox_message_insert_plan, outbox_message_row_values) and the whole table::* surface are no longer re-exported at the crate root. They stay reachable under distributed::outbox::* and distributed::table::* (the outbox and outbox_worker modules are now `pub`). The documented quick-start API is unchanged. Internal/test references were repointed to the module paths. - Made BusTopologyConfig and the namespace/consumer-group validators (validate_namespace, validate_consumer_group, resolve_consumer_group, DEFAULT_BUS_NAMESPACE, MAX_TOPOLOGY_NAME_LEN) pub and re-exported from `bus`, matching validate_stable_message_id. Third-party transports can now reuse the portable naming rules, and this removes the dead-code warnings that appeared under --no-default-features and partial feature combos. - Resolved the dual outbox publisher story by documenting the boundary (no parallel hierarchies introduced): AsyncMessagePublisher (BusPublisher over a Bus, wired by service.with_bus) is THE production extension point; the sync OutboxPublisher/OutboxWorker/LogPublisher trio is rustdoc'd as the dev/test drain path. Folding/retiring the sync trait was deferred — it is exercised by tests/todos and retiring it is out of scope for a hygiene PR. Verified: cargo fmt --all --check; cargo build for default, --no-default-features, postgres+nats, sqlite+rabbitmq+http (all zero warnings, no topology dead code); cargo test --doc; cargo test and cargo test --features sqlite (all green). Implements [[tasks/public-api-docs-hygiene]] Co-Authored-By: Claude Fable 5 --- README.md | 102 +++++++++--------- src/bus/capabilities.rs | 27 +++++ src/bus/mod.rs | 5 +- src/bus/topology.rs | 53 ++++++--- src/lib.rs | 30 +++--- src/manifest.rs | 6 +- src/microsvc/context.rs | 5 +- src/microsvc/error.rs | 1 + src/microsvc/mod.rs | 27 ++--- src/microsvc/service.rs | 10 +- src/outbox/message.rs | 1 + src/outbox_worker/mod.rs | 18 +++- src/outbox_worker/publisher.rs | 34 +++++- src/outbox_worker/worker.rs | 15 ++- src/read_model/mod.rs | 1 + .../distributed_read_model/read_models/mod.rs | 2 +- tests/sqlite_repository/main.rs | 5 +- 17 files changed, 225 insertions(+), 117 deletions(-) diff --git a/README.md b/README.md index 7a398c4..e478a75 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ A domain model is a plain Rust struct with an embedded `Entity`. `#[sourced]` tu its command methods into recorded, replayable events; `#[derive(Snapshot)]` adds a hydration cache for long streams. -```rust +```rust,ignore use serde::Deserialize; use distributed::{sourced, Entity, Snapshot}; @@ -84,7 +84,7 @@ Each handler is a module exporting a `COMMAND` name, a `guard`, and an **async** `handle`. It loads/creates the aggregate, runs a command, and commits the resulting events — optionally alongside a durable outbox message in the same transaction. -```rust +```rust,ignore // handlers/todo_create.rs use serde_json::{json, Value}; use distributed::microsvc::{Context, HandlerError}; @@ -120,7 +120,7 @@ Build the service fluently from `Service::new()`, register handlers with `register_handlers!`, then expose the exact same service over direct dispatch, HTTP, gRPC, or the bus. Handlers are written once and are transport-agnostic. -```rust +```rust,ignore use std::sync::Arc; use distributed::microsvc::{self, Service, Session}; use distributed::bus::{InMemoryBus, RunOptions}; @@ -162,7 +162,7 @@ Everything above is in-memory. Moving to production is a **constructor change**, a handler change — every infrastructure concern is an async trait with an in-memory default you replace with a durable adapter. -```rust +```rust,ignore // Persistence: HashMapRepository → durable SQL (features "postgres" / "sqlite") let repo = distributed::PostgresRepository::connect_and_migrate(database_url).await?; let service = distributed::register_handlers!( @@ -284,7 +284,7 @@ Every infrastructure concern in `distributed` follows the same pattern: an **asy | Messaging | `Bus` + `BusConsumer` | `InMemoryBus` | `NatsBus`, `PostgresBus`, `RabbitBus`, `KafkaBus`, `KnativeBus` | | Read model rows | `ReadModelWritePlanStore` + `RelationalReadModelQueryStore` | `InMemoryReadModelStore` | Postgres, SQLite | | Snapshot store | `SnapshotStore` | `InMemorySnapshotStore` | Postgres, SQLite, … | -| Outbox publishing | `AsyncMessagePublisher` / `OutboxPublisher` | `LogPublisher` | Any transport publisher | +| Outbox publishing | `AsyncMessagePublisher` (production; the extension point) — sync `OutboxPublisher` is dev/test only | `LogPublisher` (dev/test) | Any `AsyncMessagePublisher` (e.g. `BusPublisher` over a real `Bus`) | | Locking | `AsyncLock` + `AsyncLockManager` | `InMemoryAsyncLockManager` | `PostgresLockManager`, `SqliteLockManager` (durable leases), Redis, … | All in-memory defaults are `Clone` and `Send + Sync`, so they work in single-task tests and multi-task servers alike. When you're ready for production, implement the trait for your infrastructure and plug it in — handler code does not change. @@ -297,7 +297,7 @@ Event methods are rewritten to return `SourcedResult`, even when the source meth ### Basic Usage -```rust +```rust,ignore use distributed::{sourced, Entity}; #[derive(Default)] @@ -326,7 +326,7 @@ impl Todo { This generates: -```rust +```rust,ignore // Typed event enum with named fields from method parameters #[derive(Debug, Clone, PartialEq)] pub enum TodoEvent { @@ -349,7 +349,7 @@ impl Aggregate for Todo { /* ... */ } `Aggregate::aggregate_type()` provides the type component of a persistence stream's identity (the pair `(aggregate_type, aggregate_id)`). The default uses Rust's type name for development convenience, but **production persistence should set an explicit, stable durable name**: -```rust +```rust,ignore #[sourced(entity, aggregate_type = "todo")] impl Todo { // events are stored under the durable stream type "todo" @@ -360,7 +360,7 @@ impl Todo { The generated enum enables exhaustive matching — if you add or remove an event, the compiler tells you everywhere that needs updating: -```rust +```rust,ignore use distributed::EventRecord; fn print_todo_event(record: &EventRecord) -> Result<(), String> { @@ -377,7 +377,7 @@ fn print_todo_event(record: &EventRecord) -> Result<(), String> { ### Custom Enum Name -```rust +```rust,ignore #[sourced(entity, events = "TodoCommand")] impl Todo { // generates TodoCommand enum instead of TodoEvent @@ -388,7 +388,7 @@ impl Todo { Create events at a specific version for [upcasting](#event-upcasting--versioning): -```rust +```rust,ignore type InitV1 = (String, String); type InitV2 = (String, String, u8); @@ -414,7 +414,7 @@ impl TodoV2 { ### Custom Entity Field -```rust +```rust,ignore #[sourced(my_entity)] impl MyAggregate { #[event("initialized")] @@ -428,7 +428,7 @@ impl MyAggregate { Add `enqueue` to `#[sourced]` to automatically queue events for in-process emission alongside digest. Every `#[event]` method both records to the entity stream and enqueues for emission: -```rust +```rust,ignore use distributed::{sourced, Entity}; use distributed::emitter::EntityEmitter; @@ -456,7 +456,7 @@ impl Order { **Custom emitter field** — when your emitter field isn't named `emitter`: -```rust +```rust,ignore #[sourced(entity, enqueue(my_emitter))] impl Notifier { #[event("sent")] @@ -473,7 +473,7 @@ The `#[digest]` and `aggregate!()` macros are the lower-level building blocks th ### The `#[digest]` Macro -```rust +```rust,ignore // Basic — captures function parameters #[digest("initialized")] fn initialize(&mut self, id: String, user_id: String, task: String) { @@ -501,7 +501,7 @@ fn create(&mut self, name: String) { /* uses self.my_entity */ } Generates the `Aggregate` trait implementation with replay logic: -```rust +```rust,ignore aggregate!(Todo, entity, aggregate_type = "todo" { "initialized"(id, user_id, task) => initialize, "completed"() => complete(), @@ -510,7 +510,7 @@ aggregate!(Todo, entity, aggregate_type = "todo" { With [upcasters](#event-upcasting--versioning) for event schema evolution: -```rust +```rust,ignore type InitV1 = (String, String); type InitV2 = (String, String, u8); @@ -534,7 +534,7 @@ Metadata lets you attach cross-cutting context — correlation IDs, causation ID Set metadata on the entity before calling command methods. Every event produced by `#[event]` or `#[digest]` automatically inherits it: -```rust +```rust,ignore let mut todo = Todo::default(); todo.entity.set_correlation_id("req-abc-123"); @@ -552,7 +552,7 @@ Entity metadata is **transient** — it is not serialized with the entity. It is Use `encode_for_entity` to create outbox messages that automatically inherit the entity's metadata context: -```rust +```rust,ignore let outbox = OutboxMessage::encode_for_entity( format!("{}:created", order.entity.id()), "order.initialized", @@ -577,7 +577,7 @@ Framework-derived metadata (codec, destination, source aggregate) is namespaced ### Reading Metadata -```rust +```rust,ignore // On EventRecord (event store) event_record.correlation_id() // Option<&str> event_record.causation_id() @@ -596,7 +596,7 @@ The `emitter` feature (enabled by default) adds in-process event-driven choreogr Every `#[event]` method automatically records to the entity stream (for replay) and enqueues for in-process emission: -```rust +```rust,ignore use serde::{Deserialize, Serialize}; use distributed::{sourced, Entity}; use distributed::emitter::EntityEmitter; @@ -630,7 +630,7 @@ impl OrderSaga { Queued events are held until you explicitly emit them after a successful commit: -```rust +```rust,ignore let mut saga = OrderSaga::default(); saga.start("order-1".into())?; @@ -643,7 +643,7 @@ saga.emitter.emit_queued(); ### Registering Listeners -```rust +```rust,ignore let shared_state = Arc::new(Mutex::new(Vec::new())); let state = Arc::clone(&shared_state); @@ -660,7 +660,7 @@ This pattern is useful for reactive workflows within the same process. For cross Per-entity async locking for serialized workflows. `get` acquires the lock, `commit` releases it: -```rust +```rust,ignore use distributed::{AggregateBuilder, HashMapRepository, Queueable, RepositoryError}; let repo = HashMapRepository::new().queued().aggregate::(); @@ -683,7 +683,7 @@ on restart. For **cross-process** serialization, back the queue with a durable SQLx lease lock (feature `postgres` or `sqlite`). It implements the same `AsyncLockManager` trait, so it's a drop-in via `queued_with`: -```rust +```rust,ignore use distributed::{PostgresLockManager, PostgresRepository}; let repo = PostgresRepository::connect_and_migrate(&database_url).await?; @@ -708,7 +708,7 @@ event streams, relational read-model write plans, processed-message marks, snapshots, and outbox rows — staging everything through one SQL transaction when committed via `CommitBatch`. -```rust +```rust,ignore // SQLite — local persistence and conformance (requires `sqlite`) let repo = distributed::SqliteRepository::connect_and_migrate("sqlite::memory:").await?; @@ -728,7 +728,7 @@ read models, the outbox, **and** the durable transport (`PostgresBus`). See Each outbox message is a durable delivery row committed alongside your domain entity. Aggregate event records are write-side replay history; they become domain events, integration events, commands, or transport messages only when application code creates an `OutboxMessage` for that purpose. -```rust +```rust,ignore use distributed::OutboxMessage; let mut todo = Todo::default(); @@ -744,7 +744,7 @@ repo.outbox(message).commit(&mut todo).await?; For custom payloads or IDs, use `encode_for_entity`: -```rust +```rust,ignore let message = OutboxMessage::encode_for_entity( format!("{}:init", todo.entity.id()), "todo.initialized", @@ -769,7 +769,7 @@ The polling worker is the durable backstop in both cases. It is the same `OutboxDispatcher` primitive composed with your runtime's timer — run it in the service process or as a separate worker, against the same outbox store: -```rust +```rust,ignore use distributed::{BusPublisher, OutboxDispatcher}; use std::{sync::Arc, time::Duration}; @@ -803,7 +803,7 @@ two messaging patterns through two traits: A concrete `*Bus` implements both, so the **application surface is identical across transports; only the constructor line changes.** -```rust +```rust,ignore use std::sync::Arc; use distributed::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; @@ -868,7 +868,7 @@ role-based `Broker` + per-name `Trigger` YAML, and the service mounts carries a `FailurePolicy` controlling what happens to a **permanent** handler failure — `Retry`, `DeadLetter`, `Park`, `LogAndAck`, or `Stop`: -```rust +```rust,ignore use distributed::bus::{FailurePolicy, RunOptions}; bus.listen( @@ -895,7 +895,7 @@ A `Service` is generic over a dependency type `D` that handlers read via `ctx Handlers are registered with a fluent builder. `.command(name)` / `.event(name)` start a registration; `.handle(closure)` adds an unguarded handler and `.guarded(guard, closure)` adds a guarded one. The handler closure receives `&Context` and returns a future: -```rust +```rust,ignore use std::sync::Arc; use distributed::microsvc::{Context, HandlerError, Service, Session}; use distributed::{AggregateBuilder, HashMapRepository, Queueable}; @@ -938,7 +938,7 @@ let _result = service `.guarded(guard, handler)` runs the guard before the handler — if it returns `false`, the command is rejected: -```rust +```rust,ignore service .command("admin.reset") .guarded( @@ -951,7 +951,7 @@ service For larger services, organize handlers into separate files. Each handler module exports a `COMMAND` (or `EVENT` / `EVENTS`) name, a `guard`, and an async `handle`: -```rust +```rust,ignore // src/handlers/counter_create.rs use serde::Deserialize; use serde_json::{json, Value}; @@ -989,7 +989,7 @@ pub async fn handle(ctx: &Context<'_, Repo>) -> Result { Register them with the `register_handlers!` macro: -```rust +```rust,ignore let service = distributed::register_handlers!( Service::new().with_repo(HashMapRepository::new().queued().aggregate::()), command handlers::counter_create, @@ -1003,7 +1003,7 @@ Event projection handlers use `EVENT` / `EVENTS` and `event handlers::...` in th The `http` feature adds an axum-based HTTP transport. Every registered command becomes a `POST /:command` endpoint. Request headers flow into the `Session` verbatim — including `x-hasura-*` identity headers, which the framework does **not** authenticate. Deploy behind a trusted proxy that strips client-supplied identity headers and injects authenticated ones (see [Security / Trust Boundary](#security--trust-boundary)). -```rust +```rust,ignore use std::sync::Arc; use distributed::microsvc; @@ -1034,7 +1034,7 @@ curl http://localhost:3000/health The `grpc` feature adds a tonic-based gRPC transport using standard protobuf wire format (no `.proto` file needed): -```rust +```rust,ignore // Get a CommandServiceServer to compose with other tonic routes let grpc_svc = microsvc::grpc_server(service.clone()); @@ -1109,7 +1109,7 @@ Read models are query-optimized relational projections derived from aggregates, ### Defining a Read Model -```rust +```rust,ignore use serde::{Deserialize, Serialize}; use distributed::ReadModel; @@ -1129,7 +1129,7 @@ pub struct GameView { When the response to a command must include the fully consistent, updated view, commit the aggregate and read model together in one transaction: -```rust +```rust,ignore use distributed::{ReadModelWritePlanCommitExt, ReadModelWritePlanBuilder}; // Player submits a move @@ -1148,7 +1148,7 @@ repo.read_models(read_models).commit(&mut game).await?; For related rows, build the same structured write plan: -```rust +```rust,ignore let mut read_models = ReadModelWritePlanBuilder::new(); read_models.upsert(&player_view)?; read_models.upsert_related(&player_view, "weapons", &weapon_view)?; @@ -1161,7 +1161,7 @@ This is a deliberate consistency tradeoff: the read model is in sync with the ag Distributed projectors subscribe to published messages and commit read-model rows through a workspace, marking the message processed in the same adapter transaction for SQL idempotency: -```rust +```rust,ignore use distributed::ReadModelWorkspaceExt; let mut workspace = ctx.read_model_store().workspace(); @@ -1171,7 +1171,7 @@ workspace.commit().await?; ### Loading -```rust +```rust,ignore use distributed::{ReadModelWorkspaceExt, RowKey, RowValue}; let loaded = repo @@ -1191,7 +1191,7 @@ As aggregates accumulate events, replaying from scratch gets expensive. The fram Add `#[derive(Snapshot)]` to your aggregate struct. This generates a state snapshot payload DTO (e.g. `TodoSnapshot`), a `fn snapshot()` method, and the full `impl Snapshottable`: -```rust +```rust,ignore use distributed::{Entity, Snapshot}; #[derive(Default, Snapshot)] @@ -1207,7 +1207,7 @@ Fields with `#[serde(skip)]` (like `emitter: EntityEmitter`) are automatically e **Custom ID key** — when the entity ID maps to a domain field like `sku`: -```rust +```rust,ignore #[derive(Default, Snapshot)] #[snapshot(id = "sku")] struct Inventory { @@ -1219,7 +1219,7 @@ struct Inventory { **Custom entity field name**: -```rust +```rust,ignore #[derive(Default, Snapshot)] #[snapshot(entity = "my_entity")] struct Widget { @@ -1232,7 +1232,7 @@ struct Widget { Chain `.with_snapshots(frequency)` onto any aggregate repository. The frequency is how many events between automatic snapshots: -```rust +```rust,ignore use distributed::{AggregateBuilder, HashMapRepository, Queueable, RepositoryError}; let repo = HashMapRepository::new() @@ -1265,7 +1265,7 @@ Event schemas evolve over time. When you add a field to an event (e.g., `priorit An upcaster is a plain function that converts a typed payload from one version to the next. The crate handles payload decoding and encoding: -```rust +```rust,ignore type InitV1 = (String, String); type InitV2 = (String, String, u8); @@ -1279,7 +1279,7 @@ fn upcast_init_v1_v2((id, task): InitV1) -> InitV2 { With `#[sourced]`, add upcasters directly in the attribute: -```rust +```rust,ignore #[sourced(entity, upcasters( ("initialized", 1 => 2, InitV1 => InitV2, upcast_init_v1_v2), ))] @@ -1304,7 +1304,7 @@ Old events stored as `(id, task)` at v1 are transparently upcast to `(id, task, Upcasters chain automatically. Each transforms one version to the next (v1→v2→v3): -```rust +```rust,ignore #[sourced(entity, upcasters( ("initialized", 1 => 2, InitV1 => InitV2, upcast_init_v1_v2), ("initialized", 2 => 3, InitV2 => InitV3, upcast_init_v2_v3), @@ -1340,7 +1340,7 @@ dsvc schema --dialect postgres # render migration SQL entrypoint (override with `--entrypoint`), which registers the [read models](#read-models) and tables that define the schema: -```rust +```rust,ignore pub fn distributed_manifest() -> distributed::DistributedProjectManifest { distributed::DistributedProjectManifest::new("orders").read_model::() } @@ -1364,7 +1364,7 @@ reference: [`distributed_cli/README.md`](distributed_cli/README.md). ## Project Structure -``` +```text src/ aggregate/ # Aggregate trait, hydration, async aggregate repository helpers commit_builder/ # Async transactional batches for aggregates, outbox, and read models diff --git a/src/bus/capabilities.rs b/src/bus/capabilities.rs index 069a3d3..a15ef74 100644 --- a/src/bus/capabilities.rs +++ b/src/bus/capabilities.rs @@ -12,6 +12,7 @@ /// How a consumer acknowledges successful handler execution back to its /// transport. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +#[non_exhaustive] pub enum ConsumerAckKind { /// Complete / delete / archive a durable table row (Postgres). TableRow, @@ -29,6 +30,7 @@ pub enum ConsumerAckKind { /// How a transport integrates with Knative Eventing. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +#[non_exhaustive] pub enum KnativeIntegrationKind { /// First-class Knative Eventing path (Kafka and RabbitMQ Broker/Source, /// Knative HTTP CloudEvents). @@ -54,6 +56,7 @@ pub enum KnativeIntegrationKind { /// owns retry, backoff, and dead-lettering) from direct transports where the /// adapter and this crate own that operational contract. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +#[non_exhaustive] pub struct TransportCapabilities { /// The transport durably stores received messages until acknowledged (a /// table, queue, stream, or broker), rather than relying on a live process @@ -72,6 +75,30 @@ pub struct TransportCapabilities { } impl TransportCapabilities { + /// Build a custom capability profile for a third-party transport. + /// + /// The named constructors below cover the transports this crate ships; + /// `new` lets an external adapter declare its own profile. It is the + /// supported construction path now that the struct is `#[non_exhaustive]` + /// (an external crate can no longer use a struct literal). New optional + /// dimensions added later default to a conservative value through this + /// constructor rather than breaking callers. + pub const fn new( + durable_receive: bool, + publish_confirm: bool, + platform_managed_retry: bool, + consumer_ack: ConsumerAckKind, + knative_integration: KnativeIntegrationKind, + ) -> Self { + Self { + durable_receive, + publish_confirm, + platform_managed_retry, + consumer_ack, + knative_integration, + } + } + /// Postgres: durable table-backed transport, transaction-commit publish /// confirmation, app-owned retry, row-completion ack, custom Knative bridge. pub const fn postgres() -> Self { diff --git a/src/bus/mod.rs b/src/bus/mod.rs index 210c37f..eafd04a 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -148,4 +148,7 @@ pub use run_options::{ConsumerDeliveryMode, InboxHook, NoInbox, RunOptions}; pub use runner::run_source; pub use source::{AsyncMessageSource, ReceivedMessage}; pub use stable_id::{validate_stable_message_id, StableMessageIdError, MAX_STABLE_MESSAGE_ID_LEN}; -pub(crate) use topology::BusTopologyConfig; +pub use topology::{ + resolve_consumer_group, validate_consumer_group, validate_namespace, BusTopologyConfig, + DEFAULT_BUS_NAMESPACE, MAX_TOPOLOGY_NAME_LEN, +}; diff --git a/src/bus/topology.rs b/src/bus/topology.rs index f08989a..0c181ca 100644 --- a/src/bus/topology.rs +++ b/src/bus/topology.rs @@ -1,11 +1,21 @@ use super::router::MessageRouter; use super::TransportError; -pub(crate) const DEFAULT_BUS_NAMESPACE: &str = "default"; -pub(crate) const MAX_TOPOLOGY_NAME_LEN: usize = 128; - +/// Default broker namespace applied when a transport is constructed without an +/// explicit one. +pub const DEFAULT_BUS_NAMESPACE: &str = "default"; +/// Maximum byte length accepted for a consumer group or namespace name. +pub const MAX_TOPOLOGY_NAME_LEN: usize = 128; + +/// Consumer-group and namespace topology for a broker-backed bus. +/// +/// Third-party transports can reuse this to apply the same portable naming +/// rules (`group`/`namespace` validation) the built-in transports use, instead +/// of reimplementing them. Construct via [`BusTopologyConfig::default`] and +/// refine with [`group`](Self::group) / [`namespace`](Self::namespace), then +/// [`validate_for`](Self::validate_for) before touching broker topology. #[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) struct BusTopologyConfig { +pub struct BusTopologyConfig { group: Option, namespace: String, } @@ -20,25 +30,31 @@ impl Default for BusTopologyConfig { } impl BusTopologyConfig { - pub(crate) fn default_namespace() -> &'static str { + /// The default namespace used when none is set. + pub fn default_namespace() -> &'static str { DEFAULT_BUS_NAMESPACE } - pub(crate) fn group(mut self, group: impl Into) -> Self { + /// Set the durable consumer group. + pub fn group(mut self, group: impl Into) -> Self { self.group = Some(group.into()); self } - pub(crate) fn namespace(mut self, namespace: impl Into) -> Self { + /// Set the broker namespace/prefix. + pub fn namespace(mut self, namespace: impl Into) -> Self { self.namespace = namespace.into(); self } - pub(crate) fn namespace_unchecked(&self) -> &str { + /// The configured namespace without re-validating it. + pub fn namespace_unchecked(&self) -> &str { &self.namespace } - pub(crate) fn resolve_consumer_group( + /// Resolve the effective consumer group, falling back to the router's + /// identity, validating the result. + pub fn resolve_consumer_group( &self, router: &R, transport: &str, @@ -46,11 +62,13 @@ impl BusTopologyConfig { resolve_consumer_group(self.group.as_deref(), router, transport) } - pub(crate) fn namespace_for(&self, transport: &str) -> Result { + /// Validate and return the namespace for the given transport. + pub fn namespace_for(&self, transport: &str) -> Result { validate_namespace(&self.namespace, transport) } - pub(crate) fn validate_for(self, transport: &str) -> Result { + /// Validate both group and namespace, returning a checked config. + pub fn validate_for(self, transport: &str) -> Result { let group = self .group .map(|group| validate_consumer_group(&group, transport)) @@ -79,18 +97,19 @@ impl TopologyNameKind { } } -pub(crate) fn validate_consumer_group( - value: &str, - transport: &str, -) -> Result { +/// Validate a consumer-group name against the portable topology rules. +pub fn validate_consumer_group(value: &str, transport: &str) -> Result { validate_topology_name(value, TopologyNameKind::ConsumerGroup, transport) } -pub(crate) fn validate_namespace(value: &str, transport: &str) -> Result { +/// Validate a namespace name against the portable topology rules (dots allowed). +pub fn validate_namespace(value: &str, transport: &str) -> Result { validate_topology_name(value, TopologyNameKind::Namespace, transport) } -pub(crate) fn resolve_consumer_group( +/// Resolve a consumer group from an explicit value or the router identity, +/// validating the result. +pub fn resolve_consumer_group( explicit: Option<&str>, router: &R, transport: &str, diff --git a/src/lib.rs b/src/lib.rs index eac232f..1aac3a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::module_inception)] +#![doc = include_str!("../README.md")] // Allow proc-macros to reference this crate by name even when used internally extern crate self as distributed; @@ -15,8 +16,8 @@ mod hashmap_repo; pub mod lock; pub mod manifest; pub mod microsvc; -mod outbox; -mod outbox_worker; +pub mod outbox; +pub mod outbox_worker; #[cfg(feature = "postgres")] pub mod postgres_repo; pub mod queued_repo; @@ -64,11 +65,14 @@ pub use lock::{PostgresLock, PostgresLockManager}; #[cfg(feature = "sqlite")] pub use lock::{SqliteLock, SqliteLockManager}; -// Outbox: commit concerns (aggregate + outbox in one commit) +// Outbox: commit concerns (aggregate + outbox in one commit). +// +// Low-level row plumbing (`outbox_message_insert_plan`, `outbox_message_row_values`) +// stays reachable under `distributed::outbox::*` and is intentionally NOT re-exported +// at the crate root — it is an adapter-authoring detail, not part of the quick-start API. pub use outbox::{ - outbox_message_insert_plan, outbox_message_key, outbox_message_row_values, - outbox_message_schema, AggregateCommit, CommitReceipt, OutboxMessage, OutboxMessageStatus, - OutboxPublishHook, OutboxPublisherConfig, OUTBOX_MESSAGES_TABLE, + outbox_message_key, outbox_message_schema, AggregateCommit, CommitReceipt, OutboxMessage, + OutboxMessageStatus, OutboxPublishHook, OutboxPublisherConfig, OUTBOX_MESSAGES_TABLE, }; // Outbox Worker: drain and publish concerns @@ -123,16 +127,10 @@ pub use read_model::{ DEFAULT_READ_MODEL_VERSION_COLUMN, }; -// Neutral table/row primitives shared by read models and operational tables. -pub use table::{ - generate_table_migration_artifacts, table_schema_bootstrap_result, table_schema_statements, - DeleteTableRowMutation, PatchTableRowMutation, TableAdapterCapabilities, TableColumn, - TableCommitOutcome, TableIndex, TableMigrationArtifact, TableModel, TableMutation, - TableRowMutation, TableSchema, TableSchemaAdapter, TableSchemaAdapterCapabilities, - TableSchemaBootstrap, TableSchemaIssue, TableSchemaIssueKind, TableSchemaRegistry, - TableSchemaRegistryExt, TableSchemaVerification, TableSqlDialect, TableSqlSchemaAdapter, - TableStoreError, TableWritePlan, DEFAULT_TABLE_VERSION_COLUMN, -}; +// Neutral table/row primitives shared by read models and operational tables stay +// reachable under their module path (`distributed::table::*`). They are low-level +// schema/adapter plumbing, not part of the quick-start surface, so they are not +// re-exported at the crate root. pub use manifest::{ DistributedManifestEnvelope, DistributedProjectManifest, MessageEndpointManifest, diff --git a/src/manifest.rs b/src/manifest.rs index a8ae32d..ef0759f 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -1,10 +1,10 @@ use serde::{Deserialize, Serialize}; -use crate::{ - generate_table_migration_artifacts, table_schema_statements, ReadModelError, - ReadModelMigrationArtifact, RelationalReadModel, TableSchema, TableSchemaRegistry, +use crate::table::{ + generate_table_migration_artifacts, table_schema_statements, TableSchema, TableSchemaRegistry, TableSqlDialect, }; +use crate::{ReadModelError, ReadModelMigrationArtifact, RelationalReadModel}; pub const DISTRIBUTED_MANIFEST_SCHEMA_VERSION: u32 = 1; diff --git a/src/microsvc/context.rs b/src/microsvc/context.rs index 6a7dbc2..454caa4 100644 --- a/src/microsvc/context.rs +++ b/src/microsvc/context.rs @@ -21,12 +21,13 @@ use crate::bus::Message; /// ## Example /// /// ```ignore -/// pub fn handle( -/// ctx: &Context, +/// pub async fn handle( +/// ctx: &Context<'_, Repo>, /// ) -> Result { /// let user_id = ctx.user_id()?; /// let input = ctx.input::()?; /// let repo = ctx.repo(); +/// // ... commit via repo.commit(&mut agg).await? ... /// } /// ``` pub struct Context<'a, D> { diff --git a/src/microsvc/error.rs b/src/microsvc/error.rs index 1c1baf4..faeb341 100644 --- a/src/microsvc/error.rs +++ b/src/microsvc/error.rs @@ -9,6 +9,7 @@ use crate::{repository::RepositoryError, EventRecordError}; /// Error type for command handler operations. #[derive(Debug)] +#[non_exhaustive] pub enum HandlerError { /// No handler registered for this command name. UnknownCommand(String), diff --git a/src/microsvc/mod.rs b/src/microsvc/mod.rs index 695b5dc..caa2e69 100644 --- a/src/microsvc/mod.rs +++ b/src/microsvc/mod.rs @@ -6,6 +6,9 @@ //! //! ## Quick Start //! +//! Dispatch is **async** — `dispatch`, `handle`, and the commit path all return +//! futures and are awaited. +//! //! ```ignore //! use std::sync::Arc; //! use distributed::{microsvc, HashMapRepository}; @@ -15,13 +18,15 @@ //! microsvc::Service::new().with_repo(HashMapRepository::new()) //! .command("order.create") //! .handle(|ctx| { -//! let input = ctx.input::()?; -//! Ok(json!({ "id": input.id })) +//! let input = ctx.input::(); +//! async move { Ok(json!({ "id": input?.id })) } //! }) //! ); //! -//! // Direct dispatch -//! let result = service.dispatch("order.create", json!({ "id": "o1" }), microsvc::Session::new()); +//! // Direct dispatch (async) +//! let result = service +//! .dispatch("order.create", json!({ "id": "o1" }), microsvc::Session::new()) +//! .await?; //! //! // HTTP transport (requires "http" feature) //! // microsvc::serve(service, "0.0.0.0:3000").await?; @@ -29,26 +34,22 @@ //! //! ## Handler Convention //! -//! Each handler file follows this convention: +//! Each handler file follows this convention. `handle` is **async**: //! //! ```ignore //! // src/handlers/order_create.rs //! //! pub const COMMAND: &str = "order.create"; //! -//! pub fn guard(ctx: µsvc::Context) -> bool { +//! pub fn guard(ctx: µsvc::Context) -> bool { //! ctx.has_fields(&["id", "product_id"]) //! } //! -//! pub fn handle(ctx: µsvc::Context) -> Result -//! where -//! D: microsvc::HasRepo, -//! D::Repo: CommitAggregate, -//! { +//! pub async fn handle(ctx: µsvc::Context<'_, Repo>) -> Result { //! let input = ctx.input::()?; //! let mut order = Order::default(); -//! order.create(input.id); -//! ctx.repo().commit_aggregate(&mut order)?; +//! order.create(input.id)?; +//! ctx.repo().commit(&mut order).await?; //! Ok(json!({ "id": order.entity().id() })) //! } //! ``` diff --git a/src/microsvc/service.rs b/src/microsvc/service.rs index 8155a5d..cab964c 100644 --- a/src/microsvc/service.rs +++ b/src/microsvc/service.rs @@ -5,6 +5,8 @@ //! //! ## Example //! +//! The handler closure returns a future, and `dispatch` is awaited: +//! //! ```ignore //! use distributed::microsvc; //! use serde_json::json; @@ -12,11 +14,13 @@ //! let service = microsvc::Service::new() //! .command("order.create") //! .handle(|ctx| { -//! let input = ctx.input::()?; -//! Ok(json!({ "id": input.id })) +//! let input = ctx.input::(); +//! async move { Ok(json!({ "id": input?.id })) } //! }); //! -//! let result = service.dispatch("order.create", json!({"id": "1"}), Session::new()); +//! let result = service +//! .dispatch("order.create", json!({"id": "1"}), Session::new()) +//! .await?; //! ``` use std::collections::HashMap; diff --git a/src/outbox/message.rs b/src/outbox/message.rs index f992b9c..7e67212 100644 --- a/src/outbox/message.rs +++ b/src/outbox/message.rs @@ -48,6 +48,7 @@ impl std::str::FromStr for OutboxMessageStatus { /// It is not an aggregate stream; repositories store it in their outbox storage /// and workers update delivery state directly. #[derive(Clone, Debug, Serialize, Deserialize)] +#[non_exhaustive] pub struct OutboxMessage { pub id: String, pub event_type: String, diff --git a/src/outbox_worker/mod.rs b/src/outbox_worker/mod.rs index 498dcb9..98cc9d1 100644 --- a/src/outbox_worker/mod.rs +++ b/src/outbox_worker/mod.rs @@ -1,10 +1,20 @@ //! Outbox Worker - Drains and publishes outbox messages. //! -//! This module provides the worker infrastructure for processing outbox messages: +//! This module provides the worker infrastructure for processing outbox messages. +//! +//! There are two drain paths: +//! - **Production / extension point**: the async `OutboxDispatcher` + +//! `AsyncMessagePublisher` (`BusPublisher` over a `Bus`), wired by +//! `service.with_bus(bus)`. +//! - **Dev/test**: the synchronous `OutboxWorker` + `OutboxPublisher` + +//! `LogPublisher` trio — no async runtime, no real transport. +//! +//! Items: //! - `OutboxStore` - Store operations for claiming and completing messages -//! - `OutboxWorker` - Synchronous message processor -//! - `OutboxPublisher` - Trait for publishing to external systems -//! - `LogPublisher` - Simple logging publisher for testing +//! - `OutboxDispatcher` / `BusPublisher` - the async production drain path +//! - `OutboxWorker` - synchronous dev/test message processor +//! - `OutboxPublisher` - synchronous dev/test publish trait +//! - `LogPublisher` - simple logging publisher for tests //! - `LocalEmitterPublisher` - In-process event emitter (requires `emitter` feature) //! //! ## Separation of Concerns diff --git a/src/outbox_worker/publisher.rs b/src/outbox_worker/publisher.rs index c36228b..5f320b3 100644 --- a/src/outbox_worker/publisher.rs +++ b/src/outbox_worker/publisher.rs @@ -5,7 +5,31 @@ use std::sync::{Arc, Mutex}; #[cfg(feature = "emitter")] use crate::EventEmitter; -/// Trait for publishing outbox records to external systems. +/// Synchronous publisher trait used by the in-process [`OutboxWorker`] — a +/// **dev/test** drain loop. +/// +/// # Which publisher path to use +/// +/// This crate has two outbox-publishing paths, and they are not parallel +/// hierarchies — they sit at different layers: +/// +/// - **Production / the extension point: [`AsyncMessagePublisher`]**. The async +/// [`OutboxDispatcher`] drains durable rows and publishes them through an +/// `AsyncMessagePublisher`. [`BusPublisher`] adapts any [`Bus`] into one, and +/// `service.with_bus(bus)` wires this path automatically so +/// `repo.outbox(msg).commit(agg)` publishes on commit. Implement +/// `AsyncMessagePublisher` to plug in a custom transport. +/// - **Dev/test only: this trait + [`OutboxWorker`] + [`LogPublisher`]**. A +/// synchronous, in-process processor with no async runtime and no real +/// transport. Useful in unit tests and examples that just need to observe +/// that rows drain; not the production drain path. +/// +/// [`AsyncMessagePublisher`]: crate::bus::AsyncMessagePublisher +/// [`OutboxDispatcher`]: crate::OutboxDispatcher +/// [`BusPublisher`]: crate::BusPublisher +/// [`Bus`]: crate::bus::Bus +/// [`OutboxWorker`]: crate::OutboxWorker +/// [`LogPublisher`]: crate::LogPublisher pub trait OutboxPublisher { type Error: fmt::Display; @@ -35,7 +59,13 @@ impl fmt::Display for LogPublisherError { impl std::error::Error for LogPublisherError {} -/// A simple publisher that logs events to stdout or a buffer. +/// A simple **dev/test** publisher that logs events to stdout or a buffer. +/// +/// It is the in-memory default for the synchronous [`OutboxPublisher`] / +/// [`OutboxWorker`] processor. For production publishing implement +/// [`AsyncMessagePublisher`](crate::bus::AsyncMessagePublisher) (or use +/// [`BusPublisher`](crate::BusPublisher) over a real [`Bus`](crate::bus::Bus)); +/// see [`OutboxPublisher`] for the full comparison of the two paths. pub struct LogPublisher { buffer: Option>>>, } diff --git a/src/outbox_worker/worker.rs b/src/outbox_worker/worker.rs index 6edc342..5ca7686 100644 --- a/src/outbox_worker/worker.rs +++ b/src/outbox_worker/worker.rs @@ -28,10 +28,21 @@ pub struct ProcessOneResult { pub failed: bool, } -/// Worker for processing outbox messages. +/// Synchronous, in-process worker for processing outbox messages — a **dev/test** +/// drain loop. /// /// The repository is responsible for claiming pending messages. The worker -/// processes messages that are already loaded and (optionally) claimed. +/// processes messages that are already loaded and (optionally) claimed via a +/// synchronous [`OutboxPublisher`]. +/// +/// For the production drain path use the async [`OutboxDispatcher`] with an +/// [`AsyncMessagePublisher`] (e.g. [`BusPublisher`] over a real [`Bus`]); see +/// [`OutboxPublisher`] for the full comparison. +/// +/// [`OutboxDispatcher`]: crate::OutboxDispatcher +/// [`AsyncMessagePublisher`]: crate::bus::AsyncMessagePublisher +/// [`BusPublisher`]: crate::BusPublisher +/// [`Bus`]: crate::bus::Bus pub struct OutboxWorker

{ publisher: P, worker_id: String, diff --git a/src/read_model/mod.rs b/src/read_model/mod.rs index d9ce412..d8beed2 100644 --- a/src/read_model/mod.rs +++ b/src/read_model/mod.rs @@ -51,6 +51,7 @@ pub struct Versioned { /// Error type for read model store operations. #[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] pub enum ReadModelError { /// Optimistic concurrency conflict. ConcurrencyConflict { diff --git a/tests/distributed_read_model/read_models/mod.rs b/tests/distributed_read_model/read_models/mod.rs index f832fb7..af1576c 100644 --- a/tests/distributed_read_model/read_models/mod.rs +++ b/tests/distributed_read_model/read_models/mod.rs @@ -11,7 +11,7 @@ pub use checkout_view::CheckoutView; pub use seat_view::SeatView; #[cfg(any(feature = "postgres", feature = "sqlite"))] -use distributed::TableSchemaRegistry; +use distributed::table::TableSchemaRegistry; use distributed::{InMemoryReadModelStore, ReadModelError, RowKey, RowValue}; pub fn register_schemas(store: &InMemoryReadModelStore) -> Result<(), ReadModelError> { diff --git a/tests/sqlite_repository/main.rs b/tests/sqlite_repository/main.rs index a935aa3..10a652a 100644 --- a/tests/sqlite_repository/main.rs +++ b/tests/sqlite_repository/main.rs @@ -2,12 +2,13 @@ use std::collections::HashMap; +use distributed::table::TableSchemaRegistry; use distributed::{ sourced, Aggregate, AggregateBuilder, AsyncOutboxStore, CommitBatch, Entity, GetStream, OutboxMessage, OutboxMessageStatus, ReadModel, ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt, RepositoryError, RowKey, RowPatch, RowValue, SnapshotRecord, - SnapshotStore, SqliteRepository, StreamIdentity, StreamWrite, TableSchemaRegistry, - TransactionalCommit, OUTBOX_MESSAGES_TABLE, + SnapshotStore, SqliteRepository, StreamIdentity, StreamWrite, TransactionalCommit, + OUTBOX_MESSAGES_TABLE, }; use serde::{Deserialize, Serialize}; From da55d6ba2b4179961d574e1f2d6370310806e50e Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Thu, 11 Jun 2026 04:16:26 -0500 Subject: [PATCH 2/2] fix: keep TableSchemaRegistry re-exported at crate root MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The root re-export pruning in this PR was too aggressive: it dropped `table::*` (including `TableSchemaRegistry`) and the outbox row-plumbing fns from the crate root, but `TableSchemaRegistry` is consumed by the crate's own integration tests via `distributed::TableSchemaRegistry` (tests/postgres_repository/main.rs) and is the entry point downstream users build before bootstrapping table migrations. A symbol that the integration tests import through the crate root is effectively public API, so it must stay re-exported. The local verification used `--features sqlite`, which does not compile the postgres/all-features test crates, so the unresolved-import breaks were missed in CI (coverage `--all-features` and postgres jobs). Re-add only `TableSchemaRegistry` to the crate root. The remaining table primitives stay reachable under `distributed::table::*` (no test/external root consumers), and the genuinely internal outbox row plumbing (`outbox_message_insert_plan`, `outbox_message_row_values`) stays pruned — confirmed zero consumers outside `src/outbox/`. All test targets now compile, including the postgres and all-features (librdkafka) coverage targets that previously failed. Refines [[tasks/public-api-docs-hygiene]] --- src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 1aac3a3..09c8cab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -131,6 +131,12 @@ pub use read_model::{ // reachable under their module path (`distributed::table::*`). They are low-level // schema/adapter plumbing, not part of the quick-start surface, so they are not // re-exported at the crate root. +// +// Exception: `TableSchemaRegistry` is the entry point for registering operational +// table schemas (it is what callers build before bootstrapping migrations), so it +// is consumed directly by downstream users and integration tests. It stays at the +// crate root as part of the public surface. +pub use table::TableSchemaRegistry; pub use manifest::{ DistributedManifestEnvelope, DistributedProjectManifest, MessageEndpointManifest,