Process Managers revisited
Process Managers revisited
I've been telling my story with process managers some time ago. In short I've explored there a way to source state of the process using nothing more but domain events. However I've encountered an issue which led to a workaround I wasn't quite happy about. With the release of RailsEventStore v0.22.0 this is no longer a case!
Let's remind ourselves what was the original problem to solve:
You’re an operations manager. Your task is to suggest your customer a menu they’d like to order and at the same time you have to confirm that caterer can deliver this particular menu (for given catering conditions). In short you wait for
CustomerConfirmedMenu
andCatererConfirmedMenu
. Only after both happened you can proceed further. You’ll likely offer several menus to the customer and each of them will need a confirmation from corresponding caterers. If there’s a match ofCustomerConfirmedMenu
andCatererConfirmedMenu
for the sameorder_id
you cheer and triggerConfirmOrder
command to push things forward.
The issue manifested when I was about to "publish" events that process manager subscribed to and eventually received:
ActiveRecord::RecordNotUnique:
PG::UniqueViolation: ERROR: duplicate key value violates unique constraint "index_event_store_events_on_event_id"
DETAIL: Key (event_id)=(bddeffe8-7188-4004-918b-2ef77d94fa65) already exists.
I wanted to group those events in a short, dedicated stream from which they could be read from on each process manager invocation. Within limitations of RailsEventStore version at that time I wasn't able to do so and resorted to looking for past domain events in streams they were originally published to. That involved filtering them from irrelevant events (in the light of the process) and most notably knowing and depending on such streams (coupling).
Linking events to the rescue
Recently released RailsEventStore v0.22.0 finally brings long-awaited link_to_stream
API. This method would simply make reference to a published event in a given stream. It does not duplicate the domain event — it is the same fact but indexed in another collection.
From the outside it looks quite similar to publish_event
you may already know. It accepts stream name and expected version of an event in that stream. The difference is that you can only link published events so it takes event ids instead of events as a first argument:
TestEvent = Class.new(RubyEventStore::Event)
specify 'link events' do
client = RubyEventStore::Client.new(repository: InMemoryRepository.new)
first_event = TestEvent.new
second_event = TestEvent.new
client.append_to_stream(
[first_event, second_event],
stream_name: 'stream'
)
client.link_to_stream(
[first_event.event_id, second_event.event_id],
stream_name: 'flow',
expected_version: -1
)
client.link_to_stream(
[first_event.event_id],
stream_name: 'cars',
)
expect(client.read_stream_events_forward('flow')).to eq([first_event, second_event])
expect(client.read_stream_events_forward('cars')).to eq([first_event])
end
Just like when publishing, you cannot link same event twice in a stream.
Now you may be wondering why is that this API wasn't present before and just now became possible. From the inside we've changed how events are persisted — the layout of database tables in RailsEventStoreActiveRecord
is a bit different. There's a single table for domain events (event_store_events
) and another table to maintain links in streams (event_store_events_in_streams
).
It was quite a big change along v0.19.0 release and a challenging one to do it right. Overall, our goal was to make streams cheap. This opens a range of possibilities:
Generally when people are wanting only a few streams its because they want to read things out in a certain way for a particular type of reader.
What you can do is repartition your streams utilizing projections to help provide for a specific reader. As an example let’s say that a reader was interested in all the InventoryItemCreated and InventoryItemDeactivated events but was not interested in all the other events in the system.
Its important to remember that the way you write to your streams does not have to match the way you want to read from your streams. You can quite easily choose a different partitioning for a given reader.
How would our process manager look like with link_to_stream
then? Below you'll find store
method which takes advantage of it.
class CateringMatch
class State
def initialize
@caterer_confirmed = false
@customer_confirmed = false
@version = -1
@event_ids_to_link = []
end
def apply_caterer_confirmed_menu
@caterer_confirmed = true
end
def apply_customer_confirmed_menu
@customer_confirmed = true
end
def complete?
caterer_confirmed? && customer_confirmed?
end
def apply(*events)
events.each do |event|
case event
when CatererConfirmedMenu then apply_caterer_confirmed_menu
when CustomerConfirmedMenu then apply_customer_confirmed_menu
end
@event_ids_to_link << event.id
end
end
def load(stream_name, event_store:)
events = event_store.read_stream_events_forward(stream_name)
events.each do |event|
apply(event)
end
@version = events.size - 1
@event_ids_to_link = []
self
end
def store(stream_name, event_store:)
event_store.link_to_stream(
@event_ids_to_link,
stream_name: stream_name,
expected_version: @version
)
@version += @event_ids_to_link.size
@event_ids_to_link = []
end
end
private_constant :State
def initialize(command_bus:, event_store:)
@command_bus = command_bus
@event_store = event_store
end
def call(event)
order_id = event.data(:order_id)
stream_name = "CateringMatch$#{order_id}"
state = State.new
state.load(stream_name, event_store: @event_store)
state.apply(event)
state.store(stream_name, event_store: @event_store)
command_bus.(ConfirmOrder.new(data: {
order_id: order_id
})) if state.complete?
end
end
Whenever process manager receives new domain event it is processed and linked to the corresponding CateringMatch$
stream. If the process is complete, we trigger a command. Otherwise we have to wait for more events.
Inspecting Process Manager state
Processes like this happen over time and it's nice addition to be able to inspect their state. Is it nearly done? Or what are we waiting for? It's not uncommon that some processes may never complete.
A stream browser that now ships with RailsEventStore helps with that. Mount it in your application, launch the app and navigate to the stream you're interested in:
<%= img_fit("process-managers-revisited/stream-browser.png") %>
Tell us your RailsEventStore story
Isn't it funny that as creators we mostly learn about new people using what we've created from github issues when we break something or make it harder than necessary?
We'd love to hear from you when things are going well too 😅