Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't stop client without raising an error #82

Open
zedalaye opened this issue Jan 27, 2025 · 1 comment
Open

Can't stop client without raising an error #82

zedalaye opened this issue Jan 27, 2025 · 1 comment

Comments

@zedalaye
Copy link

zedalaye commented Jan 27, 2025

I can't find a way to close a client program using async-websocket without errors.

Here's the error I get when I quit using Ctrl+C :

/home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:386:in 'IO::Event::Selector::EPoll#select': Interrupt
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:386:in 'Async::Scheduler#run_once!'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:425:in 'Async::Scheduler#run_once'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:498:in 'block in Async::Scheduler#run'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:461:in 'block in Async::Scheduler#run_loop'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:458:in 'Thread.handle_interrupt'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:458:in 'Async::Scheduler#run_loop'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:497:in 'Async::Scheduler#run'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/kernel/async.rb:34:in 'Kernel#Async'
        from lib/listener.rb:224:in 'MexCWebSocketClient#run'
        from lib/listener.rb:282:in '<main>'

I implemented a simple REPL with a quit command that send { method: "unsubscribe", ... } messages and I wait for the response before shutting down the WebSocket connection using : connection.shutdown and I get this error :

 4.04s     warn: Async::Task [oid=0x1ce0] [ec=0x1ce8] [pid=423165] [2025-01-28 00:03:03 +0100]
               | Task may have ended with unhandled exception.
               |   FrozenError: can't modify frozen IO::Stream::StringBuffer: "\x88\x02\x03\xE8"
               |   → /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/3.4.0/openssl/buffering.rb:217 in 'OpenSSL::SSL::SSLSocket#sysread_nonblock'
               |     /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/3.4.0/openssl/buffering.rb:217 in 'OpenSSL::Buffering#read_nonblock'

Then I have to Ctrl-C to end the program which then display the same error as above.

Do you have any tips on how to end a WebSocket client properly / gracefullty ?

Many thanks for this amazing piece of code

-- Pierre

@zedalaye
Copy link
Author

This how it looks like :

# require_relative '../config/environment'

# Time.current and other time manipulation methods
require 'active_support/all' 

require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'

class MexCWebSocketClient
  def initialize
    @ticker_updates = {}
    @log_messages = []
    @last_pong = nil
    @subscriptions = {} # symbol: status (:subscribing -> :subscribed -> :unsubscribing -> :unsubscribed)
  end

  def log(message)
    puts message unless $stdout.tty?
    @log_messages << message
  end

  def ping(connection)
    log("Ping...")
    connection.write(Protocol::WebSocket::TextMessage.generate({
      method: "PING", 
      params: []
    }))    
  end

  def subscribe(symbol, connection)
    state = @subscriptions.fetch(symbol, :unsubscribed)
    if state == :unsubscribed
      @subscriptions[symbol] = :subscribing

      log("Subscribe to #{symbol} updates")
      connection.write(Protocol::WebSocket::TextMessage.generate({
        method: "SUBSCRIPTION", 
        params: [ "[email protected]@#{symbol}@UTC+1" ]
      }))
      connection.flush
    else
      log("Can't subscribe to #{symbol} updates (state=#{state})")
    end
  end

  def unsubscribe(symbol, connection)
    state = @subscriptions.fetch(symbol, :unsubscribed)
    if state == :subscribed
      @subscriptions[symbol] = :unsubscribing

      log("Unsubscribe from #{symbol} updates")
      connection.write(Protocol::WebSocket::TextMessage.generate({
        method: "UNSUBSCRIPTION", 
        params: [ "[email protected]@#{symbol}@UTC+1" ]
      }))  
      connection.flush
    else
      log("Can't unsubscribe from #{symbol} updates (state=#{state})")
    end      
  end

  def display_status(connection)
    messages = [@log_messages.count, 10].min
    (@ticker_updates.keys.length + 4 + messages).times { print "\e[A\e[K" } if $stdout.tty?
    puts "MEXC Public Listener Realtime Status"
    puts "Last PingPong\t#{@last_pong}"
    puts "Last Ticker Updates"
    @ticker_updates.each do |symbol, data|
      puts "  #{'%-12s' % symbol}#{'%16s' % data[:p]} (#{data[:tr]})"
    end
    puts "Last Log Messages"
    @log_messages.last(messages).each do |msg|
      puts "  #{msg}"
    end    
  end

  def process_command(command, connection)
    log "Process Command #{command}"

    case command.chomp
    when /\Aquit\z/
      puts("Remove current subscriptions...")
      @subscriptions.each do |symbol, state| 
        unsubscribe(symbol, connection) if state == :subscribed
      end
      sleep(0.1) while @ticker_updates.count > 0
      puts("Shutting down connection...")
      connection.shutdown

    when /\Astatus\z/
      display_status(connection)
    
    when /\Asubscribe (\w+)\z/
      symbol = $1
      # if Pair.find_by(symbol: symbol).present?        
        subscribe(symbol, connection)
      # else
      #   log("subscribe error: #{symbol} does not exist") 
      # end

    when /\Aunsubscribe (\w+)\z/
      symbol = $1
      # if Pair.find_by(symbol: symbol).present?
        unsubscribe(symbol, connection)
      # else
      #   log("unsubscribe error: #{symbol} does not exist") 
      # end
    end
  end

  def process_message(message)
    m = message.parse # decode message as JSON

    if (s = m[:s]) && (c = m[:c]) && c.start_with?("[email protected]") && (d = m[:d])
      @ticker_updates[s] = d
    elsif m[:msg]
      if m[:msg] == "PONG"
        @last_pong = Time.current
      else
        symbols = m[:msg].scan(/@([A-Z]+)@UTC/).flatten.uniq
        symbols.each do |symbol|
          if @subscriptions.has_key?(symbol)
            case @subscriptions[symbol]
            when :subscribing
              @subscriptions[symbol] = :subscribed
            when :unsubscribing
              @ticker_updates.delete(symbol)
              @subscriptions[symbol] = :unsubscribed
            end
          end
        end
      end
    else
      log(m.to_h)
    end
  end

  def run
    Async do |task|
      endpoint = Async::HTTP::Endpoint.parse("wss://wbs.mexc.com/ws",
        alpn_protocols: Async::HTTP::Protocol::HTTP11.names
      )

      Async::WebSocket::Client.connect(endpoint) do |connection|
        input_task = task.async do
          while command = $stdin.gets
            process_command(command, connection)
            $stdout.write "> "
          end
        end

        ping_task = task.async do
          loop do
            ping(connection)
            sleep(20)
          end
        end

        $stdout.write("Connected to MexC WebSocket server.\n")
        $stdout.write "> "

        # Always subscribe to BTCUSDT ticker
        subscribe('BTCUSDT', connection)

        while message = connection.read
          process_message(message)
        end

        puts "Exiting..."
      ensure
        puts "Stopping Tasks..."
        begin  
          input_task&.stop
          renew_task&.stop
          input_task&.wait
          renew_task&.wait
        rescue
          puts "Ignore Task End exceptions"
        end  
        puts "Done."
      end
    end
  rescue
    puts "Exit ignoring exceptions..."
  end
end

client = MexCWebSocketClient.new
client.run

Run using Ruby 3.4.1 with async, async-websocket and rails installed (or at least active-support)
On prompt > try commands : status, subscribe ETHUSDT, unsubscribe BTCUSDT and quit or Ctrl+C

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant