Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Custom Stores

The inmemory::Store is useful for testing, but production systems need durable storage. This guide walks through implementing EventStore for your database.

The EventStore Trait

pub trait EventStore: Send + Sync {
    /// Aggregate identifier type.
    ///
    /// This type must be clonable so repositories can reuse IDs across calls.
    /// Common choices: `String`, `Uuid`, or custom ID types.
    type Id: Clone + Send + Sync + 'static;

    /// Position type used for ordering events and version checking.
    ///
    /// Must be `Copy + PartialEq` to support optimistic concurrency.
    /// Use `()` if ordering is not needed.
    type Position: Copy + PartialEq + std::fmt::Debug + Send + Sync + 'static;

    /// Store-specific error type.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Serialization codec.
    type Codec: Codec + Clone + Send + Sync + 'static;

    /// Metadata type for infrastructure concerns.
    type Metadata: Send + Sync + 'static;

    fn codec(&self) -> &Self::Codec;

    /// Get the current version (latest position) for an aggregate stream.
    ///
    /// Returns `None` for streams with no events.
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when the operation fails.
    fn stream_version<'a>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
    ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;

    /// Begin a transaction for appending events to an aggregate.
    ///
    /// The transaction type is determined by the concurrency strategy `C`.
    ///
    /// # Arguments
    /// * `aggregate_kind` - The aggregate type identifier (`Aggregate::KIND`)
    /// * `aggregate_id` - The aggregate instance identifier
    /// * `expected_version` - The version expected for optimistic concurrency
    fn begin<C: ConcurrencyStrategy>(
        &mut self,
        aggregate_kind: &str,
        aggregate_id: Self::Id,
        expected_version: Option<Self::Position>,
    ) -> Transaction<'_, Self, C>
    where
        Self: Sized;

    /// Append events with optional version checking.
    ///
    /// If `expected_version` is `Some`, the append fails with a concurrency
    /// conflict if the current stream version doesn't match.
    /// If `expected_version` is `None`, no version checking is performed.
    ///
    /// # Errors
    ///
    /// Returns [`AppendError::Conflict`] if the version doesn't match, or
    /// [`AppendError::Store`] if persistence fails.
    fn append<'a>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        expected_version: Option<Self::Position>,
        events: Vec<PersistableEvent<Self::Metadata>>,
    ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;

    /// Load events matching the specified filters.
    ///
    /// Each filter describes an event kind and optional aggregate identity:
    /// - [`EventFilter::for_event`] loads every event of the given kind
    /// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
    ///
    /// The store optimizes based on its storage model and returns events
    /// merged by position (if positions are available).
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when loading fails.
    fn load_events<'a>(
        &'a self,
        filters: &'a [EventFilter<Self::Id, Self::Position>],
    ) -> impl Future<
        Output = LoadEventsResult<Self::Id, Self::Position, Self::Metadata, Self::Error>,
    > + Send
    + 'a;

    /// Append events expecting an empty stream.
    ///
    /// This method is used by optimistic concurrency when creating new
    /// aggregates. It fails with a [`ConcurrencyConflict`] if the stream
    /// already has events.
    ///
    /// # Errors
    ///
    /// Returns [`AppendError::Conflict`] if the stream is not empty,
    /// or [`AppendError::Store`] if persistence fails.
    fn append_expecting_new<'a>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        events: Vec<PersistableEvent<Self::Metadata>>,
    ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
}

Design Decisions

Position Type

Choose based on your ordering needs:

Position TypeUse Case
()Unordered, append-only log
u64Global sequence number
(i64, i32)Timestamp + sequence for distributed systems
UuidEvent IDs for deduplication

Storage Schema

A typical SQL schema:

CREATE TABLE events (
    position BIGSERIAL PRIMARY KEY,
    aggregate_kind TEXT NOT NULL,
    aggregate_id TEXT NOT NULL,
    event_kind TEXT NOT NULL,
    data BYTEA NOT NULL,
    metadata JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_events_aggregate
    ON events (aggregate_kind, aggregate_id, position);

CREATE INDEX idx_events_kind
    ON events (event_kind, position);

Implementation Skeleton

use std::future::Future;
use sourcery::store::{
    AppendError, EventFilter, EventStore, JsonCodec, PersistableEvent, StoredEvent, Transaction,
};
use sourcery::concurrency::ConcurrencyStrategy;

pub struct PostgresEventStore {
    pool: sqlx::PgPool,
    codec: JsonCodec,
}

impl EventStore for PostgresEventStore {
    type Id = String;
    type Position = i64;
    type Error = sqlx::Error;
    type Codec = JsonCodec;
    type Metadata = serde_json::Value;

    fn codec(&self) -> &Self::Codec { &self.codec }

    fn stream_version<'a>(&'a self, aggregate_kind: &'a str, aggregate_id: &'a Self::Id)
        -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a
    {
        async move { todo!("SELECT MAX(position) WHERE aggregate_kind = $1 AND aggregate_id = $2") }
    }

    fn begin<C: ConcurrencyStrategy>(&mut self, aggregate_kind: &str, aggregate_id: Self::Id, expected_version: Option<Self::Position>)
        -> Transaction<'_, Self, C>
    {
        Transaction::new(self, aggregate_kind.to_string(), aggregate_id, expected_version)
    }

    fn append<'a>(&'a mut self, aggregate_kind: &'a str, aggregate_id: &'a Self::Id, expected_version: Option<Self::Position>, events: Vec<PersistableEvent<Self::Metadata>>)
        -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a
    {
        async move { todo!("INSERT with version check") }
    }

    fn append_expecting_new<'a>(&'a mut self, aggregate_kind: &'a str, aggregate_id: &'a Self::Id, events: Vec<PersistableEvent<Self::Metadata>>)
        -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a
    {
        async move { todo!("INSERT only if stream empty") }
    }

    fn load_events<'a>(&'a self, filters: &'a [EventFilter<Self::Id, Self::Position>])
        -> impl Future<Output = Result<Vec<StoredEvent<Self::Id, Self::Position, Self::Metadata>>, Self::Error>> + Send + 'a
    {
        async move { todo!("SELECT with filters") }
    }
}

Implementing Transactions

The Transaction type manages event batching:

impl PostgresEventStore {
    pub async fn append_batch(
        &mut self,
        aggregate_kind: &str,
        aggregate_id: &str,
        events: Vec<PersistableEvent<serde_json::Value>>,
    ) -> Result<(), sqlx::Error> {
        let mut tx = self.pool.begin().await?;

        for event in events {
            sqlx::query(
                "INSERT INTO events (aggregate_kind, aggregate_id, event_kind, data, metadata)
                 VALUES ($1, $2, $3, $4, $5)"
            )
            .bind(aggregate_kind)
            .bind(aggregate_id)
            .bind(&event.kind)
            .bind(&event.data)
            .bind(&event.metadata)
            .execute(&mut *tx)
            .await?;
        }

        tx.commit().await?;
        Ok(())
    }
}

Loading Events

Handle multiple filters efficiently:

fn load_events(&self, filters: &[EventFilter<String, i64>])
    -> Result<Vec<StoredEvent<String, i64, serde_json::Value>>, sqlx::Error>
{
    // Deduplicate overlapping filters
    // Build WHERE clause:
    //   (event_kind = 'x' AND position > N)
    //   OR (event_kind = 'y' AND aggregate_kind = 'a' AND aggregate_id = 'b')
    // ORDER BY position ASC
    // Map rows to StoredEvent
    todo!()
}

Optimistic Concurrency

For systems requiring strict ordering, add version checking:

ALTER TABLE events ADD COLUMN stream_version INT NOT NULL;

CREATE UNIQUE INDEX idx_events_stream_version
    ON events (aggregate_kind, aggregate_id, stream_version);
// In append_batch:
// 1. Get current max version for stream
// 2. Insert with version + 1
// 3. Handle unique constraint violation as concurrency conflict

Event Stores for Different Databases

DynamoDB

Table: events
  PK: {aggregate_kind}#{aggregate_id}
  SK: {position:012d}  (zero-padded for sorting)
  GSI: event_kind-position-index

MongoDB

{
  _id: ObjectId,
  aggregateKind: "account",
  aggregateId: "ACC-001",
  eventKind: "account.deposited",
  position: NumberLong(1234),
  data: BinData(...),
  metadata: { ... }
}

S3 (Append-Only Log)

s3://bucket/events/{aggregate_kind}/{aggregate_id}/{position}.json
s3://bucket/events-by-kind/{event_kind}/{position}.json  (symlinks/copies)

Testing Your Store

Use the same test patterns as inmemory::Store:

#[tokio::test]
async fn test_append_and_load() {
    let mut store = PostgresEventStore::new(test_pool()).await;

    // Append events
    let mut tx = store.begin::<Unchecked>("account", "ACC-001".to_string(), None);
    tx.append(event, metadata)?;
    tx.commit()?;

    // Load and verify
    let events = store.load_events(&[
        EventFilter::for_aggregate("account.deposited", "account", "ACC-001".to_string())
    ])?;

    assert_eq!(events.len(), 1);
}

Next

Test Framework — Testing aggregates in isolation