Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

sourcery provides building blocks for pragmatic event-sourced systems in Rust. It keeps your domain types pure while giving you tools to rebuild state, project read models, and persist events through a pluggable store interface.

When to Use Event Sourcing

Event sourcing shines when you need:

  • Complete audit trails — every state change is recorded
  • Time travel — reconstruct state at any point in history
  • Decoupled read models — optimise queries independently of writes
  • Complex domain logic — model behaviour as a sequence of facts

It adds complexity, so consider simpler approaches for CRUD-heavy applications with minimal business logic.

What This Crate Provides

IncludedNot Included
Core traits for aggregates, events, projectionsCommand bus / message broker
Derive macros to reduce boilerplateOutbox pattern
Repository for command executionSnapshot scheduler
Push-based subscriptions for live projections
In-memory store for testing
PostgreSQL store (optional)
Test framework (given-when-then)

The crate is intentionally minimal. You wire the repository into whichever pipeline you prefer—whether that’s a web framework, actor system, or CLI tool.

A Taste of the API

#[derive(Aggregate)]
#[aggregate(id = String, error = String, events(FundsDeposited, FundsWithdrawn))]
pub struct Account {
    balance: i64,
}

impl Handle<Deposit> for Account {
    fn handle(&self, cmd: &Deposit) -> Result<Vec<Self::Event>, Self::Error> {
        Ok(vec![FundsDeposited { amount: cmd.amount }.into()])
    }
}

The derive macro generates the event enum and serialisation glue. You focus on domain behaviour.

Next Steps

Event Sourcing Primer

Event sourcing stores state as a sequence of events rather than as a single mutable record. Instead of updating a row in a database, you append an event describing what happened. Current state is derived by replaying events from the beginning.

Traditional State vs Event Log

Traditional (Mutable State)Event SourcedUPDATE balance = 150Current: 150UPDATE balance = 50Current: 50FundsDeposited(100)FundsDeposited(50)FundsWithdrawn(100)Replay: 100 + 50 - 100 = 50  



With traditional storage, you only know the current balance. With event sourcing, you know how you got there.

The Core Principle

Events are the source of truth. To “undo” something, append a compensating event.

PropertyDescription
ImmutableEvents cannot be changed after creation
Past tenseNamed as facts: OrderPlaced, FundsDeposited
CompleteContains all data needed to understand what happened
OrderedSequence matters for correct state reconstruction

Reconstructing State

When you need the current state of an entity, the repository replays its events in order into a fresh aggregate instance. This “replay” process runs every time you load an aggregate. For long-lived entities, snapshots optimise this by checkpointing state periodically.

Benefits

  • Audit trail — Every change is recorded with full context
  • Debugging — Replay events to reproduce bugs exactly
  • Temporal queries — Answer “what was the state at time T?”
  • Event-driven architecture — Events naturally feed other systems

Trade-offs

  • Complexity — More concepts to understand than CRUD
  • Storage growth — Event logs grow indefinitely (though events compress well)
  • Eventual consistency — Read models may lag behind writes
  • Schema evolution — Old events must remain readable forever

Next

CQRS Overview — Separating reads from writes

CQRS Overview

Command Query Responsibility Segregation (CQRS) separates the write model (how you change data) from the read model (how you query data). Event sourcing and CQRS complement each other naturally.

The Split

Write Side (Commands)Read Side (Queries)CommandAggregateEventsStoreEvent StoreProjectionViewQuery  Event StoreEvent StoreEvent StoreRead ModelRead Model  







Write side: Commands validate against the aggregate and produce events. The aggregate enforces business rules.

Read side: Projections consume events and build optimised views for queries. Each projection can structure data however the UI needs.

Why Separate?

ConcernWrite ModelRead Model
Optimised forConsistency, validationQuery performance
StructureReflects domain behaviourReflects UI needs
UpdatesTransactionalEventually consistent
ScalingConsistent writesReplicated reads

A single aggregate might feed multiple projections (account summary, transaction history, fraud detection, analytics). Each projection sees the same events but builds a different view.

In This Crate

The crate models CQRS through:

  • Aggregate + Handle<C> — the write side
  • Projection + ApplyProjection<E> — the read side
  • Repository — orchestrates both
// Write: execute a command
repository
    .execute_command::<Account, Deposit>(&id, &command, &metadata)
    .await?;

// Read: load a projection
let summary = repository
    .load_projection::<AccountSummary>(&())
    .await?;

Eventual Consistency

With CQRS, read models may not immediately reflect the latest write. This is fine for most UIs—users expect slight delays. For operations requiring strong consistency, read from the aggregate itself (via repo.load()).

Next

Architecture — How the components fit together

Architecture

This page shows how the crate’s components connect and where your domain code fits in.

Component Overview

Your Applicationsourcery crateCommandAggregate DefinitionEvent DefinitionsProjection DefinitionRepositoryEventStoreSnapshotStore    



You define: Aggregates, events, commands, projections

The crate provides: Repository orchestration, store abstraction, serialisation

The Envelope Pattern

Events travel with metadata, but domain types stay pure:

  • Aggregates receive only the pure event—no IDs or metadata
  • Projections receive the full envelope—aggregate ID and metadata included
  • Stores persist the envelope but deserialise only what’s needed

This keeps domain logic free of infrastructure concerns.

Command Execution Flow

ApplicationRepository (snapshots enabled)EventStoreAggregateSnapshotStore  execute_command(id, cmd, meta)load(id)?  Optional snapshotload_events(filters)Vec<StoredEvent>replay events (apply)handle(command)Vec<Event>commit_events()offer_snapshot()?Ok(())













Projection Loading Flow

ApplicationRepositoryEventStoreProjection  load_projection::<P>(&instance_id)P::filters(&instance_id)  Filters (event filters + handlers)load_events(filters)Vec<StoredEvent>P::init(&instance_id)apply_projection(id, event, meta) [For each event]Projection










Projections define their event filters centrally in the ProjectionFilters trait. The repository calls filters() to determine which events to load, then replays them into the projection.

Key Types

TypeRole
RepositoryOrchestrates command execution, aggregate loading, and projection loading
EventStoreTrait for event persistence
SnapshotStoreTrait for aggregate/projection snapshots
AggregateTrait for command-side entities
ProjectionFiltersBase trait for event subscribers (projections)
ProjectionStable KIND identifier for projection snapshot storage
ApplyProjection<E>Per-event handler for projections
FiltersBuilder for event filter specs + handler closures
DomainEventMarker trait for event structs

For exact generic signatures, see the API docs on docs.rs. This page is intentionally focused on responsibilities and flow.

Next

Installation — Add the crate to your project

Installation

Add sourcery to your Cargo.toml:

[dependencies]
sourcery = "0.1"
serde = { version = "1", features = ["derive"] }

The crate re-exports the derive macro, so you don’t need a separate dependency for sourcery-macros.

Feature Flags

FeatureDescription
test-utilEnables TestFramework for given-when-then aggregate testing
postgresPostgreSQL event store via sqlx

To enable test utilities:

[dev-dependencies]
sourcery = { version = "0.1", features = ["test-util"] }

To use the PostgreSQL store:

[dependencies]
sourcery = { version = "0.1", features = ["postgres"] }

Minimum Rust Version

This crate requires Rust 1.88.0 or later (edition 2024).

Next

Quick Start — Build your first aggregate

Quick Start

Build a simple bank account aggregate in a compact, end-to-end example. This example demonstrates events, commands, an aggregate, a projection, and the repository.

The Complete Example

use serde::{Deserialize, Serialize};
use sourcery::{Apply, ApplyProjection, DomainEvent, Handle, Repository, store::inmemory};

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct FundsDeposited {
    pub amount: i64,
}

impl DomainEvent for FundsDeposited {
    const KIND: &'static str = "account.deposited";
}

#[derive(Debug)]
pub struct Deposit {
    pub amount: i64,
}

#[derive(Default, Serialize, Deserialize, sourcery::Aggregate)]
#[aggregate(
    id = String,
    error = String,
    events(FundsDeposited),
    derives(Debug, PartialEq, Eq)
)]
pub struct Account {
    balance: i64,
}

impl Apply<FundsDeposited> for Account {
    fn apply(&mut self, event: &FundsDeposited) {
        self.balance += event.amount;
    }
}

impl Handle<Deposit> for Account {
    fn handle(&self, cmd: &Deposit) -> Result<Vec<Self::Event>, Self::Error> {
        if cmd.amount <= 0 {
            return Err("amount must be positive".into());
        }
        Ok(vec![FundsDeposited { amount: cmd.amount }.into()])
    }
}

#[derive(Debug, Default, sourcery::Projection)]
#[projection(events(FundsDeposited))]
pub struct TotalDeposits {
    pub total: i64,
}

impl ApplyProjection<FundsDeposited> for TotalDeposits {
    fn apply_projection(&mut self, _id: &Self::Id, event: &FundsDeposited, _meta: &Self::Metadata) {
        self.total += event.amount;
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create an in-memory store
    let store = inmemory::Store::new();
    let repository = Repository::new(store);

    // Execute a command
    repository
        .execute_command::<Account, Deposit>(&"ACC-001".to_string(), &Deposit { amount: 100 }, &())
        .await?;

    // Load a projection
    let totals = repository.load_projection::<TotalDeposits>(&()).await?;

    println!("Total deposits: {}", totals.total);
    assert_eq!(totals.total, 100);

    Ok(())
}

What Just Happened?

  1. Defined typesFundsDeposited event with DomainEvent, Deposit command (plain struct)
  2. Created an aggregateAccount with the derive macro, Apply for state mutations, Handle for command validation
  3. Created a projectionTotalDeposits builds a read model from events
  4. Wired the repository — Connected everything with inmemory::Store

Key Points

  • Events are past tense factsFundsDeposited, not DepositFunds
  • Commands are imperativeDeposit, not Deposited
  • The derive macro generates — The event enum, From impls, serialisation
  • Projections are decoupled — They receive events, not aggregate types
  • IDs are infrastructure — Passed to the repository, not embedded in events

Next Steps

Aggregates

An aggregate rebuilds state from events and validates commands to produce new events.

Basic Usage

For most aggregates:

  1. #[derive(Aggregate)]
  2. Implement Apply<E> per event
  3. Implement Handle<C> per command
#[derive(Default, sourcery::Aggregate)]
#[aggregate(id = String, error = AccountError, events(FundsDeposited, FundsWithdrawn))]
pub struct Account {
    balance: i64,
}

impl Apply<FundsDeposited> for Account {
    fn apply(&mut self, event: &FundsDeposited) {
        self.balance += event.amount;
    }
}

impl Apply<FundsWithdrawn> for Account {
    fn apply(&mut self, event: &FundsWithdrawn) {
        self.balance -= event.amount;
    }
}

Loading Aggregates

let account: Account = repository
    .load(&account_id)
    .await?;

This replays events for that aggregate ID. If snapshots are configured, replay starts from the latest snapshot.

Trait Reference

The Aggregate Trait

pub trait Aggregate: Default {
    /// Aggregate type identifier used by the event store.
    ///
    /// This is combined with the aggregate ID to create stream identifiers.
    /// Use lowercase, kebab-case for consistency: `"product"`,
    /// `"user-account"`, etc.
    const KIND: &'static str;

    type Event;
    type Error;
    type Id;

    /// Apply an event to update aggregate state.
    ///
    /// This is called during event replay to rebuild aggregate state from
    /// history.
    ///
    /// When using `#[derive(Aggregate)]`, this dispatches to your `Apply<E>`
    /// implementations. For hand-written aggregates, implement this
    /// directly with a match expression.
    fn apply(&mut self, event: &Self::Event);
}

#[derive(Aggregate)] generates most of this.

The Apply<E> Trait

pub trait Apply<E> {
    fn apply(&mut self, event: &E);
}

apply should be:

  • Infallible (events are facts)
  • Deterministic (same events -> same state)

Event Replay Model

Event Storebalance = 120  apply(FundsDeposited { amount: 100 })apply(FundsWithdrawn { amount: 30 })apply(FundsDeposited { amount: 50 })





Snapshots and serde

Snapshots are opt-in via Repository::with_snapshots().

If enabled, aggregate state must implement Serialize + DeserializeOwned.

Next

Domain Events — Defining events as first-class types

Domain Events

Events are facts: things that already happened.

Basic Usage

Define a plain struct and implement DomainEvent with a stable KIND.

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OrderPlaced {
    pub product_id: String,
    pub quantity: u32,
    pub unit_price: i64,
}

impl DomainEvent for OrderPlaced {
    const KIND: &'static str = "order.placed";
}

Rules of Thumb

  • Use past tense names (OrderPlaced, not PlaceOrder)
  • Keep KIND stable forever once persisted
  • Keep payload focused on a single fact

Why First-Class Structs?

This crate keeps events as standalone structs rather than forcing enum variants:

  • Reuse across aggregates
  • Decoupled projections by event type
  • Smaller, explicit imports

The aggregate derive still generates an internal event enum for replay and storage routing.

KIND Naming Guidance

const KIND: &'static str = "order.placed"; // Good
const KIND: &'static str = "OrderPlaced";  // Acceptable
const KIND: &'static str = "placed";       // Too generic

Event Design Guidelines

  1. Include values needed to replay decisions later.
  2. Keep aggregate IDs in the envelope, not event payload.
  3. Prefer multiple specific events over one broad event.

Trait Reference

pub trait DomainEvent {
    const KIND: &'static str;
}

Next

Commands — Handling requests that produce events

Commands

Commands request a state change. They can be rejected.

Basic Usage

  1. Define a command struct
  2. Implement Handle<C> on your aggregate
  3. Return events or a domain error
#[derive(Debug)]
pub struct Deposit {
    pub amount: i64,
}

impl Handle<Deposit> for Account {
    fn handle(&self, cmd: &Deposit) -> Result<Vec<Self::Event>, Self::Error> {
        if cmd.amount <= 0 {
            return Err(AccountError::InvalidAmount);
        }
        Ok(vec![FundsDeposited { amount: cmd.amount }.into()])
    }
}

Executing Commands

repository
    .execute_command::<Account, Deposit>(&account_id, &Deposit { amount: 100 }, &metadata)
    .await?;

Validation Patterns

  • Reject invalid input (Result::Err)
  • Emit one or more events when valid
  • Return vec![] for valid no-op commands
if cmd.amount == 0 {
    return Ok(vec![]);
}

Trait Reference

The Handle<C> Trait

pub trait Handle<C>: Aggregate {
    /// Handle a command and produce events.
    ///
    /// # Errors
    ///
    /// Returns `Self::Error` if the command is invalid for the current
    /// aggregate state.
    fn handle(&self, command: &C) -> Result<Vec<Self::Event>, Self::Error>;
}

Key points:

  • Takes &self (validate against current state)
  • Returns Vec<Event> (0..n events)
  • Returns Result (commands can fail)

Command Naming

Commands are imperative:

GoodBad
DepositFundsDeposited
PlaceOrderOrderPlaced
RegisterUserUserRegistered
ChangePasswordPasswordChanged

Next

Projections — Building read models from events

Projections

Projections are read models built by replaying events. They are query-oriented and eventually consistent.

For most projections, keep to this shape:

  1. #[derive(Projection)]
  2. #[projection(events(...))]
  3. impl ApplyProjection<E> for each event
use sourcery::ApplyProjection;

#[derive(Debug, Default, sourcery::Projection)]
#[projection(events(FundsDeposited, FundsWithdrawn))]
pub struct AccountSummary {
    pub accounts: HashMap<String, i64>,
}

impl ApplyProjection<FundsDeposited> for AccountSummary {
    fn apply_projection(&mut self, id: &Self::Id, event: &FundsDeposited, _: &Self::Metadata) {
        *self.accounts.entry(id.clone()).or_default() += event.amount;
    }
}

impl ApplyProjection<FundsWithdrawn> for AccountSummary {
    fn apply_projection(&mut self, id: &Self::Id, event: &FundsWithdrawn, _: &Self::Metadata) {
        *self.accounts.entry(id.clone()).or_default() -= event.amount;
    }
}

#[projection(events(...))] auto-generates ProjectionFilters for the common case.

Loading Projections

// Singleton projection (InstanceId = ())
let summary = repository
    .load_projection::<AccountSummary>(&())
    .await?;

// Instance projection (InstanceId = String)
let report = repository
    .load_projection::<CustomerReport>(&customer_id)
    .await?;

When to Implement ProjectionFilters Manually

Use manual filters when you need:

  • Dynamic filtering
  • Scoped filtering (event_for / events_for)
  • Non-default initialisation
  • Full control over Id, InstanceId, or Metadata

Example: Scoped Instance Filter

impl ProjectionFilters for AccountComparison {
    type Id = String;
    type InstanceId = (String, String);
    type Metadata = ();

    fn init(ids: &Self::InstanceId) -> Self {
        Self {
            left_id: ids.0.clone(),
            right_id: ids.1.clone(),
            ..Self::default()
        }
    }

    fn filters<S>(ids: &Self::InstanceId) -> Filters<S, Self>
    where
        S: EventStore<Id = Self::Id, Metadata = Self::Metadata>,
    {
        let (left, right) = ids;
        Filters::new()
            .events_for::<Account>(left)
            .events_for::<Account>(right)
    }
}

Filters Cheat Sheet

Filters::new()
    .event::<FundsDeposited>()                 // Global: all aggregates
    .event_for::<Account, FundsWithdrawn>(&id) // One event type, one aggregate instance
    .events::<AccountEvent>()                  // All kinds in an event enum
    .events_for::<Account>(&id)                // All event kinds for one aggregate instance
MethodScope
.event::<E>()All events of type E across all aggregates
.event_for::<A, E>(&id)Events of type E from one aggregate instance
.events::<Enum>()All event kinds in a ProjectionEvent enum
.events_for::<A>(&id)All event kinds for one aggregate instance

Metadata in Projections

Use metadata = ... in the derive attribute for the common case:

#[derive(Debug, Default, sourcery::Projection)]
#[projection(metadata = EventMetadata, events(FundsDeposited))]
struct AuditLog {
    entries: Vec<AuditEntry>,
}

impl ApplyProjection<FundsDeposited> for AuditLog {
    fn apply_projection(
        &mut self,
        id: &Self::Id,
        event: &FundsDeposited,
        meta: &Self::Metadata,
    ) {
        self.entries.push(AuditEntry {
            timestamp: meta.timestamp,
            user: meta.user_id.clone(),
            action: format!("Deposited {} to {}", event.amount, id),
        });
    }
}

Singleton vs Instance Projections

  • Singleton (InstanceId = ()): one global projection.
  • Instance (InstanceId = String or custom type): one projection per instance ID.

apply_projection receives the aggregate ID, not the instance ID. If handlers need instance context, store it during init.

Trait Reference

For full signatures, see the API docs:

  • Projection
  • ApplyProjection<E>
  • ProjectionFilters

You can also inspect the trait definitions directly:

pub trait Projection {
    /// Stable identifier for this projection type.
    const KIND: &'static str;
}
pub trait ApplyProjection<E>: ProjectionFilters {
    fn apply_projection(&mut self, aggregate_id: &Self::Id, event: &E, metadata: &Self::Metadata);
}
pub trait ProjectionFilters: Sized {
    /// Aggregate identifier type this subscriber is compatible with.
    type Id;
    /// Instance identifier for this subscriber.
    ///
    /// For singleton subscribers use `()`.
    type InstanceId;
    /// Metadata type expected by this subscriber's handlers.
    type Metadata;

    /// Construct a fresh instance from the instance identifier.
    ///
    /// For singleton projections (`InstanceId = ()`), this typically
    /// delegates to `Self::default()`. For instance projections, this
    /// captures the instance identifier at construction time.
    fn init(instance_id: &Self::InstanceId) -> Self;

    /// Build the filter set and handler map for this subscriber.
    fn filters<S>(instance_id: &Self::InstanceId) -> Filters<S, Self>
    where
        S: EventStore<Id = Self::Id, Metadata = Self::Metadata>;
}

Snapshotting Projections

Projections support snapshots for faster loading. See Snapshots — Projection Snapshots.

Projections vs Aggregates

AspectAggregateProjection
PurposeEnforce invariantsServe queries
State sourceOwn events onlyAny events
Receives IDsNo (in envelope)Yes (as parameter)
Receives metadataNoYes
ConsistencyStrongEventual
MutabilityVia commands onlyRebuilt on demand

Next

Stores — Event persistence and serialisation

Stores

The crate separates storage concerns into two traits:

  • EventStore for event persistence
  • SnapshotStore for aggregate/projection snapshots

Stores own serialisation and deserialisation.

The EventStore Trait

pub trait EventStore: Send + Sync {
    /// Aggregate identifier type.
    ///
    /// This type must be clonable so repositories can reuse IDs across calls.
    /// Common choices: `String`, `Uuid`, or custom ID types.
    type Id: Clone + Send + Sync + 'static;

    /// Position type used for ordering events and version checking.
    ///
    /// Must be `Clone + PartialEq` to support optimistic concurrency.
    /// Use `()` if ordering is not needed.
    type Position: Clone + PartialEq + std::fmt::Debug + Send + Sync + 'static;

    /// Store-specific error type.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Metadata type for infrastructure concerns.
    type Metadata: Send + Sync + 'static;

    /// Serialised event payload type.
    ///
    /// This is the format used to store event data internally. Common choices:
    /// - `serde_json::Value` for JSON-based stores
    /// - `Vec<u8>` for binary stores
    type Data: Clone + Send + Sync + 'static;

    /// Decode a stored event into a concrete event type.
    ///
    /// Deserialises the `data` field of a [`StoredEvent`] back into a domain
    /// event.
    ///
    /// # Errors
    ///
    /// Returns an error if deserialisation fails.
    fn decode_event<E>(
        &self,
        stored: &StoredEvent<Self::Id, Self::Position, Self::Data, Self::Metadata>,
    ) -> Result<E, Self::Error>
    where
        E: crate::event::DomainEvent + serde::de::DeserializeOwned;

    /// Get the current version (latest position) for an aggregate stream.
    ///
    /// Returns `None` for streams with no events.
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when the operation fails.
    fn stream_version<'a>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
    ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;

    /// Commit events to an aggregate stream without version checking.
    ///
    /// Events are serialised and persisted atomically. No conflict detection
    /// is performed (last-writer-wins).
    ///
    /// # Errors
    ///
    /// Returns [`CommitError::Serialization`] if an event fails to serialise,
    /// or [`CommitError::Store`] if persistence fails.
    fn commit_events<'a, E>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        events: NonEmpty<E>,
        metadata: &'a Self::Metadata,
    ) -> impl Future<Output = Result<Committed<Self::Position>, CommitError<Self::Error>>> + Send + 'a
    where
        E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
        Self::Metadata: Clone;

    /// Commit events to an aggregate stream with optimistic concurrency
    /// control.
    ///
    /// Events are serialised and persisted atomically. The commit fails if:
    /// - `expected_version` is `Some(v)` and the current version differs from
    ///   `v`
    /// - `expected_version` is `None` and the stream already has events (new
    ///   aggregate expected)
    ///
    /// # Errors
    ///
    /// Returns [`OptimisticCommitError::Serialization`] if an event fails to
    /// serialise, [`OptimisticCommitError::Conflict`] if the version check
    /// fails, or [`OptimisticCommitError::Store`] if persistence fails.
    #[allow(clippy::type_complexity)]
    fn commit_events_optimistic<'a, E>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        expected_version: Option<Self::Position>,
        events: NonEmpty<E>,
        metadata: &'a Self::Metadata,
    ) -> impl Future<
        Output = Result<
            Committed<Self::Position>,
            OptimisticCommitError<Self::Position, Self::Error>,
        >,
    > + Send
    + 'a
    where
        E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
        Self::Metadata: Clone;

    /// Load events matching the specified filters.
    ///
    /// Each filter describes an event kind and optional aggregate identity:
    /// - [`EventFilter::for_event`] loads every event of the given kind
    /// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
    ///
    /// The store optimises based on its storage model and returns events
    /// merged by position (if positions are available).
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when loading fails.
    #[allow(clippy::type_complexity)]
    fn load_events<'a>(
        &'a self,
        filters: &'a [EventFilter<Self::Id, Self::Position>],
    ) -> impl Future<
        Output = LoadEventsResult<
            Self::Id,
            Self::Position,
            Self::Data,
            Self::Metadata,
            Self::Error,
        >,
    > + Send
    + 'a;
}

/// Marker trait for stores that provide globally ordered positions.
///
/// Projection snapshots require this guarantee.
pub trait GloballyOrderedStore: EventStore {}

Built-in: inmemory::Store

For testing and prototyping:

use sourcery::store::inmemory;

// Unit metadata
let store: inmemory::Store<String, ()> = inmemory::Store::new();

// Custom metadata
let store: inmemory::Store<String, MyMetadata> = inmemory::Store::new();

The in-memory store uses serde_json and supports globally ordered positions.

Committing Events

use nonempty::NonEmpty;

let events = NonEmpty::singleton(my_event);

// Unchecked (last-writer-wins)
store.commit_events("account", &account_id, events.clone(), &metadata).await?;

// Optimistic concurrency
store
    .commit_events_optimistic("account", &account_id, Some(expected_version), events, &metadata)
    .await?;

commit_events_optimistic fails with ConcurrencyConflict when the expected and actual stream versions differ.

Loading Events with Filters

// All events of one kind
EventFilter::for_event("account.deposited")

// One aggregate instance
EventFilter::for_aggregate("account.deposited", "account", "ACC-001")

// Incremental loading
EventFilter::for_event("account.deposited").after(100)

StoredEvent in Practice

load_events returns StoredEvent<Id, Pos, Data, Metadata>, containing:

  • envelope fields (aggregate_kind, aggregate_id, kind, position)
  • serialised payload (data)
  • metadata (metadata)

Use EventStore::decode_event() to deserialise payloads into domain events.

For the exact field layout, see API docs for StoredEvent.

The SnapshotStore Trait

pub trait SnapshotStore<Id: Sync>: Send + Sync {
    /// Position type for tracking snapshot positions.
    ///
    /// Must match the `EventStore::Position` type used in the same repository.
    type Position: Send + Sync;

    /// Error type for snapshot operations.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Load the most recent snapshot for an aggregate.
    ///
    /// Returns `Ok(None)` if no snapshot exists.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying storage fails.
    fn load<T>(
        &self,
        kind: &str,
        id: &Id,
    ) -> impl std::future::Future<Output = Result<Option<Snapshot<Self::Position, T>>, Self::Error>> + Send
    where
        T: DeserializeOwned;

    /// Whether to store a snapshot, with lazy snapshot creation.
    ///
    /// The repository calls this after successfully appending new events,
    /// passing `events_since_last_snapshot` and a `create_snapshot`
    /// callback. Implementations may decline without invoking
    /// `create_snapshot`, avoiding unnecessary snapshot creation cost
    /// (serialisation, extra I/O, etc.).
    ///
    /// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
    /// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
    /// snapshot was stored.
    ///
    /// # Errors
    ///
    /// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
    /// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
    fn offer_snapshot<CE, T, Create>(
        &self,
        kind: &str,
        id: &Id,
        events_since_last_snapshot: u64,
        create_snapshot: Create,
    ) -> impl std::future::Future<
        Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>,
    > + Send
    where
        CE: std::error::Error + Send + Sync + 'static,
        T: Serialize,
        Create: FnOnce() -> Result<Snapshot<Self::Position, T>, CE> + Send;
}

See Snapshots for policy and usage guidance.

Implementing a Custom Store

See Custom Stores for a practical guide.

Next

The Aggregate Derive — Reducing boilerplate with macros

The Aggregate Derive

#[derive(Aggregate)] removes aggregate boilerplate by generating the event enum and aggregate wiring.

Basic Usage

use sourcery::Aggregate;

#[derive(Debug, Default, Aggregate)]
#[aggregate(id = String, error = String, events(FundsDeposited, FundsWithdrawn))]
pub struct Account {
    balance: i64,
}

Then add:

  • Apply<E> impls for each event
  • Handle<C> impls for each command

Attribute Reference

AttributeRequiredDescription
id = TypeYesAggregate identifier type
error = TypeYesError type for command handling
events(E1, E2, ...)YesEvent types this aggregate produces
kind = "name"NoAggregate type identifier (default: lowercase struct name)
event_enum = "Name"NoGenerated enum name (default: {Struct}Event)
derives(T1, T2, ...)NoAdditional derives for generated event enum (always includes Clone)

Typical Customisation

Customising the Kind

#[derive(Aggregate)]
#[aggregate(
    kind = "bank-account",
    id = String,
    error = String,
    events(FundsDeposited)
)]
pub struct Account { /* ... */ }

Customising Event Enum Name

#[derive(Aggregate)]
#[aggregate(
    event_enum = "BankEvent",
    id = String,
    error = String,
    events(FundsDeposited)
)]
pub struct Account { /* ... */ }

Adding Derives to Event Enum

#[derive(Aggregate)]
#[aggregate(
    id = String,
    error = String,
    events(FundsDeposited),
    derives(Debug, PartialEq)
)]
pub struct Account { /* ... */ }

What Gets Generated (Reference)

Given:

#[derive(Aggregate)]
#[aggregate(id = String, error = AccountError, events(FundsDeposited, FundsWithdrawn))]
pub struct Account { balance: i64 }

The macro generates:

  • enum AccountEvent { ... }
  • impl Aggregate for Account
  • impl From<E> for AccountEvent per event
  • impl EventKind for AccountEvent
  • impl serde::Serialize for AccountEvent
  • impl ProjectionEvent for AccountEvent

Event Enum

#[derive(Clone)]
pub enum AccountEvent {
    FundsDeposited(FundsDeposited),
    FundsWithdrawn(FundsWithdrawn),
}

From Implementations

impl From<FundsDeposited> for AccountEvent {
    fn from(event: FundsDeposited) -> Self {
        AccountEvent::FundsDeposited(event)
    }
}

Aggregate Implementation

impl Aggregate for Account {
    const KIND: &'static str = "account";
    type Event = AccountEvent;
    type Error = AccountError;
    type Id = String;

    fn apply(&mut self, event: &Self::Event) {
        match event {
            AccountEvent::FundsDeposited(e) => Apply::apply(self, e),
            AccountEvent::FundsWithdrawn(e) => Apply::apply(self, e),
        }
    }
}

ProjectionEvent Implementation

The generated enum supports replay/deserialisation from stored events by kind.

Requirements

  • Aggregate struct must implement Default.
  • Event types must implement DomainEvent.
  • Serialize + DeserializeOwned on aggregate state is only required when using snapshots.

Next

Manual Implementation — Implementing without the macro

The Projection Derive

#[derive(Projection)] generates a stable projection KIND.

For common cases, #[projection(events(...))] also generates ProjectionFilters.

Basic Usage

use sourcery::ApplyProjection;

#[derive(Debug, Default, sourcery::Projection)]
#[projection(events(FundsDeposited, FundsWithdrawn))]
pub struct AccountSummary {
    totals: HashMap<String, i64>,
}

impl ApplyProjection<FundsDeposited> for AccountSummary {
    fn apply_projection(&mut self, id: &Self::Id, event: &FundsDeposited, _: &Self::Metadata) {
        *self.totals.entry(id.clone()).or_default() += event.amount;
    }
}

impl ApplyProjection<FundsWithdrawn> for AccountSummary {
    fn apply_projection(&mut self, id: &Self::Id, event: &FundsWithdrawn, _: &Self::Metadata) {
        *self.totals.entry(id.clone()).or_default() -= event.amount;
    }
}

That is the recommended default path.

Attributes

AttributeDefaultDescription
kind = "name"kebab-case struct nameProjection identifier for snapshots
events(E1, E2, ...)noneAuto-generate ProjectionFilters::filters()
id = TypeStringAuto-generated ProjectionFilters::Id
instance_id = Type()Auto-generated ProjectionFilters::InstanceId
metadata = Type()Auto-generated ProjectionFilters::Metadata

What Is Generated

Always:

impl Projection for MyProjection {
    const KIND: &'static str = "my-projection";
}

When events(...) (or filter type overrides) are present, also generates:

  • impl ProjectionFilters for MyProjection
  • init(...) -> Self::default()
  • filters(...) -> Filters::new().event::<...>()...

When to Go Manual

Manually implement ProjectionFilters when you need:

  • Dynamic filtering
  • event_for / events_for
  • Non-default initialisation
  • Full control over filter construction
use sourcery::{ApplyProjection, Filters, ProjectionFilters, store::EventStore};

#[derive(Debug, Default, sourcery::Projection)]
pub struct AccountSummary {
    totals: HashMap<String, i64>,
}

impl ProjectionFilters for AccountSummary {
    type Id = String;
    type InstanceId = ();
    type Metadata = ();

    fn init((): &()) -> Self {
        Self::default()
    }

    fn filters<S>((): &()) -> Filters<S, Self>
    where
        S: EventStore<Id = String, Metadata = ()>,
    {
        Filters::new()
            .event::<FundsDeposited>()
            .event::<FundsWithdrawn>()
    }
}

Loading

let projection = repository
    .load_projection::<AccountSummary>(&())
    .await?;

Manual Implementation

While #[derive(Aggregate)] handles most cases, you might implement the traits manually for:

  • Custom serialisation logic
  • Non-standard event routing
  • Learning how the system works

Side-by-Side Comparison

With Derive Macro

use sourcery::{Aggregate, Apply, DomainEvent, Handle};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FundsDeposited { pub amount: i64 }

impl DomainEvent for FundsDeposited {
    const KIND: &'static str = "account.deposited";
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FundsWithdrawn { pub amount: i64 }

impl DomainEvent for FundsWithdrawn {
    const KIND: &'static str = "account.withdrawn";
}

#[derive(Debug, Default, Serialize, Deserialize, Aggregate)]
#[aggregate(id = String, error = String, events(FundsDeposited, FundsWithdrawn))]
pub struct Account {
    balance: i64,
}

impl Apply<FundsDeposited> for Account {
    fn apply(&mut self, e: &FundsDeposited) { self.balance += e.amount; }
}

impl Apply<FundsWithdrawn> for Account {
    fn apply(&mut self, e: &FundsWithdrawn) { self.balance -= e.amount; }
}

Without Derive Macro

use sourcery::{Aggregate, DomainEvent, EventKind, ProjectionEvent};
use sourcery::event::EventDecodeError;
use sourcery::store::{EventStore, StoredEvent};
use serde::{Deserialize, Serialize};

// Events (same as before)
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FundsDeposited { pub amount: i64 }

impl DomainEvent for FundsDeposited {
    const KIND: &'static str = "account.deposited";
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FundsWithdrawn { pub amount: i64 }

impl DomainEvent for FundsWithdrawn {
    const KIND: &'static str = "account.withdrawn";
}

// Manual event enum
#[derive(Clone, Debug)]
pub enum AccountEvent {
    Deposited(FundsDeposited),
    Withdrawn(FundsWithdrawn),
}

// From implementations for ergonomic .into()
impl From<FundsDeposited> for AccountEvent {
    fn from(e: FundsDeposited) -> Self { Self::Deposited(e) }
}

impl From<FundsWithdrawn> for AccountEvent {
    fn from(e: FundsWithdrawn) -> Self { Self::Withdrawn(e) }
}

// EventKind for runtime kind dispatch
impl EventKind for AccountEvent {
    fn kind(&self) -> &'static str {
        match self {
            Self::Deposited(_) => FundsDeposited::KIND,
            Self::Withdrawn(_) => FundsWithdrawn::KIND,
        }
    }
}

// Serialize — serialises only the inner event (no enum wrapper)
impl serde::Serialize for AccountEvent {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        match self {
            Self::Deposited(inner) => inner.serialize(serializer),
            Self::Withdrawn(inner) => inner.serialize(serializer),
        }
    }
}

// ProjectionEvent for loading/deserialisation
impl ProjectionEvent for AccountEvent {
    const EVENT_KINDS: &'static [&'static str] = &[
        FundsDeposited::KIND,
        FundsWithdrawn::KIND,
    ];

    fn from_stored<S: EventStore>(
        stored: &StoredEvent<S::Id, S::Position, S::Data, S::Metadata>,
        store: &S,
    ) -> Result<Self, EventDecodeError<S::Error>> {
        match stored.kind() {
            FundsDeposited::KIND => Ok(Self::Deposited(
                store.decode_event(stored).map_err(EventDecodeError::Store)?
            )),
            FundsWithdrawn::KIND => Ok(Self::Withdrawn(
                store.decode_event(stored).map_err(EventDecodeError::Store)?
            )),
            _ => Err(EventDecodeError::UnknownKind {
                kind: stored.kind().to_string(),
                expected: Self::EVENT_KINDS,
            }),
        }
    }
}

// Aggregate struct
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Account {
    balance: i64,
}

// Manual Aggregate implementation
impl Aggregate for Account {
    const KIND: &'static str = "account";
    type Event = AccountEvent;
    type Error = String;
    type Id = String;

    fn apply(&mut self, event: &Self::Event) {
        match event {
            AccountEvent::Deposited(e) => self.balance += e.amount,
            AccountEvent::Withdrawn(e) => self.balance -= e.amount,
        }
    }
}

Trade-offs

You gain: Custom enum structure, custom serialisation (compress/encrypt), fallback handling for unknown events, conditional replay logic.

You lose: More code to maintain, easy to introduce bugs in match arms, must keep EVENT_KINDS in sync with from_stored.

When to Go Manual

ScenarioRecommendation
Standard CRUD aggregateUse derive macro
Learning the crateStart with derive, then explore manual
Custom event serialisationManual
Dynamic event typesManual
Unusual enum structureManual

Next

Snapshots — Optimising aggregate loading

Optimistic Concurrency

When multiple processes or threads write to the same aggregate simultaneously, you risk losing updates. Optimistic concurrency control detects these conflicts by checking that the stream version hasn’t changed between loading the aggregate and committing new events.

Default Behaviour

By default, repositories use optimistic concurrency—version checking is performed on every write. This is the safe default for production systems.

use sourcery::{Repository, store::inmemory};

let store: inmemory::Store<String, ()> = inmemory::Store::new();
let repo = Repository::new(store); // Optimistic concurrency enabled

The concurrency strategy is encoded in the type system, so you get compile-time guarantees about which error types you need to handle.

Disabling Concurrency Checking

For single-writer scenarios where concurrency checking is unnecessary, you can opt out:

let mut repo = Repository::new(store)
    .without_concurrency_checking();

This returns a Repository<S, Unchecked> which uses last-writer-wins semantics.

Error Types

The two concurrency strategies use different error types:

StrategyError TypeIncludes Concurrency Variant?
Optimistic (default)CommandErrorYes
UncheckedCommandErrorNo (Concurrency = Infallible)

When using optimistic concurrency, execute_command returns CommandError::Concurrency(conflict) if the stream version changed between loading and committing:

use sourcery::repository::CommandError;

match repo
    .execute_command::<MyAggregate, MyCommand>(&id, &command, &metadata)
    .await
{
    Ok(()) => println!("Success!"),
    Err(CommandError::Concurrency(conflict)) => {
        println!(
            "Conflict: expected version {:?}, actual {:?}",
            conflict.expected,
            conflict.actual
        );
    }
    Err(e) => println!("Other error: {e}"),
}

Handling Conflicts

The most common pattern for handling conflicts is to retry the operation.

The repository provides a helper for this: execute_with_retry.

use sourcery::repository::RetryResult;

let attempts: RetryResult<MyAggregate, MyStore> =
    repo.execute_with_retry::<MyAggregate, MyCommand>(&id, &command, &metadata, 3).await?;
println!("Succeeded after {attempts} attempt(s)");

Each retry loads fresh state from the event store, so business rules are always validated against the current aggregate state.

max_retries controls how many retries after the first attempt are allowed, so the operation is attempted up to max_retries + 1 times total.

When to Use Optimistic Concurrency

Use optimistic concurrency when:

  • Multiple writers might modify the same aggregate simultaneously
  • Business rules depend on current state (e.g., balance checks, inventory limits)
  • Data integrity is more important than write throughput

Consider last-writer-wins when:

  • Aggregates are rarely modified concurrently
  • Events are append-only without state-dependent validation
  • You have a single writer per aggregate (e.g., actor-per-entity pattern)

How It Works

  1. When loading an aggregate, the repository records the current stream version
  2. When committing, it passes the expected version to the event store
  3. The store checks if the actual version matches the expected version
  4. If they differ, the store returns a ConcurrencyConflict error

The inmemory::Store supports this via its stream_version() method and the expected_version parameter on commit_events_optimistic().

Example

See examples/optimistic_concurrency.rs for a complete working example demonstrating:

  • Basic optimistic concurrency usage
  • Conflict detection with concurrent modifications
  • Retry patterns for handling conflicts
  • Business rule enforcement with fresh state

Snapshots

Replaying events gets expensive as aggregates accumulate history. Snapshots checkpoint aggregate state, allowing you to skip replaying old events.

How Snapshots Work

ApplicationSnapshotStoreEventStoreAggregate  load("account", "ACC-001")  Snapshot at position 1000Deserialize snapshotload_events(after: 1000)Events 1001-1050apply(event) for each








Instead of replaying all 1050 events, load the snapshot and replay only the 50 new ones.

Enabling Snapshots

Use with_snapshots() when creating the repository:

use sourcery::{Repository, snapshot::inmemory, store::inmemory as event_store};

let event_store = event_store::Store::new();
let snapshot_store = inmemory::Store::every(100);

let repository = Repository::new(event_store)
    .with_snapshots(snapshot_store);

Snapshot Policies

snapshot::inmemory::Store provides three policies:

Always

Save a snapshot after every command:

let snapshots = inmemory::Store::always();

Use for: Aggregates with expensive replay, testing.

Every N Events

Save after accumulating N events since the last snapshot:

let snapshots = inmemory::Store::every(100);

Use for: Production workloads balancing storage vs. replay cost.

Never

Never save (load-only mode):

let snapshots = inmemory::Store::never();

Use for: Read-only replicas, debugging.

The SnapshotStore Trait

pub trait SnapshotStore<Id: Sync>: Send + Sync {
    /// Position type for tracking snapshot positions.
    ///
    /// Must match the `EventStore::Position` type used in the same repository.
    type Position: Send + Sync;

    /// Error type for snapshot operations.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Load the most recent snapshot for an aggregate.
    ///
    /// Returns `Ok(None)` if no snapshot exists.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying storage fails.
    fn load<T>(
        &self,
        kind: &str,
        id: &Id,
    ) -> impl std::future::Future<Output = Result<Option<Snapshot<Self::Position, T>>, Self::Error>> + Send
    where
        T: DeserializeOwned;

    /// Whether to store a snapshot, with lazy snapshot creation.
    ///
    /// The repository calls this after successfully appending new events,
    /// passing `events_since_last_snapshot` and a `create_snapshot`
    /// callback. Implementations may decline without invoking
    /// `create_snapshot`, avoiding unnecessary snapshot creation cost
    /// (serialisation, extra I/O, etc.).
    ///
    /// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
    /// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
    /// snapshot was stored.
    ///
    /// # Errors
    ///
    /// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
    /// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
    fn offer_snapshot<CE, T, Create>(
        &self,
        kind: &str,
        id: &Id,
        events_since_last_snapshot: u64,
        create_snapshot: Create,
    ) -> impl std::future::Future<
        Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>,
    > + Send
    where
        CE: std::error::Error + Send + Sync + 'static,
        T: Serialize,
        Create: FnOnce() -> Result<Snapshot<Self::Position, T>, CE> + Send;
}

The repository calls offer_snapshot after successfully appending new events. Implementations may decline without invoking create_snapshot, avoiding unnecessary snapshot serialisation.

The Snapshot Type

pub struct Snapshot<Pos, Data> {
    pub position: Pos,
    pub data: Data,
}

The position indicates which event this snapshot was taken after. When loading, only events after this position need to be replayed.

Projection Snapshots

Projection snapshots use the same store and are keyed by (P::KIND, instance_id). Use load_projection_with_snapshot on a repository configured with snapshots:

#[derive(Default, Serialize, Deserialize, sourcery::Projection)]
#[projection(kind = "loyalty.summary")]
struct LoyaltySummary {
    total_earned: u64,
}

// (ProjectionFilters impl defines filters and associated types)

let repo = Repository::new(store).with_snapshots(inmemory::Store::every(100));

let summary: LoyaltySummary = repo
    .load_projection_with_snapshot::<LoyaltySummary>(&instance_id)
    .await?;

Singleton projections use InstanceId = () and pass &(). Instance projections pass their instance identifier.

Projection snapshots require a globally ordered store (S: GloballyOrderedStore) and P: Serialize + DeserializeOwned.

When to Snapshot

Aggregate TypeRecommendation
Short-lived (< 100 events)Skip snapshots
Medium (100-1000 events)Every 100-500 events
Long-lived (1000+ events)Every 100 events
High-throughputEvery N events, tuned to your SLA

Implementing a Custom Store

For production, implement SnapshotStore with your database. See Custom Stores for a complete guide.

Snapshot Invalidation

Snapshots are tied to your aggregate’s serialised form. When you change the struct:

  1. Add fields — Use #[serde(default)] for backwards compatibility
  2. Remove fields — Old snapshots still deserialise (extra fields ignored)
  3. Rename fields — Use #[serde(alias = "old_name")]
  4. Change types — Old snapshots become invalid; delete them

For major changes, delete old snapshots and let them rebuild from events.

Next

Event Versioning — Evolving event schemas over time

Event Versioning

Events are immutable and stored forever. When your domain model evolves, you need strategies to read old events with new code.

Old events lack fields that new code expects.

Strategy 1: Serde Defaults

The simplest approach—use serde attributes:

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRegistered {
    pub email: String,
    #[serde(default)]
    pub marketing_consent: bool,  // Defaults to false for old events
}

Works for:

  • Adding optional fields
  • Fields with sensible defaults

Strategy 2: Explicit Versioning

Keep old event types, migrate at deserialisation:

// Original event (still in storage)
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRegisteredV1 {
    pub email: String,
}

// Current event
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRegistered {
    pub email: String,
    pub marketing_consent: bool,
}

impl From<UserRegisteredV1> for UserRegistered {
    fn from(v1: UserRegisteredV1) -> Self {
        Self {
            email: v1.email,
            marketing_consent: false, // Assumed for old users
        }
    }
}

Strategy 3: Using serde-evolve

The serde-evolve crate automates version chains:

use serde_evolve::Evolve;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRegisteredV1 {
    pub email: String,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRegisteredV2 {
    pub email: String,
    pub marketing_consent: bool,
}

impl From<UserRegisteredV1> for UserRegisteredV2 {
    fn from(v1: UserRegisteredV1) -> Self {
        Self { email: v1.email, marketing_consent: false }
    }
}

#[derive(Clone, Debug, Serialize, Deserialize, Evolve)]
#[evolve(ancestors(UserRegisteredV1, UserRegisteredV2))]
pub struct UserRegistered {
    pub email: String,
    pub marketing_consent: bool,
    pub signup_source: Option<String>,  // New in V3
}

impl From<UserRegisteredV2> for UserRegistered {
    fn from(v2: UserRegisteredV2) -> Self {
        Self {
            email: v2.email,
            marketing_consent: v2.marketing_consent,
            signup_source: None,
        }
    }
}

When deserialising, serde-evolve tries each ancestor in order and applies the From chain.

Which Strategy to Use?

ScenarioRecommended Approach
Adding optional fieldSerde defaults
Adding required field with known defaultSerde defaults
Complex migration logicExplicit versions + From
Multiple version hopsserde-evolve

Event KIND Stability

The KIND constant must never change for stored events:

// BAD: Changing KIND breaks deserialisation
impl DomainEvent for UserRegistered {
    const KIND: &'static str = "user.created";  // Was "user.registered"
}

// GOOD: Use new event type, migrate in code
impl DomainEvent for UserCreated {
    const KIND: &'static str = "user.created";
}
// Old UserRegistered events still deserialise, then convert

Testing Migrations

Include serialised old events in your test suite:

#[test]
fn deserializes_v1_events() {
    let v1_json = r#"{"email":"old@example.com"}"#;
    let event: UserRegistered = serde_json::from_str(v1_json).unwrap();
    assert!(!event.marketing_consent);
}

This catches regressions when refactoring.

Next

Custom Metadata — Adding infrastructure context to events

Custom Metadata

Metadata carries infrastructure concerns alongside events without polluting domain types. Common uses include correlation IDs, user context, timestamps, and causation tracking.

Defining Metadata

Create a struct for your metadata:

use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EventMetadata {
    pub correlation_id: String,
    pub causation_id: Option<String>,
    pub user_id: String,
    pub timestamp: DateTime<Utc>,
}

Using Metadata with the Store

Configure your store with the metadata type:

use sourcery::store::inmemory;

let store: inmemory::Store<String, EventMetadata> = inmemory::Store::new();

Passing Metadata to Commands

Provide metadata when executing commands:

let metadata = EventMetadata {
    correlation_id: Uuid::new_v4().to_string(),
    causation_id: None,
    user_id: current_user.id.clone(),
    timestamp: Utc::now(),
};

repository.execute_command::<Account, Deposit>(
    &account_id,
    &Deposit { amount: 100 },
    &metadata,
)
.await?;

Each event produced by the command receives this metadata.

Accessing Metadata in Projections

Set metadata in the derive attribute and receive it in ApplyProjection:

use sourcery::ApplyProjection;

#[derive(Debug, Default, sourcery::Projection)]
#[projection(metadata = EventMetadata, events(FundsDeposited))]
pub struct AuditLog {
    pub entries: Vec<AuditEntry>,
}

impl ApplyProjection<FundsDeposited> for AuditLog {
    fn apply_projection(
        &mut self,
        aggregate_id: &Self::Id,
        event: &FundsDeposited,
        meta: &Self::Metadata,
    ) {
        self.entries.push(AuditEntry {
            timestamp: meta.timestamp,
            user: meta.user_id.clone(),
            correlation_id: meta.correlation_id.clone(),
            action: format!("Deposited {} to account {}", event.amount, aggregate_id),
        });
    }
}

Projection handlers use the same metadata type as the store for that repository.

Correlation and Causation

Track event relationships for debugging and workflows:

  • Correlation ID: Groups all events from a single user request
  • Causation ID: Points to the event that triggered this one
// When handling a saga or process manager
let follow_up_metadata = EventMetadata {
    correlation_id: original_meta.correlation_id.clone(),
    causation_id: Some(original_event_id.to_string()),
    user_id: "system".to_string(),
    timestamp: Utc::now(),
};

Unit Metadata

If you don’t need metadata, use ():

let store: inmemory::Store<String, ()> = inmemory::Store::new();

repository
    .execute_command::<Account, Deposit>(&id, &cmd, &())
    .await?;

Projections with type Metadata = () ignore the metadata parameter.

Metadata vs Event Data

Put in MetadataPut in Event
Who did it (user ID)What happened (domain facts)
When (timestamp)Domain-relevant times (due date)
Request tracing (correlation)Business identifiers
Infrastructure contextDomain context

Events should be understandable without metadata. Metadata enhances observability.

Example: Multi-Tenant Filtering

Metadata enables tenant-scoped projections. With type Metadata = TenantMetadata on ProjectionFilters, the handler can filter by tenant:

impl ApplyProjection<OrderPlaced> for TenantDashboard {
    fn apply_projection(
        &mut self,
        _id: &Self::Id,
        event: &OrderPlaced,
        meta: &Self::Metadata,
    ) {
        if meta.tenant_id == self.tenant_id {
            self.order_count += 1;
            self.total_revenue += event.total;
        }
    }
}

The projection follows the same pattern shown in Accessing Metadata in Projections, with instance_id = String if each tenant needs its own projection instance.

Next

Custom Stores — Implementing your own persistence layer

Subscriptions

While load_projection rebuilds a projection from scratch on each call, subscriptions maintain an in-memory projection that updates in real time as events are committed.

Basic Usage

Use Repository::subscribe to create a SubscriptionBuilder, configure callbacks, then call start():

let subscription = repository
    .subscribe::<Dashboard>(())
    .on_update(|dashboard| println!("{dashboard:?}"))
    .start()
    .await?;

The instance ID argument matches the projection’s InstanceId type. For singleton projections (InstanceId = ()), pass (). start() returns only after catch-up completes.

How Subscriptions Work

A subscription:

  1. Catch-up phase — Replays all historical events matching the projection’s filters
  2. Live phase — Transitions to processing events as they are committed
  3. Callbacks — Fires on_update after each event
ApplicationSubscriptionEventStore  subscribe::<P>(instance_id)load_events(filters)  Historical eventsReplay events (catch-up)start() returns (caught up)Live eventapply_projection()on_update(&projection)










Callbacks

on_update

Called after every event is applied to the projection. Receives an immutable reference to the current projection state:

.on_update(|projection| {
    // Update a shared cache, send to WebSocket clients, etc.
    cache.lock().unwrap().clone_from(projection);
})

Callbacks must complete quickly. Long-running work should be dispatched to a separate task via a channel.

Stopping a Subscription

The start() method returns a SubscriptionHandle. Call stop() for graceful shutdown:

subscription.stop().await?;

The subscription processes any remaining events, offers a final snapshot, then terminates. Dropping the handle sends a best-effort stop signal, but use stop() when you need deterministic shutdown and error handling.

Subscription Snapshots

By default, subscriptions don’t persist snapshots. Use subscribe_with_snapshots to provide a snapshot store for faster restart:

let snapshot_store = inmemory::Store::every(100);

let subscription = repository
    .subscribe_with_snapshots::<Dashboard>((), snapshot_store)
    .on_update(|d| println!("{d:?}"))
    .start()
    .await?;

The subscription loads the most recent snapshot on startup and periodically offers new snapshots as events are processed.

The SubscribableStore Trait

Not all stores support push notifications. The SubscribableStore trait extends EventStore with a subscribe method that returns a stream of events:

pub trait SubscribableStore: EventStore + GloballyOrderedStore {
    fn subscribe(
        &self,
        filters: &[EventFilter<Self::Id, Self::Position>],
        from_position: Option<Self::Position>,
    ) -> EventStream<'_, Self>
    where
        Self::Position: Ord;
}

The in-memory store implements this via tokio::sync::broadcast. A PostgreSQL implementation would use LISTEN/NOTIFY.

Shared State Pattern

A common pattern is to share projection state between the subscription and the command side via Arc<Mutex<_>>:

let live_state = Arc::new(Mutex::new(Dashboard::default()));
let state_for_callback = live_state.clone();

let subscription = repository
    .subscribe::<Dashboard>(())
    .on_update(move |projection| {
        *state_for_callback.lock().unwrap() = projection.clone();
    })
    .start()
    .await?;

// Read the live projection from another task — no event replay needed
let current = live_state.lock().unwrap().clone();

When to Use Subscriptions

Use CaseApproach
One-off queryload_projection
Real-time dashboardsubscribe
Pre-computed read modelsubscribe with snapshots
Guard condition from live statesubscribe + Arc<Mutex<_>>
Batch reportingload_projection

Example

See examples/subscription_billing.rs for a complete working example demonstrating:

  • Live subscription with on_update
  • Multi-aggregate projection (Subscription + Invoice)
  • Guard conditions from live state
  • Graceful shutdown

Next

Custom Stores — Implementing your own persistence layer

Custom Stores

The inmemory::Store is useful for testing, but production systems need durable storage. This guide walks through implementing EventStore for your database.

The EventStore Trait

See Stores for the full trait definition.

Design Decisions

Position Type

Choose based on your ordering needs:

Position TypeUse Case
()Unordered, append-only log
u64 / i64Global sequence number
(i64, i32)Timestamp + sequence for distributed systems

Storage Schema

A typical SQL schema:

CREATE TABLE events (
    position BIGSERIAL PRIMARY KEY,
    aggregate_kind TEXT NOT NULL,
    aggregate_id TEXT NOT NULL,
    event_kind TEXT NOT NULL,
    data JSONB NOT NULL,
    metadata JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_events_aggregate
    ON events (aggregate_kind, aggregate_id, position);

CREATE INDEX idx_events_kind
    ON events (event_kind, position);

Implementation Skeleton

use std::future::Future;
use nonempty::NonEmpty;
use sourcery::store::{
    CommitError, Committed, EventFilter, EventStore,
    OptimisticCommitError, StoredEvent,
};
use sourcery::concurrency::ConcurrencyConflict;

pub struct PostgresEventStore {
    pool: sqlx::PgPool,
}

impl EventStore for PostgresEventStore {
    type Id = String;
    type Position = i64;
    type Error = sqlx::Error;
    type Metadata = serde_json::Value;
    type Data = serde_json::Value;

    fn decode_event<E>(
        &self,
        stored: &StoredEvent<Self::Id, Self::Position, Self::Data, Self::Metadata>,
    ) -> Result<E, Self::Error>
    where
        E: DomainEvent + serde::de::DeserializeOwned
    {
        serde_json::from_value(stored.data.clone())
            .map_err(Into::into)
    }

    fn stream_version<'a>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
    ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a
    {
        async move {
            todo!("SELECT MAX(position) WHERE aggregate_kind = $1 AND aggregate_id = $2")
        }
    }

    fn commit_events<'a, E>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        events: NonEmpty<E>,
        metadata: &'a Self::Metadata,
    ) -> impl Future<Output = Result<Committed<Self::Position>, CommitError<Self::Error>>> + Send + 'a
    where
        E: EventKind + serde::Serialize + Send + Sync + 'a,
        Self::Metadata: Clone,
    {
        async move {
            // Serialize each event with e.kind() and serde_json::to_value(e)
            // INSERT atomically, return Committed { last_position }
            todo!()
        }
    }

    fn commit_events_optimistic<'a, E>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        expected_version: Option<Self::Position>,
        events: NonEmpty<E>,
        metadata: &'a Self::Metadata,
    ) -> impl Future<Output = Result<Committed<Self::Position>, OptimisticCommitError<Self::Position, Self::Error>>> + Send + 'a
    where
        E: EventKind + serde::Serialize + Send + Sync + 'a,
        Self::Metadata: Clone,
    {
        async move {
            // Check version, return Conflict on mismatch
            // Serialize and INSERT atomically
            todo!()
        }
    }

    fn load_events<'a>(
        &'a self,
        filters: &'a [EventFilter<Self::Id, Self::Position>],
    ) -> impl Future<Output = Result<Vec<StoredEvent<Self::Id, Self::Position, Self::Data, Self::Metadata>>, Self::Error>> + Send + 'a
    {
        async move { todo!("SELECT with filters") }
    }
}

Loading Events

Build a WHERE clause from filters with OR, ordered by position:

fn load_events(&self, filters: &[EventFilter<String, i64>])
    -> Result<Vec<StoredEvent>, sqlx::Error>
{
    // WHERE (event_kind = 'x' AND position > N)
    //    OR (aggregate_kind = 'a' AND aggregate_id = 'b')
    // ORDER BY position ASC
    todo!()
}

Optimistic Concurrency

For systems requiring strict ordering, add version checking:

ALTER TABLE events ADD COLUMN stream_version INT NOT NULL;

CREATE UNIQUE INDEX idx_events_stream_version
    ON events (aggregate_kind, aggregate_id, stream_version);
// In commit_events_optimistic:
// 1. Get current max version for stream
// 2. Compare against expected_version
// 3. If mismatch, return OptimisticCommitError::Conflict(ConcurrencyConflict { expected, actual })
// 4. Insert with version + 1
// 5. Handle unique constraint violation as concurrency conflict

Event Stores for Different Databases

DynamoDB

Table: events
  PK: {aggregate_kind}#{aggregate_id}
  SK: {position:012d}  (zero-padded for sorting)
  GSI: event_kind-position-index

MongoDB

{
  _id: ObjectId,
  aggregateKind: "account",
  aggregateId: "ACC-001",
  eventKind: "account.deposited",
  position: NumberLong(1234),
  data: { ... },
  metadata: { ... }
}

S3 (Append-Only Log)

s3://bucket/events/{aggregate_kind}/{aggregate_id}/{position}.json
s3://bucket/events-by-kind/{event_kind}/{position}.json  (symlinks/copies)

Testing Your Store

Use the same test patterns as inmemory::Store:

#[tokio::test]
async fn test_commit_and_load() {
    let store = PostgresEventStore::new(test_pool()).await;

    // Commit events
    let events = NonEmpty::singleton(MyEvent { data: "test".into() });
    store.commit_events("account", &"ACC-001".into(), events, &metadata).await?;

    // Load and verify
    let loaded = store
        .load_events(&[
            EventFilter::for_aggregate("my-event", "account", "ACC-001".to_string())
        ])
        .await?;

    assert_eq!(loaded.len(), 1);
}

#[tokio::test]
async fn test_optimistic_concurrency() {
    let store = PostgresEventStore::new(test_pool()).await;

    // Create initial event
    let events = NonEmpty::singleton(MyEvent { data: "first".into() });
    store.commit_events_optimistic("account", &id, None, events, &metadata).await?;

    // Concurrent write with wrong version should fail
    let events = NonEmpty::singleton(MyEvent { data: "second".into() });
    let result = store
        .commit_events_optimistic("account", &id, Some(999), events, &metadata)
        .await;

    assert!(matches!(result, Err(OptimisticCommitError::Conflict(_))));
}

Next

Test Framework — Testing aggregates in isolation

Test Framework

The crate provides two testing utilities:

  • TestFramework: Unit testing aggregates in isolation (no stores, no serialization)
  • RepositoryTestExt: Integration testing with real repositories (seeding data, simulating concurrency)

Enabling the Test Framework

Add the test-util feature to your dev dependencies:

[dev-dependencies]
sourcery = { version = "0.1", features = ["test-util"] }

Basic Usage

use sourcery::test::TestFramework;

#[test]
fn deposit_increases_balance() {
    TestFramework::<Account>::given(&[FundsDeposited { amount: 100 }.into()])
        .when(&Deposit { amount: 50 })
        .then_expect_events(&[FundsDeposited { amount: 50 }.into()]);
}

Given Methods

Set up initial state with given(events):

// With existing events
TestFramework::<Account>::given(&[
    FundsDeposited { amount: 100 }.into(),
    FundsWithdrawn { amount: 30 }.into(),
])  // Balance is now 70

// Fresh aggregate (pass empty slice)
TestFramework::<Account>::given(&[])  // Balance is 0

When Methods

when(command)

Execute a command against the aggregate:

.when(&Withdraw { amount: 50 })

Then Methods

Assert outcomes with these methods:

// Expect specific events (requires PartialEq on events)
.then_expect_events(&[FundsWithdrawn { amount: 50 }.into()])

// Expect no events (valid no-op)
.then_expect_no_events()

// Expect any error
.then_expect_error()

// Expect specific error
.then_expect_error_eq(&AccountError::InsufficientFunds)

Additional methods: then_expect_error_message(substring) for substring matching, inspect_result() to get the raw Result for custom assertions.

Complete Test Suite Example

use sourcery::test::TestFramework;

#[test]
fn deposits_positive_amount() {
    TestFramework::<Account>::given(&[])
        .when(&Deposit { amount: 100 })
        .then_expect_events(&[FundsDeposited { amount: 100 }.into()]);
}

#[test]
fn rejects_overdraft() {
    TestFramework::<Account>::given(&[FundsDeposited { amount: 100 }.into()])
        .when(&Withdraw { amount: 150 })
        .then_expect_error_eq(&AccountError::InsufficientFunds);
}

#[test]
fn rejects_invalid_deposit() {
    TestFramework::<Account>::given(&[])
        .when(&Deposit { amount: -50 })
        .then_expect_error();
}

Testing Projections

Projections don’t use TestFramework. Test them directly by calling init() and apply_projection():

#[test]
fn projection_aggregates_deposits() {
    let mut proj = AccountSummary::init(&());

    proj.apply_projection(&"ACC-001".to_string(), &FundsDeposited { amount: 100 }, &());
    proj.apply_projection(&"ACC-002".to_string(), &FundsDeposited { amount: 50 }, &());
    proj.apply_projection(&"ACC-001".to_string(), &FundsDeposited { amount: 25 }, &());

    assert_eq!(proj.accounts.get("ACC-001"), Some(&125));
    assert_eq!(proj.accounts.get("ACC-002"), Some(&50));
}

Next

Design Decisions — Why the crate works this way

Design Decisions

This page explains the reasoning behind key architectural choices in the crate.

Events as First-Class Structs

Decision: Events are standalone structs implementing DomainEvent, not just enum variants.

Why:

  • Events can be shared across multiple aggregates
  • Projections subscribe to event types, not aggregate enums
  • Easier to evolve events independently
  • Better compile-time isolation

Trade-off: More boilerplate (mitigated by derive macro).

// This crate: events stand alone
struct OrderPlaced { /* ... */ }
struct PaymentReceived { /* ... */ }

// Both Order and Payment aggregates can use PaymentReceived

IDs as Infrastructure

Decision: Aggregate IDs are passed to the repository, not embedded in events or handlers.

Why:

  • Domain events stay pure (no infrastructure metadata)
  • Same event type works with different ID schemes
  • Command handlers focus on behaviour, not identity
  • IDs travel in the envelope alongside events

Trade-off: You can’t access the ID inside Handle<C>. If needed, include relevant IDs in the command.

// ID is infrastructure
repository
    .execute_command::<Account, Deposit>(&account_id, &command, &metadata)
    .await?;

// Event doesn't contain ID
struct FundsDeposited { amount: i64 }  // No account_id field

No Built-In Command Bus

Decision: The crate provides Repository::execute_command() but no command routing infrastructure.

Why:

  • Command buses vary widely (sync, async, distributed, in-process)
  • Many applications don’t need one (web handlers call repository directly)
  • Easy to add on top: trait object dispatch, actor messages, etc.
  • Keeps the crate focused on event sourcing, not messaging

What you might build:

// Your command bus (not provided)
trait CommandBus {
    fn dispatch<C: Command>(&self, id: &str, command: C) -> Result<(), Error>;
}

Event Serialisation in Stores

Decision: EventStore implementations handle serialisation, not a separate Codec trait. The generated event enum implements serde::Serialize with custom logic that serialises only the inner event struct.

Why:

  • Eliminates unnecessary abstraction layer (Codec trait)
  • Store can choose optimal format (JSON, JSONB, MessagePack, etc.)
  • Simpler architecture with fewer generic parameters
  • Event enum variants are never serialised as tagged enums - only the inner event is serialised

How it works:

// The Aggregate macro generates an event enum
#[derive(Aggregate)]
#[aggregate(id = String, error = String, events(FundsDeposited, FundsWithdrawn))]
struct Account { /* ... */ }

// Generated code (simplified):
enum AccountEvent {
    FundsDeposited(FundsDeposited),
    FundsWithdrawn(FundsWithdrawn),
}

// Generated Serialize impl — serialises only the inner event
impl serde::Serialize for AccountEvent {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where S: serde::Serializer {
        match self {
            Self::FundsDeposited(inner) => inner.serialize(serializer),
            Self::FundsWithdrawn(inner) => inner.serialize(serializer),
        }
    }
}

// EventStore uses kind() for routing and serde::Serialize for data
// - kind() returns the event type identifier ("account.deposited")
// - Serialize serialises just the inner event data

Event versioning: Use serde attributes on the event structs themselves:

#[derive(serde::Serialize, serde::Deserialize)]
struct FundsDeposited {
    amount: i64,
    #[serde(default)]  // Field added in v2
    currency: String,
}

Projections Decoupled from Aggregates

Decision: Projections implement ApplyProjection<E> for event types, not aggregate types.

Why:

  • A projection can consume events from many aggregates
  • Projections don’t need to know about aggregate enums
  • Enables cross-aggregate views naturally
  • Better separation of read and write concerns
// Projection consumes from multiple aggregates
#[derive(Default, sourcery::Projection)]
#[projection(kind = "dashboard")]
struct Dashboard { /* ... */ }

impl ProjectionFilters for Dashboard {
    type Id = String;
    type InstanceId = ();
    type Metadata = ();
    fn init((): &()) -> Self { Self::default() }
    fn filters<S>((): &()) -> Filters<S, Self>
    where S: EventStore<Id = String, Metadata = ()> {
        Filters::new()
            .event::<OrderPlaced>()
            .event::<PaymentReceived>()
            .event::<ShipmentDispatched>()
    }
}

impl ApplyProjection<OrderPlaced> for Dashboard { /* ... */ }
impl ApplyProjection<PaymentReceived> for Dashboard { /* ... */ }
impl ApplyProjection<ShipmentDispatched> for Dashboard { /* ... */ }

Minimal Infrastructure

Decision: No outbox, no scheduler, no event streaming, no saga orchestrator.

Why:

  • Event sourcing is the foundation; infrastructure varies by use case
  • Message brokers differ (Kafka, RabbitMQ, NATS, none)
  • Database choices affect outbox patterns
  • Keeps dependencies minimal
  • May be added later.

What we do provide:

  • Core traits for aggregates and projections
  • Repository for command execution
  • In-memory store for testing
  • Test framework for aggregate testing

What you add:

  • Production event store
  • Outbox pattern (if needed)
  • Process managers / sagas
  • Event publishing to message broker

How Sourcery Compares

Sourcery borrows inspiration from projects like eventually and cqrs but makes different trade-offs.

AreaTypical ES/CQRS librariesSourcery
Event modellingEvents are enum variants tied to one aggregateEvents are standalone structs, reusable across aggregates
ProjectionsCoupled to a specific aggregate enumDecoupled — subscribe to event types from any aggregate
MetadataOften embedded in event payloadsCarried alongside events in the envelope; domain types stay pure
InfrastructureBundled command bus, outbox, saga orchestratorMinimal — repository + store only; you add what you need
Event versioningExplicit upcaster pipelinesSerde attributes on event structs (details)
Concurrency controlRuntime configurationType-level — Optimistic vs Unchecked (details)

See Design Decisions for the full rationale behind each choice.