Aggregates
An aggregate rebuilds state from events and validates commands to produce new events.
Basic Usage
For most aggregates:
#[derive(Aggregate)]- Implement
Apply<E>per event - Implement
Handle<C>per command
#[derive(Default, sourcery::Aggregate)]
#[aggregate(id = String, error = AccountError, events(FundsDeposited, FundsWithdrawn))]
pub struct Account {
balance: i64,
}
impl Apply<FundsDeposited> for Account {
fn apply(&mut self, event: &FundsDeposited) {
self.balance += event.amount;
}
}
impl Apply<FundsWithdrawn> for Account {
fn apply(&mut self, event: &FundsWithdrawn) {
self.balance -= event.amount;
}
}
Loading Aggregates
let account: Account = repository
.load(&account_id)
.await?;
This replays events for that aggregate ID. If snapshots are configured, replay starts from the latest snapshot.
Trait Reference
The Aggregate Trait
pub trait Aggregate: Default {
/// Aggregate type identifier used by the event store.
///
/// This is combined with the aggregate ID to create stream identifiers.
/// Use lowercase, kebab-case for consistency: `"product"`,
/// `"user-account"`, etc.
const KIND: &'static str;
type Event;
type Error;
type Id;
/// Apply an event to update aggregate state.
///
/// This is called during event replay to rebuild aggregate state from
/// history.
///
/// When using `#[derive(Aggregate)]`, this dispatches to your `Apply<E>`
/// implementations. For hand-written aggregates, implement this
/// directly with a match expression.
fn apply(&mut self, event: &Self::Event);
}
#[derive(Aggregate)] generates most of this.
The Apply<E> Trait
pub trait Apply<E> {
fn apply(&mut self, event: &E);
}
apply should be:
- Infallible (events are facts)
- Deterministic (same events -> same state)
Event Replay Model
Snapshots and serde
Snapshots are opt-in via Repository::with_snapshots().
If enabled, aggregate state must implement Serialize + DeserializeOwned.
Next
Domain Events — Defining events as first-class types