sourcery provides building blocks for pragmatic event-sourced systems in Rust. It keeps your domain types pure while giving you tools to rebuild state, project read models, and persist events through a pluggable store interface.
The crate is intentionally minimal. You wire the repository into whichever pipeline you prefer—whether that’s a web framework, actor system, or CLI tool.
Event sourcing stores state as a sequence of events rather than as a single mutable record. Instead of updating a row in a database, you append an event describing what happened. Current state is derived by replaying events from the beginning.
When you need the current state of an entity, the repository replays its events in order into a fresh aggregate instance. This “replay” process runs every time you load an aggregate. For long-lived entities, snapshots optimise this by checkpointing state periodically.
Command Query Responsibility Segregation (CQRS) separates the write model (how you change data) from the read model (how you query data). Event sourcing and CQRS complement each other naturally.
A single aggregate might feed multiple projections (account summary, transaction history, fraud detection, analytics). Each projection sees the same events but builds a different view.
With CQRS, read models may not immediately reflect the latest write. This is fine for most UIs—users expect slight delays. For operations requiring strong consistency, read from the aggregate itself (via repo.load()).
ApplicationRepositoryEventStoreProjectionload_projection::<P>(&instance_id)P::filters(&instance_id)Filters (event filters + handlers)load_events(filters)Vec<StoredEvent>P::init(&instance_id)apply_projection(id, event, meta) [For each event]Projection
Projections define their event filters centrally in the ProjectionFilters trait. The repository calls filters() to determine which events to load, then replays them into the projection.
Build a simple bank account aggregate in a compact, end-to-end example. This example demonstrates events, commands, an aggregate, a projection, and the repository.
pub trait Aggregate: Default {
/// Aggregate type identifier used by the event store.
///
/// This is combined with the aggregate ID to create stream identifiers.
/// Use lowercase, kebab-case for consistency: `"product"`,
/// `"user-account"`, etc.
const KIND: &'static str;
type Event;
type Error;
type Id;
/// Apply an event to update aggregate state.
///
/// This is called during event replay to rebuild aggregate state from
/// history.
///
/// When using `#[derive(Aggregate)]`, this dispatches to your `Apply<E>`
/// implementations. For hand-written aggregates, implement this
/// directly with a match expression.
fn apply(&mut self, event: &Self::Event);
}
pub trait Handle<C>: Aggregate {
/// Handle a command and produce events.
///
/// # Errors
///
/// Returns `Self::Error` if the command is invalid for the current
/// aggregate state.
fn handle(&self, command: &C) -> Result<Vec<Self::Event>, Self::Error>;
}
Filters::new()
.event::<FundsDeposited>() // Global: all aggregates
.event_for::<Account, FundsWithdrawn>(&id) // One event type, one aggregate instance
.events::<AccountEvent>() // All kinds in an event enum
.events_for::<Account>(&id) // All event kinds for one aggregate instance
You can also inspect the trait definitions directly:
pub trait Projection {
/// Stable identifier for this projection type.
const KIND: &'static str;
}
pub trait ApplyProjection<E>: ProjectionFilters {
fn apply_projection(&mut self, aggregate_id: &Self::Id, event: &E, metadata: &Self::Metadata);
}
pub trait ProjectionFilters: Sized {
/// Aggregate identifier type this subscriber is compatible with.
type Id;
/// Instance identifier for this subscriber.
///
/// For singleton subscribers use `()`.
type InstanceId;
/// Metadata type expected by this subscriber's handlers.
type Metadata;
/// Construct a fresh instance from the instance identifier.
///
/// For singleton projections (`InstanceId = ()`), this typically
/// delegates to `Self::default()`. For instance projections, this
/// captures the instance identifier at construction time.
fn init(instance_id: &Self::InstanceId) -> Self;
/// Build the filter set and handler map for this subscriber.
fn filters<S>(instance_id: &Self::InstanceId) -> Filters<S, Self>
where
S: EventStore<Id = Self::Id, Metadata = Self::Metadata>;
}
pub trait EventStore: Send + Sync {
/// Aggregate identifier type.
///
/// This type must be clonable so repositories can reuse IDs across calls.
/// Common choices: `String`, `Uuid`, or custom ID types.
type Id: Clone + Send + Sync + 'static;
/// Position type used for ordering events and version checking.
///
/// Must be `Clone + PartialEq` to support optimistic concurrency.
/// Use `()` if ordering is not needed.
type Position: Clone + PartialEq + std::fmt::Debug + Send + Sync + 'static;
/// Store-specific error type.
type Error: std::error::Error + Send + Sync + 'static;
/// Metadata type for infrastructure concerns.
type Metadata: Send + Sync + 'static;
/// Serialised event payload type.
///
/// This is the format used to store event data internally. Common choices:
/// - `serde_json::Value` for JSON-based stores
/// - `Vec<u8>` for binary stores
type Data: Clone + Send + Sync + 'static;
/// Decode a stored event into a concrete event type.
///
/// Deserialises the `data` field of a [`StoredEvent`] back into a domain
/// event.
///
/// # Errors
///
/// Returns an error if deserialisation fails.
fn decode_event<E>(
&self,
stored: &StoredEvent<Self::Id, Self::Position, Self::Data, Self::Metadata>,
) -> Result<E, Self::Error>
where
E: crate::event::DomainEvent + serde::de::DeserializeOwned;
/// Get the current version (latest position) for an aggregate stream.
///
/// Returns `None` for streams with no events.
///
/// # Errors
///
/// Returns a store-specific error when the operation fails.
fn stream_version<'a>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;
/// Commit events to an aggregate stream without version checking.
///
/// Events are serialised and persisted atomically. No conflict detection
/// is performed (last-writer-wins).
///
/// # Errors
///
/// Returns [`CommitError::Serialization`] if an event fails to serialise,
/// or [`CommitError::Store`] if persistence fails.
fn commit_events<'a, E>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
events: NonEmpty<E>,
metadata: &'a Self::Metadata,
) -> impl Future<Output = Result<Committed<Self::Position>, CommitError<Self::Error>>> + Send + 'a
where
E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
Self::Metadata: Clone;
/// Commit events to an aggregate stream with optimistic concurrency
/// control.
///
/// Events are serialised and persisted atomically. The commit fails if:
/// - `expected_version` is `Some(v)` and the current version differs from
/// `v`
/// - `expected_version` is `None` and the stream already has events (new
/// aggregate expected)
///
/// # Errors
///
/// Returns [`OptimisticCommitError::Serialization`] if an event fails to
/// serialise, [`OptimisticCommitError::Conflict`] if the version check
/// fails, or [`OptimisticCommitError::Store`] if persistence fails.
#[allow(clippy::type_complexity)]
fn commit_events_optimistic<'a, E>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
expected_version: Option<Self::Position>,
events: NonEmpty<E>,
metadata: &'a Self::Metadata,
) -> impl Future<
Output = Result<
Committed<Self::Position>,
OptimisticCommitError<Self::Position, Self::Error>,
>,
> + Send
+ 'a
where
E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
Self::Metadata: Clone;
/// Load events matching the specified filters.
///
/// Each filter describes an event kind and optional aggregate identity:
/// - [`EventFilter::for_event`] loads every event of the given kind
/// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
///
/// The store optimises based on its storage model and returns events
/// merged by position (if positions are available).
///
/// # Errors
///
/// Returns a store-specific error when loading fails.
#[allow(clippy::type_complexity)]
fn load_events<'a>(
&'a self,
filters: &'a [EventFilter<Self::Id, Self::Position>],
) -> impl Future<
Output = LoadEventsResult<
Self::Id,
Self::Position,
Self::Data,
Self::Metadata,
Self::Error,
>,
> + Send
+ 'a;
}
/// Marker trait for stores that provide globally ordered positions.
///
/// Projection snapshots require this guarantee.
pub trait GloballyOrderedStore: EventStore {}
use sourcery::store::inmemory;
// Unit metadata
let store: inmemory::Store<String, ()> = inmemory::Store::new();
// Custom metadata
let store: inmemory::Store<String, MyMetadata> = inmemory::Store::new();
The in-memory store uses serde_json and supports globally ordered positions.
// All events of one kind
EventFilter::for_event("account.deposited")
// One aggregate instance
EventFilter::for_aggregate("account.deposited", "account", "ACC-001")
// Incremental loading
EventFilter::for_event("account.deposited").after(100)
pub trait SnapshotStore<Id: Sync>: Send + Sync {
/// Position type for tracking snapshot positions.
///
/// Must match the `EventStore::Position` type used in the same repository.
type Position: Send + Sync;
/// Error type for snapshot operations.
type Error: std::error::Error + Send + Sync + 'static;
/// Load the most recent snapshot for an aggregate.
///
/// Returns `Ok(None)` if no snapshot exists.
///
/// # Errors
///
/// Returns an error if the underlying storage fails.
fn load<T>(
&self,
kind: &str,
id: &Id,
) -> impl std::future::Future<Output = Result<Option<Snapshot<Self::Position, T>>, Self::Error>> + Send
where
T: DeserializeOwned;
/// Whether to store a snapshot, with lazy snapshot creation.
///
/// The repository calls this after successfully appending new events,
/// passing `events_since_last_snapshot` and a `create_snapshot`
/// callback. Implementations may decline without invoking
/// `create_snapshot`, avoiding unnecessary snapshot creation cost
/// (serialisation, extra I/O, etc.).
///
/// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
/// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
/// snapshot was stored.
///
/// # Errors
///
/// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
/// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
fn offer_snapshot<CE, T, Create>(
&self,
kind: &str,
id: &Id,
events_since_last_snapshot: u64,
create_snapshot: Create,
) -> impl std::future::Future<
Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>,
> + Send
where
CE: std::error::Error + Send + Sync + 'static,
T: Serialize,
Create: FnOnce() -> Result<Snapshot<Self::Position, T>, CE> + Send;
}
When multiple processes or threads write to the same aggregate simultaneously, you risk
losing updates. Optimistic concurrency control detects these conflicts by checking that
the stream version hasn’t changed between loading the aggregate and committing new events.
The two concurrency strategies use different error types:
Strategy
Error Type
Includes Concurrency Variant?
Optimistic (default)
CommandError
Yes
Unchecked
CommandError
No (Concurrency = Infallible)
When using optimistic concurrency, execute_command returns
CommandError::Concurrency(conflict) if the stream version changed between
loading and committing:
use sourcery::repository::CommandError;
match repo
.execute_command::<MyAggregate, MyCommand>(&id, &command, &metadata)
.await
{
Ok(()) => println!("Success!"),
Err(CommandError::Concurrency(conflict)) => {
println!(
"Conflict: expected version {:?}, actual {:?}",
conflict.expected,
conflict.actual
);
}
Err(e) => println!("Other error: {e}"),
}
ApplicationSnapshotStoreEventStoreAggregateload("account", "ACC-001")Snapshot at position 1000Deserialize snapshotload_events(after: 1000)Events 1001-1050apply(event) for each
Instead of replaying all 1050 events, load the snapshot and replay only the 50 new ones.
Use with_snapshots() when creating the repository:
use sourcery::{Repository, snapshot::inmemory, store::inmemory as event_store};
let event_store = event_store::Store::new();
let snapshot_store = inmemory::Store::every(100);
let repository = Repository::new(event_store)
.with_snapshots(snapshot_store);
pub trait SnapshotStore<Id: Sync>: Send + Sync {
/// Position type for tracking snapshot positions.
///
/// Must match the `EventStore::Position` type used in the same repository.
type Position: Send + Sync;
/// Error type for snapshot operations.
type Error: std::error::Error + Send + Sync + 'static;
/// Load the most recent snapshot for an aggregate.
///
/// Returns `Ok(None)` if no snapshot exists.
///
/// # Errors
///
/// Returns an error if the underlying storage fails.
fn load<T>(
&self,
kind: &str,
id: &Id,
) -> impl std::future::Future<Output = Result<Option<Snapshot<Self::Position, T>>, Self::Error>> + Send
where
T: DeserializeOwned;
/// Whether to store a snapshot, with lazy snapshot creation.
///
/// The repository calls this after successfully appending new events,
/// passing `events_since_last_snapshot` and a `create_snapshot`
/// callback. Implementations may decline without invoking
/// `create_snapshot`, avoiding unnecessary snapshot creation cost
/// (serialisation, extra I/O, etc.).
///
/// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
/// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
/// snapshot was stored.
///
/// # Errors
///
/// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
/// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
fn offer_snapshot<CE, T, Create>(
&self,
kind: &str,
id: &Id,
events_since_last_snapshot: u64,
create_snapshot: Create,
) -> impl std::future::Future<
Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>,
> + Send
where
CE: std::error::Error + Send + Sync + 'static,
T: Serialize,
Create: FnOnce() -> Result<Snapshot<Self::Position, T>, CE> + Send;
}
The repository calls offer_snapshot after successfully appending new events. Implementations may decline without invoking create_snapshot, avoiding unnecessary snapshot serialisation.
Projection snapshots use the same store and are keyed by (P::KIND, instance_id).
Use load_projection_with_snapshot on a repository configured with snapshots:
Metadata carries infrastructure concerns alongside events without polluting domain types. Common uses include correlation IDs, user context, timestamps, and causation tracking.
Track event relationships for debugging and workflows:
Correlation ID: Groups all events from a single user request
Causation ID: Points to the event that triggered this one
// When handling a saga or process manager
let follow_up_metadata = EventMetadata {
correlation_id: original_meta.correlation_id.clone(),
causation_id: Some(original_event_id.to_string()),
user_id: "system".to_string(),
timestamp: Utc::now(),
};
The projection follows the same pattern shown in Accessing Metadata in Projections, with instance_id = String if each tenant needs its own projection instance.
While load_projection rebuilds a projection from scratch on each call, subscriptions maintain an in-memory projection that updates in real time as events are committed.
Use Repository::subscribe to create a SubscriptionBuilder, configure callbacks, then call start():
let subscription = repository
.subscribe::<Dashboard>(())
.on_update(|dashboard| println!("{dashboard:?}"))
.start()
.await?;
The instance ID argument matches the projection’s InstanceId type. For singleton projections (InstanceId = ()), pass ().
start() returns only after catch-up completes.
The start() method returns a SubscriptionHandle. Call stop() for graceful shutdown:
subscription.stop().await?;
The subscription processes any remaining events, offers a final snapshot, then terminates. Dropping the handle sends a best-effort stop signal, but use stop() when you need deterministic shutdown and error handling.
A common pattern is to share projection state between the subscription and the command side via Arc<Mutex<_>>:
let live_state = Arc::new(Mutex::new(Dashboard::default()));
let state_for_callback = live_state.clone();
let subscription = repository
.subscribe::<Dashboard>(())
.on_update(move |projection| {
*state_for_callback.lock().unwrap() = projection.clone();
})
.start()
.await?;
// Read the live projection from another task — no event replay needed
let current = live_state.lock().unwrap().clone();
The inmemory::Store is useful for testing, but production systems need durable storage. This guide walks through implementing EventStore for your database.
CREATE TABLE events (
position BIGSERIAL PRIMARY KEY,
aggregate_kind TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
event_kind TEXT NOT NULL,
data JSONB NOT NULL,
metadata JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_events_aggregate
ON events (aggregate_kind, aggregate_id, position);
CREATE INDEX idx_events_kind
ON events (event_kind, position);
Build a WHERE clause from filters with OR, ordered by position:
fn load_events(&self, filters: &[EventFilter<String, i64>])
-> Result<Vec<StoredEvent>, sqlx::Error>
{
// WHERE (event_kind = 'x' AND position > N)
// OR (aggregate_kind = 'a' AND aggregate_id = 'b')
// ORDER BY position ASC
todo!()
}
For systems requiring strict ordering, add version checking:
ALTER TABLE events ADD COLUMN stream_version INT NOT NULL;
CREATE UNIQUE INDEX idx_events_stream_version
ON events (aggregate_kind, aggregate_id, stream_version);
// In commit_events_optimistic:
// 1. Get current max version for stream
// 2. Compare against expected_version
// 3. If mismatch, return OptimisticCommitError::Conflict(ConcurrencyConflict { expected, actual })
// 4. Insert with version + 1
// 5. Handle unique constraint violation as concurrency conflict
Decision: Events are standalone structs implementing DomainEvent, not just enum variants.
Why:
Events can be shared across multiple aggregates
Projections subscribe to event types, not aggregate enums
Easier to evolve events independently
Better compile-time isolation
Trade-off: More boilerplate (mitigated by derive macro).
// This crate: events stand alone
struct OrderPlaced { /* ... */ }
struct PaymentReceived { /* ... */ }
// Both Order and Payment aggregates can use PaymentReceived
Decision: Aggregate IDs are passed to the repository, not embedded in events or handlers.
Why:
Domain events stay pure (no infrastructure metadata)
Same event type works with different ID schemes
Command handlers focus on behaviour, not identity
IDs travel in the envelope alongside events
Trade-off: You can’t access the ID inside Handle<C>. If needed, include relevant IDs in the command.
// ID is infrastructure
repository
.execute_command::<Account, Deposit>(&account_id, &command, &metadata)
.await?;
// Event doesn't contain ID
struct FundsDeposited { amount: i64 } // No account_id field
Decision: EventStore implementations handle serialisation, not a separate Codec trait. The generated event enum implements serde::Serialize with custom logic that serialises only the inner event struct.