Dynamic Consistency Boundary in RailsEventStore
Last year I had the pleasure of rambling about Dynamic Consistency Boundary at the DRUG Ruby User Group in Wrocław, followed by a guest appearance on the Better Software Design podcast. The interview was quite a new experience for me and I enjoyed it a lot — kudos go to Mariusz for being such a great host. At the end of the episode I shared a sentiment that RailsEventStore would support DCB soon, perhaps in its next release.
I was being quite aggressive about the timeline, but it wasn't completely baseless. The underlying storage model and query capabilities already positioned RailsEventStore to meet two thirds of the DCB specification. These included querying events, filtered by their type and by a set of tags and tagging events freely at the moment they were appended or later.
RailsEventStore existed long before the discovery of DCB. But our approach to event streams was a bit different from the one criticized in the Kill Aggregate series. Events in RES can belong to multiple streams at the same time. They can be added and removed from streams later on. That supported refactoring of the aggregate boundaries and allowed multiple partitions of events at the same time, each for the particular reader. In that sense, a stream name acted just like a tag would in a DCB specification.
The elephant in the room, and the missing part, was modifying the append method to accept AppendCondition. Without it, the DCB examples ported directly to RES had a significant flaw. For various reasons, this remained unaddressed until now.
Technically, changing the append to support DCB meant remodelling Concurrency Control away from streams. That is, replacing validation constraints on a SQL index with running a query. And all that without a race condition. Usually this involves some form of linearizing writes that share overlapping AppendCondition. The more granular locking, the better performance as the unrelated writes could happen in parallel.
Locks can be explicit, just as Orisun implements them. Or more subtle and implicit — in the form of SERIALIZABLE isolation.
This is how Postgres behaves in this isolation level:
To guarantee true serializability PostgreSQL uses predicate locking, which means that it keeps locks which allow it to determine when a write would have had an impact on the result of a previous read from a concurrent transaction, had it run first. In PostgreSQL these locks do not cause any blocking and therefore can not play any part in causing a deadlock. They are used to identify and flag dependencies among concurrent Serializable transactions which in certain combinations can lead to serialization anomalies.
Putting it all together, a single-file implementation and verification, with no change to the storage:
require "bundler/inline"
gemfile do
source "https://rubygems.org"
gem "rails_event_store"
gem "pg"
gem "minitest",
require: "minitest/autorun"
gem "rails", require: false
end
# ActiveRecord::Base.logger = Logger.new(STDOUT)
# DATA_TYPE="jsonb"
# DATABASE_URL="postgres://localhost/rails_event_store_active_record"
::ActiveRecord::Base.establish_connection
::ActiveRecord::Schema.define do
drop_table(
:event_store_events_in_streams,
if_exists: true
)
drop_table(
:event_store_events,
if_exists: true
)
create_table(
:event_store_events,
id: :bigserial,
force: false
) do |t|
t.references :event,
null: false,
type: :uuid,
index: {
unique: true
}
t.string :event_type,
null: false,
index: true
t.jsonb :metadata
t.jsonb :data, null: false
t.datetime :created_at,
null: false,
type: :timestamp,
precision: 6,
index: true
t.datetime :valid_at,
null: true,
type: :timestamp,
precision: 6,
index: true
end
create_table(
:event_store_events_in_streams,
id: :bigserial,
force: false
) do |t|
t.string :stream, null: false
t.integer :position, null: true
t.references :event,
null: false,
type: :uuid,
index: true,
foreign_key: {
to_table:
:event_store_events,
primary_key:
:event_id
}
t.datetime :created_at,
null: false,
type: :timestamp,
precision: 6,
index: true
t.index %i[stream position],
unique: true
t.index %i[stream event_id],
unique: true
end
end
module RubyEventStore
class DCBClient < ::RailsEventStore::JSONClient
def append(
events,
append_query,
tags: []
)
@repository.append(
transform(
enrich_events_metadata(events)
),
append_query,
tags
)
self
end
end
class Specification
def after(position)
Specification.new(
reader,
result.dup do |r|
r.after = position
end
)
end
def of_tags(tags)
Specification.new(
reader,
result.dup do |r|
r.stream =
Data
.define(:name) do
def global? = false
end
.new(name: tags)
end
)
end
def exists? = reader.exists?(result)
end
class SpecificationReader
def exists?(specification_result)
repository.exists?(
specification_result
)
end
end
class SpecificationResult
def initialize(
direction: :forward,
start: nil,
stop: nil,
older_than: nil,
older_than_or_equal: nil,
newer_than: nil,
newer_than_or_equal: nil,
time_sort_by: nil,
count: nil,
stream: Stream.new(GLOBAL_STREAM),
read_as: :all,
batch_size: Specification::DEFAULT_BATCH_SIZE,
with_ids: nil,
with_types: nil,
after: nil
)
@attributes =
Struct.new(
:direction,
:start,
:stop,
:older_than,
:older_than_or_equal,
:newer_than,
:newer_than_or_equal,
:time_sort_by,
:count,
:stream,
:read_as,
:batch_size,
:with_ids,
:with_types,
:after
).new(
direction,
start,
stop,
older_than,
older_than_or_equal,
newer_than,
newer_than_or_equal,
time_sort_by,
count,
stream,
read_as,
batch_size,
with_ids,
with_types,
after
)
freeze
end
def after = @attributes.after
end
class DCBEventRepository < ActiveRecord::EventRepository
def append(
records,
append_query,
tags
)
return if records.empty?
event_klass.transaction(
isolation: :serializable
) do
if append_query.call.exists?
raise
end
event_klass.insert_all!(
records.map do |record|
insert_hash(
record,
record.serialize(
@serializer
)
)
end
)
stream_klass.insert_all!(
records
.map do |record|
tags.map do |tag|
{
stream: tag,
event_id:
record.event_id,
created_at:
Time.now.utc
}
end
end
.flatten
)
end
self
rescue ::ActiveRecord::RecordNotUnique,
::ActiveRecord::SerializationFailure => e
raise_error(e)
end
def exists?(spec) =
repo_reader.read_scope(
spec
).exists?
def initialize(...)
super
repo_reader.define_singleton_method(
:read_scope
) do |spec|
scope = super(spec)
if spec.after
scope =
scope.where(
[
%_"#{@event_klass.table_name}"."id" > ?_,
spec.after
]
)
end
unless spec.stream.global?
scope.unscope(where: :stream)
Array(
spec.stream.name
).each do |tag|
scope =
scope.and(
scope.where(stream: tag)
)
end
end
scope
end
end
end
end
TopUpCredits = Data.define(:amount)
CreditsToppedUp =
Class.new(RailsEventStore::Event)
UseCredits = Data.define(:amount)
CreditsUsed =
Class.new(RailsEventStore::Event)
class Account
class << self
def initial_state = 0
def evolve(balance, event)
case event
when CreditsToppedUp
balance +
event.data.fetch(:amount)
when CreditsUsed
balance -
event.data.fetch(:amount)
end
end
def decide(balance, command)
case command
when TopUpCredits
[
CreditsToppedUp.new(
data: {
amount: command.amount
}
)
]
when UseCredits
if command.amount > balance
raise
end
[
CreditsUsed.new(
data: {
amount: command.amount
}
)
]
end
end
end
end
class Repository
def initialize(event_store)
@event_store = event_store
end
def with_state(
event_types,
tags,
decider
)
state, last_event_id =
load(event_types, tags, decider)
position =
@event_store.global_position(
last_event_id
) + 1 if last_event_id
@event_store.append(
(yield state),
-> do
@event_store
.read
.of_tags(tags)
.of_type(event_types)
.after(position)
end,
tags: tags
)
end
def load(event_types, tags, decider)
events =
@event_store
.read
.of_tags(tags)
.of_type(event_types)
.to_a
[
events.reduce(
decider.initial_state
) do |state, event|
decider.evolve(state, event)
end,
events.last&.event_id
]
end
end
class Scenario
def initialize(event_store)
@repository =
Repository.new(event_store)
@id = SecureRandom.uuid
end
def top_up_credits(amount)
@repository.with_state(
[CreditsToppedUp, CreditsUsed],
["account:#{@id}"],
Account
) do |state|
Account.decide(
state,
TopUpCredits.new(amount: amount)
)
end
end
def use_credits(amount)
@repository.with_state(
[CreditsToppedUp, CreditsUsed],
["account:#{@id}"],
Account
) do |state|
Account.decide(
state,
UseCredits.new(amount: amount)
)
end
end
def balance =
@repository.load(
[CreditsToppedUp, CreditsUsed],
["account:#{@id}"],
Account
).first
end
module RubyEventStore
class DCBTest < Minitest::Test
def test_happy
account = mk_scenario
account.top_up_credits(100)
account.use_credits(90)
assert_equal 10, account.balance
end
def test_invariant
account = mk_scenario
assert_raises do
account.use_credits(100)
end
assert_equal 0, account.balance
end
def test_stress
account = mk_scenario
account.top_up_credits(100)
barrier =
Concurrent::CyclicBarrier.new(
count =
::ActiveRecord::Base
.connection
.pool
.size
)
count
.times
.map do
Thread.new do
barrier.wait
begin
account.use_credits(100)
rescue RuntimeError,
WrongExpectedEventVersion
end
end
end
.map(&:join)
assert_equal 0, account.balance
end
def mk_event_store =
DCBClient.new(
repository:
DCBEventRepository.new(
serializer: JSON
)
)
def mk_scenario =
Scenario.new(mk_event_store)
end
end
require "bundler/inline"
gemfile do
source "https://rubygems.org"
gem "rails_event_store"
gem "pg"
gem "minitest", require: "minitest/autorun"
gem "rails", require: false
end
# ActiveRecord::Base.logger = Logger.new(STDOUT)
# DATA_TYPE="jsonb"
# DATABASE_URL="postgres://localhost/rails_event_store_active_record"
::ActiveRecord::Base.establish_connection
::ActiveRecord::Schema.define do
drop_table(:event_store_events_in_streams, if_exists: true)
drop_table(:event_store_events, if_exists: true)
create_table(:event_store_events, id: :bigserial, force: false) do |t|
t.references :event, null: false, type: :uuid, index: { unique: true }
t.string :event_type, null: false, index: true
t.jsonb :metadata
t.jsonb :data, null: false
t.datetime :created_at,
null: false,
type: :timestamp,
precision: 6,
index: true
t.datetime :valid_at,
null: true,
type: :timestamp,
precision: 6,
index: true
end
create_table(
:event_store_events_in_streams,
id: :bigserial,
force: false
) do |t|
t.string :stream, null: false
t.integer :position, null: true
t.references :event,
null: false,
type: :uuid,
index: true,
foreign_key: {
to_table: :event_store_events,
primary_key: :event_id
}
t.datetime :created_at,
null: false,
type: :timestamp,
precision: 6,
index: true
t.index %i[stream position], unique: true
t.index %i[stream event_id], unique: true
end
end
module RubyEventStore
class DCBClient < ::RailsEventStore::JSONClient
def append(events, append_query, tags: [])
@repository.append(
transform(enrich_events_metadata(events)),
append_query,
tags
)
self
end
end
class Specification
def after(position)
Specification.new(reader, result.dup { |r| r.after = position })
end
def of_tags(tags)
Specification.new(
reader,
result.dup do |r|
r.stream = Data.define(:name) { def global? = false }.new(name: tags)
end
)
end
def exists? = reader.exists?(result)
end
class SpecificationReader
def exists?(specification_result)
repository.exists?(specification_result)
end
end
class SpecificationResult
def initialize(
direction: :forward,
start: nil,
stop: nil,
older_than: nil,
older_than_or_equal: nil,
newer_than: nil,
newer_than_or_equal: nil,
time_sort_by: nil,
count: nil,
stream: Stream.new(GLOBAL_STREAM),
read_as: :all,
batch_size: Specification::DEFAULT_BATCH_SIZE,
with_ids: nil,
with_types: nil,
after: nil
)
@attributes =
Struct.new(
:direction,
:start,
:stop,
:older_than,
:older_than_or_equal,
:newer_than,
:newer_than_or_equal,
:time_sort_by,
:count,
:stream,
:read_as,
:batch_size,
:with_ids,
:with_types,
:after
).new(
direction,
start,
stop,
older_than,
older_than_or_equal,
newer_than,
newer_than_or_equal,
time_sort_by,
count,
stream,
read_as,
batch_size,
with_ids,
with_types,
after
)
freeze
end
def after = @attributes.after
end
class DCBEventRepository < ActiveRecord::EventRepository
def append(records, append_query, tags)
return if records.empty?
event_klass.transaction(isolation: :serializable) do
raise if append_query.call.exists?
event_klass.insert_all!(
records.map do |record|
insert_hash(record, record.serialize(@serializer))
end
)
stream_klass.insert_all!(
records
.map do |record|
tags.map do |tag|
{
stream: tag,
event_id: record.event_id,
created_at: Time.now.utc
}
end
end
.flatten
)
end
self
rescue ::ActiveRecord::RecordNotUnique,
::ActiveRecord::SerializationFailure => e
raise_error(e)
end
def exists?(spec) = repo_reader.read_scope(spec).exists?
def initialize(...)
super
repo_reader.define_singleton_method(:read_scope) do |spec|
scope = super(spec)
if spec.after
scope =
scope.where([%_"#{@event_klass.table_name}"."id" > ?_, spec.after])
end
unless spec.stream.global?
scope.unscope(where: :stream)
Array(spec.stream.name).each do |tag|
scope = scope.and(scope.where(stream: tag))
end
end
scope
end
end
end
end
TopUpCredits = Data.define(:amount)
CreditsToppedUp = Class.new(RailsEventStore::Event)
UseCredits = Data.define(:amount)
CreditsUsed = Class.new(RailsEventStore::Event)
class Account
class << self
def initial_state = 0
def evolve(balance, event)
case event
when CreditsToppedUp
balance + event.data.fetch(:amount)
when CreditsUsed
balance - event.data.fetch(:amount)
end
end
def decide(balance, command)
case command
when TopUpCredits
[CreditsToppedUp.new(data: { amount: command.amount })]
when UseCredits
raise if command.amount > balance
[CreditsUsed.new(data: { amount: command.amount })]
end
end
end
end
class Repository
def initialize(event_store)
@event_store = event_store
end
def with_state(event_types, tags, decider)
state, last_event_id = load(event_types, tags, decider)
position = @event_store.global_position(last_event_id) + 1 if last_event_id
@event_store.append(
(yield state),
-> do
@event_store.read.of_tags(tags).of_type(event_types).after(position)
end,
tags: tags
)
end
def load(event_types, tags, decider)
events = @event_store.read.of_tags(tags).of_type(event_types).to_a
[
events.reduce(decider.initial_state) do |state, event|
decider.evolve(state, event)
end,
events.last&.event_id
]
end
end
class Scenario
def initialize(event_store)
@repository = Repository.new(event_store)
@id = SecureRandom.uuid
end
def top_up_credits(amount)
@repository.with_state(
[CreditsToppedUp, CreditsUsed],
["account:#{@id}"],
Account
) { |state| Account.decide(state, TopUpCredits.new(amount: amount)) }
end
def use_credits(amount)
@repository.with_state(
[CreditsToppedUp, CreditsUsed],
["account:#{@id}"],
Account
) { |state| Account.decide(state, UseCredits.new(amount: amount)) }
end
def balance =
@repository.load(
[CreditsToppedUp, CreditsUsed],
["account:#{@id}"],
Account
).first
end
module RubyEventStore
class DCBTest < Minitest::Test
def test_happy
account = mk_scenario
account.top_up_credits(100)
account.use_credits(90)
assert_equal 10, account.balance
end
def test_invariant
account = mk_scenario
assert_raises { account.use_credits(100) }
assert_equal 0, account.balance
end
def test_stress
account = mk_scenario
account.top_up_credits(100)
barrier =
Concurrent::CyclicBarrier.new(
count = ::ActiveRecord::Base.connection.pool.size
)
count
.times
.map do
Thread.new do
barrier.wait
begin
account.use_credits(100)
rescue RuntimeError, WrongExpectedEventVersion
end
end
end
.map(&:join)
assert_equal 0, account.balance
end
def mk_event_store =
DCBClient.new(repository: DCBEventRepository.new(serializer: JSON))
def mk_scenario = Scenario.new(mk_event_store)
end
end
In this code I've deliberately chosen a few simplifications and omissions:
- not caring about MySQL or SQLite that RES also supports
- detecting the mixing of tag- and stream-based approaches, which could lead to broken ordering in streams
- breaking backwards compatibility of the public Client interface
My intent was to show that supporting Dynamic Consistency Boundary in Rails Event Store is low-hanging fruit, should the maintainers decide to integrate it fully. For the release, introducing new APIs without breaking backward compatibility with a dominant stream-based approach is a must. Lots of existing clients depend on it.
In the long term, it would be ideal to make one implementation (stream-based) a specialization of the other (DCB). Just as Aggregate Roots can be composed of Deciders, I'm quite optimistic that Streams can be re-implemented on top of DCB. Storage-layer compatibility means the APIs can change safely while remaining compatible with existing data. The facade can be polished over time and corner cases can be detected early or with little additional infrastructure.
Whether DCB introduces any performance challenges over the stream-based approach on top of SQL storage engines is a completely different topic for another post.