— originally on blog.arkency.com

One simple trick to make Event Sourcing click

One simple trick to make Event Sourcing click

Event Sourcing is like having two methods when previously there was one. There — I've said it.

But it isn't my idea at all.

It was Greg that used it first, in a bit different context. When explaining CQRS he used this exact words:

Starting with CQRS, CQRS is simply the creation of two objects where there was previously only one. The separation occurs based upon whether the methods are a command or a query (the same definition that is used by Meyer in Command and Query Separation, a command is any method that mutates state and a query is any method that returns a value).

You can have quite a similar statement on event-sourced aggregate root. The separation occurs based upon whether the method:

Not convinced yet? Let the examples speak.

Stereotypical aggregate without Event Sourcing

Below is a typical aggregate root. In the scope of the example there are only two actions you can take — via public register and supply methods.

class Product
  CannotSupply = Class.new(StandardError)
  AlreadyRegistered = Class.new(StandardError)

  def initialize(store_id: nil, sku: nil, quantity_available: 0)
    @store_id = store_id
    @sku = sku
    @quantity_available = quantity_available
  end

  def register(store_id:, sku:, event_store:)
    raise AlreadyRegistered if @store_id
    @store_id = store_id
    @sku = sku

    event_store.publish_event(ProductRegistered.new(data: {
      store_id: @store_id,
      sku: @sku,
    }))
  end

  def supply(quantity, event_store:)
    raise CannotSupply unless @store_id && @sku

    @quantity_available += quantity

    event_store.publish_event(ProductSupplied.new(data: {
      store_id: @store_id,
      sku: @sku,
      quantity: quantity,
    }))
  end
end

Aggregate with Event Sourcing

In event sourcing it is the domain events that are our source of truth. They state what happened. What we need to do is to make them a bit more useful and convenient for decision making. This is the sourcing part.

class Product
  CannotSupply = Class.new(StandardError)
  AlreadyRegistered = Class.new(StandardError)

  def initialize(store_id: nil, sku: nil, quantity_available: 0)
    @store_id = store_id
    @sku = sku
    @quantity_available = quantity_available
  end

  def register(store_id:, sku:, event_store:)
    raise AlreadyRegistered if @store_id

    event = ProductRegistered.new(data: {
      store_id: store_id,
      sku: sku,
    })

    event_store.publish_event(event)
    registered(event)
  end

  def supply(quantity, event_store:)
    raise CannotSupply unless @store_id && @sku

    event = ProductSupplied.new(data: {
      store_id: @store_id,
      sku: @sku,
      quantity: quantity,
    })

    event_store.publish_event(event)
    supplied(event)
  end

  private

  def supplied(event)
    @quantity_available += event.data.fetch(:quantity)
  end

  def registered(event)
    @sku = event.data.fetch(:sku)
    @store_id = event.data.fetch(:store_id)
  end
end

In this step we've drawn the line between making a statement that something happened (being possible to happen first) and what side effects does it have. Notice private registered and supplied methods.

Why make such effort and introduce indirection? The reason is simple — if the events are source of truth, we could not only shape internal state for current actions we take but also for the ones that happened in the past.

Instead of loading current state stored in a database, we can take collection of events that happened in scope of this aggregate — in its stream.

class Product
  CannotSupply = Class.new(StandardError)
  AlreadyRegistered = Class.new(StandardError)

  def initialize(store_id: nil, sku: nil, event_store:)
    stream_name = "Product$#{store_id}-#{sku}"
    events = event_store.read_all_events_forward(stream_name)
    events.each do |event|
      case event
      when ProductRegistered then registered(event)
      when ProductSupplied then supplied(event)
      end
    end
  end

  def register(store_id:, sku:, event_store:)
    raise AlreadyRegistered if @store_id

    event = ProductRegistered.new(data: {
      store_id: store_id,
      sku: sku,
    })

    event_store.publish_event(event)
    registered(event)
  end

  def supply(quantity, event_store:)
    raise CannotSupply unless @store_id && @sku

    event = ProductSupplied.new(data: {
      store_id: @store_id,
      sku: @sku,
      quantity: quantity,
    })

    event_store.publish_event(event)
    supplied(event)
  end

  private

  def supplied(event)
    @quantity_available += event.data.fetch(:quantity)
  end

  def registered(event)
    @sku = event.data.fetch(:sku)
    @store_id = event.data.fetch(:store_id)
  end
end

At this point you may have figured out that event_store dependency that we constantly pass as an argument belongs more to the infrastructure layer than to a domain and business.

What if something above passed a list of events first so we could rebuild the state? After an aggregate action happened we could provide a list of domain events to be published (unpublished_events):

class Product
  CannotSupply = Class.new(StandardError)
  AlreadyRegistered = Class.new(StandardError)

  attr_reader :unpublished_events  

  def initialize(events)
    @unpublished_events = []
    events.each { |event| dispatch(event) }
  end

  def register(store_id:, sku:)
    raise AlreadyRegistered if @store_id

    apply(ProductRegistered.new(data: {
      store_id: store_id,
      sku: sku,
    }))
  end

  def supply(quantity)
    raise CannotSupply unless @store_id && @sku

    apply(ProductSupplied.new(data: {
      store_id: @store_id,
      sku: @sku,
      quantity: quantity,
    }))
  end

  private

  def apply(event)
    dispatch(event)
    @unpublished_events << event
  end

  def dispatch(event)
    case event
    when ProductRegistered then registered(event)
    when ProductSupplied then supplied(event)
    end
  end

  def supplied(event)
    @quantity_available += event.data.fetch(:quantity)
  end

  def registered(event)
    @sku = event.data.fetch(:sku)
    @store_id = event.data.fetch(:store_id)
  end
end

More or less this reminds the aggregate_root gem that is aimed to assist you with event sourced aggregates.

The rule of having two methods when there was previously one however still holds.

Have a great day!