sourcery provides building blocks for pragmatic event-sourced systems in Rust. It keeps your domain types pure while giving you tools to rebuild state, project read models, and persist events through a pluggable store interface.
The crate is intentionally minimal. You wire the repository into whichever pipeline you prefer—whether that’s a web framework, actor system, or CLI tool.
Event sourcing stores state as a sequence of events rather than as a single mutable record. Instead of updating a row in a database, you append an event describing what happened. Current state is derived by replaying events from the beginning.
An event represents something that happened in the past. It is immutable—you never modify or delete events. To “undo” something, you append a compensating event.
Property
Description
Immutable
Events cannot be changed after creation
Past tense
Named as facts: OrderPlaced, FundsDeposited
Complete
Contains all data needed to understand what happened
Command Query Responsibility Segregation (CQRS) separates the write model (how you change data) from the read model (how you query data). Event sourcing and CQRS complement each other naturally.
With CQRS, read models may not immediately reflect the latest write. This is fine for most UIs—users expect slight delays. For operations requiring strong consistency, read from the aggregate itself (via aggregate_builder).
An aggregate is a cluster of domain objects treated as a single unit for data changes. In event sourcing, aggregates rebuild their state by replaying events and validate commands to produce new events.
pub trait Aggregate: Default + Sized {
/// Aggregate type identifier used by the event store.
///
/// This is combined with the aggregate ID to create stream identifiers.
/// Use lowercase, kebab-case for consistency: `"product"`,
/// `"user-account"`, etc.
const KIND: &'static str;
type Event;
type Error;
type Id;
/// Apply an event to update aggregate state.
///
/// This is called during event replay to rebuild aggregate state from
/// history.
///
/// When using `#[derive(Aggregate)]`, this dispatches to your `Apply<E>`
/// implementations. For hand-written aggregates, implement this
/// directly with a match expression.
fn apply(&mut self, event: &Self::Event);
}
Most of this is generated by #[derive(Aggregate)]. You focus on the behavior.
Snapshots are opt-in. If you enable snapshots (via Repository::with_snapshots), the aggregate state must be serializable (Serialize + DeserializeOwned).
The aggregate starts in its Default state. Each event is applied in order. The final state matches what would exist if you had executed all the original commands.
On Repository this replays all events for that aggregate ID.
On Repository<S, C, Snapshots<SS>> it loads a snapshot first (when present) and replays only the delta.
Events are the heart of event sourcing. They represent facts—things that have happened. In this crate, events are first-class structs, not just enum variants.
Commands represent requests to change the system. Unlike events (which are facts), commands can be rejected. An aggregate validates a command and either produces events or returns an error.
pub trait Handle<C>: Aggregate {
/// Handle a command and produce events.
///
/// # Errors
///
/// Returns `Self::Error` if the command is invalid for the current
/// aggregate state.
fn handle(&self, command: &C) -> Result<Vec<Self::Event>, Self::Error>;
}
Key points:
Takes &self — the handler reads state but doesn’t mutate it
Returns Vec<Event> — a command may produce zero, one, or many events
Projections are read models built by replaying events. They’re optimized for queries rather than consistency. A single event stream can feed many projections, each structuring data differently.
pub trait Projection: Default + Sized {
/// Aggregate identifier type this projection is compatible with.
type Id;
/// Metadata type expected by this projection
type Metadata;
}
Projections must be Default because they start empty and build up through event replay.
Use ProjectionBuilder to specify which events to load:
let summary = repository
.build_projection::<AccountSummary>()
.event::<FundsDeposited>() // All deposits across all accounts
.event::<FundsWithdrawn>() // All withdrawals across all accounts
.load()
.await?;
Projections can consume events from multiple aggregate types. Register each event type with .event::<E>():
let report = repository
.build_projection::<InventoryReport>()
.event::<ProductCreated>() // From Product aggregate
.event::<SaleRecorded>() // From Sale aggregate
.load()
.await?;
Implement ApplyProjection<E> for each event type the projection handles.
The crate separates storage concerns into three traits: EventStore for event persistence, SnapshotStore for aggregate snapshots, and Codec for serialization.
pub trait EventStore: Send + Sync {
/// Aggregate identifier type.
///
/// This type must be clonable so repositories can reuse IDs across calls.
/// Common choices: `String`, `Uuid`, or custom ID types.
type Id: Clone + Send + Sync + 'static;
/// Position type used for ordering events and version checking.
///
/// Must be `Copy + PartialEq` to support optimistic concurrency.
/// Use `()` if ordering is not needed.
type Position: Copy + PartialEq + std::fmt::Debug + Send + Sync + 'static;
/// Store-specific error type.
type Error: std::error::Error + Send + Sync + 'static;
/// Serialization codec.
type Codec: Codec + Clone + Send + Sync + 'static;
/// Metadata type for infrastructure concerns.
type Metadata: Send + Sync + 'static;
fn codec(&self) -> &Self::Codec;
/// Get the current version (latest position) for an aggregate stream.
///
/// Returns `None` for streams with no events.
///
/// # Errors
///
/// Returns a store-specific error when the operation fails.
fn stream_version<'a>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;
/// Begin a transaction for appending events to an aggregate.
///
/// The transaction type is determined by the concurrency strategy `C`.
///
/// # Arguments
/// * `aggregate_kind` - The aggregate type identifier (`Aggregate::KIND`)
/// * `aggregate_id` - The aggregate instance identifier
/// * `expected_version` - The version expected for optimistic concurrency
fn begin<C: ConcurrencyStrategy>(
&mut self,
aggregate_kind: &str,
aggregate_id: Self::Id,
expected_version: Option<Self::Position>,
) -> Transaction<'_, Self, C>
where
Self: Sized;
/// Append events with optional version checking.
///
/// If `expected_version` is `Some`, the append fails with a concurrency
/// conflict if the current stream version doesn't match.
/// If `expected_version` is `None`, no version checking is performed.
///
/// # Errors
///
/// Returns [`AppendError::Conflict`] if the version doesn't match, or
/// [`AppendError::Store`] if persistence fails.
fn append<'a>(
&'a mut self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
expected_version: Option<Self::Position>,
events: Vec<PersistableEvent<Self::Metadata>>,
) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
/// Load events matching the specified filters.
///
/// Each filter describes an event kind and optional aggregate identity:
/// - [`EventFilter::for_event`] loads every event of the given kind
/// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
///
/// The store optimizes based on its storage model and returns events
/// merged by position (if positions are available).
///
/// # Errors
///
/// Returns a store-specific error when loading fails.
fn load_events<'a>(
&'a self,
filters: &'a [EventFilter<Self::Id, Self::Position>],
) -> impl Future<
Output = LoadEventsResult<Self::Id, Self::Position, Self::Metadata, Self::Error>,
> + Send
+ 'a;
/// Append events expecting an empty stream.
///
/// This method is used by optimistic concurrency when creating new
/// aggregates. It fails with a [`ConcurrencyConflict`] if the stream
/// already has events.
///
/// # Errors
///
/// Returns [`AppendError::Conflict`] if the stream is not empty,
/// or [`AppendError::Store`] if persistence fails.
fn append_expecting_new<'a>(
&'a mut self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
events: Vec<PersistableEvent<Self::Metadata>>,
) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
}
use sourcery::store::{inmemory, JsonCodec};
// With unit metadata
let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
// With custom metadata
let store: inmemory::Store<String, JsonCodec, MyMetadata> = inmemory::Store::new(JsonCodec);
Events are appended within a transaction for atomicity:
use sourcery::concurrency::Unchecked;
let mut tx = store.begin::<Unchecked>("account", "ACC-001".to_string(), None);
tx.append(event1, metadata.clone())?;
tx.append(event2, metadata.clone())?;
tx.commit().await?; // Events visible only after commit
If the transaction is dropped without committing, no events are persisted.
// All events of a specific kind (across all aggregates)
EventFilter::for_event("account.deposited")
// All events for a specific aggregate instance
EventFilter::for_aggregate("account.deposited", "account", "ACC-001")
// Events after a position (for incremental loading)
EventFilter::for_event("account.deposited").after(100)
pub trait Codec {
type Error: std::error::Error + Send + Sync + 'static;
/// Serialize a value for persistence.
///
/// # Errors
///
/// Returns an error from the codec if the value cannot be serialized.
fn serialize<T>(&self, value: &T) -> Result<Vec<u8>, Self::Error>
where
T: serde::Serialize;
/// Deserialize a value from stored bytes.
///
/// # Errors
///
/// Returns an error from the codec if the bytes cannot be decoded.
fn deserialize<T>(&self, data: &[u8]) -> Result<T, Self::Error>
where
T: serde::de::DeserializeOwned;
}
pub trait SnapshotStore: Send + Sync {
/// Aggregate identifier type.
///
/// Must match the `EventStore::Id` type used in the same repository.
type Id: Send + Sync + 'static;
/// Position type for tracking snapshot positions.
///
/// Must match the `EventStore::Position` type used in the same repository.
type Position: Send + Sync + 'static;
/// Error type for snapshot operations.
type Error: std::error::Error + Send + Sync + 'static;
/// Load the most recent snapshot for an aggregate.
///
/// Returns `Ok(None)` if no snapshot exists.
///
/// # Errors
///
/// Returns an error if the underlying storage fails.
fn load<'a>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
) -> impl Future<Output = Result<Option<Snapshot<Self::Position>>, Self::Error>> + Send + 'a;
/// Whether to store a snapshot, with lazy snapshot creation.
///
/// The repository calls this after successfully appending new events,
/// passing `events_since_last_snapshot` and a `create_snapshot`
/// callback. Implementations may decline without invoking
/// `create_snapshot`, avoiding unnecessary snapshot creation cost
/// (serialization, extra I/O, etc.).
///
/// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
/// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
/// snapshot was stored.
///
/// # Errors
///
/// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
/// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
fn offer_snapshot<'a, CE, Create>(
&'a mut self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
events_since_last_snapshot: u64,
create_snapshot: Create,
) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
where
CE: std::error::Error + Send + Sync + 'static,
Create: FnOnce() -> Result<Snapshot<Self::Position>, CE> + 'a;
}
When multiple processes or threads write to the same aggregate simultaneously, you risk
losing updates. Optimistic concurrency control detects these conflicts by checking that
the stream version hasn’t changed between loading the aggregate and committing new events.
The two concurrency strategies use different error types:
Strategy
Error Type
Includes Concurrency Variant?
Optimistic (default)
OptimisticCommandError
Yes
Unchecked
CommandError
No
When using optimistic concurrency, execute_command returns
OptimisticCommandError::Concurrency(conflict) if the stream version changed between
loading and committing:
use sourcery::OptimisticCommandError;
match repo
.execute_command::<MyAggregate, MyCommand>(&id, &command, &metadata)
.await
{
Ok(()) => println!("Success!"),
Err(OptimisticCommandError::Concurrency(conflict)) => {
println!(
"Conflict: expected version {:?}, actual {:?}",
conflict.expected,
conflict.actual
);
}
Err(e) => println!("Other error: {e}"),
}
ApplicationSnapshotStoreEventStoreAggregateload("account", "ACC-001")Snapshot at position 1000Deserialize snapshotload_events(after: 1000)Events 1001-1050apply(event) [For each new event]balance = 5000 (from snapshot)balance = 5250 (current)
Instead of replaying 1050 events, you load the snapshot and replay only 50.
Use with_snapshots() when creating the repository:
use sourcery::{Repository, snapshot::InMemorySnapshotStore, store::{inmemory, JsonCodec}};
let event_store = inmemory::Store::new(JsonCodec);
let snapshot_store = InMemorySnapshotStore::always();
let mut repository = Repository::new(event_store)
.with_snapshots(snapshot_store);
pub trait SnapshotStore: Send + Sync {
/// Aggregate identifier type.
///
/// Must match the `EventStore::Id` type used in the same repository.
type Id: Send + Sync + 'static;
/// Position type for tracking snapshot positions.
///
/// Must match the `EventStore::Position` type used in the same repository.
type Position: Send + Sync + 'static;
/// Error type for snapshot operations.
type Error: std::error::Error + Send + Sync + 'static;
/// Load the most recent snapshot for an aggregate.
///
/// Returns `Ok(None)` if no snapshot exists.
///
/// # Errors
///
/// Returns an error if the underlying storage fails.
fn load<'a>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
) -> impl Future<Output = Result<Option<Snapshot<Self::Position>>, Self::Error>> + Send + 'a;
/// Whether to store a snapshot, with lazy snapshot creation.
///
/// The repository calls this after successfully appending new events,
/// passing `events_since_last_snapshot` and a `create_snapshot`
/// callback. Implementations may decline without invoking
/// `create_snapshot`, avoiding unnecessary snapshot creation cost
/// (serialization, extra I/O, etc.).
///
/// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
/// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
/// snapshot was stored.
///
/// # Errors
///
/// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
/// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
fn offer_snapshot<'a, CE, Create>(
&'a mut self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
events_since_last_snapshot: u64,
create_snapshot: Create,
) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
where
CE: std::error::Error + Send + Sync + 'static,
Create: FnOnce() -> Result<Snapshot<Self::Position>, CE> + 'a;
}
The repository calls offer_snapshot after successfully appending new events. Implementations may decline without invoking create_snapshot, avoiding unnecessary snapshot encoding work.
Metadata carries infrastructure concerns alongside events without polluting domain types. Common uses include correlation IDs, user context, timestamps, and causation tracking.
Track event relationships for debugging and workflows:
Request A (correlation: abc)
OrderPlaced
causation: null
InventoryReserved
causation: OrderPlaced
PaymentProcessed
causation: OrderPlaced
Correlation ID: Groups all events from a single user request
Causation ID: Points to the event that triggered this one
// When handling a saga or process manager
let follow_up_metadata = EventMetadata {
correlation_id: original_meta.correlation_id.clone(),
causation_id: Some(original_event_id.to_string()),
user_id: "system".to_string(),
timestamp: Utc::now(),
};
The inmemory::Store is useful for testing, but production systems need durable storage. This guide walks through implementing EventStore for your database.
pub trait EventStore: Send + Sync {
/// Aggregate identifier type.
///
/// This type must be clonable so repositories can reuse IDs across calls.
/// Common choices: `String`, `Uuid`, or custom ID types.
type Id: Clone + Send + Sync + 'static;
/// Position type used for ordering events and version checking.
///
/// Must be `Copy + PartialEq` to support optimistic concurrency.
/// Use `()` if ordering is not needed.
type Position: Copy + PartialEq + std::fmt::Debug + Send + Sync + 'static;
/// Store-specific error type.
type Error: std::error::Error + Send + Sync + 'static;
/// Serialization codec.
type Codec: Codec + Clone + Send + Sync + 'static;
/// Metadata type for infrastructure concerns.
type Metadata: Send + Sync + 'static;
fn codec(&self) -> &Self::Codec;
/// Get the current version (latest position) for an aggregate stream.
///
/// Returns `None` for streams with no events.
///
/// # Errors
///
/// Returns a store-specific error when the operation fails.
fn stream_version<'a>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;
/// Begin a transaction for appending events to an aggregate.
///
/// The transaction type is determined by the concurrency strategy `C`.
///
/// # Arguments
/// * `aggregate_kind` - The aggregate type identifier (`Aggregate::KIND`)
/// * `aggregate_id` - The aggregate instance identifier
/// * `expected_version` - The version expected for optimistic concurrency
fn begin<C: ConcurrencyStrategy>(
&mut self,
aggregate_kind: &str,
aggregate_id: Self::Id,
expected_version: Option<Self::Position>,
) -> Transaction<'_, Self, C>
where
Self: Sized;
/// Append events with optional version checking.
///
/// If `expected_version` is `Some`, the append fails with a concurrency
/// conflict if the current stream version doesn't match.
/// If `expected_version` is `None`, no version checking is performed.
///
/// # Errors
///
/// Returns [`AppendError::Conflict`] if the version doesn't match, or
/// [`AppendError::Store`] if persistence fails.
fn append<'a>(
&'a mut self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
expected_version: Option<Self::Position>,
events: Vec<PersistableEvent<Self::Metadata>>,
) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
/// Load events matching the specified filters.
///
/// Each filter describes an event kind and optional aggregate identity:
/// - [`EventFilter::for_event`] loads every event of the given kind
/// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
///
/// The store optimizes based on its storage model and returns events
/// merged by position (if positions are available).
///
/// # Errors
///
/// Returns a store-specific error when loading fails.
fn load_events<'a>(
&'a self,
filters: &'a [EventFilter<Self::Id, Self::Position>],
) -> impl Future<
Output = LoadEventsResult<Self::Id, Self::Position, Self::Metadata, Self::Error>,
> + Send
+ 'a;
/// Append events expecting an empty stream.
///
/// This method is used by optimistic concurrency when creating new
/// aggregates. It fails with a [`ConcurrencyConflict`] if the stream
/// already has events.
///
/// # Errors
///
/// Returns [`AppendError::Conflict`] if the stream is not empty,
/// or [`AppendError::Store`] if persistence fails.
fn append_expecting_new<'a>(
&'a mut self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
events: Vec<PersistableEvent<Self::Metadata>>,
) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
}
CREATE TABLE events (
position BIGSERIAL PRIMARY KEY,
aggregate_kind TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
event_kind TEXT NOT NULL,
data BYTEA NOT NULL,
metadata JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_events_aggregate
ON events (aggregate_kind, aggregate_id, position);
CREATE INDEX idx_events_kind
ON events (event_kind, position);
For systems requiring strict ordering, add version checking:
ALTER TABLE events ADD COLUMN stream_version INT NOT NULL;
CREATE UNIQUE INDEX idx_events_stream_version
ON events (aggregate_kind, aggregate_id, stream_version);
// In append_batch:
// 1. Get current max version for stream
// 2. Insert with version + 1
// 3. Handle unique constraint violation as concurrency conflict
Decision: Events are standalone structs implementing DomainEvent, not just enum variants.
Why:
Events can be shared across multiple aggregates
Projections subscribe to event types, not aggregate enums
Easier to evolve events independently
Better compile-time isolation
Trade-off: More boilerplate (mitigated by derive macro).
// This crate: events stand alone
struct OrderPlaced { /* ... */ }
struct PaymentReceived { /* ... */ }
// Both Order and Payment aggregates can use PaymentReceived
Decision: Aggregate IDs are passed to the repository, not embedded in events or handlers.
Why:
Domain events stay pure (no infrastructure metadata)
Same event type works with different ID schemes
Command handlers focus on behavior, not identity
IDs travel in the envelope alongside events
Trade-off: You can’t access the ID inside Handle<C>. If needed, include relevant IDs in the command.
// ID is infrastructure
repository
.execute_command::<Account, Deposit>(&account_id, &command, &metadata)
.await?;
// Event doesn't contain ID
struct FundsDeposited { amount: i64 } // No account_id field