Class: Funes::EventStream

Inherits:
Object
  • Object
show all
Defined in:
app/event_streams/funes/event_stream.rb

Overview

EventStream manages the append-only sequence of events for a specific entity. Each stream is identified by an idx (entity identifier) and provides methods for appending events and configuring how projections are triggered.

EventStreams implement a three-tier consistency model:

  • Consistency Projection: Validates business rules before persisting the event. If invalid, the event is rejected.
  • Transactional Projections: Execute synchronously in the same database transaction as the event.
  • Async Projections: Execute asynchronously via ActiveJob after the event is committed.

Temporal Queries

EventStreams support temporal queries through the as_of parameter. When an EventStream is created with a specific timestamp, only events created before or at that timestamp are included, enabling point-in-time state reconstruction.

Concurrency Control

EventStreams use optimistic concurrency control with version numbers. Each event gets an incrementing version number with a unique constraint on (idx, version), preventing race conditions when multiple processes append to the same stream simultaneously.

Examples:

Define an event stream with projections

class OrderEventStream < Funes::EventStream
  consistency_projection OrderValidationProjection
  add_transactional_projection OrderSnapshotProjection
  add_async_projection OrderReportProjection, queue: :reports
end

Append events to a stream

stream = OrderEventStream.for("order-123")
event = stream.append!(Order::Placed.new(total: 99.99))

if event.valid?
  puts "Event persisted with version #{event.version}"
else
  puts "Event rejected: #{event.errors.full_messages}"
end

Temporal query - get stream state as of a specific time

stream = OrderEventStream.for("order-123", 1.month.ago)
stream.events # => only events up to 1 month ago

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#idxObject (readonly)

Returns the value of attribute idx.



137
138
139
# File 'app/event_streams/funes/event_stream.rb', line 137

def idx
  @idx
end

Class Method Details

.add_async_projection(projection, as_of: :last_event_time, **options) ⇒ void

This method returns an undefined value.

Register an async projection that executes in a background job after the event is committed.

Async projections are scheduled via ActiveJob after the event transaction commits. You can pass any ActiveJob options (queue, wait, wait_until, priority, etc.) to control job scheduling.

The as_of parameter controls the timestamp used when the projection job executes:

  • :last_event_time (default) - Uses the creation time of the last event
  • :job_time - Uses Time.current when the job executes
  • Proc/Lambda - Custom logic that receives the last event and returns a Time object

Examples:

Schedule with custom queue

class OrderEventStream < Funes::EventStream
  add_async_projection OrderReportProjection, queue: :reports
end

Schedule with delay

class OrderEventStream < Funes::EventStream
  add_async_projection AnalyticsProjection, wait: 5.minutes
end

Use job execution time instead of event time

class OrderEventStream < Funes::EventStream
  add_async_projection RealtimeProjection, as_of: :job_time
end

Custom as_of logic with proc

class OrderEventStream < Funes::EventStream
  add_async_projection EndOfDayProjection, as_of: ->(last_event) { last_event.created_at.beginning_of_day }
end

Parameters:

  • projection (Class<Funes::Projection>)

    The projection class to execute asynchronously.

  • as_of (Symbol, Proc) (defaults to: :last_event_time)

    Strategy for determining the as_of timestamp (:last_event_time, :job_time, or Proc).

  • options (Hash)

    ActiveJob options for scheduling (queue, wait, wait_until, priority, etc.).



113
114
115
116
# File 'app/event_streams/funes/event_stream.rb', line 113

def add_async_projection(projection, as_of: :last_event_time, **options)
  @async_projections ||= []
  @async_projections << { class: projection, as_of_strategy: as_of, options: options }
end

.add_transactional_projection(projection) ⇒ void

This method returns an undefined value.

Register a transactional projection that executes synchronously in the same database transaction.

Transactional projections run after the event is persisted but within the same database transaction. If a transactional projection fails, the entire transaction (including the event) is rolled back.

Examples:

class OrderEventStream < Funes::EventStream
  add_transactional_projection OrderSnapshotProjection
end

Parameters:

  • projection (Class<Funes::Projection>)

    The projection class to execute transactionally.



74
75
76
77
# File 'app/event_streams/funes/event_stream.rb', line 74

def add_transactional_projection(projection)
  @transactional_projections ||= []
  @transactional_projections << projection
end

.consistency_projection(projection) ⇒ void

This method returns an undefined value.

Register a consistency projection that validates business rules before persisting events.

The consistency projection runs before the event is saved. If the resulting state is invalid, the event is rejected and not persisted to the database.

Examples:

class InventoryEventStream < Funes::EventStream
  consistency_projection InventoryValidationProjection
end

Parameters:

  • projection (Class<Funes::Projection>)

    The projection class that will validate the state.



58
59
60
# File 'app/event_streams/funes/event_stream.rb', line 58

def consistency_projection(projection)
  @consistency_projection = projection
end

.for(idx, as_of = nil) ⇒ Funes::EventStream

Create a new EventStream instance for the given entity identifier.

Examples:

Current state

stream = OrderEventStream.for("order-123")

State as of a specific time

stream = OrderEventStream.for("order-123", 1.month.ago)

Parameters:

  • idx (String)

    The entity identifier.

  • as_of (Time, nil) (defaults to: nil)

    Optional timestamp for temporal queries. If provided, only events created before or at this timestamp will be included. Defaults to Time.current.

Returns:



130
131
132
# File 'app/event_streams/funes/event_stream.rb', line 130

def for(idx, as_of = nil)
  new(idx, as_of)
end

Instance Method Details

#append!(new_event) ⇒ Funes::Event

Append a new event to the stream.

This method validates the event, runs the consistency projection (if configured), persists the event with an incremented version number, and triggers transactional and async projections.

Examples:

Successful append

event = stream.append!(Order::Placed.new(total: 99.99))
if event.valid?
  puts "Event persisted with version #{event.version}"
end

Handling validation failure

event = stream.append!(InvalidEvent.new)
unless event.valid?
  puts "Event rejected: #{event.errors.full_messages}"
end

Handling concurrency conflict

event = stream.append!(SomeEvent.new)
if event.errors[:base].present?
  # Race condition detected, retry logic here
end

Parameters:

  • new_event (Funes::Event)

    The event to append to the stream.

Returns:

  • (Funes::Event)

    The event object (check valid? to see if it was persisted).



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'app/event_streams/funes/event_stream.rb', line 164

def append!(new_event)
  return new_event unless new_event.valid?
  return new_event if consistency_projection.present? &&
                      compute_projection_with_new_event(consistency_projection, new_event).invalid?

  ActiveRecord::Base.transaction do
    begin
      @instance_new_events << new_event.persist!(@idx, incremented_version)
      run_transactional_projections
    rescue ActiveRecord::RecordNotUnique, Funes::TransactionalProjectionFailed
      new_event.errors.add(:base, I18n.t("funes.events.racing_condition_on_insert"))
      raise ActiveRecord::Rollback
    end
  end

  schedule_async_projections unless new_event.errors.any?

  new_event
end

#eventsArray<Funes::Event>

Get all events in the stream as event instances.

Returns both previously persisted events (up to as_of timestamp) and any new events appended in this session.

Examples:

stream = OrderEventStream.for("order-123")
stream.events.each do |event|
  puts "#{event.class.name} at #{event.created_at}"
end

Returns:



203
204
205
# File 'app/event_streams/funes/event_stream.rb', line 203

def events
  (previous_events + @instance_new_events).map(&:to_klass_instance)
end