Skip to content

Commit

Permalink
Update demos
Browse files Browse the repository at this point in the history
  • Loading branch information
hopsoft committed Dec 5, 2024
1 parent 17b8844 commit ae6675b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 29 deletions.
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,10 @@ bin/demo-bus # demonstrates Bus performance
bin/demo-station # demonstrates Station performance
```

Both demos simulate I/O-bound operations _(1 second latency per subscriber)_ to show how LocalBus handles concurrent processing. For example, on an 10-core system:
Both demos simulate I/O-bound operations _(1 second latency per subscriber)_ to show how LocalBus handles concurrent processing.
For example, on an 10-core system:

- The Bus processes a message with 10 I/O-bound subscribers in ~1 second instead of 10 seconds
- The Station processes 10 messages with 10 I/O-bound subscribers each in ~1 second instead of 100 seconds

This demonstrates how LocalBus offers high throughput for I/O-bound operations. :raised_hands:
LocalBus can process 10 messages with 10 I/O-bound subscribers each in **~1 second instead of 100 seconds**

## See Also

Expand Down
39 changes: 30 additions & 9 deletions bin/demo-bus
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,40 @@ require_relative "../lib/local_bus"
AmazingPrint.pry!

bus = LocalBus.instance.bus
subscriber_count = bus.concurrency
subscribe_count = 10
publish_count = 10

subscriber_count.times do |num|
bus.subscribe "bus-demo" do |message|
sleep 1 # sleep 1 second per subscriber
# setup multiple subscribers per/message
publish_count.times do |i|
subscribe_count.times do
bus.subscribe "demo-#{i}" do |message|
sleep 1 # sleep 1 second per subscriber
end
end
end

start = Time.now
puts "Publishing 1 message with #{subscriber_count} subscribers."
message = bus.publish("bus-demo", demo: :bus)

# NOTE: The Bus blocks until all subscribers complete even if we don't explicitly wait
# Wrap in an Async task since we're publishing multiple messages to the Bus.
# This prevents blocking as the Bus implicitly waits for each published message to complete.
# Note that this is not necessary when publishing via the Station.
task = Async do
publish_count.times.map do |i|
bus.publish("demo-#{i}")
end
end
messages = task.result

duration = Time.now - start
subscribers = message.subscribers
ap subscribers: subscribers.map(&:to_h), processed: subscribers.size, duration: duration
subscribers = messages.map(&:subscribers).flatten

puts

ap(
subscribers: subscribers.map(&:to_h),
total_messages: messages.size,
total_subscribers: subscribers.size,
duration: duration
)

puts "Published #{messages.size} messages with #{bus.concurrency} subscribers each."
39 changes: 24 additions & 15 deletions bin/demo-station
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,37 @@ AmazingPrint.pry!

station = LocalBus.instance.station
bus = station.bus
subscriber_count = bus.concurrency
publish_count = station.threads
subscribe_count = 10
publish_count = 10

subscriber_count.times do |num|
station.subscribe "station-demo" do |message|
sleep 1 # sleep 1 second per subscriber
# setup multiple subscribers per/message
publish_count.times do |i|
subscribe_count.times do
bus.subscribe "demo-#{i}" do |message|
sleep 1 # sleep 1 second per subscriber
end
end
end

station.start
start = Time.now
puts "Calling Station#publish with #{subscriber_count} subscribers that all sleep for 1 second each."
messages = publish_count.times.map do
station.publish "station-demo"
messages = publish_count.times.map do |i|
station.publish "demo-#{i}"
end
puts "Published #{messages.size} messages with #{subscriber_count} subscribers each."
puts "Got here fast because the Station runs on a separate thread. Now we wait..."
messages.flatten!

# NOTE: The Station will block until all subscribers complete if we wait or access subscribers.
# Also, the process will attempt a safe shutdown (allowing some time for subscribers to complete)
# even if we don't explicitly wait for or collect the value.
puts "Got here fast because the Station runs on a thread pool. Now we wait..."
messages.each(&:wait)

duration = Time.now - start
subscribers = messages.map(&:subscribers).flatten
ap subscribers: subscribers.map(&:to_h), processed: subscribers.size, duration: duration

puts

ap(
subscribers: subscribers.map(&:to_h),
total_messages: messages.size,
total_subscribers: subscribers.size,
duration: duration
)

puts "Published #{messages.size} messages with #{bus.concurrency} subscribers each."

0 comments on commit ae6675b

Please sign in to comment.