Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Aggregates

An aggregate is a cluster of domain objects treated as a single unit for data changes. In event sourcing, aggregates rebuild their state by replaying events and validate commands to produce new events.

The Aggregate Trait

pub trait Aggregate: Default + Sized {
    /// Aggregate type identifier used by the event store.
    ///
    /// This is combined with the aggregate ID to create stream identifiers.
    /// Use lowercase, kebab-case for consistency: `"product"`,
    /// `"user-account"`, etc.
    const KIND: &'static str;

    type Event;
    type Error;
    type Id;

    /// Apply an event to update aggregate state.
    ///
    /// This is called during event replay to rebuild aggregate state from
    /// history.
    ///
    /// When using `#[derive(Aggregate)]`, this dispatches to your `Apply<E>`
    /// implementations. For hand-written aggregates, implement this
    /// directly with a match expression.
    fn apply(&mut self, event: &Self::Event);
}

Most of this is generated by #[derive(Aggregate)]. You focus on the behavior.

Snapshots and serde

Snapshots are opt-in. If you enable snapshots (via Repository::with_snapshots), the aggregate state must be serializable (Serialize + DeserializeOwned).

The Apply<E> Trait

When using #[derive(Aggregate)], implement Apply<E> for each event type:

pub trait Apply<E> {
    fn apply(&mut self, event: &E);
}

This is where state mutation happens:

impl Apply<FundsDeposited> for Account {
    fn apply(&mut self, event: &FundsDeposited) {
        self.balance += event.amount;
    }
}

impl Apply<FundsWithdrawn> for Account {
    fn apply(&mut self, event: &FundsWithdrawn) {
        self.balance -= event.amount;
    }
}

Key rules: apply must be infallible (events are facts) and deterministic (same events → same state).

Event Replay

Event Storebalance = 120  apply(FundsDeposited { amount: 100 })apply(FundsWithdrawn { amount: 30 })apply(FundsDeposited { amount: 50 })





The aggregate starts in its Default state. Each event is applied in order. The final state matches what would exist if you had executed all the original commands.

Loading Aggregates

Use AggregateBuilder to load an aggregate’s current state:

let account: Account = repository
    .aggregate_builder()
    .load(&account_id)?;

println!("Current balance: {}", account.balance);

On Repository this replays all events for that aggregate ID. On Repository<S, C, Snapshots<SS>> it loads a snapshot first (when present) and replays only the delta.

Next

Domain Events — Defining events as first-class types