WORK IN PROGRESS

Event Sourcing / CQRS library for Ruby.
There’s many ES gems available already. The objectives here are:

A small demo app here.

The programming model

If you’re unfamiliar with Event Sourcing, you can read this first: Event Sourcing from the ground up, with Ruby examples
For a high-level overview of the mental model, read this. Or the video version, here.

The entire behaviour of an event-sourced app is described via commands, events and reactions.

sourced-arch-diagram

  • Commands are intents to effect some change in the state of the system. Ex. Add cart item, Place order, Update email, etc.
  • Events are produced after handling a command and they describe facts or state changes in the system. Ex. Item added to cart, order placed, email updated. Events are stored and you can use them to build views (“projections”), caches and reports to support UIs, or other artifacts.
  • Reactions are blocks of code that run after an event has been processed and can dispatch new commands in a workflow or automation.
  • State is whatever object you need to hold the current state of a part of the system. It’s usually derived from past events, and it’s just enough to interrogate the state of the system and make the next decision.

Actors

Overview

Actors are classes that encapsulate the full life-cycle of a concept in your domain, backed by an event stream. This includes loading state from past events and handling commands for a part of your system. They can also define reactions to their own events, or events emitted by other actors. This is a simple shopping cart actor.

class Cart < Sourced::Actor
  # Define what cart state looks like.
  # This is the initial state which will be updated by applying events.
  # The state holds whatever data is relevant to decide how to handle a command.
  # It can be any object you need. A custom class instance, a Hash, an Array, etc.
  CartState = Struct.new(:id, :status, :items) do
    def total = items.sum { |it| it.price * it.quantity }
  end
    
  CartItem = Struct.new(:product_id, :price, :quantity)
    
  # This factory is called to initialise a blank cart.
  state do |id|
    CartState.new(id:, status: 'open', items: [])
  end
  
  # Define a command and its handling logic.
  # The command handler will be passed the current state of the cart,
  # and the command instance itself.
  # Its main job is to validate business rules and decide whether new events
  # can be emitted to update the state
  command :add_item, product_id: String, price: Integer, quantity: Integer do |cart, cmd|
    # Validate that this command can run
    raise "cart is not open!" unless cart.status == 'open'
    # Produce a new event with the same attributes as the command
    event :item_added, cmd.payload
  end
  
  # Define an event handler that will "evolve" the state of the cart by adding an item to it.
  # These handlers are also used to "hydrate" the initial state from Sourced's storage backend
  # when first handling a command
  event :item_added, product_id: String, price: Integer, quantity: Integer do |cart, event|
    cart.items << CartItem.new(**event.payload.to_h)
  end
  
  # Optionally, define how this actor reacts to the event above.
  # .reaction blocks can dispatch new commands that will be routed to their handlers.
  # This allows you to build workflows.
  reaction :item_added do |event|
    # Evaluate whether we should dispatch the next command.
    # Here we could fetch some external data or query that might be needed
    # to populate the new commands.
    # Here we dispatch a command to the same stream_id present in the event
    dispatch(:send_admin_email, product_id: event.payload.product_id)
  end
  
  # Handle the :send_admin_email dispatched by the reaction above
  command :send_admin_email, product_id: String do |cart, cmd|
    # maybe produce new events
  end
end

Using the Cart actor in an IRB console. This will use Sourced’s in-memory backend by default.

cart = Cart.new(id: 'test-cart')
cart.state.total # => 0
# Instantiate a command and handle it
cmd = Cart::AddItem.build('test-cart', product_id: 'p123', price: 1000, quantity: 2)
events = cart.decide(cmd)
# => [Cart::ItemAdded.new(...)]
cmd.valid? # true
# Inspect state
cart.state.total # 2000
cart.items.items.size # 1
# Inspect that events were stored
cart.seq # 2 the sequence number or "version" in storage. Ie. how many commands / events exist for this cart
# Append new messages to the backend
Sourced.config.backend.append_to_stream('test-cart', events)
# Load events for cart
events = Sourced.history_for(cart)
# => an array with instances of [Cart::AddItem, Cart::ItemAdded]
events.map(&:type) # ['cart.add_item', 'cart.item_added']

Try loading a new cart instance from recorded events

cart2, events = Sourced.load(Cart, 'test-cart')
cart2.seq # 2
cart2.state.total # 2000
cart2.state.items.size # 1

Registering actors

Invoking commands directly on an actor instance works in an IRB console or a synchronous-only web handler, but for actors to be available to background workers, and to react to other actor’s events, you need to register them.

Sourced.register(Cart)

This achieves two things:

  1. Messages can be routed to this actor by background processes, using Sourced.dispatch(message).
  2. The actor can react to other events in the system (more on event choreography later), via its low-level .handle(event) Reactor Interface.

These two properties are what enables asynchronous, eventually-consistent systems in Sourced.

Expanded message syntax

Commands and event structs can also be defined separately as Sourced::Command and Sourced::Event sub-classes.

These definitions include a message type (for storage) and payload attributes schema, if any.

module Carts
  # A command to add an item to the cart
  # Commands may come from HTML forms, so we use Types::Lax to coerce attributes
  AddItem = Sourced::Command.define('carts.add_item') do
    attribute :product_id, Types::Lax::Integer
    attribute :quantity, Types::Lax::Integer.default(1)
    attribute :price, Types::Lax::Integer.default(0)
  end
  
  # An event to track items added to the cart
  # Events are only produced by valid commands, so we don't 
  # need validations or coercions
  ItemAdded = Sourced::Event.define('carts.item_added') do
    attribute :product_id, Integer
    attribute :quantity, Integer
    attribute :price, Integer
  end
  
  ## Now define command and event handlers in a Actor
  class Cart < Sourced::Actor
    # Initial state, etc...
    
    command AddItem do |cart, cmd|
      # logic here
      event ItemAdded, cmd.payload
    end
    
    event ItemAdded do |cart, event|
      cart.items << CartItem.new(**event.payload.to_h)
    end
  end
end

.command block

The class-level .command block defines a command handler. Its job is to take a command (from a user, an automation, etc), validate it, and apply state changes by publishing new events.

sourced-command-handler

command AddItem do |cart, cmd|
  # logic here...
  # apply and publish one or more new events
  # using instance-level #event(event_type, **payload)
  event ItemAdded, product_id: cmd.payload.product_id
end

.event block

The class-level .event block registers an event handler used to evolve the actor’s internal state.

These blocks are used both to load the initial state when handling a command, and to apply new events to the state in command handlers.

sourced-evolve-handler

event ItemAdded do |cart, event|
  cart.items << CartItem.new(**event.payload.to_h)
end

These handlers are pure: given the same state and event, they should always update the state in the same exact way. They should never reach out to the outside (API calls, current time, etc), and they should never run validations. They work on events already committed to history, which by definition are assumed to be valid.

.before_evolve block

The class-level .before_evolve block registers a callback that runs before each registered event handler during state evolution. This is useful for common logic that should run before all event handlers, such as updating timestamps or recording metadata.

class CartListings < Sourced::Projector::StateStored
  state do |id|
    { id:, items: [], updated_at: nil, seq: 0 }
  end

  # This block runs before any .event handler
  before_evolve do |state, event|
    state[:updated_at] = event.created_at
    state[:seq] = event.seq
  end

  event Cart::ItemAdded do |state, event|
    state[:items] << event.payload.to_h
  end

  event Cart::Placed do |state, event|
    state[:status] = :placed
  end
end

The before_evolve callback only runs for events that have a registered handler via the .event macro. If an event is not handled by this class, the callback is skipped for that event.

.reaction block

The class-level .reaction block registers an event handler that reacts to events already published by this or other Actors.

.reaction blocks can dispatch the next command in a workflow with the instance-level #dispatch helper.

sourced-react-handler

reaction ItemAdded do |cart, event|
  # dispatch the next command to the event's stream_id
  dispatch(
    CheckInventory, 
    product_id: event.payload.product_id,
    quantity: event.payload.quantity
  )
end

You can also dispatch commanda to other streams. For example for starting concurrent workflows.

# dispatch a command to a new custom-made stream_id
dispatch(CheckInventory, event.payload).to("cart-#{Time.now.to_i}")

# Or use Sourced.new_stream_id
dispatch(CheckInventory, event.payload).to(Sourced.new_stream_id)

# Or start a new stream and dispatch commands to another actor
dispatch(:notify, message: 'hello!').to(NotifierActor)

.reaction block with actor state

.reaction blocks receive the actor state, which is derived by applying past events to it (same as when handling commands).

# Define an event handler to evolve state
event ItemAdded do |state, event|
  state[:item_count] += 1
end

# Now react to it and check state
reaction ItemAdded do |state, event|
  if state[:item_count] > 30
    dispatch NotifyBigCart
  end
end

.reaction with state for all events

If the event name or class is omitted, the .reaction macro registers reaction handlers for all events already registered for the actor with the .event macro, minus events that have specific reaction handlers defined.

# wildcard reaction for all evolved events
reaction do |state, event|
  if state[:item_count] > 30
    dispatch NotifyBigCart
  end
end

.reaction for multiple events

reaction ItemAdded, InventoryChecked do |state, event|
  # etc
end

It also works with symbols, for messages that have been defined as symbols (ex event :item_added)

reaction :item_added, InventoryChecked do |state, event|
  # etc
end

Causation and correlation

When a command produces events, or when an event makes a reactor dispatch a new command, the cause-and-effect relationship between these messages is tracked by Sourced in the form of correlation_id and causation_id properties in each message’s metadata.

sourced-causation-correlation

This helps the system keep a full audit trail of the cause-and-effect behaviour of the entire system.

CleanShot 2025-11-11 at 23 59 40

Background vs. foreground execution

By default Sourced processes commands and events asynchronously through background workers. Each reactor picks up messages at its own pace — the system is eventually consistent.

Sometimes you need synchronous, all-or-nothing execution: a web request handler that must know the full outcome before responding, or a test that needs deterministic behaviour. Sourced::Unit provides this.

Sourced::Unit

A Unit wires one or more reactors together and runs the full command → event → reaction → command chain inside a single backend transaction, using breadth-first traversal of the message graph. If any step raises, the entire transaction rolls back.

unit = Sourced::Unit.new(
  OrderActor,
  PaymentActor,
  InventoryProjector,
  backend: Sourced.config.backend
)

cmd = PlaceOrder.new(stream_id: 'order-1', payload: { amount: 100 })
results = unit.handle(cmd)

Messages produced by one reactor are immediately routed to any other reactor in the unit that handles them — no background workers needed.

Extracting results

Unit#handle returns a Results object you can query per reactor class.

results = unit.handle(cmd)

# Hash of { instance => [events] } for a given reactor
results[OrderActor].each do |instance, events|
  puts "#{instance.id}: #{events.map(&:type)}"
end

# Flat list of events
results.events_for(OrderActor)
# => [OrderPlaced, ...]

Skipping command persistence

By default every message (commands and events) is written to the store. Pass persist_commands: false to write only events.

unit = Sourced::Unit.new(OrderActor, backend: backend, persist_commands: false)
unit.handle(cmd) # only events are persisted; commands still flow through the chain

This is useful when commands are transient intents that don’t need an audit trail.

Loop detection

The BFS traversal is capped at 100 iterations by default. If a reaction dispatches a command whose event triggers the same reaction, the unit raises Sourced::Unit::InfiniteLoopError before running away.

unit = Sourced::Unit.new(LoopyActor, backend: backend, max_iterations: 10)
unit.handle(cmd)
# => raises Sourced::Unit::InfiniteLoopError after 10 steps

ACK tracking

After the BFS completes, the unit ACKs every handled message for each reactor’s consumer group. This means background workers won’t re-process messages that the unit already handled.

When to use Unit vs. background workers

  Sourced::Unit Background workers
Consistency Immediate (single transaction) Eventual
Failure mode All-or-nothing rollback Per-message retry / stop
Concurrency Single-threaded BFS Concurrent across streams
Use case Web request handlers, tests, scripts Long-running workflows, side-effects

You can combine both: use a Unit for the synchronous core of a request, while other reactors pick up the same events asynchronously in the background.

Actions

Actions are the return values of reactor .handle methods. They tell the runtime (Unit or background worker) what side-effects to perform. Each persistable action class implements an #execute(backend, source_message) method that correlates messages and persists them via the backend.

Action Description #execute behaviour
Actions::AppendAfter Append to a stream with optimistic locking (sequence check) Correlate → backend.append_to_stream
Actions::AppendNext Append to stream(s), auto-incrementing sequence Correlate → backend.append_next_to_stream per stream
Actions::Schedule Schedule messages for future delivery Correlate → backend.schedule_messages
Actions::Sync Run a synchronous side-effect (cache write, API call) Call the work block, return nil
Actions::OK Acknowledge the message (no persistence)
Actions::RETRY Tell the runtime to retry this message later
Actions::Ack ACK an arbitrary message by ID

OK, RETRY, and Ack are caller-specific signals — they don’t implement #execute.

# Inside a reactor's .handle method:
def self.handle(message)
  started = Order::Started.build(message.stream_id)
  [Sourced::Actions::AppendNext.new([started])]
end

Projectors

Projectors react to events published by actors and update views, search indices, caches, or other representations of current state useful to the app. They can both react to events as they happen in the system, and also “catch up” to past events. Sourced keeps track of where in the global event stream each projector is.

From the outside-in, projectors are classes that implement the Reactor interface.

Sourced ships with two ready-to-use projectors, but you can also build your own.

State-stored projector

A state-stored projector fetches initial state from storage somewhere (DB, files, API), and then after reacting to events and updating state, it can save it back to the same or different storage.

class CartListings < Sourced::Projector::StateStored
  # Fetch listing record from DB, or new one.
  state do |id|
    CartListing.find_or_initialize(id)
  end

  # Evolve listing record from events
  event Carts::ItemAdded do |listing, event|
    listing.total += event.payload.price
  end

  # Sync listing record back to DB
  sync do |state:, events:, replaying:|
    state.save!
  end
end

Event-sourced projector

An event-sourced projector fetches initial state from past events in the event store, and then after reacting to events and updating state, it can save it to a DB table, a file, etc.

class CartListings < Sourced::Projector::EventSourced
  # Initial in-memory state
  state do |id|
    { id:, total: 0 }
  end

  # Evolve listing record from events
  event Carts::ItemAdded do |listing, event|
    listing[:total] += event.payload.price
  end

  # Sync listing record to a file
  sync do |state:, events:, replaying:|
    File.write("/listings/#{state[:id]}.json", JSON.dump(state)) 
  end
end

Registering projectors

Like any other reactor, projectors need to be registered for background workers to route events to them.

# In your app's configuration
Sourced.register(CartListings)

Reacting to events and scheduling the next command from projectors

Sourced projectors can define .reaction handlers that will be called after evolving state via their .event handlers, in the same transaction.

This can be useful to implement TODO List patterns where a projector persists projected data, and then reacts to the data update using the data to schedule the next command in a workflow.

CleanShot 2025-05-30 at 18 43 01

class ReadyOrders < Sourced::Projector::StateStored
  # Fetch listing record from DB, or new one.
  state do |id|
    OrderListing.find_or_initialize(id)
  end

  event Orders::ItemAdded do |listing, event|
    listing.line_items << event.payload
  end
  
  # Evolve listing record from events
  event Orders::PaymentConfirmed do |listing, event|
    listing.payment_confirmed = true
  end

  event Orders::BuildConfirmed do |listing, event|
    listing.build_confirmed = true
  end
  
  # Sync listing record back to DB
  sync do |state:, events:, replaying:|
    state.save!
  end
  
  # If a listing has both the build and payment confirmed,
  # automate dispatching the next command in the workflow
  reaction do |listing, event|
    if listing.payment_confirmed? && listing.build_confirmed?
      dispatch Orders::Release, **listing.attributes
    end
  end
end

Projectors can also define .reaction event_class do |state, event| to react to specific events, or reaction event1, event2 to react to more than one event with the same block.

Skipping projector reactions when replaying events

When a projector’s offsets are reset (so that it starts re-processing events and re- building projections), Sourced skips invoking a projector’s .reaction handlers. This is because building projections should be deterministic, and rebuilding them should not trigger side-effects such as automations (we don’t want to call 3rd party APIs, send emails, or just dispatch the same commands over and over when rebuilding projections).

To do this, Sourced keeps track of each consumer groups’ highest acknowledged event sequence. When a consumer group is reset and starts re-processing past events, this sequence number is compared with each event’s sequence, which tells us whether the event has been processed before.

Concurrency model

Concurrency in Sourced is achieved by explicitely modeling it in.

Sourced workers process messages by acquiring locks on [reactor group ID][stream ID]. For example "CartActor:cart-123"

This means that all events for a given reactor/stream are processed in order, but events for different streams can be processed concurrently. You can define workflows where some work is done concurrently by modeling them as a collaboration of streams.

Single-stream sequential execution

In the following (simplified!) example, a Holiday Booking workflow is modelled as a single stream (“Actor”). The infrastructure makes sure these steps are run sequentially.

sourced-concurrency-single-lane

The Actor glues its steps together by reacting to events emitted by the previous step, and dispatching the next command.

class HolidayBooking < Sourced::Actor
  # State and details omitted...
  
  command :start_booking do |state, cmd|
    event :booking_started
  end
  
  reaction :booking_started do |event|
    dispatch :book_flight
  end
  
  command :book_flight do |state, cmd|
    event :flght_booked
  end
  
  reaction :flight_booked do |event|
    dispatch :book_hotel
  end
  
  command :book_hotel do |state, cmd|
    event :hotel_booked
  end
  
  # Define event handlers if you haven't...
  event :booking_started, # ..etc
  event :flight_booked, # ..etc
end

Multi-stream concurrent execution

In this other example, the same workflow is split into separate streams/actors, so that Flight and Hotel bookings can run concurrently from each other. When completed, they each notify the parent Holiday actor, so the whole process coalesces into a sequential operation again.

sourced-concurrency-multi-lane

# An actor dispatches a message to different stream
# messages for different streams are processed concurrently
reaction BookingStarted do |state, event|
  dispatch(BookHotel).to("#{event.stream_id}-hotel")
end

Units of work

CleanShot 2025-11-15 at 14 38 05

The diagram shows the units of work in an example Sourced workflow. The operations within each of the red boxes are protected by a combination of transactions and locking strategies on the consumer group + stream ID, so they are isolated from other concurrent processing. They can be said to be immediately consistent.
The data-flow between these boxes is propagated asynchronously by Sourced’s infrastructure so, relative to each other, the entire system is eventually consistent.

These transactional boundaries are guarded by the same locks that enforce the concurrency model, so that for example the same message can’t be processed twice by the same Reactor (workflow, projector, etc).

Durable workflows

There’s a Sourced::DurableWorkflow class that can be subclassed to define Reactors with a synchronous-looking API. This is work in progress.

class BookHoliday < Sourced::DurableWorkflow
  # This method can be called like a regular method
  # The methods inside also have blocking semantics
  # but they're in fact event-sourced, and will be
  # retried on failure until the booking completes.
  # Methods that were succesful will be idempotent on retry
  def execute(flight_info, hotel_info)
    flight = book_flight(flight_info)
    hotel = book_hotel(hotel_info)
    confirm_booking(flight, hotel)
  end
  
  # The .durable macro turns a regular method
  # into an event-sourced workflow
  durable def book_flight(info)
    FlightsAPI.book(info)
  end
  
  durable def book_hotel(info)
    HotelsAPI.book(info)
  end
  
  durable def confirm_booking(flight, hotel)
    # etc,
  end
end

These executions will be handed off to the runtime to be run by one or more workers, while preserving ordering. You can optionally wait for a result.

result = BookHoliday.execute(flight_info, hotel_info).wait.output
# Confirmed booking, or whatever error result your code returns

Events for the full execution are recorded to the backend.
CleanShot 2025-11-13 at 13 48 27@2x

Durable workflows must be registered with the runtime, like any other Reactor.

Sourced.register BookHoliday

Handler DSL

The Sourced::Handler mixin provides a lighter-weight DSL for simple reactors.

class OrderTelemetry
  include Sourced::Handler
  
  # Handle these Order events
  # and log them
  on Order::Started do |event|
    Logger.info ['order started', event.stream_id]
    []
  end
  
  on Order::Placed do |event|
    Logger.info ['order placed', event.stream_id]
    []
  end
end

# Register it
Sourced.register OrderTelemetry

Handlers can optionally define the :history argument. The runtime will provide the full message history for the stream ID being handled.

on Order::Placed do |event, history:|
  total = history
    .filter { |e| Order::ProductAdded === e }
    .reduce(0) { |n, e| n + e.payload.price }
  
  if total > 10000
    return [Order::AddDiscount.build(event.stream_id, amount: 100)]
  end
  
  []
end

It also supports multiple event types, for generic handling.

on Order::Placed, Order::Complete do |event|
  Logger.info "received event #{event.inspect}"
  []
end

Command methods for Actors

The optional Sourced::CommandMethods mixin allows invoking an Actor’s commands as regular methods.

CommandMethods automatically generates instance methods from command definitions,
allowing you to invoke commands in two ways:

  1. In-memory version (e.g., actor.start(name: 'Joe'))
    • Validates the command and executes the decision handler
    • Returns a tuple of [cmd, new_events]
    • Does NOT persist events to backend
  2. Durable version (e.g., actor.start!(name: 'Joe'))
    • Same as in-memory, but also appends events to backend
    • Raises FailedToAppendMessagesError if backend fails

Include the module in an Actor and define commands normally:

class MyActor < Sourced::Actor
  include Sourced::CommandMethods

  command :create_item, name: String do |state, cmd|
    event :item_created, cmd.payload
  end
end

actor = MyActor.new(id: 'actor-123')
cmd, events = actor.create_item(name: 'Widget')  # In-memory
cmd, events = actor.create_item!(name: 'Widget') # Persists to backend

Orchestration and choreography

Orchestration

Orchestration is when the flow control of a multi-collaborator workflow is centralised into a single entity. This can be achieved by having one Actor coordinate the communication by reacting to events and sending commands to other actors.

class HolidayBooking < Sourced::Actor
  state do |id|
    BookingState.new(id)
  end
  
  command StartBooking do |booking, cmd|
    # validations, etc
    event BookingStarted, cmd.payload
  end
  
  event BookingStarted
  
  # React to BookingStarted and start sub-workflows
  reaction BookingStarted do |booking, event|
    dispatch(HotelBooking::Start)
  end
  
  # React to events emitted by sub-workflows
  reaction HotelBooking::Started do |booking, event|
    dispatch(ConfirmHotelBooking, event.payload)
  end
  
  command ConfirmHotelBooking do |booking, cmd|
    unless booking.hotel.booked?
      event HotelBookingConfirmed, cmd.payload
    end
  end
  
  event HotelBookingConfirmed do |booking, event|
    # update booking state
    booking.confirm_hotel(event.payload)
  end
end

This is a verbose step-by-step choreography, but it can be made more succint by ommiting the mirroring of commands/events, if needed (or by using the Reactor Interface directly).

TODO: a way for Actors to initialise their internal state with event attributes other than the stream_id. For example, events may carry a booking_id for the overall workflow.

Choreography

Choreography is when each component reacts to other components’ events without centralised control. The overall workflow “emerges” from this collaboration.

class HotelBooking < Sourced::Actor
  # The HotelBooking defines its own
  # reactions to booking events
  reaction HolidayBooking::StartBooking do |state, event|
    # dispatch a command to itself to start its own life-cycle
    dispatch Start, event.payload
  end
  
  command Start do |state, cmd|
    # validations, etc
    # other Actors in the choreography
    # can choose to react to events emitted here
    event Started, cmd.payload
  end
  
  event Started do |state, event|
    # update state, etc
  end
end

Appending and reading messages

Appending messages without optimistic locking

Use Backend#append_next_to_stream to append messages to a stream, with no questions asked.

message = ProductAdded.build('order-123', product_id: 123, price: 100)
Sourced.config.backend.append_next_to_stream('order-123', [message])

# Shortcut:
Sourced.dispatch(message)

Appending messages with optimistic locking

Using Backend#append_to_stream, the backend expects the new messages seq property (sequence number) to be greater than the last message in storage for the same stream. This is to catch concurrent writes where a different client or thread may append to the stream while your code was preparing for it.

# Your code must make sure to increment sequence numbers
past_events = Sourced.config.backend.read_stream('order-123')
last_known_seq = past_events.last&.seq # ex. 10
# Instantiate new messages and make sure to increment their sequences
message = ProductAdded.new(
  stream_id: 'order-123', 
  seq: last_known_seq + 1, # <== incremented sequence
  payload: { product_id: 123, price: 100 }
)

# This will raise an exception if there's already a message
# for this stream with this sequence number in storage.
Sourced.backend.append_to_stream('order-123', [message])

Sourced::Actor classes do this incrementing automatically when they produce new messages.

Scheduling messages in the future

You can append messages to a separate log, with a schedule time. Sourced workers will periodically poll this log and move these messages into the main log at the right time.

message = ProductAdded.build('order-123', product_id: 123, price: 100)
Sourced.config.backend.schedule_messages([message], at: Time.now + 20)

Actor reactions can use the #dispatch and #at helpers to schedule commands to run at a future time.

reaction ProductAdded do |order, event|
  dispatch(NotifyNewProduct).at(Time.now + 20)
end

Replaying messages

You can use the backend API to reset offsets for a specific consumer group, which will cause workers to start replaying messages for that group.

Sourced.config.backend.reset_consumer_group(ReadyOrder)

See below for other consumer lifecycle methods.

The Reactor Interface

All built-in Reactors (Actors, Projections) build on the low-level Reactor Interface.

The runtime invokes .handle_batch on each reactor, passing a batch of [message, replaying] pairs from the same stream. The Sourced::Consumer module provides a default handle_batch that delegates to a per-message .handle method, so simple reactors only need to implement .handle.

class MyReactor
  extend Sourced::Consumer
  
  # The runtime will poll and hand over messages of this type
  def self.handled_messages = [Order::Started, Order::Placed]
  
  # The default handle_batch (from Consumer) calls this per message.
  # Return an Array of one or more Actions.
  def self.handle(new_message)
    actions = []
    
    # Just acknowledge new_message
    actions << Sourced::Actions::OK
    
    # Append these new messages to the event store
    # Sourced will automatically increment the stream's sequence number
    # (ie. no optimistic locking)
    started = Order::Started.build(new_message.stream_id)
    actions << Sourced::Actions::AppendNext.new([started])
    
    # Append these new messages to the event store.
    # The messages are expected to have a :seq incremented after new_message.seq
    # Messages will fail to append if other messages have been appended
    # with overlapping sequence numbers (optimistic locking)
    started = Order::Started.new(stream_id: new_message.stream_id, seq: new_message.seq + 1)
    actions << Sourced::Actions::AppendAfter.new(new_message.stream_id, [started])
    
    # Tell the runtime to retry this message
    actions << Sourced::Actions::RETRY
    
    actions
  end
end

You can implement your own low-level reactors following the interface above. Then register them as normal.

Sourced.register MyReactor

Batch processing

Workers fetch up to worker_batch_size messages per lock cycle (default: 50). Built-in reactors optimize batch processing automatically:

  • Projector::StateStored: loads state once, evolves all batch messages, syncs once (instead of N state loads + N syncs).
  • Projector::EventSourced: evolves history once, syncs once (instead of N x O(H) evolves + N syncs).
  • Actor: replaying messages return OK immediately; live messages are handled individually.

For custom reactors, you can override handle_batch directly for full control:

class MyBatchReactor
  extend Sourced::Consumer

  def self.handled_messages = [Order::Started, Order::Placed]

  # Override handle_batch for custom batch processing.
  # Must return Array of [actions, source_message] pairs.
  def self.handle_batch(batch)
    batch.map do |message, replaying|
      actions = process(message)
      [actions, message]
    end
  end
end

Individual reactors can override the global worker_batch_size via the consumer DSL:

class OrderProjector < Sourced::Projector::StateStored
  consumer do |c|
    c.batch_size = 100
  end
end

When set, the reactor’s batch_size takes precedence over the global worker_batch_size. When not set (default), the global value is used.

Partial ACK on failure

When a message raises mid-batch, Sourced ACKs up to the last successfully processed message instead of retrying the entire batch. The failed message and any subsequent messages are retried in the next batch. This is handled automatically by each_with_partial_ack in the Consumer module, which all built-in reactor types use.

Actors and plain Consumer reactors process each message independently (a new instance per message), so partial ACK is straightforward and safe with any batch size.

Projectors use an evolve-all-sync-once optimization: all batch messages are evolved into a single instance’s state, then synced once. Reactions are processed one by one — if a reaction fails mid-batch, all previously successful messages (including their reactions and a correct partial sync) are ACKed, and only the failed message onward is retried. On partial failure, the projector automatically rebuilds a fresh instance with only the successfully processed messages to produce a correct sync (via the on_partial_sync block in sync_and_react).

class PaymentProcessor < Sourced::Projector::StateStored
  consumer do |c|
    c.batch_size = 10
  end

  reaction PaymentStarted do |state, evt|
    # If this HTTP call succeeds for messages 1-3 but fails on message 4,
    # messages 1-3 are fully ACKed (reactions + sync). Message 4 onward is retried.
    response = PaymentGateway.post(:process_payment, state[:payment_info])
    if response.ok?
      dispatch ConfirmPayment, payment_id: response.body[:payment_id]
    else
      dispatch RejectPayment, errors: response.body[:errors]
    end
  end
end

Reactors that require message history

Reactors that declare the :history keyword in their .handle_batch (or .handle) signature will be provided the full message history for the stream being handled.

This is how event-sourced Actors are implemented.

def self.handle(new_message, history:)
  # evolve state from history,
  # handle command, return new events, etc
  []
end

:replaying flag

Your .handle method can also declare a :replaying boolean, which tells the reactor whether the stream is replaying events, or handling new messages. Reactors use this to run or omit side-effects (for example, replaying Projectors don’t run reaction blocks).

def self.handle(new_message, history:, replaying:)
  if replaying
    # Omit side-effects
  else
    # Trigger side-effects
  end
end

Testing

There’s a couple of experimental RSpec helpers that allow testing Sourced reactors in GIVEN, WHEN, THEN style.

GIVEN existing events A, B, C
WHEN new command D is sent
THEN I expect new events E and F

Single reactor

Use with_reactor to unit-test the life-cycle of a single reactor.

require 'sourced/testing/rspec'

RSpec.describe Order do
  include Sourced::Testing::RSpec

  it 'adds product to order' do
    with_reactor(Order, 'order-123')
      .when(Order::AddProduct, product_id: 1, price: 100)
      .then(Order::ProductAdded.build('order-123', product_id: 1, price: 100))
  end

  it 'is a noop if product already in order' do
    with_reactor(Order, 'order-123')
      .given(Order::ProductAdded, product_id: 1, price: 100)
      .when(Order::AddProduct, product_id: 1, price: 100)
      .then([])
  end
end

#then can also take a block, which will be given the low level Sourced::Actions objects returned by your .handle() interface.

You can use this block to test reactors that trigger side effects.

with_reactor(Webhooks, 'webhook-1')
  .when(Webooks::Dispatch, name: 'Joe')
  .then do |actions|
    expect(api_request).to have_been_requested
  end

You can mix argument and block assertions with .then()

with_reactor(Webhooks, 'webhook-1')
  .when(Webooks::Dispatch, name: 'Joe')
  .then do |_|
    expect(api_request).to have_been_requested
  end
  .then(Webhooks::Dispatched, reference: 'webhook-abc')

For reactors that have sync blocks for side-effects (ex. Projectors), use #then! to trigger those side-effects and assert their results.

with_reactor(PlacedOrders, 'order-123')
  .given(Order::Started)
  .given(Order::ProductAdded, product_id: 1, price: 100, units: 2)
  .given(Order::Placed)
  .then! do |_|
    expect(OrderRecord.find('order-123').total).to eq(200)
  end

Multiple reactors (A.K.A “Sagas”)

Use with_reactors to test the collaboration of multiple reactors sending and picking up eachother’s messages.

it 'tests collaboration of reactors' do
  order_stream = 'actor-1'
  payment_stream = 'actor-1-payment'
  telemetry_stream = Testing::Telemetry::STREAM_ID

  # With these reactors
  with_reactors(Order, Payment, Telemetry)
    # GIVEN that these events exist in history
    .given(Order::Started.build(order_stream, name: 'foo'))
    # WHEN I dispatch this new command
    .when(Order::StartPayment.build(order_stream))
    # Then I expect
    .then do |stage|
      # The different reactors collaborated and
      # left this message trail behind
      # Backend#messages is only available in the TestBackend
      expect(stage.backend.messages).to match_sourced_messages([
        Order::Started.build(order_stream, name: 'foo'), 
        Order::StartPayment.build(order_stream), 
        Order::PaymentStarted.build(order_stream), 
        Telemetry::Logged.build(telemetry_stream, source_stream: order_stream),
        Payment::Process.build(payment_stream), 
        Payment::Processed.build(payment_stream),
        Telemetry::Logged.build(telemetry_stream, source_stream: payment_stream),
      ])
    end
end

with_reactors sets up its own in-memory backend, so you can test multi-reactor workflows in terms of what messages they produce without database or network requests, and there’s no need for database setup or tear-down. Just test the behaviour!

The .then block can take an optional second argument, which will be passed as only the new messages produced by the reactors, appended after any messages setup with given.

.then do |stage, new_messages|
  expect(new_messages).to match_sourced_messages([...])
end

Setup

You’ll need the pg and sequel gems.

gem 'sourced', github: 'ismasan/sourced'
gem 'pg'
gem 'sequel'

Create a Postgres database.
For now Sourced uses the Sequel gem. In future there’ll be an ActiveRecord adapter with migrations support.

Configure and migrate the database.

Sourced.configure do |config|
  config.backend = Sequel.connect(ENV.fetch('DATABASE_URL'))

  # Worker and housekeeping options (shown with defaults)
  config.worker_count = 2                       # Number of worker fibers
  config.worker_batch_size = 50                 # Messages fetched per lock cycle (batch processing)
  config.housekeeping_count = 1                 # Number of housekeeper fibers
  config.housekeeping_interval = 3              # Seconds between scheduling cycles
  config.housekeeping_heartbeat_interval = 5    # Seconds between worker heartbeats
  config.housekeeping_claim_ttl_seconds = 120   # Seconds before stale claims are reaped
end

Sourced.config.backend.install unless Sourced.config.backend.installed?

These options are used by both Sourced::Supervisor and the Falcon integration. When running workers alongside a web server (Falcon, or any other Async-compatible server), these control how many worker and housekeeper fibers are spawned per OS process. The worker_batch_size controls how many messages from the same stream are fetched and processed in a single lock cycle (see Batch processing).

Generating Sequel migrations

If your app already uses Sequel’s migrator, you can copy Sourced’s migration into your migrations directory instead of using backend.install.

backend = Sourced.config.backend
backend.copy_migration_to("db/migrations")
# => writes db/migrations/001_create_sourced_tables.rb

Or use a block to control the file name (e.g. timestamped migrations):

backend.copy_migration_to do
  "db/migrations/#{Time.now.strftime('%Y%m%d%H%M%S')}_create_sourced_tables.rb"
end

The generated file is a standard Sequel.migration { change { ... } } that works with Sequel::Migrator. It respects the prefix and schema options passed when configuring the backend:

Sourced.configure do |config|
  db = Sequel.connect(ENV.fetch('DATABASE_URL'))
  config.backend = Sourced::Backends::SequelBackend.new(db, prefix: 'myapp', schema: 'events')
end

# Migration will create tables like events.myapp_messages, events.myapp_streams, etc.
Sourced.config.backend.copy_migration_to("db/migrations")

Register your Actors and Reactors.

Sourced.register(Leads::Actor)
Sourced.register(Leads::Listings)
Sourced.register(Webooks::Dispatcher)

Running workers as a separate process

When using a web server that doesn’t share Sourced’s Async event loop (e.g. Puma), or in non-web applications, run workers as a standalone process using Sourced::Supervisor:

# worker.rb
require_relative 'config/environment'
# start workers with 10 worker fibers or threads per OS process
# depending on Sourced.config.executor (:async, :thread, or custom)
Sourced::Supervisor.start(count: 10)

This requires managing two processes in deployment: one for your web server, one for workers.

Running workers with Falcon

If you use Falcon as your web server, you can run Sourced workers in the same process. Both Falcon and Sourced use the Async gem, so workers run as lightweight fibers alongside web requests — no separate worker process needed.

This requires Sourced.config.executor = :async (the default). Do not change it to :thread when using Falcon, as workers must run as fibers to share Falcon’s event loop.

Add a ./falcon.rb file to the root of your app, which requieres sourced/falcon (no hard dependency on Falcon in sourced.gemspec):

# falcon.rb
#!/usr/bin/env falcon-host
require 'bundler/setup'
require 'sourced/falcon'
require_relative 'config/environment' # <= YOUR app setup, Sourced.configure, register reactors, etc.

service "my-app" do
  include Sourced::Falcon::Environment
  include Falcon::Environment::Rackup    # loads config.ru

  # -- Falcon / Async options --
  url "http://[::]:9292"                 # Server bind URL (default: "http://[::]:9292")
  count 2                                # Number of OS processes to fork (default: Etc.nprocessors)
  timeout 30                             # Connection timeout in seconds (default: nil)
  verbose false                          # Enable verbose logging (default: false)
  cache true                             # Enable HTTP response caching (default: false)

  # Sourced worker options default to Sourced.config values.
  # Override per-service if needed:
  # sourced_worker_count 4
  # sourced_worker_batch_size 50
  # sourced_housekeeping_count 1
  # sourced_housekeeping_interval 3
  # sourced_housekeeping_heartbeat_interval 5
  # sourced_housekeeping_claim_ttl_seconds 120
end

Run with:

falcon host

Total Sourced workers = count * sourced_worker_count. For example, count 2 and sourced_worker_count 4 gives 8 worker fibers across 2 OS processes, all competing for events via database locks (same as running multiple Supervisors).

Set config.worker_count = 0 to run Falcon as a web-only process with no Sourced workers. This is useful if you want to run workers separately via Sourced::Supervisor while still using Falcon for HTTP, or if you explicitely don’t want workers adding unnecessary pressure on the database.

On shutdown (Ctrl-C / SIGTERM), Falcon signals workers to stop. Their poll loops exit gracefully with no stale claims.

How worker dispatch works

Instead of each worker polling the database independently, Sourced uses a signal-driven dispatch model. Workers block on a shared WorkQueue waiting for signals, and two sources feed that queue:

  1. Backend notifier (real-time): The backend exposes a generic pub/sub notifier (PGNotifier for PostgreSQL, InlineNotifier for others). When messages are appended, the backend publishes a messages_appended event with the message types. When a stopped reactor is resumed, it publishes a reactor_resumed event with the consumer group ID. For PostgreSQL, these are delivered over PG LISTEN/NOTIFY; for other backends, they fire synchronously.

  2. CatchUpPoller (safety net): A single fiber pushes all registered reactors into the WorkQueue every few seconds (default 5). This covers startup catch-up, missed notifications, offset resets, and PG reconnections.

The Dispatcher subscribes to the backend notifier and maps these events to reactor classes, pushing them onto the WorkQueue. For messages_appended, it resolves message types to interested reactors via an eager lookup table. For reactor_resumed, it resolves the group ID directly to the reactor class.

When a worker pops a reactor from the queue, it enters a bounded drain loop: it processes up to max_drain_rounds batches (default 10) for that reactor, then re-enqueues the reactor if it hit the cap. This ensures no single reactor monopolizes workers, and multiple workers can drain the same reactor concurrently on different streams (via SKIP LOCKED).

The WorkQueue caps pending entries per reactor (equal to the worker count), so notification bursts are coalesced without queue bloat.

Backend notifier ────┐
  (PG LISTEN or       ├──▶ WorkQueue (capped/reactor) ──▶ Worker fibers
   inline pub/sub)    │         │                           │
CatchUpPoller (5s) ──┘         │◀── re-enqueue ────────────┘
                                     (if max_drain_rounds hit)

This design preserves natural back-pressure (workers only fetch when ready), eliminates polling-interval lag for new messages, and handles both real-time and catch-up work in a single operating mode.

Custom attribute types and coercions.

Define a module to hold your attribute types using Plumb

module Types
  include Plumb::Types
  
  # Your own types here.
  CorporateEmail = Email[/@apple\.com^/]
end

Then you can use any built-in Plumb types, as well as your own, when defining command or event structs (or any other data structures for your app).

UpdateEmail = Sourced::Command.define('accounts.update_email') do
  attribute :email, Types::CorporateEmail
end

Error handling

Sourced workflows are eventually-consistent by default. This means that commands and events are handled in background processes, and any exceptions raised can’t be immediatly surfaced back to the user (and, there might not be a user anyway!).

Most “domain errors” in command handlers should be handled by the developer and recorded as domain events, so that the domain can react and/or compensate for them.

To handle true exceptions (code or data bugs, network or IO exceptions) Sourced provides a default error strategy that will “stop” the affected consumer group (the Postgres backend will log the exception and offending message in the consumer_groups table).

You can configure the error strategy with retries and exponential backoff, as well as on_retry and on_stop callbacks.

Sourced.configure do |config|
  # config.backend = Sequel.connect(ENV.fetch('DATABASE_URL'))
  config.error_strategy do |s|
    s.retry(
      # Retry up to 3 times
      times: 3,
      # Wait 5 seconds before retrying
      after: 5, 
      # Custom backoff: given after=5, retries in 5, 10 and 15 seconds before stopping
      backoff: ->(retry_after, retry_count) { retry_after * retry_count }
    )
    
    # Trigger this callback on each retry
    s.on_retry do |n, exception, message, later|
      LOGGER.info("Retrying #{n} times")
    end

    # Finally, trigger this callback
    # after all retries have failed and the consumer group is stopped.
    s.on_stop do |exception, message|
      Sentry.capture_exception(exception)
    end
  end
end

Custom error strategy

You can also configure your own error strategy. It must respond to #call(exception, message, group)

CUSTOM_STRATEGY = proc do |exception, message, group|
  case exception
  when Faraday::Error
    group.retry(Time.now + 10)
  else
    group.stop(exception)
  end
end

Sourced.configure do |config|
  # Configure backend, etc
  config.error_strategy = CUSTOM_STRATEGY
end

Stopping and starting consumer groups.

Sourced.config.backend provides an API for stopping and starting consumer groups. For example to resume groups that were stopped by raised exceptions, after the error has been corrected.

Sourced.config.backend.stop_consumer_group('Carts::Listings')
Sourced.config.backend.start_consumer_group('Carts::Listings')

Topology

Sourced.topology returns a flat array of node structs describing the message flow graph of all registered reactors. This is useful for building visualizations, documentation, or tooling that needs to understand how commands, events, automations and read models connect.

Sourced.register(Cart)
Sourced.register(CartListings)

nodes = Sourced.topology
# => [CommandNode, EventNode, AutomationNode, ReadModelNode, ...]

The result is memoized. Call Sourced.reset_topology to clear the cache after registering new reactors.

Node types

CommandNode

Represents a command handled by an actor. produces lists the event type strings that the command handler can emit (extracted via static analysis).

# Fields: type, id, name, group_id, produces, schema
{ type: "command", id: "carts.add_item", name: "Carts::AddItem",
  group_id: "Carts::Cart", produces: ["carts.item_added"],
  schema: { "type" => "object", "properties" => { ... } } }

EventNode

Represents an event type. Deduplicated across reactors — the first reactor to reference an event owns its group_id.

# Fields: type, id, name, group_id, produces, schema
{ type: "event", id: "carts.item_added", name: "Carts::ItemAdded",
  group_id: "Carts::Cart", produces: [],
  schema: { "type" => "object", "properties" => { ... } } }

AutomationNode

Represents a .reaction block. consumes lists what triggers the reaction (event types for actors, readmodel IDs for projectors). produces lists the command type strings dispatched by the reaction (extracted via static analysis).

# Fields: type, id, name, group_id, consumes, produces
# Actor reaction — consumes an event directly:
{ type: "automation", id: "carts.item_added-Carts::Cart-aut",
  name: "reaction(Carts::ItemAdded)", group_id: "Carts::Cart",
  consumes: ["carts.item_added"], produces: ["carts.check_inventory"] }

# Projector reaction — consumes the readmodel:
{ type: "automation", id: "carts.item_added-Carts::CartListings-aut",
  name: "reaction(Carts::ItemAdded)", group_id: "Carts::CartListings",
  consumes: ["carts.cart_listings-rm"], produces: ["carts.notify_admin"] }

ReadModelNode

Represents a projector as a consumer of events. consumes lists the event types the projector evolves. produces lists the IDs of any automation nodes derived from the projector’s reactions.

# Fields: type, id, name, group_id, consumes, produces, schema
{ type: "readmodel", id: "carts.cart_listings-rm",
  name: "Carts::CartListings", group_id: "Carts::CartListings",
  consumes: ["carts.item_added", "carts.placed"],
  produces: ["carts.item_added-Carts::CartListings-aut"],
  schema: {} }

Catch-all reactions

When a reactor uses a catch-all reaction do ... end (no event argument), the topology collapses all covered events into a single automation node named after the reactor, instead of one automation per event.

class ReadyOrders < Sourced::Projector::StateStored
  event Orders::PaymentConfirmed do |state, event|
    # ...
  end

  event Orders::BuildConfirmed do |state, event|
    # ...
  end

  # Catch-all: reacts to all evolved events
  reaction do |state, event|
    if state[:ready]
      dispatch Orders::Release
    end
  end
end

This produces a single automation node:

{ type: "automation", id: "ready_orders-aut",
  name: "reaction(ReadyOrders)", group_id: "ReadyOrders",
  consumes: ["ready_orders-rm"], produces: ["orders.release"] }

Rather than separate automation nodes for PaymentConfirmed and BuildConfirmed.

Rails integration

Soon.

Sourced vs. ActiveJob

ActiveJob is a great way to handle background jobs in Rails. It’s simple and easy to use. However, it’s not designed for event sourcing.
ActiveJob backends (and other job queues) are optimised for parallel processing of jobs, this means that multiple jobs for the same business entity may be processed in parallel without any ordering guarantees.

sourced-job-queue-diagram

Sourced’s concurrency model is designed to process events for the same entity in order, while allowing for parallel processing of events for different entities.

sourced-ordered-streams-diagram

Gotchas

By default Sourced processes commands and events asynchronously through
background workers. This can be confusing if you expect reactions to run
automatically when you issue commands.

For synchronous, all-or-nothing execution use Sourced::Unit,
which runs the full command → event → reaction chain inside a single transaction.

# Synchronous execution with Unit
unit = Sourced::Unit.new(Chat, backend: Sourced.config.backend)
results = unit.handle(SendMessage.new(stream_id: 'chat-123', payload: { content: query }))
results.events_for(Chat) # => [MessageSent, ...]

If you’re using the Sourced::CommandMethods mixin directly (without a Unit),
note that it persists events but does not trigger reactions. You’d need to
explicitly call #react after issuing commands.

chat = Sourced.load(Chat, 'chat-123')
# Persists but does not call reactions
_cmd, events = chat.send_message!(content: query)
# Have to react manually
commands = chat.react(events)

Installation

Install the gem and add to the application’s Gemfile by executing:

$ bundle add sourced

Note: this gem is under active development, so you probably want to install from Github:
In your Gemfile:

$ gem 'sourced', github: 'ismasan/sourced'

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rake spec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and the created tag, and push the .gem file to rubygems.org.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/ismasan/sourced.