Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 51 additions & 51 deletions README.md

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions src/bus/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
/// How a consumer acknowledges successful handler execution back to its
/// transport.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub enum ConsumerAckKind {
/// Complete / delete / archive a durable table row (Postgres).
TableRow,
Expand All @@ -29,6 +30,7 @@ pub enum ConsumerAckKind {

/// How a transport integrates with Knative Eventing.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub enum KnativeIntegrationKind {
/// First-class Knative Eventing path (Kafka and RabbitMQ Broker/Source,
/// Knative HTTP CloudEvents).
Expand All @@ -54,6 +56,7 @@ pub enum KnativeIntegrationKind {
/// owns retry, backoff, and dead-lettering) from direct transports where the
/// adapter and this crate own that operational contract.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct TransportCapabilities {
/// The transport durably stores received messages until acknowledged (a
/// table, queue, stream, or broker), rather than relying on a live process
Expand All @@ -72,6 +75,30 @@ pub struct TransportCapabilities {
}

impl TransportCapabilities {
/// Build a custom capability profile for a third-party transport.
///
/// The named constructors below cover the transports this crate ships;
/// `new` lets an external adapter declare its own profile. It is the
/// supported construction path now that the struct is `#[non_exhaustive]`
/// (an external crate can no longer use a struct literal). New optional
/// dimensions added later default to a conservative value through this
/// constructor rather than breaking callers.
pub const fn new(
durable_receive: bool,
publish_confirm: bool,
platform_managed_retry: bool,
consumer_ack: ConsumerAckKind,
knative_integration: KnativeIntegrationKind,
) -> Self {
Self {
durable_receive,
publish_confirm,
platform_managed_retry,
consumer_ack,
knative_integration,
}
}

/// Postgres: durable table-backed transport, transaction-commit publish
/// confirmation, app-owned retry, row-completion ack, custom Knative bridge.
pub const fn postgres() -> Self {
Expand Down
5 changes: 4 additions & 1 deletion src/bus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,7 @@ pub use run_options::{ConsumerDeliveryMode, InboxHook, NoInbox, RunOptions};
pub use runner::run_source;
pub use source::{AsyncMessageSource, ReceivedMessage};
pub use stable_id::{validate_stable_message_id, StableMessageIdError, MAX_STABLE_MESSAGE_ID_LEN};
pub(crate) use topology::BusTopologyConfig;
pub use topology::{
resolve_consumer_group, validate_consumer_group, validate_namespace, BusTopologyConfig,
DEFAULT_BUS_NAMESPACE, MAX_TOPOLOGY_NAME_LEN,
};
53 changes: 36 additions & 17 deletions src/bus/topology.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
use super::router::MessageRouter;
use super::TransportError;

pub(crate) const DEFAULT_BUS_NAMESPACE: &str = "default";
pub(crate) const MAX_TOPOLOGY_NAME_LEN: usize = 128;

/// Default broker namespace applied when a transport is constructed without an
/// explicit one.
pub const DEFAULT_BUS_NAMESPACE: &str = "default";
/// Maximum byte length accepted for a consumer group or namespace name.
pub const MAX_TOPOLOGY_NAME_LEN: usize = 128;

/// Consumer-group and namespace topology for a broker-backed bus.
///
/// Third-party transports can reuse this to apply the same portable naming
/// rules (`group`/`namespace` validation) the built-in transports use, instead
/// of reimplementing them. Construct via [`BusTopologyConfig::default`] and
/// refine with [`group`](Self::group) / [`namespace`](Self::namespace), then
/// [`validate_for`](Self::validate_for) before touching broker topology.
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct BusTopologyConfig {
pub struct BusTopologyConfig {
group: Option<String>,
namespace: String,
}
Expand All @@ -20,37 +30,45 @@ impl Default for BusTopologyConfig {
}

impl BusTopologyConfig {
pub(crate) fn default_namespace() -> &'static str {
/// The default namespace used when none is set.
pub fn default_namespace() -> &'static str {
DEFAULT_BUS_NAMESPACE
}

pub(crate) fn group(mut self, group: impl Into<String>) -> Self {
/// Set the durable consumer group.
pub fn group(mut self, group: impl Into<String>) -> Self {
self.group = Some(group.into());
self
}

pub(crate) fn namespace(mut self, namespace: impl Into<String>) -> Self {
/// Set the broker namespace/prefix.
pub fn namespace(mut self, namespace: impl Into<String>) -> Self {
self.namespace = namespace.into();
self
}

pub(crate) fn namespace_unchecked(&self) -> &str {
/// The configured namespace without re-validating it.
pub fn namespace_unchecked(&self) -> &str {
&self.namespace
}

pub(crate) fn resolve_consumer_group<R: MessageRouter>(
/// Resolve the effective consumer group, falling back to the router's
/// identity, validating the result.
pub fn resolve_consumer_group<R: MessageRouter>(
&self,
router: &R,
transport: &str,
) -> Result<String, TransportError> {
resolve_consumer_group(self.group.as_deref(), router, transport)
}

pub(crate) fn namespace_for(&self, transport: &str) -> Result<String, TransportError> {
/// Validate and return the namespace for the given transport.
pub fn namespace_for(&self, transport: &str) -> Result<String, TransportError> {
validate_namespace(&self.namespace, transport)
}

pub(crate) fn validate_for(self, transport: &str) -> Result<Self, TransportError> {
/// Validate both group and namespace, returning a checked config.
pub fn validate_for(self, transport: &str) -> Result<Self, TransportError> {
let group = self
.group
.map(|group| validate_consumer_group(&group, transport))
Expand Down Expand Up @@ -79,18 +97,19 @@ impl TopologyNameKind {
}
}

pub(crate) fn validate_consumer_group(
value: &str,
transport: &str,
) -> Result<String, TransportError> {
/// Validate a consumer-group name against the portable topology rules.
pub fn validate_consumer_group(value: &str, transport: &str) -> Result<String, TransportError> {
validate_topology_name(value, TopologyNameKind::ConsumerGroup, transport)
}

pub(crate) fn validate_namespace(value: &str, transport: &str) -> Result<String, TransportError> {
/// Validate a namespace name against the portable topology rules (dots allowed).
pub fn validate_namespace(value: &str, transport: &str) -> Result<String, TransportError> {
validate_topology_name(value, TopologyNameKind::Namespace, transport)
}

pub(crate) fn resolve_consumer_group<R: MessageRouter>(
/// Resolve a consumer group from an explicit value or the router identity,
/// validating the result.
pub fn resolve_consumer_group<R: MessageRouter>(
explicit: Option<&str>,
router: &R,
transport: &str,
Expand Down
36 changes: 20 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![allow(clippy::module_inception)]
#![doc = include_str!("../README.md")]

// Allow proc-macros to reference this crate by name even when used internally
extern crate self as distributed;
Expand All @@ -15,8 +16,8 @@ mod hashmap_repo;
pub mod lock;
pub mod manifest;
pub mod microsvc;
mod outbox;
mod outbox_worker;
pub mod outbox;
pub mod outbox_worker;
#[cfg(feature = "postgres")]
pub mod postgres_repo;
pub mod queued_repo;
Expand Down Expand Up @@ -64,11 +65,14 @@ pub use lock::{PostgresLock, PostgresLockManager};
#[cfg(feature = "sqlite")]
pub use lock::{SqliteLock, SqliteLockManager};

// Outbox: commit concerns (aggregate + outbox in one commit)
// Outbox: commit concerns (aggregate + outbox in one commit).
//
// Low-level row plumbing (`outbox_message_insert_plan`, `outbox_message_row_values`)
// stays reachable under `distributed::outbox::*` and is intentionally NOT re-exported
// at the crate root — it is an adapter-authoring detail, not part of the quick-start API.
pub use outbox::{
outbox_message_insert_plan, outbox_message_key, outbox_message_row_values,
outbox_message_schema, AggregateCommit, CommitReceipt, OutboxMessage, OutboxMessageStatus,
OutboxPublishHook, OutboxPublisherConfig, OUTBOX_MESSAGES_TABLE,
outbox_message_key, outbox_message_schema, AggregateCommit, CommitReceipt, OutboxMessage,
OutboxMessageStatus, OutboxPublishHook, OutboxPublisherConfig, OUTBOX_MESSAGES_TABLE,
};

// Outbox Worker: drain and publish concerns
Expand Down Expand Up @@ -123,16 +127,16 @@ pub use read_model::{
DEFAULT_READ_MODEL_VERSION_COLUMN,
};

// Neutral table/row primitives shared by read models and operational tables.
pub use table::{
generate_table_migration_artifacts, table_schema_bootstrap_result, table_schema_statements,
DeleteTableRowMutation, PatchTableRowMutation, TableAdapterCapabilities, TableColumn,
TableCommitOutcome, TableIndex, TableMigrationArtifact, TableModel, TableMutation,
TableRowMutation, TableSchema, TableSchemaAdapter, TableSchemaAdapterCapabilities,
TableSchemaBootstrap, TableSchemaIssue, TableSchemaIssueKind, TableSchemaRegistry,
TableSchemaRegistryExt, TableSchemaVerification, TableSqlDialect, TableSqlSchemaAdapter,
TableStoreError, TableWritePlan, DEFAULT_TABLE_VERSION_COLUMN,
};
// Neutral table/row primitives shared by read models and operational tables stay
// reachable under their module path (`distributed::table::*`). They are low-level
// schema/adapter plumbing, not part of the quick-start surface, so they are not
// re-exported at the crate root.
//
// Exception: `TableSchemaRegistry` is the entry point for registering operational
// table schemas (it is what callers build before bootstrapping migrations), so it
// is consumed directly by downstream users and integration tests. It stays at the
// crate root as part of the public surface.
pub use table::TableSchemaRegistry;

pub use manifest::{
DistributedManifestEnvelope, DistributedProjectManifest, MessageEndpointManifest,
Expand Down
6 changes: 3 additions & 3 deletions src/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use serde::{Deserialize, Serialize};

use crate::{
generate_table_migration_artifacts, table_schema_statements, ReadModelError,
ReadModelMigrationArtifact, RelationalReadModel, TableSchema, TableSchemaRegistry,
use crate::table::{
generate_table_migration_artifacts, table_schema_statements, TableSchema, TableSchemaRegistry,
TableSqlDialect,
};
use crate::{ReadModelError, ReadModelMigrationArtifact, RelationalReadModel};

pub const DISTRIBUTED_MANIFEST_SCHEMA_VERSION: u32 = 1;

Expand Down
5 changes: 3 additions & 2 deletions src/microsvc/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use crate::bus::Message;
/// ## Example
///
/// ```ignore
/// pub fn handle<D: HasRepo>(
/// ctx: &Context<D>,
/// pub async fn handle(
/// ctx: &Context<'_, Repo>,
/// ) -> Result<Value, HandlerError> {
/// let user_id = ctx.user_id()?;
/// let input = ctx.input::<CreateOrderInput>()?;
/// let repo = ctx.repo();
/// // ... commit via repo.commit(&mut agg).await? ...
/// }
/// ```
pub struct Context<'a, D> {
Expand Down
1 change: 1 addition & 0 deletions src/microsvc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{repository::RepositoryError, EventRecordError};

/// Error type for command handler operations.
#[derive(Debug)]
#[non_exhaustive]
pub enum HandlerError {
/// No handler registered for this command name.
UnknownCommand(String),
Expand Down
27 changes: 14 additions & 13 deletions src/microsvc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
//!
//! ## Quick Start
//!
//! Dispatch is **async** — `dispatch`, `handle`, and the commit path all return
//! futures and are awaited.
//!
//! ```ignore
//! use std::sync::Arc;
//! use distributed::{microsvc, HashMapRepository};
Expand All @@ -15,40 +18,38 @@
//! microsvc::Service::new().with_repo(HashMapRepository::new())
//! .command("order.create")
//! .handle(|ctx| {
//! let input = ctx.input::<CreateOrderInput>()?;
//! Ok(json!({ "id": input.id }))
//! let input = ctx.input::<CreateOrderInput>();
//! async move { Ok(json!({ "id": input?.id })) }
//! })
//! );
//!
//! // Direct dispatch
//! let result = service.dispatch("order.create", json!({ "id": "o1" }), microsvc::Session::new());
//! // Direct dispatch (async)
//! let result = service
//! .dispatch("order.create", json!({ "id": "o1" }), microsvc::Session::new())
//! .await?;
//!
//! // HTTP transport (requires "http" feature)
//! // microsvc::serve(service, "0.0.0.0:3000").await?;
//! ```
//!
//! ## Handler Convention
//!
//! Each handler file follows this convention:
//! Each handler file follows this convention. `handle` is **async**:
//!
//! ```ignore
//! // src/handlers/order_create.rs
//!
//! pub const COMMAND: &str = "order.create";
//!
//! pub fn guard<D>(ctx: &microsvc::Context<D>) -> bool {
//! pub fn guard(ctx: &microsvc::Context<Repo>) -> bool {
//! ctx.has_fields(&["id", "product_id"])
//! }
//!
//! pub fn handle<D>(ctx: &microsvc::Context<D>) -> Result<Value, microsvc::HandlerError>
//! where
//! D: microsvc::HasRepo,
//! D::Repo: CommitAggregate,
//! {
//! pub async fn handle(ctx: &microsvc::Context<'_, Repo>) -> Result<Value, microsvc::HandlerError> {
//! let input = ctx.input::<CreateOrderInput>()?;
//! let mut order = Order::default();
//! order.create(input.id);
//! ctx.repo().commit_aggregate(&mut order)?;
//! order.create(input.id)?;
//! ctx.repo().commit(&mut order).await?;
//! Ok(json!({ "id": order.entity().id() }))
//! }
//! ```
Expand Down
10 changes: 7 additions & 3 deletions src/microsvc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@
//!
//! ## Example
//!
//! The handler closure returns a future, and `dispatch` is awaited:
//!
//! ```ignore
//! use distributed::microsvc;
//! use serde_json::json;
//!
//! let service = microsvc::Service::new()
//! .command("order.create")
//! .handle(|ctx| {
//! let input = ctx.input::<CreateOrderInput>()?;
//! Ok(json!({ "id": input.id }))
//! let input = ctx.input::<CreateOrderInput>();
//! async move { Ok(json!({ "id": input?.id })) }
//! });
//!
//! let result = service.dispatch("order.create", json!({"id": "1"}), Session::new());
//! let result = service
//! .dispatch("order.create", json!({"id": "1"}), Session::new())
//! .await?;
//! ```

use std::collections::HashMap;
Expand Down
1 change: 1 addition & 0 deletions src/outbox/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl std::str::FromStr for OutboxMessageStatus {
/// It is not an aggregate stream; repositories store it in their outbox storage
/// and workers update delivery state directly.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct OutboxMessage {
pub id: String,
pub event_type: String,
Expand Down
18 changes: 14 additions & 4 deletions src/outbox_worker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
//! Outbox Worker - Drains and publishes outbox messages.
//!
//! This module provides the worker infrastructure for processing outbox messages:
//! This module provides the worker infrastructure for processing outbox messages.
//!
//! There are two drain paths:
//! - **Production / extension point**: the async `OutboxDispatcher` +
//! `AsyncMessagePublisher` (`BusPublisher` over a `Bus`), wired by
//! `service.with_bus(bus)`.
//! - **Dev/test**: the synchronous `OutboxWorker` + `OutboxPublisher` +
//! `LogPublisher` trio — no async runtime, no real transport.
//!
//! Items:
//! - `OutboxStore` - Store operations for claiming and completing messages
//! - `OutboxWorker` - Synchronous message processor
//! - `OutboxPublisher` - Trait for publishing to external systems
//! - `LogPublisher` - Simple logging publisher for testing
//! - `OutboxDispatcher` / `BusPublisher` - the async production drain path
//! - `OutboxWorker` - synchronous dev/test message processor
//! - `OutboxPublisher` - synchronous dev/test publish trait
//! - `LogPublisher` - simple logging publisher for tests
//! - `LocalEmitterPublisher` - In-process event emitter (requires `emitter` feature)
//!
//! ## Separation of Concerns
Expand Down
Loading
Loading