Dynamic Consistency Boundary in RailsEventStore

Last year I had the pleasure of rambling about Dynamic Consistency Boundary at the DRUG Ruby User Group in Wrocław, followed by a guest appearance on the Better Software Design podcast. The interview was quite a new experience for me and I enjoyed it a lot — kudos go to Mariusz for being such a great host. At the end of the episode I shared a sentiment that RailsEventStore would support DCB soon, perhaps in its next release.

I was being quite aggressive about the timeline, but it wasn't completely baseless. The underlying storage model and query capabilities already positioned RailsEventStore to meet two thirds of the DCB specification. These included querying events, filtered by their type and by a set of tags and tagging events freely at the moment they were appended or later.

RailsEventStore existed long before the discovery of DCB. But our approach to event streams was a bit different from the one criticized in the Kill Aggregate series. Events in RES can belong to multiple streams at the same time. They can be added and removed from streams later on. That supported refactoring of the aggregate boundaries and allowed multiple partitions of events at the same time, each for the particular reader. In that sense, a stream name acted just like a tag would in a DCB specification.

The elephant in the room, and the missing part, was modifying the append method to accept AppendCondition. Without it, the DCB examples ported directly to RES had a significant flaw. For various reasons, this remained unaddressed until now.

Technically, changing the append to support DCB meant remodelling Concurrency Control away from streams. That is, replacing validation constraints on a SQL index with running a query. And all that without a race condition. Usually this involves some form of linearizing writes that share overlapping AppendCondition. The more granular locking, the better performance as the unrelated writes could happen in parallel. Locks can be explicit, just as Orisun implements them. Or more subtle and implicit — in the form of SERIALIZABLE isolation.

This is how Postgres behaves in this isolation level:

To guarantee true serializability PostgreSQL uses predicate locking, which means that it keeps locks which allow it to determine when a write would have had an impact on the result of a previous read from a concurrent transaction, had it run first. In PostgreSQL these locks do not cause any blocking and therefore can not play any part in causing a deadlock. They are used to identify and flag dependencies among concurrent Serializable transactions which in certain combinations can lead to serialization anomalies.

Putting it all together, a single-file implementation and verification, with no change to the storage:

require "bundler/inline"

gemfile do
  source "https://rubygems.org"

  gem "rails_event_store"
  gem "pg"
  gem "minitest",
      require: "minitest/autorun"
  gem "rails", require: false
end

# ActiveRecord::Base.logger = Logger.new(STDOUT)
# DATA_TYPE="jsonb"
# DATABASE_URL="postgres://localhost/rails_event_store_active_record"

::ActiveRecord::Base.establish_connection
::ActiveRecord::Schema.define do
  drop_table(
    :event_store_events_in_streams,
    if_exists: true
  )
  drop_table(
    :event_store_events,
    if_exists: true
  )

  create_table(
    :event_store_events,
    id: :bigserial,
    force: false
  ) do |t|
    t.references :event,
                 null: false,
                 type: :uuid,
                 index: {
                   unique: true
                 }
    t.string :event_type,
             null: false,
             index: true
    t.jsonb :metadata
    t.jsonb :data, null: false
    t.datetime :created_at,
               null: false,
               type: :timestamp,
               precision: 6,
               index: true
    t.datetime :valid_at,
               null: true,
               type: :timestamp,
               precision: 6,
               index: true
  end

  create_table(
    :event_store_events_in_streams,
    id: :bigserial,
    force: false
  ) do |t|
    t.string :stream, null: false
    t.integer :position, null: true
    t.references :event,
                 null: false,
                 type: :uuid,
                 index: true,
                 foreign_key: {
                   to_table:
                     :event_store_events,
                   primary_key:
                     :event_id
                 }
    t.datetime :created_at,
               null: false,
               type: :timestamp,
               precision: 6,
               index: true

    t.index %i[stream position],
            unique: true
    t.index %i[stream event_id],
            unique: true
  end
end

module RubyEventStore
  class DCBClient < ::RailsEventStore::JSONClient
    def append(
      events,
      append_query,
      tags: []
    )
      @repository.append(
        transform(
          enrich_events_metadata(events)
        ),
        append_query,
        tags
      )
      self
    end
  end

  class Specification
    def after(position)
      Specification.new(
        reader,
        result.dup do |r|
          r.after = position
        end
      )
    end

    def of_tags(tags)
      Specification.new(
        reader,
        result.dup do |r|
          r.stream =
            Data
              .define(:name) do
                def global? = false
              end
              .new(name: tags)
        end
      )
    end

    def exists? = reader.exists?(result)
  end

  class SpecificationReader
    def exists?(specification_result)
      repository.exists?(
        specification_result
      )
    end
  end

  class SpecificationResult
    def initialize(
      direction: :forward,
      start: nil,
      stop: nil,
      older_than: nil,
      older_than_or_equal: nil,
      newer_than: nil,
      newer_than_or_equal: nil,
      time_sort_by: nil,
      count: nil,
      stream: Stream.new(GLOBAL_STREAM),
      read_as: :all,
      batch_size: Specification::DEFAULT_BATCH_SIZE,
      with_ids: nil,
      with_types: nil,
      after: nil
    )
      @attributes =
        Struct.new(
          :direction,
          :start,
          :stop,
          :older_than,
          :older_than_or_equal,
          :newer_than,
          :newer_than_or_equal,
          :time_sort_by,
          :count,
          :stream,
          :read_as,
          :batch_size,
          :with_ids,
          :with_types,
          :after
        ).new(
          direction,
          start,
          stop,
          older_than,
          older_than_or_equal,
          newer_than,
          newer_than_or_equal,
          time_sort_by,
          count,
          stream,
          read_as,
          batch_size,
          with_ids,
          with_types,
          after
        )
      freeze
    end

    def after = @attributes.after
  end

  class DCBEventRepository < ActiveRecord::EventRepository
    def append(
      records,
      append_query,
      tags
    )
      return if records.empty?

      event_klass.transaction(
        isolation: :serializable
      ) do
        if append_query.call.exists?
          raise
        end

        event_klass.insert_all!(
          records.map do |record|
            insert_hash(
              record,
              record.serialize(
                @serializer
              )
            )
          end
        )
        stream_klass.insert_all!(
          records
            .map do |record|
              tags.map do |tag|
                {
                  stream: tag,
                  event_id:
                    record.event_id,
                  created_at:
                    Time.now.utc
                }
              end
            end
            .flatten
        )
      end
      self
    rescue ::ActiveRecord::RecordNotUnique,
           ::ActiveRecord::SerializationFailure => e
      raise_error(e)
    end

    def exists?(spec) =
      repo_reader.read_scope(
        spec
      ).exists?

    def initialize(...)
      super

      repo_reader.define_singleton_method(
        :read_scope
      ) do |spec|
        scope = super(spec)

        if spec.after
          scope =
            scope.where(
              [
                %_"#{@event_klass.table_name}"."id" > ?_,
                spec.after
              ]
            )
        end

        unless spec.stream.global?
          scope.unscope(where: :stream)

          Array(
            spec.stream.name
          ).each do |tag|
            scope =
              scope.and(
                scope.where(stream: tag)
              )
          end
        end

        scope
      end
    end
  end
end

TopUpCredits = Data.define(:amount)
CreditsToppedUp =
  Class.new(RailsEventStore::Event)

UseCredits = Data.define(:amount)
CreditsUsed =
  Class.new(RailsEventStore::Event)

class Account
  class << self
    def initial_state = 0

    def evolve(balance, event)
      case event
      when CreditsToppedUp
        balance +
          event.data.fetch(:amount)
      when CreditsUsed
        balance -
          event.data.fetch(:amount)
      end
    end

    def decide(balance, command)
      case command
      when TopUpCredits
        [
          CreditsToppedUp.new(
            data: {
              amount: command.amount
            }
          )
        ]
      when UseCredits
        if command.amount > balance
          raise
        end
        [
          CreditsUsed.new(
            data: {
              amount: command.amount
            }
          )
        ]
      end
    end
  end
end

class Repository
  def initialize(event_store)
    @event_store = event_store
  end

  def with_state(
    event_types,
    tags,
    decider
  )
    state, last_event_id =
      load(event_types, tags, decider)
    position =
      @event_store.global_position(
        last_event_id
      ) + 1 if last_event_id

    @event_store.append(
      (yield state),
      -> do
        @event_store
          .read
          .of_tags(tags)
          .of_type(event_types)
          .after(position)
      end,
      tags: tags
    )
  end

  def load(event_types, tags, decider)
    events =
      @event_store
        .read
        .of_tags(tags)
        .of_type(event_types)
        .to_a

    [
      events.reduce(
        decider.initial_state
      ) do |state, event|
        decider.evolve(state, event)
      end,
      events.last&.event_id
    ]
  end
end

class Scenario
  def initialize(event_store)
    @repository =
      Repository.new(event_store)
    @id = SecureRandom.uuid
  end

  def top_up_credits(amount)
    @repository.with_state(
      [CreditsToppedUp, CreditsUsed],
      ["account:#{@id}"],
      Account
    ) do |state|
      Account.decide(
        state,
        TopUpCredits.new(amount: amount)
      )
    end
  end

  def use_credits(amount)
    @repository.with_state(
      [CreditsToppedUp, CreditsUsed],
      ["account:#{@id}"],
      Account
    ) do |state|
      Account.decide(
        state,
        UseCredits.new(amount: amount)
      )
    end
  end

  def balance =
    @repository.load(
      [CreditsToppedUp, CreditsUsed],
      ["account:#{@id}"],
      Account
    ).first
end

module RubyEventStore
  class DCBTest < Minitest::Test
    def test_happy
      account = mk_scenario
      account.top_up_credits(100)
      account.use_credits(90)

      assert_equal 10, account.balance
    end

    def test_invariant
      account = mk_scenario
      assert_raises do
        account.use_credits(100)
      end

      assert_equal 0, account.balance
    end

    def test_stress
      account = mk_scenario
      account.top_up_credits(100)

      barrier =
        Concurrent::CyclicBarrier.new(
          count =
            ::ActiveRecord::Base
              .connection
              .pool
              .size
        )
      count
        .times
        .map do
          Thread.new do
            barrier.wait
            begin
              account.use_credits(100)
            rescue RuntimeError,
                   WrongExpectedEventVersion
            end
          end
        end
        .map(&:join)

      assert_equal 0, account.balance
    end

    def mk_event_store =
      DCBClient.new(
        repository:
          DCBEventRepository.new(
            serializer: JSON
          )
      )

    def mk_scenario =
      Scenario.new(mk_event_store)
  end
end
require "bundler/inline"

gemfile do
  source "https://rubygems.org"

  gem "rails_event_store"
  gem "pg"
  gem "minitest", require: "minitest/autorun"
  gem "rails", require: false
end

# ActiveRecord::Base.logger = Logger.new(STDOUT)
# DATA_TYPE="jsonb"
# DATABASE_URL="postgres://localhost/rails_event_store_active_record"

::ActiveRecord::Base.establish_connection
::ActiveRecord::Schema.define do
  drop_table(:event_store_events_in_streams, if_exists: true)
  drop_table(:event_store_events, if_exists: true)

  create_table(:event_store_events, id: :bigserial, force: false) do |t|
    t.references :event, null: false, type: :uuid, index: { unique: true }
    t.string :event_type, null: false, index: true
    t.jsonb :metadata
    t.jsonb :data, null: false
    t.datetime :created_at,
               null: false,
               type: :timestamp,
               precision: 6,
               index: true
    t.datetime :valid_at,
               null: true,
               type: :timestamp,
               precision: 6,
               index: true
  end

  create_table(
    :event_store_events_in_streams,
    id: :bigserial,
    force: false
  ) do |t|
    t.string :stream, null: false
    t.integer :position, null: true
    t.references :event,
                 null: false,
                 type: :uuid,
                 index: true,
                 foreign_key: {
                   to_table: :event_store_events,
                   primary_key: :event_id
                 }
    t.datetime :created_at,
               null: false,
               type: :timestamp,
               precision: 6,
               index: true

    t.index %i[stream position], unique: true
    t.index %i[stream event_id], unique: true
  end
end

module RubyEventStore
  class DCBClient < ::RailsEventStore::JSONClient
    def append(events, append_query, tags: [])
      @repository.append(
        transform(enrich_events_metadata(events)),
        append_query,
        tags
      )
      self
    end
  end

  class Specification
    def after(position)
      Specification.new(reader, result.dup { |r| r.after = position })
    end

    def of_tags(tags)
      Specification.new(
        reader,
        result.dup do |r|
          r.stream = Data.define(:name) { def global? = false }.new(name: tags)
        end
      )
    end

    def exists? = reader.exists?(result)
  end

  class SpecificationReader
    def exists?(specification_result)
      repository.exists?(specification_result)
    end
  end

  class SpecificationResult
    def initialize(
      direction: :forward,
      start: nil,
      stop: nil,
      older_than: nil,
      older_than_or_equal: nil,
      newer_than: nil,
      newer_than_or_equal: nil,
      time_sort_by: nil,
      count: nil,
      stream: Stream.new(GLOBAL_STREAM),
      read_as: :all,
      batch_size: Specification::DEFAULT_BATCH_SIZE,
      with_ids: nil,
      with_types: nil,
      after: nil
    )
      @attributes =
        Struct.new(
          :direction,
          :start,
          :stop,
          :older_than,
          :older_than_or_equal,
          :newer_than,
          :newer_than_or_equal,
          :time_sort_by,
          :count,
          :stream,
          :read_as,
          :batch_size,
          :with_ids,
          :with_types,
          :after
        ).new(
          direction,
          start,
          stop,
          older_than,
          older_than_or_equal,
          newer_than,
          newer_than_or_equal,
          time_sort_by,
          count,
          stream,
          read_as,
          batch_size,
          with_ids,
          with_types,
          after
        )
      freeze
    end

    def after = @attributes.after
  end

  class DCBEventRepository < ActiveRecord::EventRepository
    def append(records, append_query, tags)
      return if records.empty?

      event_klass.transaction(isolation: :serializable) do
        raise if append_query.call.exists?

        event_klass.insert_all!(
          records.map do |record|
            insert_hash(record, record.serialize(@serializer))
          end
        )
        stream_klass.insert_all!(
          records
            .map do |record|
              tags.map do |tag|
                {
                  stream: tag,
                  event_id: record.event_id,
                  created_at: Time.now.utc
                }
              end
            end
            .flatten
        )
      end
      self
    rescue ::ActiveRecord::RecordNotUnique,
           ::ActiveRecord::SerializationFailure => e
      raise_error(e)
    end

    def exists?(spec) = repo_reader.read_scope(spec).exists?

    def initialize(...)
      super

      repo_reader.define_singleton_method(:read_scope) do |spec|
        scope = super(spec)

        if spec.after
          scope =
            scope.where([%_"#{@event_klass.table_name}"."id" > ?_, spec.after])
        end

        unless spec.stream.global?
          scope.unscope(where: :stream)

          Array(spec.stream.name).each do |tag|
            scope = scope.and(scope.where(stream: tag))
          end
        end

        scope
      end
    end
  end
end

TopUpCredits = Data.define(:amount)
CreditsToppedUp = Class.new(RailsEventStore::Event)

UseCredits = Data.define(:amount)
CreditsUsed = Class.new(RailsEventStore::Event)

class Account
  class << self
    def initial_state = 0

    def evolve(balance, event)
      case event
      when CreditsToppedUp
        balance + event.data.fetch(:amount)
      when CreditsUsed
        balance - event.data.fetch(:amount)
      end
    end

    def decide(balance, command)
      case command
      when TopUpCredits
        [CreditsToppedUp.new(data: { amount: command.amount })]
      when UseCredits
        raise if command.amount > balance
        [CreditsUsed.new(data: { amount: command.amount })]
      end
    end
  end
end

class Repository
  def initialize(event_store)
    @event_store = event_store
  end

  def with_state(event_types, tags, decider)
    state, last_event_id = load(event_types, tags, decider)
    position = @event_store.global_position(last_event_id) + 1 if last_event_id

    @event_store.append(
      (yield state),
      -> do
        @event_store.read.of_tags(tags).of_type(event_types).after(position)
      end,
      tags: tags
    )
  end

  def load(event_types, tags, decider)
    events = @event_store.read.of_tags(tags).of_type(event_types).to_a

    [
      events.reduce(decider.initial_state) do |state, event|
        decider.evolve(state, event)
      end,
      events.last&.event_id
    ]
  end
end

class Scenario
  def initialize(event_store)
    @repository = Repository.new(event_store)
    @id = SecureRandom.uuid
  end

  def top_up_credits(amount)
    @repository.with_state(
      [CreditsToppedUp, CreditsUsed],
      ["account:#{@id}"],
      Account
    ) { |state| Account.decide(state, TopUpCredits.new(amount: amount)) }
  end

  def use_credits(amount)
    @repository.with_state(
      [CreditsToppedUp, CreditsUsed],
      ["account:#{@id}"],
      Account
    ) { |state| Account.decide(state, UseCredits.new(amount: amount)) }
  end

  def balance =
    @repository.load(
      [CreditsToppedUp, CreditsUsed],
      ["account:#{@id}"],
      Account
    ).first
end

module RubyEventStore
  class DCBTest < Minitest::Test
    def test_happy
      account = mk_scenario
      account.top_up_credits(100)
      account.use_credits(90)

      assert_equal 10, account.balance
    end

    def test_invariant
      account = mk_scenario
      assert_raises { account.use_credits(100) }

      assert_equal 0, account.balance
    end

    def test_stress
      account = mk_scenario
      account.top_up_credits(100)

      barrier =
        Concurrent::CyclicBarrier.new(
          count = ::ActiveRecord::Base.connection.pool.size
        )
      count
        .times
        .map do
          Thread.new do
            barrier.wait
            begin
              account.use_credits(100)
            rescue RuntimeError, WrongExpectedEventVersion
            end
          end
        end
        .map(&:join)

      assert_equal 0, account.balance
    end

    def mk_event_store =
      DCBClient.new(repository: DCBEventRepository.new(serializer: JSON))

    def mk_scenario = Scenario.new(mk_event_store)
  end
end

In this code I've deliberately chosen a few simplifications and omissions:

My intent was to show that supporting Dynamic Consistency Boundary in Rails Event Store is low-hanging fruit, should the maintainers decide to integrate it fully. For the release, introducing new APIs without breaking backward compatibility with a dominant stream-based approach is a must. Lots of existing clients depend on it.

In the long term, it would be ideal to make one implementation (stream-based) a specialization of the other (DCB). Just as Aggregate Roots can be composed of Deciders, I'm quite optimistic that Streams can be re-implemented on top of DCB. Storage-layer compatibility means the APIs can change safely while remaining compatible with existing data. The facade can be polished over time and corner cases can be detected early or with little additional infrastructure.

Whether DCB introduces any performance challenges over the stream-based approach on top of SQL storage engines is a completely different topic for another post.