diff --git a/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb b/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb index 18d697ec..2760b3ce 100644 --- a/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb +++ b/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb @@ -1,6 +1,6 @@ module Processes class DetermineVatRatesOnOrderPlaced - include Infra::ProcessManager.with_state { ProcessState } + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Pricing::OfferAccepted, @@ -21,29 +21,35 @@ def determine_vat_rates end end - def apply(event) - case event - when Pricing::OfferAccepted - state.with( - offer_accepted: true, - order_lines: event.data.fetch(:order_lines), - order_id: event.data.fetch(:order_id) - ) - when Fulfillment::OrderRegistered - state.with(order_placed: true) - end - end - def fetch_id(event) event.data.fetch(:order_id) end - ProcessState = Data.define(:offer_accepted, :order_placed, :order_id, :order_lines) do - def initialize(offer_accepted: false, order_placed: false, order_id: nil, order_lines: []) - super + class StateProjector + ProcessState = Data.define(:offer_accepted, :order_placed, :order_id, :order_lines) do + def initialize(offer_accepted: false, order_placed: false, order_id: nil, order_lines: []) + super + end + + def placed? = offer_accepted && order_placed end - def placed? = offer_accepted && order_placed + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Pricing::OfferAccepted + state_instance.with( + offer_accepted: true, + order_lines: event.data.fetch(:order_lines), + order_id: event.data.fetch(:order_id) + ) + when Fulfillment::OrderRegistered + state_instance.with(order_placed: true) + end + end end end end diff --git a/ecommerce/processes/lib/processes/order_item_invoicing_process.rb b/ecommerce/processes/lib/processes/order_item_invoicing_process.rb index 410a8a9a..82c36fcd 100644 --- a/ecommerce/processes/lib/processes/order_item_invoicing_process.rb +++ b/ecommerce/processes/lib/processes/order_item_invoicing_process.rb @@ -1,7 +1,6 @@ module Processes class OrderItemInvoicingProcess - include Infra::ProcessManager.with_state { ProcessState } - + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Pricing::PriceItemValueCalculated, Taxes::VatRateDetermined @@ -26,35 +25,9 @@ def act end end - def apply(event) - case event - when Pricing::PriceItemValueCalculated - state.with( - order_id: event.data.fetch(:order_id), - product_id: event.data.fetch(:product_id), - quantity: event.data.fetch(:quantity), - discounted_amount: event.data.fetch(:discounted_amount) - ) - when Taxes::VatRateDetermined - state.with( - vat_rate: event.data.fetch(:vat_rate) - ) - end - end - def fetch_id(event) "#{event.data.fetch(:order_id)}$#{event.data.fetch(:product_id)}" end - - ProcessState = Data.define(:order_id, :product_id, :quantity, :vat_rate, :discounted_amount) do - def initialize(order_id: nil, product_id: nil, quantity: nil, vat_rate: nil, discounted_amount: nil) - super - end - - def can_create_invoice_item? - order_id && product_id && quantity && vat_rate && discounted_amount - end - end end class MoneySplitter @@ -82,4 +55,36 @@ def call distributed_amounts end end + + class StateProjector + ProcessState = Data.define(:order_id, :product_id, :quantity, :vat_rate, :discounted_amount) do + def initialize(order_id: nil, product_id: nil, quantity: nil, vat_rate: nil, discounted_amount: nil) + super + end + + def can_create_invoice_item? + order_id && product_id && quantity && vat_rate && discounted_amount + end + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Pricing::PriceItemValueCalculated + state_instance.with( + order_id: event.data.fetch(:order_id), + product_id: event.data.fetch(:product_id), + quantity: event.data.fetch(:quantity), + discounted_amount: event.data.fetch(:discounted_amount) + ) + when Taxes::VatRateDetermined + state_instance.with( + vat_rate: event.data.fetch(:vat_rate) + ) + end + end + end end diff --git a/ecommerce/processes/lib/processes/release_payment_process.rb b/ecommerce/processes/lib/processes/release_payment_process.rb index ee126665..e0f4d095 100644 --- a/ecommerce/processes/lib/processes/release_payment_process.rb +++ b/ecommerce/processes/lib/processes/release_payment_process.rb @@ -1,7 +1,6 @@ module Processes class ReleasePaymentProcess - include Infra::ProcessManager.with_state { ProcessState } - + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Payments::PaymentAuthorized, Payments::PaymentReleased, @@ -16,24 +15,6 @@ def act release_payment if state.release? end - def apply(event) - case event - when Payments::PaymentAuthorized - state.with(payment: :authorized) - when Payments::PaymentReleased - state.with(payment: :released) - when Fulfillment::OrderRegistered - state.with( - order: :placed, - order_id: event.data.fetch(:order_id) - ) - when Pricing::OfferExpired - state.with(order: :expired) - when Fulfillment::OrderConfirmed - state.with(order: :confirmed) - end - end - def release_payment command_bus.call(Payments::ReleasePayment.new(order_id: state.order_id)) end @@ -42,13 +23,37 @@ def fetch_id(event) event.data.fetch(:order_id) end - ProcessState = Data.define(:order, :payment, :order_id) do - def initialize(order: :draft, payment: :none, order_id: nil) - super + class StateProjector + ProcessState = Data.define(:order, :payment, :order_id) do + def initialize(order: :draft, payment: :none, order_id: nil) + super + end + + def release? + payment.eql?(:authorized) && order.eql?(:expired) + end + end + + def self.initial_state_instance + ProcessState.new end - def release? - payment.eql?(:authorized) && order.eql?(:expired) + def self.apply(state_instance, event) + case event + when Payments::PaymentAuthorized + state_instance.with(payment: :authorized) + when Payments::PaymentReleased + state_instance.with(payment: :released) + when Fulfillment::OrderRegistered + state_instance.with( + order: :placed, + order_id: event.data.fetch(:order_id) + ) + when Pricing::OfferExpired + state_instance.with(order: :expired) + when Fulfillment::OrderConfirmed + state_instance.with(order: :confirmed) + end end end end diff --git a/ecommerce/processes/lib/processes/reservation_process.rb b/ecommerce/processes/lib/processes/reservation_process.rb index 402c1876..c657e986 100644 --- a/ecommerce/processes/lib/processes/reservation_process.rb +++ b/ecommerce/processes/lib/processes/reservation_process.rb @@ -1,6 +1,6 @@ module Processes class ReservationProcess - include Infra::ProcessManager.with_state { ProcessState } + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Pricing::OfferAccepted, @@ -28,20 +28,6 @@ def act end end - def apply(event) - case event - when Pricing::OfferAccepted - state.with( - order: :accepted, - order_lines: event.data.fetch(:order_lines).map { |ol| [ol.fetch(:product_id), ol.fetch(:quantity)] }.to_h - ) - when Fulfillment::OrderCancelled - state.with(order: :cancelled) - when Fulfillment::OrderConfirmed - state.with(order: :confirmed) - end - end - def reserve_stock unavailable_products = [] reserved_products = [] @@ -84,12 +70,34 @@ def fetch_id(event) event.data.fetch(:order_id) end - ProcessState = Data.define(:order, :order_lines) do - def initialize(order: nil, order_lines: []) - super(order:, order_lines: order_lines.freeze) + class StateProjector + ProcessState = Data.define(:order, :order_lines) do + def initialize(order: nil, order_lines: []) + super(order: order, order_lines: order_lines.freeze) + end + + def reserved_product_ids + order_lines.keys + end end - def reserved_product_ids = order_lines.keys + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Pricing::OfferAccepted + state_instance.with( + order: :accepted, + order_lines: event.data.fetch(:order_lines).map { |ol| [ol.fetch(:product_id), ol.fetch(:quantity)] }.to_h + ) + when Fulfillment::OrderCancelled + state_instance.with(order: :cancelled) + when Fulfillment::OrderConfirmed + state_instance.with(order: :confirmed) + end + end end class SomeInventoryNotAvailable < StandardError @@ -99,5 +107,6 @@ def initialize(unavailable_products) @unavailable_products = unavailable_products end end + end end diff --git a/ecommerce/processes/lib/processes/shipment_process.rb b/ecommerce/processes/lib/processes/shipment_process.rb index 67f0d432..b658bcb7 100644 --- a/ecommerce/processes/lib/processes/shipment_process.rb +++ b/ecommerce/processes/lib/processes/shipment_process.rb @@ -1,6 +1,6 @@ module Processes class ShipmentProcess - include Infra::ProcessManager.with_state { ProcessState } + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Shipping::ShippingAddressAddedToShipment, @@ -21,17 +21,6 @@ def act end end - def apply(event) - case event - when Shipping::ShippingAddressAddedToShipment - state.with(shipment: :address_set) - when Fulfillment::OrderRegistered - state.with(order: :placed) - when Fulfillment::OrderConfirmed - state.with(order: :confirmed) - end - end - def submit_shipment command_bus.call(Shipping::SubmitShipment.new(order_id: id)) end @@ -44,8 +33,25 @@ def fetch_id(event) event.data.fetch(:order_id) end - ProcessState = Data.define(:order, :shipment) do - def initialize(order: nil, shipment: nil) = super + class StateProjector + ProcessState = Data.define(:order, :shipment) do + def initialize(order: nil, shipment: nil) = super + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Shipping::ShippingAddressAddedToShipment + state_instance.with(shipment: :address_set) + when Fulfillment::OrderRegistered + state_instance.with(order: :placed) + when Fulfillment::OrderConfirmed + state_instance.with(order: :confirmed) + end + end end end end diff --git a/ecommerce/processes/lib/processes/three_plus_one_free.rb b/ecommerce/processes/lib/processes/three_plus_one_free.rb index 4a8c9b92..670d5fd9 100644 --- a/ecommerce/processes/lib/processes/three_plus_one_free.rb +++ b/ecommerce/processes/lib/processes/three_plus_one_free.rb @@ -1,6 +1,6 @@ module Processes class ThreePlusOneFree - include Infra::ProcessManager.with_state { ProcessState } + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Pricing::PriceItemAdded, @@ -22,24 +22,6 @@ def act end end - def apply(event) - product_id = event.data.fetch(:product_id) - case event - when Pricing::PriceItemAdded - lines = (state.lines + [{ product_id:, price: event.data.fetch(:price) }]) - state.with(lines:) - when Pricing::PriceItemRemoved - lines = state.lines.dup - index_of_line_to_remove = lines.index { |line| line.fetch(:product_id) == product_id } - lines.delete_at(index_of_line_to_remove) - state.with(lines:) - when Pricing::ProductMadeFreeForOrder - state.with(free_product: product_id) - when Pricing::FreeProductRemovedFromOrder - state.with(free_product: nil) - end - end - def remove_old_free_product(product_id) command_bus.call(Pricing::RemoveFreeProductFromOrder.new(order_id: id, product_id:)) end @@ -52,16 +34,40 @@ def fetch_id(event) event.data.fetch(:order_id) end - ProcessState = Data.define(:lines, :free_product) do - def initialize(lines: [], free_product: nil) - super(lines: lines.freeze, free_product:) + class StateProjector + ProcessState = Data.define(:lines, :free_product) do + def initialize(lines: [], free_product: nil) + super(lines: lines.freeze, free_product:) + end + + MIN_ORDER_LINES_QUANTITY = 4 + + def eligible_free_product + if lines.size >= MIN_ORDER_LINES_QUANTITY + lines.sort_by { _1.fetch(:price) }.first.fetch(:product_id) + end + end end - MIN_ORDER_LINES_QUANTITY = 4 + def self.initial_state_instance + ProcessState.new + end - def eligible_free_product - if lines.size >= MIN_ORDER_LINES_QUANTITY - lines.sort_by { _1.fetch(:price) }.first.fetch(:product_id) + def self.apply(state_instance, event) + product_id = event.data.fetch(:product_id) + case event + when Pricing::PriceItemAdded + lines = (state_instance.lines + [{ product_id:, price: event.data.fetch(:price) }]) + state_instance.with(lines:) + when Pricing::PriceItemRemoved + lines = state_instance.lines.dup + index_of_line_to_remove = lines.index { |line| line.fetch(:product_id) == product_id } + lines.delete_at(index_of_line_to_remove) + state_instance.with(lines:) + when Pricing::ProductMadeFreeForOrder + state_instance.with(free_product: product_id) + when Pricing::FreeProductRemovedFromOrder + state_instance.with(free_product: nil) end end end diff --git a/infra/lib/infra/process_manager.rb b/infra/lib/infra/process_manager.rb index 0b211a58..90fd3121 100644 --- a/infra/lib/infra/process_manager.rb +++ b/infra/lib/infra/process_manager.rb @@ -15,14 +15,38 @@ def call(event) private - attr_reader :event_store, :command_bus, :id + attr_reader :event_store, :command_bus, :id, :state def build_state(event) + projector_class_block = self.class.instance_variable_get(:@projector_class_definition_block) + unless projector_class_block + raise "State projector class definition block not found for #{self.class}. "\ + "Ensure it's configured via Infra::ProcessManager.with_state { YourProjectorClass }." + end + + projector_class = projector_class_block.call + unless projector_class.is_a?(Class) && + projector_class.respond_to?(:apply) && + projector_class.respond_to?(:initial_state_instance) + raise ArgumentError, + "The block provided to with_state must return a valid Projector class " \ + "that responds to :apply and :initial_state_instance. Got: #{projector_class.inspect}" + end + with_retry do past_events = event_store.read.stream(stream_name).to_a - last_stored = past_events.size - 1 - event_store.link(event.event_id, stream_name:, expected_version: last_stored) - (past_events + [event]).each { |ev| @state = apply(ev) } + last_stored_idx = past_events.empty? ? -1 : past_events.size - 1 + event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored_idx) + + current_projected_state = projector_class.initial_state_instance + all_events_to_apply = past_events + [event] + + unique_events = all_events_to_apply.uniq(&:event_id) + + unique_events.each do |ev| + current_projected_state = projector_class.apply(current_projected_state, ev) + end + @state = current_projected_state end end @@ -43,30 +67,32 @@ def subscribes_to(*events) attr_reader :subscribed_events end - def self.with_state(&state_class_block) + def self.with_state(&projector_class_block) unless block_given? - raise ArgumentError, "A block returning the state class is required." + raise ArgumentError, "A block returning the projector class is required for with_state." end Module.new do - @state_definition_block = state_class_block + @projector_class_definition_block_config = projector_class_block define_method(:initial_state) do - block = self.class.instance_variable_get(:@state_definition_block) - raise "State definition block not found on #{self.class}" unless block - - state_class = block.call - raise "State definition block did not return a Class" unless state_class.is_a?(Class) - - state_class.new - end - - define_method(:state) do - @state ||= initial_state + block = self.class.instance_variable_get(:@projector_class_definition_block) + unless block + raise "Projector class definition block not found on #{self.class}. " \ + "Was Infra::ProcessManager.with_state called with a block?" + end + + projector_class = block.call + unless projector_class.is_a?(Class) && projector_class.respond_to?(:initial_state_instance) + raise "The block provided to with_state did not return a Class responding to :initial_state_instance. " \ + "Got: #{projector_class.inspect}" + end + projector_class.initial_state_instance end def self.included(host_class) - host_class.instance_variable_set(:@state_definition_block, @state_definition_block) + projector_block_to_set = @projector_class_definition_block_config + host_class.instance_variable_set(:@projector_class_definition_block, projector_block_to_set) host_class.include(ProcessMethods) host_class.include(Infra::Retry)