Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Stores

The crate separates storage concerns into two traits:

  • EventStore for event persistence
  • SnapshotStore for aggregate/projection snapshots

Stores own serialisation and deserialisation.

The EventStore Trait

pub trait EventStore: Send + Sync {
    /// Aggregate identifier type.
    ///
    /// This type must be clonable so repositories can reuse IDs across calls.
    /// Common choices: `String`, `Uuid`, or custom ID types.
    type Id: Clone + Send + Sync + 'static;

    /// Position type used for ordering events and version checking.
    ///
    /// Must be `Clone + PartialEq` to support optimistic concurrency.
    /// Use `()` if ordering is not needed.
    type Position: Clone + PartialEq + std::fmt::Debug + Send + Sync + 'static;

    /// Store-specific error type.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Metadata type for infrastructure concerns.
    type Metadata: Send + Sync + 'static;

    /// Serialised event payload type.
    ///
    /// This is the format used to store event data internally. Common choices:
    /// - `serde_json::Value` for JSON-based stores
    /// - `Vec<u8>` for binary stores
    type Data: Clone + Send + Sync + 'static;

    /// Decode a stored event into a concrete event type.
    ///
    /// Deserialises the `data` field of a [`StoredEvent`] back into a domain
    /// event.
    ///
    /// # Errors
    ///
    /// Returns an error if deserialisation fails.
    fn decode_event<E>(
        &self,
        stored: &StoredEvent<Self::Id, Self::Position, Self::Data, Self::Metadata>,
    ) -> Result<E, Self::Error>
    where
        E: crate::event::DomainEvent + serde::de::DeserializeOwned;

    /// Get the current version (latest position) for an aggregate stream.
    ///
    /// Returns `None` for streams with no events.
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when the operation fails.
    fn stream_version<'a>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
    ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;

    /// Commit events to an aggregate stream without version checking.
    ///
    /// Events are serialised and persisted atomically. No conflict detection
    /// is performed (last-writer-wins).
    ///
    /// # Errors
    ///
    /// Returns [`CommitError::Serialization`] if an event fails to serialise,
    /// or [`CommitError::Store`] if persistence fails.
    fn commit_events<'a, E>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        events: NonEmpty<E>,
        metadata: &'a Self::Metadata,
    ) -> impl Future<Output = Result<Committed<Self::Position>, CommitError<Self::Error>>> + Send + 'a
    where
        E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
        Self::Metadata: Clone;

    /// Commit events to an aggregate stream with optimistic concurrency
    /// control.
    ///
    /// Events are serialised and persisted atomically. The commit fails if:
    /// - `expected_version` is `Some(v)` and the current version differs from
    ///   `v`
    /// - `expected_version` is `None` and the stream already has events (new
    ///   aggregate expected)
    ///
    /// # Errors
    ///
    /// Returns [`OptimisticCommitError::Serialization`] if an event fails to
    /// serialise, [`OptimisticCommitError::Conflict`] if the version check
    /// fails, or [`OptimisticCommitError::Store`] if persistence fails.
    #[allow(clippy::type_complexity)]
    fn commit_events_optimistic<'a, E>(
        &'a self,
        aggregate_kind: &'a str,
        aggregate_id: &'a Self::Id,
        expected_version: Option<Self::Position>,
        events: NonEmpty<E>,
        metadata: &'a Self::Metadata,
    ) -> impl Future<
        Output = Result<
            Committed<Self::Position>,
            OptimisticCommitError<Self::Position, Self::Error>,
        >,
    > + Send
    + 'a
    where
        E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
        Self::Metadata: Clone;

    /// Load events matching the specified filters.
    ///
    /// Each filter describes an event kind and optional aggregate identity:
    /// - [`EventFilter::for_event`] loads every event of the given kind
    /// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
    ///
    /// The store optimises based on its storage model and returns events
    /// merged by position (if positions are available).
    ///
    /// # Errors
    ///
    /// Returns a store-specific error when loading fails.
    #[allow(clippy::type_complexity)]
    fn load_events<'a>(
        &'a self,
        filters: &'a [EventFilter<Self::Id, Self::Position>],
    ) -> impl Future<
        Output = LoadEventsResult<
            Self::Id,
            Self::Position,
            Self::Data,
            Self::Metadata,
            Self::Error,
        >,
    > + Send
    + 'a;
}

/// Marker trait for stores that provide globally ordered positions.
///
/// Projection snapshots require this guarantee.
pub trait GloballyOrderedStore: EventStore {}

Built-in: inmemory::Store

For testing and prototyping:

use sourcery::store::inmemory;

// Unit metadata
let store: inmemory::Store<String, ()> = inmemory::Store::new();

// Custom metadata
let store: inmemory::Store<String, MyMetadata> = inmemory::Store::new();

The in-memory store uses serde_json and supports globally ordered positions.

Committing Events

use nonempty::NonEmpty;

let events = NonEmpty::singleton(my_event);

// Unchecked (last-writer-wins)
store.commit_events("account", &account_id, events.clone(), &metadata).await?;

// Optimistic concurrency
store
    .commit_events_optimistic("account", &account_id, Some(expected_version), events, &metadata)
    .await?;

commit_events_optimistic fails with ConcurrencyConflict when the expected and actual stream versions differ.

Loading Events with Filters

// All events of one kind
EventFilter::for_event("account.deposited")

// One aggregate instance
EventFilter::for_aggregate("account.deposited", "account", "ACC-001")

// Incremental loading
EventFilter::for_event("account.deposited").after(100)

StoredEvent in Practice

load_events returns StoredEvent<Id, Pos, Data, Metadata>, containing:

  • envelope fields (aggregate_kind, aggregate_id, kind, position)
  • serialised payload (data)
  • metadata (metadata)

Use EventStore::decode_event() to deserialise payloads into domain events.

For the exact field layout, see API docs for StoredEvent.

The SnapshotStore Trait

pub trait SnapshotStore<Id: Sync>: Send + Sync {
    /// Position type for tracking snapshot positions.
    ///
    /// Must match the `EventStore::Position` type used in the same repository.
    type Position: Send + Sync;

    /// Error type for snapshot operations.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Load the most recent snapshot for an aggregate.
    ///
    /// Returns `Ok(None)` if no snapshot exists.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying storage fails.
    fn load<T>(
        &self,
        kind: &str,
        id: &Id,
    ) -> impl std::future::Future<Output = Result<Option<Snapshot<Self::Position, T>>, Self::Error>> + Send
    where
        T: DeserializeOwned;

    /// Whether to store a snapshot, with lazy snapshot creation.
    ///
    /// The repository calls this after successfully appending new events,
    /// passing `events_since_last_snapshot` and a `create_snapshot`
    /// callback. Implementations may decline without invoking
    /// `create_snapshot`, avoiding unnecessary snapshot creation cost
    /// (serialisation, extra I/O, etc.).
    ///
    /// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
    /// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
    /// snapshot was stored.
    ///
    /// # Errors
    ///
    /// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
    /// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
    fn offer_snapshot<CE, T, Create>(
        &self,
        kind: &str,
        id: &Id,
        events_since_last_snapshot: u64,
        create_snapshot: Create,
    ) -> impl std::future::Future<
        Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>,
    > + Send
    where
        CE: std::error::Error + Send + Sync + 'static,
        T: Serialize,
        Create: FnOnce() -> Result<Snapshot<Self::Position, T>, CE> + Send;
}

See Snapshots for policy and usage guidance.

Implementing a Custom Store

See Custom Stores for a practical guide.

Next

The Aggregate Derive — Reducing boilerplate with macros