Class: Funes::EventStream
- Inherits:
-
Object
- Object
- Funes::EventStream
- Defined in:
- app/event_streams/funes/event_stream.rb
Overview
EventStream manages the append-only sequence of events for a specific entity.
Each stream is identified by an idx (entity identifier) and provides methods for appending
events and configuring how projections are triggered.
EventStreams implement a three-tier consistency model:
- Consistency Projection: Validates business rules before persisting the event. If invalid, the event is rejected.
- Transactional Projections: Execute synchronously in the same database transaction as the event.
- Async Projections: Execute asynchronously via ActiveJob after the event is committed.
Temporal Queries
EventStreams support temporal queries through the as_of parameter. When an EventStream is created
with a specific timestamp, only events created before or at that timestamp are included, enabling
point-in-time state reconstruction.
Concurrency Control
EventStreams use optimistic concurrency control with version numbers. Each event gets an incrementing
version number with a unique constraint on (idx, version), preventing race conditions when multiple
processes append to the same stream simultaneously.
Instance Attribute Summary collapse
-
#idx ⇒ Object
readonly
Returns the value of attribute idx.
Class Method Summary collapse
-
.add_async_projection(projection, as_of: :last_event_time, **options) ⇒ void
Register an async projection that executes in a background job after the event is committed.
-
.add_transactional_projection(projection) ⇒ void
Register a transactional projection that executes synchronously in the same database transaction.
-
.consistency_projection(projection) ⇒ void
Register a consistency projection that validates business rules before persisting events.
-
.for(idx, as_of = nil) ⇒ Funes::EventStream
Create a new EventStream instance for the given entity identifier.
Instance Method Summary collapse
-
#append!(new_event) ⇒ Funes::Event
Append a new event to the stream.
-
#events ⇒ Array<Funes::Event>
Get all events in the stream as event instances.
Instance Attribute Details
#idx ⇒ Object (readonly)
Returns the value of attribute idx.
137 138 139 |
# File 'app/event_streams/funes/event_stream.rb', line 137 def idx @idx end |
Class Method Details
.add_async_projection(projection, as_of: :last_event_time, **options) ⇒ void
This method returns an undefined value.
Register an async projection that executes in a background job after the event is committed.
Async projections are scheduled via ActiveJob after the event transaction commits. You can pass any ActiveJob options (queue, wait, wait_until, priority, etc.) to control job scheduling.
The as_of parameter controls the timestamp used when the projection job executes:
:last_event_time(default) - Uses the creation time of the last event:job_time- Uses Time.current when the job executes- Proc/Lambda - Custom logic that receives the last event and returns a Time object
113 114 115 116 |
# File 'app/event_streams/funes/event_stream.rb', line 113 def add_async_projection(projection, as_of: :last_event_time, **) @async_projections ||= [] @async_projections << { class: projection, as_of_strategy: as_of, options: } end |
.add_transactional_projection(projection) ⇒ void
This method returns an undefined value.
Register a transactional projection that executes synchronously in the same database transaction.
Transactional projections run after the event is persisted but within the same database transaction. If a transactional projection fails, the entire transaction (including the event) is rolled back.
74 75 76 77 |
# File 'app/event_streams/funes/event_stream.rb', line 74 def add_transactional_projection(projection) @transactional_projections ||= [] @transactional_projections << projection end |
.consistency_projection(projection) ⇒ void
This method returns an undefined value.
Register a consistency projection that validates business rules before persisting events.
The consistency projection runs before the event is saved. If the resulting state is invalid, the event is rejected and not persisted to the database.
58 59 60 |
# File 'app/event_streams/funes/event_stream.rb', line 58 def consistency_projection(projection) @consistency_projection = projection end |
.for(idx, as_of = nil) ⇒ Funes::EventStream
Create a new EventStream instance for the given entity identifier.
130 131 132 |
# File 'app/event_streams/funes/event_stream.rb', line 130 def for(idx, as_of = nil) new(idx, as_of) end |
Instance Method Details
#append!(new_event) ⇒ Funes::Event
Append a new event to the stream.
This method validates the event, runs the consistency projection (if configured), persists the event with an incremented version number, and triggers transactional and async projections.
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'app/event_streams/funes/event_stream.rb', line 164 def append!(new_event) return new_event unless new_event.valid? return new_event if consistency_projection.present? && compute_projection_with_new_event(consistency_projection, new_event).invalid? ActiveRecord::Base.transaction do begin @instance_new_events << new_event.persist!(@idx, incremented_version) run_transactional_projections rescue ActiveRecord::RecordNotUnique, Funes::TransactionalProjectionFailed new_event.errors.add(:base, I18n.t("funes.events.racing_condition_on_insert")) raise ActiveRecord::Rollback end end schedule_async_projections unless new_event.errors.any? new_event end |
#events ⇒ Array<Funes::Event>
Get all events in the stream as event instances.
Returns both previously persisted events (up to as_of timestamp) and any new events
appended in this session.
203 204 205 |
# File 'app/event_streams/funes/event_stream.rb', line 203 def events (previous_events + @instance_new_events).map(&:to_klass_instance) end |