Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Stores & Codecs

The crate separates storage concerns into three traits: EventStore for event persistence, SnapshotStore for aggregate snapshots, and Codec for serialization.

The EventStore Trait

pub trait EventStore: Send + Sync {
    /// Aggregate identifier type.
    ///
    /// This type must be clonable so repositories can reuse IDs across calls.
    /// Common choices: `String`, `Uuid`, or custom ID types.
    type Id: Clone + Send + Sync + 'static;

    /// Position type used for ordering events and version checking.
    ///
    /// Must be `Copy + PartialEq` to support optimistic concurrency.
    /// Use `()` if ordering is not needed.
    type Position: Copy + PartialEq + std::fmt::Debug + Send + Sync + 'static;

    /// Store-specific error type.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Serialization codec.
    type Codec: Codec + Clone + Send + Sync + 'static;

    /// Metadata type for infrastructure concerns.
    type Metadata: Send + Sync + 'static;

    fn codec(&self) -> &Self::Codec;

    /// Get the current version (latest position) for an aggregate stream.
    ///
    /// Returns `None` for streams with no events.
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when the operation fails.
    fn stream_version<'a>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
    ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;

    /// Begin a transaction for appending events to an aggregate.
    ///
    /// The transaction type is determined by the concurrency strategy `C`.
    ///
    /// # Arguments
    /// * `aggregate_kind` - The aggregate type identifier (`Aggregate::KIND`)
    /// * `aggregate_id` - The aggregate instance identifier
    /// * `expected_version` - The version expected for optimistic concurrency
    fn begin<C: ConcurrencyStrategy>(
        &mut self,
        aggregate_kind: &str,
        aggregate_id: Self::Id,
        expected_version: Option<Self::Position>,
    ) -> Transaction<'_, Self, C>
    where
        Self: Sized;

    /// Append events with optional version checking.
    ///
    /// If `expected_version` is `Some`, the append fails with a concurrency
    /// conflict if the current stream version doesn't match.
    /// If `expected_version` is `None`, no version checking is performed.
    ///
    /// # Errors
    ///
    /// Returns [`AppendError::Conflict`] if the version doesn't match, or
    /// [`AppendError::Store`] if persistence fails.
    fn append<'a>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        expected_version: Option<Self::Position>,
        events: Vec<PersistableEvent<Self::Metadata>>,
    ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;

    /// Load events matching the specified filters.
    ///
    /// Each filter describes an event kind and optional aggregate identity:
    /// - [`EventFilter::for_event`] loads every event of the given kind
    /// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
    ///
    /// The store optimizes based on its storage model and returns events
    /// merged by position (if positions are available).
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when loading fails.
    fn load_events<'a>(
        &'a self,
        filters: &'a [EventFilter<Self::Id, Self::Position>],
    ) -> impl Future<
        Output = LoadEventsResult<Self::Id, Self::Position, Self::Metadata, Self::Error>,
    > + Send
    + 'a;

    /// Append events expecting an empty stream.
    ///
    /// This method is used by optimistic concurrency when creating new
    /// aggregates. It fails with a [`ConcurrencyConflict`] if the stream
    /// already has events.
    ///
    /// # Errors
    ///
    /// Returns [`AppendError::Conflict`] if the stream is not empty,
    /// or [`AppendError::Store`] if persistence fails.
    fn append_expecting_new<'a>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        events: Vec<PersistableEvent<Self::Metadata>>,
    ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
}

Built-in: inmemory::Store

For testing and prototyping:

use sourcery::store::{inmemory, JsonCodec};

// With unit metadata
let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);

// With custom metadata
let store: inmemory::Store<String, JsonCodec, MyMetadata> = inmemory::Store::new(JsonCodec);

Features:

  • Uses u64 positions (global sequence number)
  • Events stored in memory (lost on drop)
  • Deduplicates overlapping filters when loading

Transactions

Events are appended within a transaction for atomicity:

use sourcery::concurrency::Unchecked;

let mut tx = store.begin::<Unchecked>("account", "ACC-001".to_string(), None);
tx.append(event1, metadata.clone())?;
tx.append(event2, metadata.clone())?;
tx.commit().await?; // Events visible only after commit

If the transaction is dropped without committing, no events are persisted.

Event Filters

Control which events to load:

// All events of a specific kind (across all aggregates)
EventFilter::for_event("account.deposited")

// All events for a specific aggregate instance
EventFilter::for_aggregate("account.deposited", "account", "ACC-001")

// Events after a position (for incremental loading)
EventFilter::for_event("account.deposited").after(100)

The Codec Trait

pub trait Codec {
    type Error: std::error::Error + Send + Sync + 'static;

    /// Serialize a value for persistence.
    ///
    /// # Errors
    ///
    /// Returns an error from the codec if the value cannot be serialized.
    fn serialize<T>(&self, value: &T) -> Result<Vec<u8>, Self::Error>
    where
        T: serde::Serialize;

    /// Deserialize a value from stored bytes.
    ///
    /// # Errors
    ///
    /// Returns an error from the codec if the bytes cannot be decoded.
    fn deserialize<T>(&self, data: &[u8]) -> Result<T, Self::Error>
    where
        T: serde::de::DeserializeOwned;
}

Built-in: JsonCodec

Uses serde_json for human-readable storage:

use sourcery::JsonCodec;

let codec = JsonCodec;

For production, consider implementing codecs for:

  • Protobuf — Compact, schema-enforced
  • MessagePack — Compact, schema-less
  • Avro — Schema evolution built-in

The SnapshotStore Trait

pub trait SnapshotStore: Send + Sync {
    /// Aggregate identifier type.
    ///
    /// Must match the `EventStore::Id` type used in the same repository.
    type Id: Send + Sync + 'static;

    /// Position type for tracking snapshot positions.
    ///
    /// Must match the `EventStore::Position` type used in the same repository.
    type Position: Send + Sync + 'static;

    /// Error type for snapshot operations.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Load the most recent snapshot for an aggregate.
    ///
    /// Returns `Ok(None)` if no snapshot exists.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying storage fails.
    fn load<'a>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
    ) -> impl Future<Output = Result<Option<Snapshot<Self::Position>>, Self::Error>> + Send + 'a;

    /// Whether to store a snapshot, with lazy snapshot creation.
    ///
    /// The repository calls this after successfully appending new events,
    /// passing `events_since_last_snapshot` and a `create_snapshot`
    /// callback. Implementations may decline without invoking
    /// `create_snapshot`, avoiding unnecessary snapshot creation cost
    /// (serialization, extra I/O, etc.).
    ///
    /// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
    /// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
    /// snapshot was stored.
    ///
    /// # Errors
    ///
    /// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
    /// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
    fn offer_snapshot<'a, CE, Create>(
        &'a mut self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        events_since_last_snapshot: u64,
        create_snapshot: Create,
    ) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
    where
        CE: std::error::Error + Send + Sync + 'static,
        Create: FnOnce() -> Result<Snapshot<Self::Position>, CE> + 'a;
}

See Snapshots for details.

The StoredEvent Type

Events loaded from the store:

pub struct StoredEvent<Id, Pos, M> {
    pub aggregate_kind: String,
    pub aggregate_id: Id,
    pub kind: String,           // Event type (e.g., "account.deposited")
    pub position: Pos,          // Global or stream position
    pub data: Vec<u8>,          // Serialized event payload
    pub metadata: M,            // Store-provided metadata
}

The PersistableEvent Type

Events ready to be stored:

pub struct PersistableEvent<M> {
    pub kind: String,           // Event type
    pub data: Vec<u8>,          // Serialized payload
    pub metadata: M,            // Caller-provided metadata
}

The SerializableEvent trait (generated by the derive macro) converts domain events into this form.

Implementing a Custom Store

See Custom Stores for a guide on implementing EventStore for your database.

Next

The Aggregate Derive — Reducing boilerplate with macros