-
-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Can't stop client without raising an error #82
Comments
This how it looks like : # require_relative '../config/environment'
# Time.current and other time manipulation methods
require 'active_support/all'
require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'
class MexCWebSocketClient
def initialize
@ticker_updates = {}
@log_messages = []
@last_pong = nil
@subscriptions = {} # symbol: status (:subscribing -> :subscribed -> :unsubscribing -> :unsubscribed)
end
def log(message)
puts message unless $stdout.tty?
@log_messages << message
end
def ping(connection)
log("Ping...")
connection.write(Protocol::WebSocket::TextMessage.generate({
method: "PING",
params: []
}))
end
def subscribe(symbol, connection)
state = @subscriptions.fetch(symbol, :unsubscribed)
if state == :unsubscribed
@subscriptions[symbol] = :subscribing
log("Subscribe to #{symbol} updates")
connection.write(Protocol::WebSocket::TextMessage.generate({
method: "SUBSCRIPTION",
params: [ "[email protected]@#{symbol}@UTC+1" ]
}))
connection.flush
else
log("Can't subscribe to #{symbol} updates (state=#{state})")
end
end
def unsubscribe(symbol, connection)
state = @subscriptions.fetch(symbol, :unsubscribed)
if state == :subscribed
@subscriptions[symbol] = :unsubscribing
log("Unsubscribe from #{symbol} updates")
connection.write(Protocol::WebSocket::TextMessage.generate({
method: "UNSUBSCRIPTION",
params: [ "[email protected]@#{symbol}@UTC+1" ]
}))
connection.flush
else
log("Can't unsubscribe from #{symbol} updates (state=#{state})")
end
end
def display_status(connection)
messages = [@log_messages.count, 10].min
(@ticker_updates.keys.length + 4 + messages).times { print "\e[A\e[K" } if $stdout.tty?
puts "MEXC Public Listener Realtime Status"
puts "Last PingPong\t#{@last_pong}"
puts "Last Ticker Updates"
@ticker_updates.each do |symbol, data|
puts " #{'%-12s' % symbol}#{'%16s' % data[:p]} (#{data[:tr]})"
end
puts "Last Log Messages"
@log_messages.last(messages).each do |msg|
puts " #{msg}"
end
end
def process_command(command, connection)
log "Process Command #{command}"
case command.chomp
when /\Aquit\z/
puts("Remove current subscriptions...")
@subscriptions.each do |symbol, state|
unsubscribe(symbol, connection) if state == :subscribed
end
sleep(0.1) while @ticker_updates.count > 0
puts("Shutting down connection...")
connection.shutdown
when /\Astatus\z/
display_status(connection)
when /\Asubscribe (\w+)\z/
symbol = $1
# if Pair.find_by(symbol: symbol).present?
subscribe(symbol, connection)
# else
# log("subscribe error: #{symbol} does not exist")
# end
when /\Aunsubscribe (\w+)\z/
symbol = $1
# if Pair.find_by(symbol: symbol).present?
unsubscribe(symbol, connection)
# else
# log("unsubscribe error: #{symbol} does not exist")
# end
end
end
def process_message(message)
m = message.parse # decode message as JSON
if (s = m[:s]) && (c = m[:c]) && c.start_with?("[email protected]") && (d = m[:d])
@ticker_updates[s] = d
elsif m[:msg]
if m[:msg] == "PONG"
@last_pong = Time.current
else
symbols = m[:msg].scan(/@([A-Z]+)@UTC/).flatten.uniq
symbols.each do |symbol|
if @subscriptions.has_key?(symbol)
case @subscriptions[symbol]
when :subscribing
@subscriptions[symbol] = :subscribed
when :unsubscribing
@ticker_updates.delete(symbol)
@subscriptions[symbol] = :unsubscribed
end
end
end
end
else
log(m.to_h)
end
end
def run
Async do |task|
endpoint = Async::HTTP::Endpoint.parse("wss://wbs.mexc.com/ws",
alpn_protocols: Async::HTTP::Protocol::HTTP11.names
)
Async::WebSocket::Client.connect(endpoint) do |connection|
input_task = task.async do
while command = $stdin.gets
process_command(command, connection)
$stdout.write "> "
end
end
ping_task = task.async do
loop do
ping(connection)
sleep(20)
end
end
$stdout.write("Connected to MexC WebSocket server.\n")
$stdout.write "> "
# Always subscribe to BTCUSDT ticker
subscribe('BTCUSDT', connection)
while message = connection.read
process_message(message)
end
puts "Exiting..."
ensure
puts "Stopping Tasks..."
begin
input_task&.stop
renew_task&.stop
input_task&.wait
renew_task&.wait
rescue
puts "Ignore Task End exceptions"
end
puts "Done."
end
end
rescue
puts "Exit ignoring exceptions..."
end
end
client = MexCWebSocketClient.new
client.run Run using Ruby 3.4.1 with |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I can't find a way to close a client program using async-websocket without errors.
Here's the error I get when I quit using Ctrl+C :
I implemented a simple REPL with a quit command that send
{ method: "unsubscribe", ... }
messages and I wait for the response before shutting down the WebSocket connection using :connection.shutdown
and I get this error :Then I have to Ctrl-C to end the program which then display the same error as above.
Do you have any tips on how to end a WebSocket client properly / gracefullty ?
Many thanks for this amazing piece of code
-- Pierre
The text was updated successfully, but these errors were encountered: