Stores & Codecs
The crate separates storage concerns into three traits: EventStore for event persistence, SnapshotStore for aggregate snapshots, and Codec for serialization.
The EventStore Trait
pub trait EventStore: Send + Sync {
/// Aggregate identifier type.
///
/// This type must be clonable so repositories can reuse IDs across calls.
/// Common choices: `String`, `Uuid`, or custom ID types.
type Id: Clone + Send + Sync + 'static;
/// Position type used for ordering events and version checking.
///
/// Must be `Copy + PartialEq` to support optimistic concurrency.
/// Use `()` if ordering is not needed.
type Position: Copy + PartialEq + std::fmt::Debug + Send + Sync + 'static;
/// Store-specific error type.
type Error: std::error::Error + Send + Sync + 'static;
/// Serialization codec.
type Codec: Codec + Clone + Send + Sync + 'static;
/// Metadata type for infrastructure concerns.
type Metadata: Send + Sync + 'static;
fn codec(&self) -> &Self::Codec;
/// Get the current version (latest position) for an aggregate stream.
///
/// Returns `None` for streams with no events.
///
/// # Errors
///
/// Returns a store-specific error when the operation fails.
fn stream_version<'a>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;
/// Begin a transaction for appending events to an aggregate.
///
/// The transaction type is determined by the concurrency strategy `C`.
///
/// # Arguments
/// * `aggregate_kind` - The aggregate type identifier (`Aggregate::KIND`)
/// * `aggregate_id` - The aggregate instance identifier
/// * `expected_version` - The version expected for optimistic concurrency
fn begin<C: ConcurrencyStrategy>(
&mut self,
aggregate_kind: &str,
aggregate_id: Self::Id,
expected_version: Option<Self::Position>,
) -> Transaction<'_, Self, C>
where
Self: Sized;
/// Append events with optional version checking.
///
/// If `expected_version` is `Some`, the append fails with a concurrency
/// conflict if the current stream version doesn't match.
/// If `expected_version` is `None`, no version checking is performed.
///
/// # Errors
///
/// Returns [`AppendError::Conflict`] if the version doesn't match, or
/// [`AppendError::Store`] if persistence fails.
fn append<'a>(
&'a mut self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
expected_version: Option<Self::Position>,
events: Vec<PersistableEvent<Self::Metadata>>,
) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
/// Load events matching the specified filters.
///
/// Each filter describes an event kind and optional aggregate identity:
/// - [`EventFilter::for_event`] loads every event of the given kind
/// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
///
/// The store optimizes based on its storage model and returns events
/// merged by position (if positions are available).
///
/// # Errors
///
/// Returns a store-specific error when loading fails.
fn load_events<'a>(
&'a self,
filters: &'a [EventFilter<Self::Id, Self::Position>],
) -> impl Future<
Output = LoadEventsResult<Self::Id, Self::Position, Self::Metadata, Self::Error>,
> + Send
+ 'a;
/// Append events expecting an empty stream.
///
/// This method is used by optimistic concurrency when creating new
/// aggregates. It fails with a [`ConcurrencyConflict`] if the stream
/// already has events.
///
/// # Errors
///
/// Returns [`AppendError::Conflict`] if the stream is not empty,
/// or [`AppendError::Store`] if persistence fails.
fn append_expecting_new<'a>(
&'a mut self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
events: Vec<PersistableEvent<Self::Metadata>>,
) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
}
Built-in: inmemory::Store
For testing and prototyping:
use sourcery::store::{inmemory, JsonCodec};
// With unit metadata
let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
// With custom metadata
let store: inmemory::Store<String, JsonCodec, MyMetadata> = inmemory::Store::new(JsonCodec);
Features:
- Uses
u64positions (global sequence number) - Events stored in memory (lost on drop)
- Deduplicates overlapping filters when loading
Transactions
Events are appended within a transaction for atomicity:
use sourcery::concurrency::Unchecked;
let mut tx = store.begin::<Unchecked>("account", "ACC-001".to_string(), None);
tx.append(event1, metadata.clone())?;
tx.append(event2, metadata.clone())?;
tx.commit().await?; // Events visible only after commit
If the transaction is dropped without committing, no events are persisted.
Event Filters
Control which events to load:
// All events of a specific kind (across all aggregates)
EventFilter::for_event("account.deposited")
// All events for a specific aggregate instance
EventFilter::for_aggregate("account.deposited", "account", "ACC-001")
// Events after a position (for incremental loading)
EventFilter::for_event("account.deposited").after(100)
The Codec Trait
pub trait Codec {
type Error: std::error::Error + Send + Sync + 'static;
/// Serialize a value for persistence.
///
/// # Errors
///
/// Returns an error from the codec if the value cannot be serialized.
fn serialize<T>(&self, value: &T) -> Result<Vec<u8>, Self::Error>
where
T: serde::Serialize;
/// Deserialize a value from stored bytes.
///
/// # Errors
///
/// Returns an error from the codec if the bytes cannot be decoded.
fn deserialize<T>(&self, data: &[u8]) -> Result<T, Self::Error>
where
T: serde::de::DeserializeOwned;
}
Built-in: JsonCodec
Uses serde_json for human-readable storage:
use sourcery::JsonCodec;
let codec = JsonCodec;
For production, consider implementing codecs for:
- Protobuf — Compact, schema-enforced
- MessagePack — Compact, schema-less
- Avro — Schema evolution built-in
The SnapshotStore Trait
pub trait SnapshotStore: Send + Sync {
/// Aggregate identifier type.
///
/// Must match the `EventStore::Id` type used in the same repository.
type Id: Send + Sync + 'static;
/// Position type for tracking snapshot positions.
///
/// Must match the `EventStore::Position` type used in the same repository.
type Position: Send + Sync + 'static;
/// Error type for snapshot operations.
type Error: std::error::Error + Send + Sync + 'static;
/// Load the most recent snapshot for an aggregate.
///
/// Returns `Ok(None)` if no snapshot exists.
///
/// # Errors
///
/// Returns an error if the underlying storage fails.
fn load<'a>(
&'a self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
) -> impl Future<Output = Result<Option<Snapshot<Self::Position>>, Self::Error>> + Send + 'a;
/// Whether to store a snapshot, with lazy snapshot creation.
///
/// The repository calls this after successfully appending new events,
/// passing `events_since_last_snapshot` and a `create_snapshot`
/// callback. Implementations may decline without invoking
/// `create_snapshot`, avoiding unnecessary snapshot creation cost
/// (serialization, extra I/O, etc.).
///
/// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
/// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
/// snapshot was stored.
///
/// # Errors
///
/// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
/// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
fn offer_snapshot<'a, CE, Create>(
&'a mut self,
aggregate_kind: &'a str,
aggregate_id: &'a Self::Id,
events_since_last_snapshot: u64,
create_snapshot: Create,
) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
where
CE: std::error::Error + Send + Sync + 'static,
Create: FnOnce() -> Result<Snapshot<Self::Position>, CE> + 'a;
}
See Snapshots for details.
The StoredEvent Type
Events loaded from the store:
pub struct StoredEvent<Id, Pos, M> {
pub aggregate_kind: String,
pub aggregate_id: Id,
pub kind: String, // Event type (e.g., "account.deposited")
pub position: Pos, // Global or stream position
pub data: Vec<u8>, // Serialized event payload
pub metadata: M, // Store-provided metadata
}
The PersistableEvent Type
Events ready to be stored:
pub struct PersistableEvent<M> {
pub kind: String, // Event type
pub data: Vec<u8>, // Serialized payload
pub metadata: M, // Caller-provided metadata
}
The SerializableEvent trait (generated by the derive macro) converts domain events into this form.
Implementing a Custom Store
See Custom Stores for a guide on implementing EventStore for your database.
Next
The Aggregate Derive — Reducing boilerplate with macros