Skip to content

Commit

Permalink
Run RBS
Browse files Browse the repository at this point in the history
  • Loading branch information
hopsoft committed Nov 4, 2024
1 parent 92592cc commit 90391aa
Show file tree
Hide file tree
Showing 16 changed files with 367 additions and 1 deletion.
4 changes: 4 additions & 0 deletions bin/rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

exec "bundle exec rbs-inline --output lib #{ARGV.join(" ")}".strip
2 changes: 2 additions & 0 deletions lib/local_bus.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

# rbs_inline: enabled

require "zeitwerk"
loader = Zeitwerk::Loader.for_gem
loader.setup
Expand Down
2 changes: 2 additions & 0 deletions lib/local_bus/bus.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

# rbs_inline: enabled

class LocalBus
# Local in-process single threaded "message bus" with non-blocking I/O
class Bus
Expand Down
2 changes: 2 additions & 0 deletions lib/local_bus/message.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

# rbs_inline: enabled

class LocalBus
# Represents a message in the LocalBus system
class Message
Expand Down
2 changes: 2 additions & 0 deletions lib/local_bus/pledge.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

# rbs_inline: enabled

class LocalBus
# A promise-like object that wraps an Async::Barrier and a list of Subscribers.
# Delegates #wait to the barrier and all other methods to the subscriber list.
Expand Down
1 change: 1 addition & 0 deletions lib/local_bus/station.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

# rbs_inline: enabled
# rubocop:disable Lint/MissingCopEnableDirective
# rubocop:disable Style/ArgumentsForwarding

Expand Down
2 changes: 2 additions & 0 deletions lib/local_bus/subscriber.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

# rbs_inline: enabled

class LocalBus
# Wraps a Callable (Proc) and Message intended for asynchronous execution.
class Subscriber
Expand Down
2 changes: 2 additions & 0 deletions lib/local_bus/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

# rbs_inline: enabled

class LocalBus
VERSION = "0.1.0"
end
13 changes: 13 additions & 0 deletions sig/generated/local_bus.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Generated from lib/local_bus.rb with RBS::Inline

class LocalBus
include Singleton

attr_reader bus: untyped

attr_reader station: untyped

private

def initialize: () -> untyped
end
72 changes: 72 additions & 0 deletions sig/generated/local_bus/bus.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Generated from lib/local_bus/bus.rb with RBS::Inline

class LocalBus
# Local in-process single threaded "message bus" with non-blocking I/O
class Bus
include MonitorMixin

# Constructor
# @note Creates a new Bus instance with specified concurrency
# @rbs concurrency: Integer -- maximum number of concurrent tasks (default: Concurrent.processor_count)
def initialize: (?concurrency: Integer) -> untyped

# Maximum number of concurrent tasks that can run in "parallel"
# @rbs return: Integer -- current concurrency value
def concurrency: () -> Integer

# Sets the concurrency
# @rbs concurrency: Integer -- max number of concurrent tasks that can run in "parallel"
# @rbs return: Integer -- new concurrency value
def concurrency=: (untyped value) -> Integer

# Registered topics that have subscribers
# @rbs return: Array[String] -- list of topic names
def topics: () -> Array[String]

# Registered subscriptions
# @rbs return: Hash[String, Array[callable]] -- mapping of topics to callables
def subscriptions: () -> Hash[String, Array[callable]]

# Subscribes a callable to a topic
# @rbs topic: String -- topic name
# @rbs callable: (Message) -> untyped -- callable that will process messages published to the topic
# @rbs &block: (Message) -> untyped -- alternative way to provide a callable
# @rbs return: self
# @raise [ArgumentError] if neither callable nor block is provided
def subscribe: (String topic, ?callable: Message) { (Message) -> untyped } -> self

# Unsubscribes a callable from a topic
# @rbs topic: String -- topic name
# @rbs callable: (Message) -> untyped -- subscriber that should no longer receive messages
# @rbs return: self
def unsubscribe: (String topic, callable: Message) -> self

# Unsubscribes all subscribers from a topic and removes the topic
# @rbs topic: String -- topic name
# @rbs return: self
def unsubscribe_all: (String topic) -> self

# Executes a block and unsubscribes all subscribers from the topic afterwards
# @rbs topic: String -- topic name
# @rbs block: (String) -> void -- block to execute (yields the topic)
def with_topic: (String topic) ?{ (?) -> untyped } -> untyped

# Publishes a message to a topic
#
# @note If subscribers are rapidly created/destroyed mid-publish, there's a theoretical
# possibility of object_id reuse. However, this is extremely unlikely in practice.
#
# * If subscribers are added mid-publish, they will not receive the message
# * If subscribers are removed mid-publish, they will still receive the message
#
# @note If the timeout is exceeded, the task will be cancelled before all subscribers have completed.
#
# Check the Subscriber for any errors.
#
# @rbs topic: String -- topic name
# @rbs timeout: Float -- seconds to wait before cancelling (default: 300)
# @rbs payload: Hash -- message payload
# @rbs return: Array[Subscriber] -- list of performed subscribers (empty if no subscribers)
def publish: (String topic, ?timeout: Float, **untyped payload) -> Array[Subscriber]
end
end
46 changes: 46 additions & 0 deletions sig/generated/local_bus/message.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Generated from lib/local_bus/message.rb with RBS::Inline

class LocalBus
# Represents a message in the LocalBus system
class Message
# Constructor
# @note Creates a new Message instance with the given topic and payload
# @rbs topic: String -- the topic of the message
# @rbs timeout: Float? -- optional timeout for message processing (in seconds)
# @rbs payload: Hash -- the message payload
def initialize: (String topic, ?timeout: Float?, **untyped payload) -> untyped

# Unique identifier for the message
# @rbs return: String
attr_reader id: untyped

# Message topic
# @rbs return: String
attr_reader topic: untyped

# Message payload
# @rbs return: Hash
attr_reader payload: untyped

# Time when the message was created or published
# @rbs return: Time
attr_reader created_at: untyped

# ID of the thread that created the message
# @rbs return: Integer
attr_reader thread_id: untyped

# Timeout for message processing (in seconds)
# @rbs return: Float
attr_reader timeout: untyped

# Metadata for the message
# @rbs return: Hash[Symbol, untyped]
attr_reader metadata: untyped

# Allows pattern matching on message attributes
# @rbs keys: Array[Symbol] -- keys to extract from the message
# @rbs return: Hash[Symbol, untyped]
def deconstruct_keys: (Array[Symbol] keys) -> Hash[Symbol, untyped]
end
end
20 changes: 20 additions & 0 deletions sig/generated/local_bus/pledge.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated from lib/local_bus/pledge.rb with RBS::Inline

class LocalBus
# A promise-like object that wraps an Async::Barrier and a list of Subscribers.
# Delegates #wait to the barrier and all other methods to the subscriber list.
class Pledge
# Constructor
# @rbs barrier: Async::Barrier -- barrier used to wait for all tasks
# @rbs subscribers: Array[Subscriber]
def initialize: (Async::Barrier barrier, *untyped subscribers) -> untyped

# Blocks and waits for the barrier... all subscribers to complete
# @rbs return: void
def wait: () -> void

# Blocks and waits then returns all subscribers
# @rbs return: Array[Subscriber]
def value: () -> Array[Subscriber]
end
end
111 changes: 111 additions & 0 deletions sig/generated/local_bus/station.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Generated from lib/local_bus/station.rb with RBS::Inline

class LocalBus
# An in-process message queuing system that buffers and publishes messages to Bus.
# This class acts as an intermediary, queuing messages internally before publishing them to the Bus.
#
# @note Station shares the same interface as Bus and is thus a message bus.
# The key difference is that Stations are multi-threaded and will not block the main thread.
#
# Three fallback policies are supported:
# 1. `abort` - Raises an exception and discards the task when the queue is full (default)
# 2. `discard` - Discards the task when the queue is full
# 3. `caller_runs` - Executes the task on the calling thread when the queue is full,
# This effectively jumps the queue (and blocks the main thread) but ensures the task is performed
#
# IMPORTANT: Be sure to release resources like database connections in subscribers when publishing via Station.
class Station
include MonitorMixin

class TimeoutError < StandardError
end

# Default options for Concurrent::FixedThreadPool (can be overridden via the constructor)
# @see https://ruby-concurrency.github.io/concurrent-ruby/1.3.4/Concurrent/ThreadPoolExecutor.html
THREAD_POOL_OPTIONS: untyped

# Constructor
# @rbs bus: Bus -- local message bus (default: Bus.new)
# @rbs threads: Integer -- number of threads (default: Concurrent.processor_count)
# @rbs default_timeout: Float -- seconds to wait for a future to complete
# @rbs shutdown_timeout: Float -- seconds to wait for all futures to complete on process exit
# @rbs options: Hash[Symbol, untyped] -- Concurrent::FixedThreadPool options
# @rbs return: void
def initialize: (?bus: Bus, ?threads: Integer, ?default_timeout: Float, ?shutdown_timeout: Float, **untyped options) -> void

# Bus instance
# @rbs return: Bus
attr_reader bus: untyped

# Number of threads used to process messages
# @rbs return: Integer
attr_reader threads: untyped

# Default timeout for message processing (in seconds)
# @rbs return: Float
attr_reader default_timeout: untyped

# Timeout for graceful shutdown (in seconds)
# @rbs return: Float
attr_reader shutdown_timeout: untyped

# Starts the broker
# @rbs options: Hash[Symbol, untyped] -- Concurrent::FixedThreadPool options
# @rbs return: void
def start: (**untyped options) -> void

# Stops the broker
# @rbs timeout: Float -- seconds to wait for all futures to complete
# @rbs return: void
def stop: (?timeout: Float) -> void

# Indicates if the broker is running
# @rbs return: bool
def running?: () -> bool

# Subscribe to a topic
# @rbs topic: String -- topic name
# @rbs callable: (Message) -> untyped -- callable that will process messages published to the topic
# @rbs &block: (Message) -> untyped -- alternative way to provide a callable
# @rbs return: self
def subscribe: (String topic, ?callable: Message) { (Message) -> untyped } -> self

# Unsubscribe from a topic
# @rbs topic: String -- topic name
# @rbs return: self
def unsubscribe: (String topic) -> self

# Unsubscribes all subscribers from a topic and removes the topic
# @rbs topic: String -- topic name
# @rbs return: self
def unsubscribe_all: (String topic) -> self

# Publishes a message to Bus on a separate thread keeping the main thread free for additional work.
#
# @note This allows you to publish messages when performing operations like handling web requests
# without blocking the main thread and slowing down the response.
#
# @see https://ruby-concurrency.github.io/concurrent-ruby/1.3.4/Concurrent/Promises/Future.html
#
# @rbs topic: String | Symbol -- topic name
# @rbs timeout: Float -- seconds to wait before cancelling
# @rbs payload: Hash[Symbol, untyped] -- message payload
# @rbs return: Concurrent::Promises::Future
def publish: (String | Symbol topic, ?timeout: Float, **untyped payload) -> Concurrent::Promises::Future

private

# Thread pool used for asynchronous operations
# @rbs return: Concurrent::FixedThreadPool
attr_reader pool: untyped

# Starts the shutdown handler thread
# @rbs return: void
def start_shutdown_handler: () -> void

# Enables safe shutdown on process exit by trapping specified signals
# @rbs on: Array[String] -- signals to trap
# @rbs return: void
def enable_safe_shutdown: (on: Array[String]) -> void
end
end
Loading

0 comments on commit 90391aa

Please sign in to comment.