Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

sourcery provides building blocks for pragmatic event-sourced systems in Rust. It keeps your domain types pure while giving you tools to rebuild state, project read models, and persist events through a pluggable store interface.

When to Use Event Sourcing

Event sourcing shines when you need:

  • Complete audit trails — every state change is recorded
  • Time travel — reconstruct state at any point in history
  • Decoupled read models — optimize queries independently of writes
  • Complex domain logic — model behavior as a sequence of facts

It adds complexity, so consider simpler approaches for CRUD-heavy applications with minimal business logic.

What This Crate Provides

IncludedNot Included
Core traits for aggregates, events, projectionsCommand bus / message broker
Derive macro to reduce boilerplateOutbox pattern
Repository for command executionSnapshot scheduler
In-memory store for testingEvent streaming infrastructure
PostgreSQL store (optional)
Test framework (given-when-then)

The crate is intentionally minimal. You wire the repository into whichever pipeline you prefer—whether that’s a web framework, actor system, or CLI tool.

A Taste of the API

#[derive(Aggregate)]
#[aggregate(id = String, error = String, events(FundsDeposited, FundsWithdrawn))]
pub struct Account {
    balance: i64,
}

impl Handle<Deposit> for Account {
    fn handle(&self, cmd: &Deposit) -> Result<Vec<Self::Event>, Self::Error> {
        Ok(vec![FundsDeposited { amount: cmd.amount }.into()])
    }
}

The derive macro generates the event enum and serialization glue. You focus on domain behavior.

Next Steps

Event Sourcing Primer

Event sourcing stores state as a sequence of events rather than as a single mutable record. Instead of updating a row in a database, you append an event describing what happened. Current state is derived by replaying events from the beginning.

Traditional State vs Event Log

Traditional (Mutable State)Event SourcedUPDATE balance = 150Current: 150UPDATE balance = 50Current: 50FundsDeposited(100)FundsDeposited(50)FundsWithdrawn(100)Replay: 100 + 50 - 100 = 50  



With traditional storage, you only know the current balance. With event sourcing, you know how you got there.

The Core Principle

Events are the source of truth.

An event represents something that happened in the past. It is immutable—you never modify or delete events. To “undo” something, you append a compensating event.

PropertyDescription
ImmutableEvents cannot be changed after creation
Past tenseNamed as facts: OrderPlaced, FundsDeposited
CompleteContains all data needed to understand what happened
OrderedSequence matters for correct state reconstruction

Reconstructing State

When you need the current state of an entity:

Event StoreAggregate  Load events for ID "ACC-001"apply(event) [For each event]Start with default stateCurrent state ready




This “replay” process runs every time you load an aggregate. For long-lived entities, snapshots optimize this by checkpointing state periodically.

Benefits

  • Audit trail — Every change is recorded with full context
  • Debugging — Replay events to reproduce bugs exactly
  • Temporal queries — Answer “what was the state at time T?”
  • Event-driven architecture — Events naturally feed other systems

Trade-offs

  • Complexity — More concepts to understand than CRUD
  • Storage growth — Event logs grow indefinitely (though events compress well)
  • Eventual consistency — Read models may lag behind writes
  • Schema evolution — Old events must remain readable forever

Next

CQRS Overview — Separating reads from writes

CQRS Overview

Command Query Responsibility Segregation (CQRS) separates the write model (how you change data) from the read model (how you query data). Event sourcing and CQRS complement each other naturally.

The Split

Write Side (Commands)Read Side (Queries)CommandAggregateEventsStoreEvent StoreProjectionViewQuery  Event StoreEvent StoreEvent StoreRead ModelRead Model  







Write side: Commands validate against the aggregate and produce events. The aggregate enforces business rules.

Read side: Projections consume events and build optimized views for queries. Each projection can structure data however the UI needs.

Why Separate?

ConcernWrite ModelRead Model
Optimized forConsistency, validationQuery performance
StructureReflects domain behaviorReflects UI needs
UpdatesTransactionalEventually consistent
ScalingConsistent writesReplicated reads

A single aggregate might feed multiple projections:

EventsAccount SummaryTransaction HistoryFraud DetectionAnalytics Dashboard  



Each projection sees the same events but builds a different view.

In This Crate

The crate models CQRS through:

  • Aggregate + Handle<C> — the write side
  • Projection + ApplyProjection<E> — the read side
  • Repository — orchestrates both
// Write: execute a command
repository
    .execute_command::<Account, Deposit>(&id, &command, &metadata)
    .await?;

// Read: build a projection
let summary = repository
    .build_projection::<AccountSummary>()
    .event::<FundsDeposited>()
    .event::<FundsWithdrawn>()
    .load()
    .await?;

Eventual Consistency

With CQRS, read models may not immediately reflect the latest write. This is fine for most UIs—users expect slight delays. For operations requiring strong consistency, read from the aggregate itself (via aggregate_builder).

Next

Architecture — How the components fit together

Architecture

This page shows how the crate’s components connect and where your domain code fits in.

Component Overview

Your Applicationsourcery crateCommandAggregate DefinitionEvent DefinitionsProjection DefinitionRepositoryEventStoreSnapshotStoreCodec    



You define: Aggregates, events, commands, projections

The crate provides: Repository orchestration, store abstraction, serialization

The Envelope Pattern

Events travel with metadata, but domain types stay pure:

Event Envelope

aggregate_id, kind, correlation_id, timestamp

Domain Event (pure business data)

  • Aggregates receive only the pure event—no IDs or metadata
  • Projections receive the full envelope—aggregate ID and metadata included
  • Stores persist the envelope but deserialize only what’s needed

This keeps domain logic free of infrastructure concerns.

Command Execution Flow

ApplicationRepository (snapshots enabled)EventStoreAggregateSnapshotStore  execute_command(id, cmd, meta)load(id)?  Optional snapshotload_events(filters)Vec<StoredEvent>replay events (apply)handle(command)Vec<Event>begin transactionappend eventscommitoffer_snapshot()?Ok(())















  1. Optionally load a snapshot to skip replaying old events
  2. Load remaining events from the store
  3. Replay events to rebuild aggregate state
  4. Execute the command handler
  5. Persist new events atomically
  6. Optionally create a new snapshot

Projection Building Flow

ApplicationRepositoryEventStoreProjection  build_projection().event::<E1>().event::<E2>().load()load_events(filters)  Vec<StoredEvent>default()apply_projection(id, event, meta) [For each event]Projection










Projections specify which events they care about. The repository loads only those events and replays them into the projection.

Key Types

TypeRole
Repository<S>Orchestrates aggregates and projections (no snapshots)
Repository<S, C, Snapshots<SS>>Snapshot-enabled repository orchestration
EventStoreTrait for event persistence
SnapshotStoreTrait for aggregate snapshots
CodecTrait for serialization/deserialization
AggregateTrait for command-side entities
ProjectionTrait for read-side views
DomainEventMarker trait for event structs

Next

Installation — Add the crate to your project

Installation

Add sourcery to your Cargo.toml:

[dependencies]
sourcery = "0.1"
serde = { version = "1", features = ["derive"] }

The crate re-exports the derive macro, so you don’t need a separate dependency for sourcery-macros.

Feature Flags

FeatureDescription
test-utilEnables TestFramework for given-when-then aggregate testing
postgresPostgreSQL event store via sqlx

To enable test utilities:

[dev-dependencies]
sourcery = { version = "0.1", features = ["test-util"] }

To use the PostgreSQL store:

[dependencies]
sourcery = { version = "0.1", features = ["postgres"] }

Minimum Rust Version

This crate requires Rust 1.88.0 or later (edition 2024).

Next

Quick Start — Build your first aggregate

Quick Start

Build a simple bank account aggregate in under 50 lines. This example demonstrates events, commands, an aggregate, a projection, and the repository.

The Complete Example

use serde::{Deserialize, Serialize};
use sourcery::{
    Apply, ApplyProjection, DomainEvent, Handle, Projection, Repository,
    store::{JsonCodec, inmemory},
};

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct FundsDeposited {
    pub amount: i64,
}

impl DomainEvent for FundsDeposited {
    const KIND: &'static str = "account.deposited";
}

#[derive(Debug)]
pub struct Deposit {
    pub amount: i64,
}

#[derive(Debug, Default, Serialize, Deserialize, sourcery::Aggregate)]
#[aggregate(id = String, error = String, events(FundsDeposited))]
pub struct Account {
    balance: i64,
}

impl Apply<FundsDeposited> for Account {
    fn apply(&mut self, event: &FundsDeposited) {
        self.balance += event.amount;
    }
}

impl Handle<Deposit> for Account {
    fn handle(&self, cmd: &Deposit) -> Result<Vec<Self::Event>, Self::Error> {
        if cmd.amount <= 0 {
            return Err("amount must be positive".into());
        }
        Ok(vec![FundsDeposited { amount: cmd.amount }.into()])
    }
}

#[derive(Debug, Default)]
pub struct TotalDeposits {
    pub total: i64,
}

impl Projection for TotalDeposits {
    type Id = String;
    type Metadata = ();
}

impl ApplyProjection<FundsDeposited> for TotalDeposits {
    fn apply_projection(&mut self, _id: &Self::Id, event: &FundsDeposited, _meta: &Self::Metadata) {
        self.total += event.amount;
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create an in-memory store
    let store = inmemory::Store::new(JsonCodec);
    let mut repository = Repository::new(store);

    // Execute a command
    repository
        .execute_command::<Account, Deposit>(&"ACC-001".to_string(), &Deposit { amount: 100 }, &())
        .await?;

    // Build a projection
    let totals = repository
        .build_projection::<TotalDeposits>()
        .event::<FundsDeposited>()
        .load()
        .await?;

    println!("Total deposits: {}", totals.total);
    assert_eq!(totals.total, 100);

    Ok(())
}

What Just Happened?

  1. Defined an eventFundsDeposited is a simple struct with DomainEvent
  2. Defined a commandDeposit is a plain struct (no traits required)
  3. Created an aggregateAccount uses the derive macro to generate boilerplate
  4. Implemented Apply — How events mutate state
  5. Implemented Handle — How commands produce events
  6. Created a projectionTotalDeposits builds a read model
  7. Wired the repository — Connected everything with inmemory::Store

Key Points

  • Events are past tense factsFundsDeposited, not DepositFunds
  • Commands are imperativeDeposit, not Deposited
  • The derive macro generates — The event enum, From impls, serialization
  • Projections are decoupled — They receive events, not aggregate types
  • IDs are infrastructure — Passed to the repository, not embedded in events

Next Steps

Aggregates

An aggregate is a cluster of domain objects treated as a single unit for data changes. In event sourcing, aggregates rebuild their state by replaying events and validate commands to produce new events.

The Aggregate Trait

pub trait Aggregate: Default + Sized {
    /// Aggregate type identifier used by the event store.
    ///
    /// This is combined with the aggregate ID to create stream identifiers.
    /// Use lowercase, kebab-case for consistency: `"product"`,
    /// `"user-account"`, etc.
    const KIND: &'static str;

    type Event;
    type Error;
    type Id;

    /// Apply an event to update aggregate state.
    ///
    /// This is called during event replay to rebuild aggregate state from
    /// history.
    ///
    /// When using `#[derive(Aggregate)]`, this dispatches to your `Apply<E>`
    /// implementations. For hand-written aggregates, implement this
    /// directly with a match expression.
    fn apply(&mut self, event: &Self::Event);
}

Most of this is generated by #[derive(Aggregate)]. You focus on the behavior.

Snapshots and serde

Snapshots are opt-in. If you enable snapshots (via Repository::with_snapshots), the aggregate state must be serializable (Serialize + DeserializeOwned).

The Apply<E> Trait

When using #[derive(Aggregate)], implement Apply<E> for each event type:

pub trait Apply<E> {
    fn apply(&mut self, event: &E);
}

This is where state mutation happens:

impl Apply<FundsDeposited> for Account {
    fn apply(&mut self, event: &FundsDeposited) {
        self.balance += event.amount;
    }
}

impl Apply<FundsWithdrawn> for Account {
    fn apply(&mut self, event: &FundsWithdrawn) {
        self.balance -= event.amount;
    }
}

Key rules: apply must be infallible (events are facts) and deterministic (same events → same state).

Event Replay

Event Storebalance = 120  apply(FundsDeposited { amount: 100 })apply(FundsWithdrawn { amount: 30 })apply(FundsDeposited { amount: 50 })





The aggregate starts in its Default state. Each event is applied in order. The final state matches what would exist if you had executed all the original commands.

Loading Aggregates

Use AggregateBuilder to load an aggregate’s current state:

let account: Account = repository
    .aggregate_builder()
    .load(&account_id)?;

println!("Current balance: {}", account.balance);

On Repository this replays all events for that aggregate ID. On Repository<S, C, Snapshots<SS>> it loads a snapshot first (when present) and replays only the delta.

Next

Domain Events — Defining events as first-class types

Domain Events

Events are the heart of event sourcing. They represent facts—things that have happened. In this crate, events are first-class structs, not just enum variants.

The DomainEvent Trait

pub trait DomainEvent: Serialize + DeserializeOwned {
    /// Unique identifier for this event type (used for serialization routing)
    const KIND: &'static str;
}

Every event struct implements this trait:

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OrderPlaced {
    pub product_id: String,
    pub quantity: u32,
    pub unit_price: i64,
}

impl DomainEvent for OrderPlaced {
    const KIND: &'static str = "order.placed";
}

Why First-Class Structs?

This crate keeps events as separate structs rather than enum variants. Benefits:

  • Reuse — Same event type in multiple aggregates
  • Decoupling — Projections subscribe to event types, not aggregate enums
  • Smaller scope — Import individual event structs

The derive macro generates the aggregate’s event enum internally—you get the best of both worlds.

Naming Conventions

Events are past tense because they describe things that already happened:

GoodBad
OrderPlacedPlaceOrder
FundsDepositedDepositFunds
UserRegisteredRegisterUser
PasswordChangedChangePassword

Use KIND constants that are stable and namespaced:

const KIND: &'static str = "order.placed";      // Good
const KIND: &'static str = "OrderPlaced";       // OK
const KIND: &'static str = "placed";            // Too generic

The KIND is stored in the event log and used for deserialization, so it must never change for existing events.

Event Design Guidelines

  1. Include all necessary data — Capture values at event time (e.g., price when ordered)
  2. Avoid aggregate IDs — They travel in the envelope, not the event payload
  3. Keep events small — One fact per event (prefer AddressChanged + PhoneChanged over ContactInfoChanged)

Next

Commands — Handling requests that produce events

Commands

Commands represent requests to change the system. Unlike events (which are facts), commands can be rejected. An aggregate validates a command and either produces events or returns an error.

The Handle<C> Trait

pub trait Handle<C>: Aggregate {
    /// Handle a command and produce events.
    ///
    /// # Errors
    ///
    /// Returns `Self::Error` if the command is invalid for the current
    /// aggregate state.
    fn handle(&self, command: &C) -> Result<Vec<Self::Event>, Self::Error>;
}

Key points:

  • Takes &self — the handler reads state but doesn’t mutate it
  • Returns Vec<Event> — a command may produce zero, one, or many events
  • Returns Result — commands can fail validation

Command Structs

Commands are plain structs. No traits required:

#[derive(Debug)]
pub struct Deposit {
    pub amount: i64,
}

#[derive(Debug)]
pub struct Withdraw {
    pub amount: i64,
}

#[derive(Debug)]
pub struct Transfer {
    pub to_account: String,
    pub amount: i64,
}

Implementing Handlers

Each command gets its own Handle implementation:

impl Handle<Deposit> for Account {
    fn handle(&self, cmd: &Deposit) -> Result<Vec<Self::Event>, Self::Error> {
        if cmd.amount <= 0 {
            return Err(AccountError::InvalidAmount);
        }
        Ok(vec![FundsDeposited { amount: cmd.amount }.into()])
    }
}

impl Handle<Withdraw> for Account {
    fn handle(&self, cmd: &Withdraw) -> Result<Vec<Self::Event>, Self::Error> {
        if cmd.amount <= 0 {
            return Err(AccountError::InvalidAmount);
        }
        if cmd.amount > self.balance {
            return Err(AccountError::InsufficientFunds);
        }
        Ok(vec![FundsWithdrawn { amount: cmd.amount }.into()])
    }
}

The .into() call converts your concrete event type into the aggregate’s event enum (generated by the derive macro).

Multiple Events

A single command can produce multiple events:

impl Handle<CloseAccount> for Account {
    fn handle(&self, _cmd: &CloseAccount) -> Result<Vec<Self::Event>, Self::Error> {
        if self.balance < 0 {
            return Err(AccountError::NegativeBalance);
        }

        let mut events = Vec::new();

        // Withdraw remaining balance
        if self.balance > 0 {
            events.push(FundsWithdrawn { amount: self.balance }.into());
        }

        // Mark as closed
        events.push(AccountClosed {}.into());

        Ok(events)
    }
}

No Events

Return an empty vector when the command is valid but produces no change:

impl Handle<Deposit> for Account {
    fn handle(&self, cmd: &Deposit) -> Result<Vec<Self::Event>, Self::Error> {
        if cmd.amount == 0 {
            return Ok(vec![]); // Valid but no-op
        }
        Ok(vec![FundsDeposited { amount: cmd.amount }.into()])
    }
}

Naming Conventions

Commands are imperative because they request an action:

GoodBad
DepositFundsDeposited
PlaceOrderOrderPlaced
RegisterUserUserRegistered
ChangePasswordPasswordChanged

Executing Commands

Use the repository to execute commands:

repository
    .execute_command::<Account, Deposit>(&account_id, &Deposit { amount: 100 }, &metadata)
    .await?;

The repository:

  1. Loads the aggregate from events
  2. Calls handle()
  3. Persists any resulting events
  4. Returns the result

Next

Projections — Building read models from events

Projections

Projections are read models built by replaying events. They’re optimized for queries rather than consistency. A single event stream can feed many projections, each structuring data differently.

The Projection Trait

pub trait Projection: Default + Sized {
    /// Aggregate identifier type this projection is compatible with.
    type Id;
    /// Metadata type expected by this projection
    type Metadata;
}

Projections must be Default because they start empty and build up through event replay.

The ApplyProjection<E> Trait

pub trait ApplyProjection<E>: Projection {
    fn apply_projection(&mut self, aggregate_id: &Self::Id, event: &E, metadata: &Self::Metadata);
}

Unlike aggregate Apply, projections receive:

  • aggregate_id — Which instance produced this event
  • event — The domain event
  • metadata — Store-provided metadata (timestamps, correlation IDs, etc.)

Basic Example

#[derive(Debug, Default)]
pub struct AccountSummary {
    pub accounts: HashMap<String, i64>,
}

impl Projection for AccountSummary {
    type Id = String;
    type Metadata = ();
}

impl ApplyProjection<FundsDeposited> for AccountSummary {
    fn apply_projection(&mut self, id: &Self::Id, event: &FundsDeposited, _: &Self::Metadata) {
        *self.accounts.entry(id.clone()).or_default() += event.amount;
    }
}

impl ApplyProjection<FundsWithdrawn> for AccountSummary {
    fn apply_projection(&mut self, id: &Self::Id, event: &FundsWithdrawn, _: &Self::Metadata) {
        *self.accounts.entry(id.clone()).or_default() -= event.amount;
    }
}

Building Projections

Use ProjectionBuilder to specify which events to load:

let summary = repository
    .build_projection::<AccountSummary>()
    .event::<FundsDeposited>()     // All deposits across all accounts
    .event::<FundsWithdrawn>()      // All withdrawals across all accounts
    .load()
    .await?;

Multi-Aggregate Projections

Projections can consume events from multiple aggregate types. Register each event type with .event::<E>():

let report = repository
    .build_projection::<InventoryReport>()
    .event::<ProductCreated>()   // From Product aggregate
    .event::<SaleRecorded>()     // From Sale aggregate
    .load()
    .await?;

Implement ApplyProjection<E> for each event type the projection handles.

Filtering by Aggregate

Use .events_for() to load all events for a specific aggregate instance:

let account_history = repository
    .build_projection::<TransactionHistory>()
    .events_for::<Account>(&account_id)
    .load()
    .await?;

Using Metadata

Projections can access event metadata for cross-cutting concerns:

#[derive(Debug)]
pub struct EventMetadata {
    pub timestamp: DateTime<Utc>,
    pub user_id: String,
}

#[derive(Debug, Default)]
pub struct AuditLog {
    pub entries: Vec<AuditEntry>,
}

impl Projection for AuditLog {
    type Id = String;
    type Metadata = EventMetadata;
}

impl ApplyProjection<FundsDeposited> for AuditLog {
    fn apply_projection(&mut self, id: &Self::Id, event: &FundsDeposited, meta: &Self::Metadata) {
        self.entries.push(AuditEntry {
            timestamp: meta.timestamp,
            user: meta.user_id.clone(),
            action: format!("Deposited {} to {}", event.amount, id),
        });
    }
}

Projections vs Aggregates

AspectAggregateProjection
PurposeEnforce invariantsServe queries
State sourceOwn events onlyAny events
Receives IDsNo (in envelope)Yes (as parameter)
Receives metadataNoYes
ConsistencyStrongEventual
MutabilityVia commands onlyRebuilt on demand

Next

Stores & Codecs — Event persistence and serialization

Stores & Codecs

The crate separates storage concerns into three traits: EventStore for event persistence, SnapshotStore for aggregate snapshots, and Codec for serialization.

The EventStore Trait

pub trait EventStore: Send + Sync {
    /// Aggregate identifier type.
    ///
    /// This type must be clonable so repositories can reuse IDs across calls.
    /// Common choices: `String`, `Uuid`, or custom ID types.
    type Id: Clone + Send + Sync + 'static;

    /// Position type used for ordering events and version checking.
    ///
    /// Must be `Copy + PartialEq` to support optimistic concurrency.
    /// Use `()` if ordering is not needed.
    type Position: Copy + PartialEq + std::fmt::Debug + Send + Sync + 'static;

    /// Store-specific error type.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Serialization codec.
    type Codec: Codec + Clone + Send + Sync + 'static;

    /// Metadata type for infrastructure concerns.
    type Metadata: Send + Sync + 'static;

    fn codec(&self) -> &Self::Codec;

    /// Get the current version (latest position) for an aggregate stream.
    ///
    /// Returns `None` for streams with no events.
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when the operation fails.
    fn stream_version<'a>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
    ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;

    /// Begin a transaction for appending events to an aggregate.
    ///
    /// The transaction type is determined by the concurrency strategy `C`.
    ///
    /// # Arguments
    /// * `aggregate_kind` - The aggregate type identifier (`Aggregate::KIND`)
    /// * `aggregate_id` - The aggregate instance identifier
    /// * `expected_version` - The version expected for optimistic concurrency
    fn begin<C: ConcurrencyStrategy>(
        &mut self,
        aggregate_kind: &str,
        aggregate_id: Self::Id,
        expected_version: Option<Self::Position>,
    ) -> Transaction<'_, Self, C>
    where
        Self: Sized;

    /// Append events with optional version checking.
    ///
    /// If `expected_version` is `Some`, the append fails with a concurrency
    /// conflict if the current stream version doesn't match.
    /// If `expected_version` is `None`, no version checking is performed.
    ///
    /// # Errors
    ///
    /// Returns [`AppendError::Conflict`] if the version doesn't match, or
    /// [`AppendError::Store`] if persistence fails.
    fn append<'a>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        expected_version: Option<Self::Position>,
        events: Vec<PersistableEvent<Self::Metadata>>,
    ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;

    /// Load events matching the specified filters.
    ///
    /// Each filter describes an event kind and optional aggregate identity:
    /// - [`EventFilter::for_event`] loads every event of the given kind
    /// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
    ///
    /// The store optimizes based on its storage model and returns events
    /// merged by position (if positions are available).
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when loading fails.
    fn load_events<'a>(
        &'a self,
        filters: &'a [EventFilter<Self::Id, Self::Position>],
    ) -> impl Future<
        Output = LoadEventsResult<Self::Id, Self::Position, Self::Metadata, Self::Error>,
    > + Send
    + 'a;

    /// Append events expecting an empty stream.
    ///
    /// This method is used by optimistic concurrency when creating new
    /// aggregates. It fails with a [`ConcurrencyConflict`] if the stream
    /// already has events.
    ///
    /// # Errors
    ///
    /// Returns [`AppendError::Conflict`] if the stream is not empty,
    /// or [`AppendError::Store`] if persistence fails.
    fn append_expecting_new<'a>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        events: Vec<PersistableEvent<Self::Metadata>>,
    ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
}

Built-in: inmemory::Store

For testing and prototyping:

use sourcery::store::{inmemory, JsonCodec};

// With unit metadata
let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);

// With custom metadata
let store: inmemory::Store<String, JsonCodec, MyMetadata> = inmemory::Store::new(JsonCodec);

Features:

  • Uses u64 positions (global sequence number)
  • Events stored in memory (lost on drop)
  • Deduplicates overlapping filters when loading

Transactions

Events are appended within a transaction for atomicity:

use sourcery::concurrency::Unchecked;

let mut tx = store.begin::<Unchecked>("account", "ACC-001".to_string(), None);
tx.append(event1, metadata.clone())?;
tx.append(event2, metadata.clone())?;
tx.commit().await?; // Events visible only after commit

If the transaction is dropped without committing, no events are persisted.

Event Filters

Control which events to load:

// All events of a specific kind (across all aggregates)
EventFilter::for_event("account.deposited")

// All events for a specific aggregate instance
EventFilter::for_aggregate("account.deposited", "account", "ACC-001")

// Events after a position (for incremental loading)
EventFilter::for_event("account.deposited").after(100)

The Codec Trait

pub trait Codec {
    type Error: std::error::Error + Send + Sync + 'static;

    /// Serialize a value for persistence.
    ///
    /// # Errors
    ///
    /// Returns an error from the codec if the value cannot be serialized.
    fn serialize<T>(&self, value: &T) -> Result<Vec<u8>, Self::Error>
    where
        T: serde::Serialize;

    /// Deserialize a value from stored bytes.
    ///
    /// # Errors
    ///
    /// Returns an error from the codec if the bytes cannot be decoded.
    fn deserialize<T>(&self, data: &[u8]) -> Result<T, Self::Error>
    where
        T: serde::de::DeserializeOwned;
}

Built-in: JsonCodec

Uses serde_json for human-readable storage:

use sourcery::JsonCodec;

let codec = JsonCodec;

For production, consider implementing codecs for:

  • Protobuf — Compact, schema-enforced
  • MessagePack — Compact, schema-less
  • Avro — Schema evolution built-in

The SnapshotStore Trait

pub trait SnapshotStore: Send + Sync {
    /// Aggregate identifier type.
    ///
    /// Must match the `EventStore::Id` type used in the same repository.
    type Id: Send + Sync + 'static;

    /// Position type for tracking snapshot positions.
    ///
    /// Must match the `EventStore::Position` type used in the same repository.
    type Position: Send + Sync + 'static;

    /// Error type for snapshot operations.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Load the most recent snapshot for an aggregate.
    ///
    /// Returns `Ok(None)` if no snapshot exists.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying storage fails.
    fn load<'a>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
    ) -> impl Future<Output = Result<Option<Snapshot<Self::Position>>, Self::Error>> + Send + 'a;

    /// Whether to store a snapshot, with lazy snapshot creation.
    ///
    /// The repository calls this after successfully appending new events,
    /// passing `events_since_last_snapshot` and a `create_snapshot`
    /// callback. Implementations may decline without invoking
    /// `create_snapshot`, avoiding unnecessary snapshot creation cost
    /// (serialization, extra I/O, etc.).
    ///
    /// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
    /// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
    /// snapshot was stored.
    ///
    /// # Errors
    ///
    /// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
    /// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
    fn offer_snapshot<'a, CE, Create>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        events_since_last_snapshot: u64,
        create_snapshot: Create,
    ) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
    where
        CE: std::error::Error + Send + Sync + 'static,
        Create: FnOnce() -> Result<Snapshot<Self::Position>, CE> + 'a;
}

See Snapshots for details.

The StoredEvent Type

Events loaded from the store:

pub struct StoredEvent<Id, Pos, M> {
    pub aggregate_kind: String,
    pub aggregate_id: Id,
    pub kind: String,           // Event type (e.g., "account.deposited")
    pub position: Pos,          // Global or stream position
    pub data: Vec<u8>,          // Serialized event payload
    pub metadata: M,            // Store-provided metadata
}

The PersistableEvent Type

Events ready to be stored:

pub struct PersistableEvent<M> {
    pub kind: String,           // Event type
    pub data: Vec<u8>,          // Serialized payload
    pub metadata: M,            // Caller-provided metadata
}

The SerializableEvent trait (generated by the derive macro) converts domain events into this form.

Implementing a Custom Store

See Custom Stores for a guide on implementing EventStore for your database.

Next

The Aggregate Derive — Reducing boilerplate with macros

The Aggregate Derive

The #[derive(Aggregate)] macro eliminates boilerplate by generating the event enum and trait implementations.

Basic Usage

use sourcery::Aggregate;

#[derive(Debug, Default, Serialize, Deserialize, Aggregate)]
#[aggregate(id = String, error = String, events(FundsDeposited, FundsWithdrawn))]
pub struct Account {
    balance: i64,
}

Attribute Reference

AttributeRequiredDescription
id = TypeYesAggregate identifier type
error = TypeYesError type for command handling
events(E1, E2, ...)YesEvent types this aggregate produces
kind = "name"NoAggregate type identifier (default: lowercase struct name)
event_enum = "Name"NoGenerated enum name (default: {Struct}Event)

What Gets Generated

For this input:

#[derive(Aggregate)]
#[aggregate(id = String, error = AccountError, events(FundsDeposited, FundsWithdrawn))]
pub struct Account { balance: i64 }

The macro generates:


#[derive(Aggregate)] struct Account

enum AccountEventimpl Aggregate for Accountimpl From<FundsDeposited>impl From<FundsWithdrawn>impl SerializableEventimpl ProjectionEvent

1. Event Enum

pub enum AccountEvent {
    FundsDeposited(FundsDeposited),
    FundsWithdrawn(FundsWithdrawn),
}

2. From Implementations

impl From<FundsDeposited> for AccountEvent {
    fn from(event: FundsDeposited) -> Self {
        AccountEvent::FundsDeposited(event)
    }
}

impl From<FundsWithdrawn> for AccountEvent {
    fn from(event: FundsWithdrawn) -> Self {
        AccountEvent::FundsWithdrawn(event)
    }
}

This enables the .into() call in command handlers:

Ok(vec![FundsDeposited { amount: 100 }.into()])

3. Aggregate Implementation

impl Aggregate for Account {
    const KIND: &'static str = "account";
    type Event = AccountEvent;
    type Error = AccountError;
    type Id = String;

    fn apply(&mut self, event: Self::Event) {
        match event {
            AccountEvent::FundsDeposited(e) => Apply::apply(self, &e),
            AccountEvent::FundsWithdrawn(e) => Apply::apply(self, &e),
        }
    }
}

4. SerializableEvent Implementation

Converts domain events to persistable form:

impl SerializableEvent for AccountEvent {
    fn to_persistable<C: Codec, M>(
        self,
        codec: &C,
        metadata: M,
    ) -> Result<PersistableEvent<M>, C::Error> {
        match self {
            AccountEvent::FundsDeposited(e) => /* serialize with kind */,
            AccountEvent::FundsWithdrawn(e) => /* serialize with kind */,
        }
    }
}

5. ProjectionEvent Implementation

Deserializes stored events:

impl ProjectionEvent for AccountEvent {
    const EVENT_KINDS: &'static [&'static str] = &[
        "account.deposited",
        "account.withdrawn",
    ];

    fn from_stored<C: Codec>(
        kind: &str,
        data: &[u8],
        codec: &C,
    ) -> Result<Self, C::Error> {
        match kind {
            "account.deposited" => Ok(Self::FundsDeposited(codec.deserialize(data)?)),
            "account.withdrawn" => Ok(Self::FundsWithdrawn(codec.deserialize(data)?)),
            _ => /* unknown event error */,
        }
    }
}

Customizing the Kind

By default, KIND is the lowercase struct name. Override it:

#[derive(Aggregate)]
#[aggregate(
    kind = "bank-account",
    id = String,
    error = String,
    events(FundsDeposited)
)]
pub struct Account { /* ... */ }

Now Account::KIND is "bank-account".

Customizing the Event Enum Name

#[derive(Aggregate)]
#[aggregate(
    event_enum = "BankEvent",
    id = String,
    error = String,
    events(FundsDeposited)
)]
pub struct Account { /* ... */ }

Now the generated enum is BankEvent instead of AccountEvent.

Requirements

Your struct must also derive/implement:

  • Default — Fresh aggregate state
  • Serialize + Deserialize — For snapshotting

Each event type must implement DomainEvent.

Next

Manual Implementation — Implementing without the macro

Manual Implementation

While #[derive(Aggregate)] handles most cases, you might implement the traits manually for:

  • Custom serialization logic
  • Non-standard event routing
  • Learning how the system works

Side-by-Side Comparison

With Derive Macro

use sourcery::{Aggregate, Apply, DomainEvent, Handle};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FundsDeposited { pub amount: i64 }

impl DomainEvent for FundsDeposited {
    const KIND: &'static str = "account.deposited";
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FundsWithdrawn { pub amount: i64 }

impl DomainEvent for FundsWithdrawn {
    const KIND: &'static str = "account.withdrawn";
}

#[derive(Debug, Default, Serialize, Deserialize, Aggregate)]
#[aggregate(id = String, error = String, events(FundsDeposited, FundsWithdrawn))]
pub struct Account {
    balance: i64,
}

impl Apply<FundsDeposited> for Account {
    fn apply(&mut self, e: &FundsDeposited) { self.balance += e.amount; }
}

impl Apply<FundsWithdrawn> for Account {
    fn apply(&mut self, e: &FundsWithdrawn) { self.balance -= e.amount; }
}

Without Derive Macro

use sourcery::{Aggregate, DomainEvent};
use sourcery::codec::{Codec, EventDecodeError, ProjectionEvent, SerializableEvent};
use sourcery::store::PersistableEvent;
use serde::{Deserialize, Serialize};

// Events (same as before)
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FundsDeposited { pub amount: i64 }

impl DomainEvent for FundsDeposited {
    const KIND: &'static str = "account.deposited";
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FundsWithdrawn { pub amount: i64 }

impl DomainEvent for FundsWithdrawn {
    const KIND: &'static str = "account.withdrawn";
}

// Manual event enum
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AccountEvent {
    Deposited(FundsDeposited),
    Withdrawn(FundsWithdrawn),
}

// From implementations for ergonomic .into()
impl From<FundsDeposited> for AccountEvent {
    fn from(e: FundsDeposited) -> Self { Self::Deposited(e) }
}

impl From<FundsWithdrawn> for AccountEvent {
    fn from(e: FundsWithdrawn) -> Self { Self::Withdrawn(e) }
}

// SerializableEvent for persistence
impl SerializableEvent for AccountEvent {
    fn to_persistable<C: Codec, M>(
        self,
        codec: &C,
        metadata: M,
    ) -> Result<PersistableEvent<M>, C::Error> {
        let (kind, data) = match &self {
            Self::Deposited(e) => (FundsDeposited::KIND, codec.serialize(e)?),
            Self::Withdrawn(e) => (FundsWithdrawn::KIND, codec.serialize(e)?),
        };
        Ok(PersistableEvent {
            kind: kind.to_string(),
            data,
            metadata,
        })
    }
}

// ProjectionEvent for loading
impl ProjectionEvent for AccountEvent {
    const EVENT_KINDS: &'static [&'static str] = &[
        FundsDeposited::KIND,
        FundsWithdrawn::KIND,
    ];

    fn from_stored<C: Codec>(
        kind: &str,
        data: &[u8],
        codec: &C,
    ) -> Result<Self, EventDecodeError<C::Error>> {
        match kind {
            FundsDeposited::KIND => Ok(Self::Deposited(
                codec.deserialize(data).map_err(EventDecodeError::Codec)?,
            )),
            FundsWithdrawn::KIND => Ok(Self::Withdrawn(
                codec.deserialize(data).map_err(EventDecodeError::Codec)?,
            )),
            _ => Err(EventDecodeError::UnknownKind {
                kind: kind.to_string(),
                expected: Self::EVENT_KINDS,
            }),
        }
    }
}

// Aggregate struct
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Account {
    balance: i64,
}

// Manual Aggregate implementation — Apply<E> not needed, just match directly
impl Aggregate for Account {
    const KIND: &'static str = "account";
    type Event = AccountEvent;
    type Error = String;
    type Id = String;

    fn apply(&mut self, event: Self::Event) {
        match event {
            AccountEvent::Deposited(e) => self.balance += e.amount,
            AccountEvent::Withdrawn(e) => self.balance -= e.amount,
        }
    }
}

What You Gain

Manual implementation lets you:

  1. Custom enum structure — Different variant names, nested enums
  2. Custom serialization — Compress, encrypt, or version events
  3. Fallback handling — Gracefully handle unknown event types
  4. Conditional logic — Skip certain events during replay

What You Lose

  • More code to maintain
  • Easy to introduce bugs in the match arms
  • Must keep EVENT_KINDS array in sync with from_stored

When to Go Manual

ScenarioRecommendation
Standard CRUD aggregateUse derive macro
Learning the crateStart with derive, then explore manual
Custom event codecManual
Dynamic event typesManual
Unusual enum structureManual

Next

Snapshots — Optimizing aggregate loading

Optimistic Concurrency

When multiple processes or threads write to the same aggregate simultaneously, you risk losing updates. Optimistic concurrency control detects these conflicts by checking that the stream version hasn’t changed between loading the aggregate and committing new events.

Default Behavior

By default, repositories use optimistic concurrency—version checking is performed on every write. This is the safe default for production systems.

use sourcery::{Repository, store::{inmemory, JsonCodec}};

let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
let mut repo = Repository::new(store); // Optimistic concurrency enabled

The concurrency strategy is encoded in the type system, so you get compile-time guarantees about which error types you need to handle.

Disabling Concurrency Checking

For single-writer scenarios where concurrency checking is unnecessary, you can opt out:

let mut repo = Repository::new(store)
    .without_concurrency_checking();

This returns a Repository<S, Unchecked> which uses last-writer-wins semantics.

Error Types

The two concurrency strategies use different error types:

StrategyError TypeIncludes Concurrency Variant?
Optimistic (default)OptimisticCommandErrorYes
UncheckedCommandErrorNo

When using optimistic concurrency, execute_command returns OptimisticCommandError::Concurrency(conflict) if the stream version changed between loading and committing:

use sourcery::OptimisticCommandError;

match repo
    .execute_command::<MyAggregate, MyCommand>(&id, &command, &metadata)
    .await
{
    Ok(()) => println!("Success!"),
    Err(OptimisticCommandError::Concurrency(conflict)) => {
        println!(
            "Conflict: expected version {:?}, actual {:?}",
            conflict.expected,
            conflict.actual
        );
    }
    Err(e) => println!("Other error: {e}"),
}

Handling Conflicts

The most common pattern for handling conflicts is to retry the operation.

The repository provides a helper for this: execute_with_retry.

use sourcery::RetryResult;

let attempts: RetryResult<MyAggregate, MyStore> =
    repo.execute_with_retry::<MyAggregate, MyCommand>(&id, &command, &metadata, 3).await?;
println!("Succeeded after {attempts} attempt(s)");

Each retry loads fresh state from the event store, so business rules are always validated against the current aggregate state.

max_retries controls how many retries after the first attempt are allowed, so the operation is attempted up to max_retries + 1 times total.

When to Use Optimistic Concurrency

Use optimistic concurrency when:

  • Multiple writers might modify the same aggregate simultaneously
  • Business rules depend on current state (e.g., balance checks, inventory limits)
  • Data integrity is more important than write throughput

Consider last-writer-wins when:

  • Aggregates are rarely modified concurrently
  • Events are append-only without state-dependent validation
  • You have a single writer per aggregate (e.g., actor-per-entity pattern)

How It Works

  1. When loading an aggregate, the repository records the current stream version
  2. When committing, it passes the expected version to the event store
  3. The store checks if the actual version matches the expected version
  4. If they differ, the store returns a ConcurrencyConflict error

The inmemory::Store supports this via its stream_version() method and the expected_version parameter on append().

Example

See examples/optimistic_concurrency.rs for a complete working example demonstrating:

  • Basic optimistic concurrency usage
  • Conflict detection with concurrent modifications
  • Retry patterns for handling conflicts
  • Business rule enforcement with fresh state

Snapshots

Replaying events gets expensive as aggregates accumulate history. Snapshots checkpoint aggregate state, allowing you to skip replaying old events.

How Snapshots Work

ApplicationSnapshotStoreEventStoreAggregate  load("account", "ACC-001")  Snapshot at position 1000Deserialize snapshotload_events(after: 1000)Events 1001-1050apply(event) [For each new event]balance = 5000 (from snapshot)balance = 5250 (current)








Instead of replaying 1050 events, you load the snapshot and replay only 50.

Enabling Snapshots

Use with_snapshots() when creating the repository:

use sourcery::{Repository, snapshot::InMemorySnapshotStore, store::{inmemory, JsonCodec}};

let event_store = inmemory::Store::new(JsonCodec);
let snapshot_store = InMemorySnapshotStore::always();

let mut repository = Repository::new(event_store)
    .with_snapshots(snapshot_store);

Snapshot Policies

InMemorySnapshotStore provides three policies:

Always

Save a snapshot after every command:

let snapshots = InMemorySnapshotStore::always();

Use for: Aggregates with expensive replay, testing.

Every N Events

Save after accumulating N events since the last snapshot:

let snapshots = InMemorySnapshotStore::every(100);

Use for: Production workloads balancing storage vs. replay cost.

Never

Never save (load-only mode):

let snapshots = InMemorySnapshotStore::never();

Use for: Read-only replicas, debugging.

The SnapshotStore Trait

pub trait SnapshotStore: Send + Sync {
    /// Aggregate identifier type.
    ///
    /// Must match the `EventStore::Id` type used in the same repository.
    type Id: Send + Sync + 'static;

    /// Position type for tracking snapshot positions.
    ///
    /// Must match the `EventStore::Position` type used in the same repository.
    type Position: Send + Sync + 'static;

    /// Error type for snapshot operations.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Load the most recent snapshot for an aggregate.
    ///
    /// Returns `Ok(None)` if no snapshot exists.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying storage fails.
    fn load<'a>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
    ) -> impl Future<Output = Result<Option<Snapshot<Self::Position>>, Self::Error>> + Send + 'a;

    /// Whether to store a snapshot, with lazy snapshot creation.
    ///
    /// The repository calls this after successfully appending new events,
    /// passing `events_since_last_snapshot` and a `create_snapshot`
    /// callback. Implementations may decline without invoking
    /// `create_snapshot`, avoiding unnecessary snapshot creation cost
    /// (serialization, extra I/O, etc.).
    ///
    /// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
    /// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
    /// snapshot was stored.
    ///
    /// # Errors
    ///
    /// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
    /// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
    fn offer_snapshot<'a, CE, Create>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        events_since_last_snapshot: u64,
        create_snapshot: Create,
    ) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
    where
        CE: std::error::Error + Send + Sync + 'static,
        Create: FnOnce() -> Result<Snapshot<Self::Position>, CE> + 'a;
}

The repository calls offer_snapshot after successfully appending new events. Implementations may decline without invoking create_snapshot, avoiding unnecessary snapshot encoding work.

The Snapshot Type

pub struct Snapshot<Pos> {
    pub position: Pos,
    pub data: Vec<u8>,
}

The data is the serialized aggregate state encoded using the repository’s event codec.

When to Snapshot

Aggregate TypeRecommendation
Short-lived (< 100 events)Skip snapshots
Medium (100-1000 events)Every 100-500 events
Long-lived (1000+ events)Every 100 events
High-throughputEvery N events, tuned to your SLA

Implementing a Custom Store

For production, implement SnapshotStore with your database. See Custom Stores for a complete guide.

Snapshot Invalidation

Snapshots are tied to your aggregate’s serialized form. When you change the struct:

  1. Add fields — Use #[serde(default)] for backwards compatibility
  2. Remove fields — Old snapshots still deserialize (extra fields ignored)
  3. Rename fields — Use #[serde(alias = "old_name")]
  4. Change types — Old snapshots become invalid; delete them

For major changes, delete old snapshots and let them rebuild from events.

Next

Event Versioning — Evolving event schemas over time

Event Versioning

Events are immutable and stored forever. When your domain model evolves, you need strategies to read old events with new code.

The Challenge

20242025

UserRegistered { email: String }

UserRegistered { email: String, marketing_consent: bool }

How to read?

Old events lack fields that new code expects.

Strategy 1: Serde Defaults

The simplest approach—use serde attributes:

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRegistered {
    pub email: String,
    #[serde(default)]
    pub marketing_consent: bool,  // Defaults to false for old events
}

Works for:

  • Adding optional fields
  • Fields with sensible defaults

Strategy 2: Explicit Versioning

Keep old event types, migrate at deserialization:

// Original event (still in storage)
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRegisteredV1 {
    pub email: String,
}

// Current event
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRegistered {
    pub email: String,
    pub marketing_consent: bool,
}

impl From<UserRegisteredV1> for UserRegistered {
    fn from(v1: UserRegisteredV1) -> Self {
        Self {
            email: v1.email,
            marketing_consent: false, // Assumed for old users
        }
    }
}

Strategy 3: Using serde-evolve

The serde-evolve crate automates version chains:

use serde_evolve::Evolve;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRegisteredV1 {
    pub email: String,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRegisteredV2 {
    pub email: String,
    pub marketing_consent: bool,
}

impl From<UserRegisteredV1> for UserRegisteredV2 {
    fn from(v1: UserRegisteredV1) -> Self {
        Self { email: v1.email, marketing_consent: false }
    }
}

#[derive(Clone, Debug, Serialize, Deserialize, Evolve)]
#[evolve(ancestors(UserRegisteredV1, UserRegisteredV2))]
pub struct UserRegistered {
    pub email: String,
    pub marketing_consent: bool,
    pub signup_source: Option<String>,  // New in V3
}

impl From<UserRegisteredV2> for UserRegistered {
    fn from(v2: UserRegisteredV2) -> Self {
        Self {
            email: v2.email,
            marketing_consent: v2.marketing_consent,
            signup_source: None,
        }
    }
}

When deserializing, serde-evolve tries each ancestor in order and applies the From chain.

Strategy 4: Codec-Level Migration

Handle versioning in a custom codec:

pub struct VersionedJsonCodec;

impl Codec for VersionedJsonCodec {
    type Error = serde_json::Error;

    fn deserialize<E: DeserializeOwned>(&self, data: &[u8]) -> Result<E, Self::Error> {
        // Parse as Value first
        let mut value: serde_json::Value = serde_json::from_slice(data)?;

        // Apply migrations based on type or missing fields
        if value.get("marketing_consent").is_none() {
            value["marketing_consent"] = serde_json::Value::Bool(false);
        }

        // Deserialize to target type
        serde_json::from_value(value)
    }

    fn serialize<E: Serialize>(&self, event: &E) -> Result<Vec<u8>, Self::Error> {
        serde_json::to_vec(event)
    }
}

Which Strategy to Use?

ScenarioRecommended Approach
Adding optional fieldSerde defaults
Adding required field with known defaultSerde defaults
Complex migration logicExplicit versions + From
Multiple version hopsserde-evolve
Schema changes with external validationCodec-level

Event KIND Stability

The KIND constant must never change for stored events:

// BAD: Changing KIND breaks deserialization
impl DomainEvent for UserRegistered {
    const KIND: &'static str = "user.created";  // Was "user.registered"
}

// GOOD: Use new event type, migrate in code
impl DomainEvent for UserCreated {
    const KIND: &'static str = "user.created";
}
// Old UserRegistered events still deserialize, then convert

Testing Migrations

Include serialized old events in your test suite:

#[test]
fn deserializes_v1_events() {
    let v1_json = r#"{"email":"old@example.com"}"#;
    let event: UserRegistered = serde_json::from_str(v1_json).unwrap();
    assert!(!event.marketing_consent);
}

This catches regressions when refactoring.

Next

Custom Metadata — Adding infrastructure context to events

Custom Metadata

Metadata carries infrastructure concerns alongside events without polluting domain types. Common uses include correlation IDs, user context, timestamps, and causation tracking.

Defining Metadata

Create a struct for your metadata:

use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EventMetadata {
    pub correlation_id: String,
    pub causation_id: Option<String>,
    pub user_id: String,
    pub timestamp: DateTime<Utc>,
}

Using Metadata with the Store

Configure your store with the metadata type:

use sourcery::store::{inmemory, JsonCodec};

let store: inmemory::Store<String, JsonCodec, EventMetadata> =
    inmemory::Store::new(JsonCodec);

Passing Metadata to Commands

Provide metadata when executing commands:

let metadata = EventMetadata {
    correlation_id: Uuid::new_v4().to_string(),
    causation_id: None,
    user_id: current_user.id.clone(),
    timestamp: Utc::now(),
};

repository.execute_command::<Account, Deposit>(
    &account_id,
    &Deposit { amount: 100 },
    &metadata,
)
.await?;

Each event produced by the command receives this metadata.

Accessing Metadata in Projections

Projections receive metadata as the third parameter:

#[derive(Debug, Default)]
pub struct AuditLog {
    pub entries: Vec<AuditEntry>,
}

impl Projection for AuditLog {
    type Id = String;
    type Metadata = EventMetadata;
}

impl ApplyProjection<FundsDeposited> for AuditLog {
    fn apply_projection(
        &mut self,
        aggregate_id: &Self::Id,
        event: &FundsDeposited,
        meta: &Self::Metadata,
    ) {
        self.entries.push(AuditEntry {
            timestamp: meta.timestamp,
            user: meta.user_id.clone(),
            correlation_id: meta.correlation_id.clone(),
            action: format!("Deposited {} to account {}", event.amount, aggregate_id),
        });
    }
}

Correlation and Causation

Track event relationships for debugging and workflows:

Request A (correlation: abc)

OrderPlaced causation: null

InventoryReserved causation: OrderPlaced

PaymentProcessed causation: OrderPlaced

  • Correlation ID: Groups all events from a single user request
  • Causation ID: Points to the event that triggered this one
// When handling a saga or process manager
let follow_up_metadata = EventMetadata {
    correlation_id: original_meta.correlation_id.clone(),
    causation_id: Some(original_event_id.to_string()),
    user_id: "system".to_string(),
    timestamp: Utc::now(),
};

Unit Metadata

If you don’t need metadata, use ():

let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);

repository
    .execute_command::<Account, Deposit>(&id, &cmd, &())
    .await?;

Projections with type Metadata = () ignore the metadata parameter.

Metadata vs Event Data

Put in MetadataPut in Event
Who did it (user ID)What happened (domain facts)
When (timestamp)Domain-relevant times (due date)
Request tracing (correlation)Business identifiers
Infrastructure contextDomain context

Events should be understandable without metadata. Metadata enhances observability.

Example: Multi-Tenant Metadata

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TenantMetadata {
    pub tenant_id: String,
    pub user_id: String,
    pub request_id: String,
}

impl Projection for TenantDashboard {
    type Id = String;
    type Metadata = TenantMetadata;
}

impl ApplyProjection<OrderPlaced> for TenantDashboard {
    fn apply_projection(
        &mut self,
        _id: &Self::Id,
        event: &OrderPlaced,
        meta: &Self::Metadata,
    ) {
        // Only count orders for our tenant
        if meta.tenant_id == self.tenant_id {
            self.order_count += 1;
            self.total_revenue += event.total;
        }
    }
}

Next

Custom Stores — Implementing your own persistence layer

Custom Stores

The inmemory::Store is useful for testing, but production systems need durable storage. This guide walks through implementing EventStore for your database.

The EventStore Trait

pub trait EventStore: Send + Sync {
    /// Aggregate identifier type.
    ///
    /// This type must be clonable so repositories can reuse IDs across calls.
    /// Common choices: `String`, `Uuid`, or custom ID types.
    type Id: Clone + Send + Sync + 'static;

    /// Position type used for ordering events and version checking.
    ///
    /// Must be `Copy + PartialEq` to support optimistic concurrency.
    /// Use `()` if ordering is not needed.
    type Position: Copy + PartialEq + std::fmt::Debug + Send + Sync + 'static;

    /// Store-specific error type.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Serialization codec.
    type Codec: Codec + Clone + Send + Sync + 'static;

    /// Metadata type for infrastructure concerns.
    type Metadata: Send + Sync + 'static;

    fn codec(&self) -> &Self::Codec;

    /// Get the current version (latest position) for an aggregate stream.
    ///
    /// Returns `None` for streams with no events.
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when the operation fails.
    fn stream_version<'a>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
    ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;

    /// Begin a transaction for appending events to an aggregate.
    ///
    /// The transaction type is determined by the concurrency strategy `C`.
    ///
    /// # Arguments
    /// * `aggregate_kind` - The aggregate type identifier (`Aggregate::KIND`)
    /// * `aggregate_id` - The aggregate instance identifier
    /// * `expected_version` - The version expected for optimistic concurrency
    fn begin<C: ConcurrencyStrategy>(
        &mut self,
        aggregate_kind: &str,
        aggregate_id: Self::Id,
        expected_version: Option<Self::Position>,
    ) -> Transaction<'_, Self, C>
    where
        Self: Sized;

    /// Append events with optional version checking.
    ///
    /// If `expected_version` is `Some`, the append fails with a concurrency
    /// conflict if the current stream version doesn't match.
    /// If `expected_version` is `None`, no version checking is performed.
    ///
    /// # Errors
    ///
    /// Returns [`AppendError::Conflict`] if the version doesn't match, or
    /// [`AppendError::Store`] if persistence fails.
    fn append<'a>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        expected_version: Option<Self::Position>,
        events: Vec<PersistableEvent<Self::Metadata>>,
    ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;

    /// Load events matching the specified filters.
    ///
    /// Each filter describes an event kind and optional aggregate identity:
    /// - [`EventFilter::for_event`] loads every event of the given kind
    /// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
    ///
    /// The store optimizes based on its storage model and returns events
    /// merged by position (if positions are available).
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when loading fails.
    fn load_events<'a>(
        &'a self,
        filters: &'a [EventFilter<Self::Id, Self::Position>],
    ) -> impl Future<
        Output = LoadEventsResult<Self::Id, Self::Position, Self::Metadata, Self::Error>,
    > + Send
    + 'a;

    /// Append events expecting an empty stream.
    ///
    /// This method is used by optimistic concurrency when creating new
    /// aggregates. It fails with a [`ConcurrencyConflict`] if the stream
    /// already has events.
    ///
    /// # Errors
    ///
    /// Returns [`AppendError::Conflict`] if the stream is not empty,
    /// or [`AppendError::Store`] if persistence fails.
    fn append_expecting_new<'a>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        events: Vec<PersistableEvent<Self::Metadata>>,
    ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
}

Design Decisions

Position Type

Choose based on your ordering needs:

Position TypeUse Case
()Unordered, append-only log
u64Global sequence number
(i64, i32)Timestamp + sequence for distributed systems
UuidEvent IDs for deduplication

Storage Schema

A typical SQL schema:

CREATE TABLE events (
    position BIGSERIAL PRIMARY KEY,
    aggregate_kind TEXT NOT NULL,
    aggregate_id TEXT NOT NULL,
    event_kind TEXT NOT NULL,
    data BYTEA NOT NULL,
    metadata JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_events_aggregate
    ON events (aggregate_kind, aggregate_id, position);

CREATE INDEX idx_events_kind
    ON events (event_kind, position);

Implementation Skeleton

use std::future::Future;
use sourcery::store::{
    AppendError, EventFilter, EventStore, JsonCodec, PersistableEvent, StoredEvent, Transaction,
};
use sourcery::concurrency::ConcurrencyStrategy;

pub struct PostgresEventStore {
    pool: sqlx::PgPool,
    codec: JsonCodec,
}

impl EventStore for PostgresEventStore {
    type Id = String;
    type Position = i64;
    type Error = sqlx::Error;
    type Codec = JsonCodec;
    type Metadata = serde_json::Value;

    fn codec(&self) -> &Self::Codec { &self.codec }

    fn stream_version<'a>(&'a self, aggregate_kind: &'a str, aggregate_id: &'a Self::Id)
        -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a
    {
        async move { todo!("SELECT MAX(position) WHERE aggregate_kind = $1 AND aggregate_id = $2") }
    }

    fn begin<C: ConcurrencyStrategy>(&mut self, aggregate_kind: &str, aggregate_id: Self::Id, expected_version: Option<Self::Position>)
        -> Transaction<'_, Self, C>
    {
        Transaction::new(self, aggregate_kind.to_string(), aggregate_id, expected_version)
    }

    fn append<'a>(&'a mut self, aggregate_kind: &'a str, aggregate_id: &'a Self::Id, expected_version: Option<Self::Position>, events: Vec<PersistableEvent<Self::Metadata>>)
        -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a
    {
        async move { todo!("INSERT with version check") }
    }

    fn append_expecting_new<'a>(&'a mut self, aggregate_kind: &'a str, aggregate_id: &'a Self::Id, events: Vec<PersistableEvent<Self::Metadata>>)
        -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a
    {
        async move { todo!("INSERT only if stream empty") }
    }

    fn load_events<'a>(&'a self, filters: &'a [EventFilter<Self::Id, Self::Position>])
        -> impl Future<Output = Result<Vec<StoredEvent<Self::Id, Self::Position, Self::Metadata>>, Self::Error>> + Send + 'a
    {
        async move { todo!("SELECT with filters") }
    }
}

Implementing Transactions

The Transaction type manages event batching:

impl PostgresEventStore {
    pub async fn append_batch(
        &mut self,
        aggregate_kind: &str,
        aggregate_id: &str,
        events: Vec<PersistableEvent<serde_json::Value>>,
    ) -> Result<(), sqlx::Error> {
        let mut tx = self.pool.begin().await?;

        for event in events {
            sqlx::query(
                "INSERT INTO events (aggregate_kind, aggregate_id, event_kind, data, metadata)
                 VALUES ($1, $2, $3, $4, $5)"
            )
            .bind(aggregate_kind)
            .bind(aggregate_id)
            .bind(&event.kind)
            .bind(&event.data)
            .bind(&event.metadata)
            .execute(&mut *tx)
            .await?;
        }

        tx.commit().await?;
        Ok(())
    }
}

Loading Events

Handle multiple filters efficiently:

fn load_events(&self, filters: &[EventFilter<String, i64>])
    -> Result<Vec<StoredEvent<String, i64, serde_json::Value>>, sqlx::Error>
{
    // Deduplicate overlapping filters
    // Build WHERE clause:
    //   (event_kind = 'x' AND position > N)
    //   OR (event_kind = 'y' AND aggregate_kind = 'a' AND aggregate_id = 'b')
    // ORDER BY position ASC
    // Map rows to StoredEvent
    todo!()
}

Optimistic Concurrency

For systems requiring strict ordering, add version checking:

ALTER TABLE events ADD COLUMN stream_version INT NOT NULL;

CREATE UNIQUE INDEX idx_events_stream_version
    ON events (aggregate_kind, aggregate_id, stream_version);
// In append_batch:
// 1. Get current max version for stream
// 2. Insert with version + 1
// 3. Handle unique constraint violation as concurrency conflict

Event Stores for Different Databases

DynamoDB

Table: events
  PK: {aggregate_kind}#{aggregate_id}
  SK: {position:012d}  (zero-padded for sorting)
  GSI: event_kind-position-index

MongoDB

{
  _id: ObjectId,
  aggregateKind: "account",
  aggregateId: "ACC-001",
  eventKind: "account.deposited",
  position: NumberLong(1234),
  data: BinData(...),
  metadata: { ... }
}

S3 (Append-Only Log)

s3://bucket/events/{aggregate_kind}/{aggregate_id}/{position}.json
s3://bucket/events-by-kind/{event_kind}/{position}.json  (symlinks/copies)

Testing Your Store

Use the same test patterns as inmemory::Store:

#[tokio::test]
async fn test_append_and_load() {
    let mut store = PostgresEventStore::new(test_pool()).await;

    // Append events
    let mut tx = store.begin::<Unchecked>("account", "ACC-001".to_string(), None);
    tx.append(event, metadata)?;
    tx.commit()?;

    // Load and verify
    let events = store.load_events(&[
        EventFilter::for_aggregate("account.deposited", "account", "ACC-001".to_string())
    ])?;

    assert_eq!(events.len(), 1);
}

Next

Test Framework — Testing aggregates in isolation

Test Framework

The crate provides two testing utilities:

  • TestFramework: Unit testing aggregates in isolation (no stores, no serialization)
  • RepositoryTestExt: Integration testing with real repositories (seeding data, simulating concurrency)

Unit Testing with TestFramework

TestFramework tests aggregates in isolation using the Given-When-Then pattern. No stores, no serialization—just pure domain logic.

Enabling the Test Framework

Add the test-util feature to your dev dependencies:

[dev-dependencies]
sourcery = { version = "0.1", features = ["test-util"] }

Basic Usage

use sourcery::test::TestFramework;

type AccountTest = TestFramework<Account>;

#[test]
fn deposit_increases_balance() {
    AccountTest::new()
        .given(vec![FundsDeposited { amount: 100 }.into()])
        .when(&Deposit { amount: 50 })
        .then_expect_events(&[FundsDeposited { amount: 50 }.into()]);
}

The Given-When-Then Pattern

flowchart LR
    Given["Given<br/>(past events)"] --> When["When<br/>(command)"]
    When --> Then["Then<br/>(expected outcome)"]
  • Given: Events that have already occurred (establishes state)
  • When: The command being tested
  • Then: Expected events or error

Given Methods

given(events)

Start with a list of past events:

AccountTest::new()
    .given(vec![
        FundsDeposited { amount: 100 }.into(),
        FundsWithdrawn { amount: 30 }.into(),
    ])
    // Balance is now 70

given_no_previous_events()

Start with a fresh aggregate:

AccountTest::new()
    .given_no_previous_events()
    // Balance is 0

When Methods

when(command)

Execute a command against the aggregate:

.when(&Withdraw { amount: 50 })

Then Methods

then_expect_events(events)

Assert that specific events were produced:

.then_expect_events(&[
    FundsWithdrawn { amount: 50 }.into(),
]);

Events are compared using PartialEq, so your event types must derive it.

then_expect_no_events()

Assert that the command produced no events:

AccountTest::new()
    .given_no_previous_events()
    .when(&Deposit { amount: 0 })  // No-op deposit
    .then_expect_no_events();

then_expect_error()

Assert that the command failed (any error):

AccountTest::new()
    .given(vec![FundsDeposited { amount: 50 }.into()])
    .when(&Withdraw { amount: 100 })  // Insufficient funds
    .then_expect_error();

then_expect_error_eq(error)

Assert a specific error:

.then_expect_error_eq(&AccountError::InsufficientFunds);

then_expect_error_message(substring)

Assert the error message contains a substring:

.then_expect_error_message("insufficient");

inspect_result(closure)

Custom assertions on the result:

.inspect_result(|result| {
    let events = result.as_ref().unwrap();
    assert_eq!(events.len(), 2);
    // Custom validation...
});

Complete Test Suite Example

use sourcery::test::TestFramework;

type AccountTest = TestFramework<Account>;

#[test]
fn deposits_positive_amount() {
    AccountTest::new()
        .given_no_previous_events()
        .when(&Deposit { amount: 100 })
        .then_expect_events(&[FundsDeposited { amount: 100 }.into()]);
}

#[test]
fn rejects_overdraft() {
    AccountTest::new()
        .given(vec![FundsDeposited { amount: 100 }.into()])
        .when(&Withdraw { amount: 150 })
        .then_expect_error_eq(&AccountError::InsufficientFunds);
}

#[test]
fn rejects_invalid_deposit() {
    AccountTest::new()
        .given_no_previous_events()
        .when(&Deposit { amount: -50 })
        .then_expect_error();
}

Testing Projections

Projections don’t use TestFramework. Test them directly:

#[test]
fn projection_aggregates_deposits() {
    let mut proj = AccountSummary::default();

    proj.apply_projection("ACC-001", &FundsDeposited { amount: 100 }, &());
    proj.apply_projection("ACC-002", &FundsDeposited { amount: 50 }, &());
    proj.apply_projection("ACC-001", &FundsDeposited { amount: 25 }, &());

    assert_eq!(proj.accounts.get("ACC-001"), Some(&125));
    assert_eq!(proj.accounts.get("ACC-002"), Some(&50));
}

Next

Design Decisions — Why the crate works this way

Design Decisions

This page explains the reasoning behind key architectural choices in the crate.

Events as First-Class Structs

Decision: Events are standalone structs implementing DomainEvent, not just enum variants.

Why:

  • Events can be shared across multiple aggregates
  • Projections subscribe to event types, not aggregate enums
  • Easier to evolve events independently
  • Better compile-time isolation

Trade-off: More boilerplate (mitigated by derive macro).

// This crate: events stand alone
struct OrderPlaced { /* ... */ }
struct PaymentReceived { /* ... */ }

// Both Order and Payment aggregates can use PaymentReceived

IDs as Infrastructure

Decision: Aggregate IDs are passed to the repository, not embedded in events or handlers.

Why:

  • Domain events stay pure (no infrastructure metadata)
  • Same event type works with different ID schemes
  • Command handlers focus on behavior, not identity
  • IDs travel in the envelope alongside events

Trade-off: You can’t access the ID inside Handle<C>. If needed, include relevant IDs in the command.

// ID is infrastructure
repository
    .execute_command::<Account, Deposit>(&account_id, &command, &metadata)
    .await?;

// Event doesn't contain ID
struct FundsDeposited { amount: i64 }  // No account_id field

No Built-In Command Bus

Decision: The crate provides Repository::execute_command() but no command routing infrastructure.

Why:

  • Command buses vary widely (sync, async, distributed, in-process)
  • Many applications don’t need one (web handlers call repository directly)
  • Easy to add on top: trait object dispatch, actor messages, etc.
  • Keeps the crate focused on event sourcing, not messaging

What you might build:

// Your command bus (not provided)
trait CommandBus {
    fn dispatch<C: Command>(&self, id: &str, command: C) -> Result<(), Error>;
}

Versioning at the Codec Layer

Decision: No explicit “upcaster” concept. Event migration happens in the codec or via serde.

Why:

  • Simpler mental model—one place for serialization concerns
  • Works with any serde-compatible migration library
  • Codec can optimize (e.g., cache parsed schemas)
  • Avoids framework lock-in for versioning strategy

How:

// Option 1: serde defaults
#[serde(default)]
pub marketing_consent: bool,

// Option 2: serde-evolve
#[derive(Evolve)]
#[evolve(ancestors(V1, V2))]
pub struct Event { /* ... */ }

// Option 3: Codec-level
impl Codec for VersionedCodec {
    fn deserialize(&self, data: &[u8]) -> Result<E> {
        // Parse, migrate, return current version
    }
}

Projections Decoupled from Aggregates

Decision: Projections implement ApplyProjection<E> for event types, not aggregate types.

Why:

  • A projection can consume events from many aggregates
  • Projections don’t need to know about aggregate enums
  • Enables cross-aggregate views naturally
  • Better separation of read and write concerns
// Projection consumes from multiple aggregates
impl Projection for Dashboard { type Id = String; type Metadata = (); }
impl ApplyProjection<OrderPlaced> for Dashboard { /* ... */ }
impl ApplyProjection<PaymentReceived> for Dashboard { /* ... */ }
impl ApplyProjection<ShipmentDispatched> for Dashboard { /* ... */ }

Minimal Infrastructure

Decision: No outbox, no scheduler, no event streaming, no saga orchestrator.

Why:

  • Event sourcing is the foundation; infrastructure varies by use case
  • Async runtimes differ (tokio, async-std, sync-only)
  • Message brokers differ (Kafka, RabbitMQ, NATS, none)
  • Database choices affect outbox patterns
  • Keeps dependencies minimal
  • may be added in future!!

What we do provide:

  • Core traits for aggregates and projections
  • Repository for command execution
  • In-memory store for testing
  • Test framework for aggregate testing

What you add:

  • Production event store
  • Outbox pattern (if needed)
  • Process managers / sagas
  • Event publishing to message broker