Stores
The crate separates storage concerns into two traits:
EventStorefor event persistenceSnapshotStorefor aggregate/projection snapshots
Stores own serialisation and deserialisation.
The EventStore Trait
pub trait EventStore: Send + Sync {
/// Aggregate identifier type.
///
/// This type must be clonable so repositories can reuse IDs across calls.
/// Common choices: `String`, `Uuid`, or custom ID types.
type Id: Clone + Send + Sync + 'static;
/// Position type used for ordering events and version checking.
///
/// Must be `Clone + PartialEq` to support optimistic concurrency.
/// Use `()` if ordering is not needed.
type Position: Clone + PartialEq + std::fmt::Debug + Send + Sync + 'static;
/// Store-specific error type.
type Error: std::error::Error + Send + Sync + 'static;
/// Metadata type for infrastructure concerns.
type Metadata: Send + Sync + 'static;
/// Serialised event payload type.
///
/// This is the format used to store event data internally. Common choices:
/// - `serde_json::Value` for JSON-based stores
/// - `Vec<u8>` for binary stores
type Data: Clone + Send + Sync + 'static;
/// Decode a stored event into a concrete event type.
///
/// Deserialises the `data` field of a [`StoredEvent`] back into a domain
/// event.
///
/// # Errors
///
/// Returns an error if deserialisation fails.
fn decode_event<E>(
&self,
stored: &StoredEvent<Self::Id, Self::Position, Self::Data, Self::Metadata>,
) -> Result<E, Self::Error>
where
E: crate::event::DomainEvent + serde::de::DeserializeOwned;
/// Get the current version (latest position) for an aggregate stream.
///
/// Returns `None` for streams with no events.
///
/// # Errors
///
/// Returns a store-specific error when the operation fails.
fn stream_version<'a>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;
/// Commit events to an aggregate stream without version checking.
///
/// Events are serialised and persisted atomically. No conflict detection
/// is performed (last-writer-wins).
///
/// # Errors
///
/// Returns [`CommitError::Serialization`] if an event fails to serialise,
/// or [`CommitError::Store`] if persistence fails.
fn commit_events<'a, E>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
events: NonEmpty<E>,
metadata: &'a Self::Metadata,
) -> impl Future<Output = Result<Committed<Self::Position>, CommitError<Self::Error>>> + Send + 'a
where
E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
Self::Metadata: Clone;
/// Commit events to an aggregate stream with optimistic concurrency
/// control.
///
/// Events are serialised and persisted atomically. The commit fails if:
/// - `expected_version` is `Some(v)` and the current version differs from
/// `v`
/// - `expected_version` is `None` and the stream already has events (new
/// aggregate expected)
///
/// # Errors
///
/// Returns [`OptimisticCommitError::Serialization`] if an event fails to
/// serialise, [`OptimisticCommitError::Conflict`] if the version check
/// fails, or [`OptimisticCommitError::Store`] if persistence fails.
#[allow(clippy::type_complexity)]
fn commit_events_optimistic<'a, E>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
expected_version: Option<Self::Position>,
events: NonEmpty<E>,
metadata: &'a Self::Metadata,
) -> impl Future<
Output = Result<
Committed<Self::Position>,
OptimisticCommitError<Self::Position, Self::Error>,
>,
> + Send
+ 'a
where
E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
Self::Metadata: Clone;
/// Load events matching the specified filters.
///
/// Each filter describes an event kind and optional aggregate identity:
/// - [`EventFilter::for_event`] loads every event of the given kind
/// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
///
/// The store optimises based on its storage model and returns events
/// merged by position (if positions are available).
///
/// # Errors
///
/// Returns a store-specific error when loading fails.
#[allow(clippy::type_complexity)]
fn load_events<'a>(
&'a self,
filters: &'a [EventFilter<Self::Id, Self::Position>],
) -> impl Future<
Output = LoadEventsResult<
Self::Id,
Self::Position,
Self::Data,
Self::Metadata,
Self::Error,
>,
> + Send
+ 'a;
}
/// Marker trait for stores that provide globally ordered positions.
///
/// Projection snapshots require this guarantee.
pub trait GloballyOrderedStore: EventStore {}
Built-in: inmemory::Store
For testing and prototyping:
use sourcery::store::inmemory;
// Unit metadata
let store: inmemory::Store<String, ()> = inmemory::Store::new();
// Custom metadata
let store: inmemory::Store<String, MyMetadata> = inmemory::Store::new();
The in-memory store uses serde_json and supports globally ordered positions.
Committing Events
use nonempty::NonEmpty;
let events = NonEmpty::singleton(my_event);
// Unchecked (last-writer-wins)
store.commit_events("account", &account_id, events.clone(), &metadata).await?;
// Optimistic concurrency
store
.commit_events_optimistic("account", &account_id, Some(expected_version), events, &metadata)
.await?;
commit_events_optimistic fails with ConcurrencyConflict when the expected and actual stream versions differ.
Loading Events with Filters
// All events of one kind
EventFilter::for_event("account.deposited")
// One aggregate instance
EventFilter::for_aggregate("account.deposited", "account", "ACC-001")
// Incremental loading
EventFilter::for_event("account.deposited").after(100)
StoredEvent in Practice
load_events returns StoredEvent<Id, Pos, Data, Metadata>, containing:
- envelope fields (
aggregate_kind,aggregate_id,kind,position) - serialised payload (
data) - metadata (
metadata)
Use EventStore::decode_event() to deserialise payloads into domain events.
For the exact field layout, see API docs for StoredEvent.
The SnapshotStore Trait
pub trait SnapshotStore<Id: Sync>: Send + Sync {
/// Position type for tracking snapshot positions.
///
/// Must match the `EventStore::Position` type used in the same repository.
type Position: Send + Sync;
/// Error type for snapshot operations.
type Error: std::error::Error + Send + Sync + 'static;
/// Load the most recent snapshot for an aggregate.
///
/// Returns `Ok(None)` if no snapshot exists.
///
/// # Errors
///
/// Returns an error if the underlying storage fails.
fn load<T>(
&self,
kind: &str,
id: &Id,
) -> impl std::future::Future<Output = Result<Option<Snapshot<Self::Position, T>>, Self::Error>> + Send
where
T: DeserializeOwned;
/// Whether to store a snapshot, with lazy snapshot creation.
///
/// The repository calls this after successfully appending new events,
/// passing `events_since_last_snapshot` and a `create_snapshot`
/// callback. Implementations may decline without invoking
/// `create_snapshot`, avoiding unnecessary snapshot creation cost
/// (serialisation, extra I/O, etc.).
///
/// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
/// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
/// snapshot was stored.
///
/// # Errors
///
/// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
/// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
fn offer_snapshot<CE, T, Create>(
&self,
kind: &str,
id: &Id,
events_since_last_snapshot: u64,
create_snapshot: Create,
) -> impl std::future::Future<
Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>,
> + Send
where
CE: std::error::Error + Send + Sync + 'static,
T: Serialize,
Create: FnOnce() -> Result<Snapshot<Self::Position, T>, CE> + Send;
}
See Snapshots for policy and usage guidance.
Implementing a Custom Store
See Custom Stores for a practical guide.
Next
The Aggregate Derive — Reducing boilerplate with macros