From 435657b7ebeba91b7ed9d091db3d726bba6e7f47 Mon Sep 17 00:00:00 2001 From: jorn Date: Sun, 27 May 2018 01:22:01 +0200 Subject: [PATCH] Added websockets API is now fully wrapped! --- pom.xml | 11 ++++ .../java/io/yogh/bl3p/api/v1/Bl3pClient.java | 57 +++++++++++++++++++ .../io/yogh/bl3p/api/v1/WebSocketClient.java | 47 +++++++++++++++ .../yogh/bl3p/api/v1/WebSocketEndPoint.java | 40 +++++++++++++ .../api/v1/response/domain/TradeFeedInfo.java | 55 ++++++++++++++++++ .../api/v1/response/parser/CommonParser.java | 6 +- .../v1/response/websocket/FeedForwarder.java | 19 +++++++ .../websocket/OrderBookResponseCallback.java | 17 ++++++ .../websocket/OrderBookResponseParser.java | 42 ++++++++++++++ .../response/websocket/StringJsonParser.java | 5 ++ .../websocket/SubscriptionForwarder.java | 16 ++++++ .../websocket/TradeFeedResponseCallback.java | 17 ++++++ .../websocket/TradeFeedResponseParser.java | 24 ++++++++ .../io/yogh/bl3p/api/v1/Bl3pClientTest.java | 9 +-- 14 files changed, 358 insertions(+), 7 deletions(-) create mode 100644 src/main/java/io/yogh/bl3p/api/v1/WebSocketClient.java create mode 100644 src/main/java/io/yogh/bl3p/api/v1/WebSocketEndPoint.java create mode 100644 src/main/java/io/yogh/bl3p/api/v1/response/domain/TradeFeedInfo.java create mode 100644 src/main/java/io/yogh/bl3p/api/v1/response/websocket/FeedForwarder.java create mode 100644 src/main/java/io/yogh/bl3p/api/v1/response/websocket/OrderBookResponseCallback.java create mode 100644 src/main/java/io/yogh/bl3p/api/v1/response/websocket/OrderBookResponseParser.java create mode 100644 src/main/java/io/yogh/bl3p/api/v1/response/websocket/StringJsonParser.java create mode 100644 src/main/java/io/yogh/bl3p/api/v1/response/websocket/SubscriptionForwarder.java create mode 100644 src/main/java/io/yogh/bl3p/api/v1/response/websocket/TradeFeedResponseCallback.java create mode 100644 src/main/java/io/yogh/bl3p/api/v1/response/websocket/TradeFeedResponseParser.java diff --git a/pom.xml b/pom.xml index d3fcf32..2483141 100755 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,17 @@ json 20140107 + + javax.websocket + javax.websocket-api + 1.1 + provided + + + org.glassfish.tyrus.bundles + tyrus-standalone-client + 1.13.1 + junit junit diff --git a/src/main/java/io/yogh/bl3p/api/v1/Bl3pClient.java b/src/main/java/io/yogh/bl3p/api/v1/Bl3pClient.java index 3594438..e42ee4f 100644 --- a/src/main/java/io/yogh/bl3p/api/v1/Bl3pClient.java +++ b/src/main/java/io/yogh/bl3p/api/v1/Bl3pClient.java @@ -1,6 +1,9 @@ package io.yogh.bl3p.api.v1; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.function.Consumer; import io.yogh.bl3p.api.v1.request.ApiCall; import io.yogh.bl3p.api.v1.request.authenticated.CancelOrderCall; @@ -42,6 +45,7 @@ import io.yogh.bl3p.api.v1.response.domain.OrderHistory; import io.yogh.bl3p.api.v1.response.domain.OrderInfo; import io.yogh.bl3p.api.v1.response.domain.TickerInfo; +import io.yogh.bl3p.api.v1.response.domain.TradeFeedInfo; import io.yogh.bl3p.api.v1.response.domain.TradeInfo; import io.yogh.bl3p.api.v1.response.domain.TransactionHistory; import io.yogh.bl3p.api.v1.response.exception.Bl3pException; @@ -59,6 +63,8 @@ import io.yogh.bl3p.api.v1.response.parser.RetrieveAllTradesResponseParser; import io.yogh.bl3p.api.v1.response.parser.TickerResponseParser; import io.yogh.bl3p.api.v1.response.parser.TransactionHistoryParser; +import io.yogh.bl3p.api.v1.response.websocket.OrderBookResponseCallback; +import io.yogh.bl3p.api.v1.response.websocket.TradeFeedResponseCallback; /** * Thin client providing all methods encompassing the authenticated API. @@ -71,6 +77,12 @@ public final class Bl3pClient { private Currency defaultCurrency; private FeeCurrency defaultFeeCurrency; + private final Set> orderbookSubscribers = new HashSet<>(); + private WebSocketClient orderbookClient; + + private final Set> tradesSubscribers = new HashSet<>(); + private WebSocketClient tradesClient; + public static Bl3pClient create() { return new Bl3pClient() .defaultMarket(Market.BTCEUR) @@ -743,6 +755,51 @@ public void getLast1000TradesAsync(final Market market, final AsyncCallback consumer) { + final boolean added = orderbookSubscribers.add(consumer); + if (added && orderbookSubscribers.size() == 1) { + startOrderbookFeed(); + } + } + + public void unsubscribeOrderbookFeed(final Consumer consumer) { + final boolean removed = orderbookSubscribers.remove(consumer); + if (removed && orderbookSubscribers.isEmpty()) { + stopOrderbookFeed(); + } + } + + public void startOrderbookFeed() { + orderbookClient = new WebSocketClient(defaultMarket, "orderbook", OrderBookResponseCallback.create(orderbookSubscribers)); + } + + public void stopOrderbookFeed() { + orderbookClient.terminate(); + } + + public void subscribeTradesFeed(final Consumer consumer) { + final boolean added = tradesSubscribers.add(consumer); + if (added && tradesSubscribers.size() == 1) { + startTradesFeed(); + } + } + + public void unsubscribeTradesFeed(final Consumer consumer) { + final boolean removed = tradesSubscribers.remove(consumer); + if (removed && tradesSubscribers.isEmpty()) { + stopTradesFeed(); + } + } + + public void startTradesFeed() { + System.out.println("Starting..."); + tradesClient = new WebSocketClient(defaultMarket, "trades", TradeFeedResponseCallback.create(tradesSubscribers)); + } + + public void stopTradesFeed() { + tradesClient.terminate(); + } + private T doPublicCall(final Parser parser, final ApiCall call) throws Bl3pException { return parser.parse(Bl3pClientRequestUtil.doPublicCall(call)); } diff --git a/src/main/java/io/yogh/bl3p/api/v1/WebSocketClient.java b/src/main/java/io/yogh/bl3p/api/v1/WebSocketClient.java new file mode 100644 index 0000000..cd527f0 --- /dev/null +++ b/src/main/java/io/yogh/bl3p/api/v1/WebSocketClient.java @@ -0,0 +1,47 @@ +package io.yogh.bl3p.api.v1; + +import java.net.URI; +import java.util.function.Consumer; + +import javax.websocket.ContainerProvider; +import javax.websocket.MessageHandler; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.yogh.bl3p.api.v1.request.domain.Market; + +public class WebSocketClient implements MessageHandler { + private static final Logger LOG = LoggerFactory.getLogger(Bl3pClientRequestUtil.class); + + private static final String HOST = "wss://api.bl3p.eu/1/"; + + private static Object waitLock = new Object(); + + public WebSocketClient(final Market market, final String channel, final Consumer parser) { + final WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + + final String url = HOST + market.name() + "/" + channel; + + try (final Session session = container.connectToServer(new WebSocketEndPoint(parser), URI.create(url))) { + waitForTerminate(); + } catch (final Exception e) { + LOG.error("Exception during websocket session.", e); + // Squelch error. + } + } + + private static void waitForTerminate() { + synchronized (waitLock) { + try { + waitLock.wait(); + } catch (final InterruptedException e) {} + } + } + + public void terminate() { + waitLock.notifyAll(); + } +} diff --git a/src/main/java/io/yogh/bl3p/api/v1/WebSocketEndPoint.java b/src/main/java/io/yogh/bl3p/api/v1/WebSocketEndPoint.java new file mode 100644 index 0000000..26a6367 --- /dev/null +++ b/src/main/java/io/yogh/bl3p/api/v1/WebSocketEndPoint.java @@ -0,0 +1,40 @@ +package io.yogh.bl3p.api.v1; + +import java.util.function.Consumer; + +import javax.websocket.ClientEndpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.OnClose; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ClientEndpoint +public class WebSocketEndPoint { + private static final Logger LOG = LoggerFactory.getLogger(WebSocketEndPoint.class); + + private final Consumer consumer; + + public WebSocketEndPoint(final Consumer consumer) { + this.consumer = consumer; + + LOG.debug("Created WebSocketEndPoint: {}", this); + } + + @OnOpen + public void onOpen(final EndpointConfig conf) { + LOG.debug("Opened WebSocketEndPoint: {}", conf); + } + + @OnMessage + public void onMessage(final String message) { + consumer.accept(message); + } + + @OnClose + public void onClose(final EndpointConfig conf) { + LOG.debug("Closed WebSocketEndPoint: {}", conf); + } +} diff --git a/src/main/java/io/yogh/bl3p/api/v1/response/domain/TradeFeedInfo.java b/src/main/java/io/yogh/bl3p/api/v1/response/domain/TradeFeedInfo.java new file mode 100644 index 0000000..80438b9 --- /dev/null +++ b/src/main/java/io/yogh/bl3p/api/v1/response/domain/TradeFeedInfo.java @@ -0,0 +1,55 @@ +package io.yogh.bl3p.api.v1.response.domain; + +public class TradeFeedInfo { + private long timestamp; + private String marketplace; + private int price; + private String type; + private int amount; + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(final long timestamp) { + this.timestamp = timestamp; + } + + public String getMarketplace() { + return marketplace; + } + + public void setMarketplace(final String marketplace) { + this.marketplace = marketplace; + } + + public int getPrice() { + return price; + } + + public void setPrice(final int price) { + this.price = price; + } + + public String getType() { + return type; + } + + public void setType(final String type) { + this.type = type; + } + + public int getAmount() { + return amount; + } + + public void setAmount(final int amount) { + this.amount = amount; + } + + @Override + public String toString() { + return "TradeFeedInfo [timestamp=" + timestamp + ", marketplace=" + marketplace + ", price=" + price + ", type=" + type + ", amount=" + amount + + "]"; + } +} diff --git a/src/main/java/io/yogh/bl3p/api/v1/response/parser/CommonParser.java b/src/main/java/io/yogh/bl3p/api/v1/response/parser/CommonParser.java index 4ebe022..d3b0023 100644 --- a/src/main/java/io/yogh/bl3p/api/v1/response/parser/CommonParser.java +++ b/src/main/java/io/yogh/bl3p/api/v1/response/parser/CommonParser.java @@ -67,7 +67,11 @@ public static SimpleOrder parseSimpleOrder(final JSONObject orderJson) { final SimpleOrder order = new SimpleOrder(); order.setAmount(orderJson.getInt("amount_int")); order.setPrice(orderJson.getInt("price_int")); - order.setCount(orderJson.getInt("count")); + + if (orderJson.has("count")) { + order.setCount(orderJson.getInt("count")); + } + return order; } } diff --git a/src/main/java/io/yogh/bl3p/api/v1/response/websocket/FeedForwarder.java b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/FeedForwarder.java new file mode 100644 index 0000000..e50ebb2 --- /dev/null +++ b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/FeedForwarder.java @@ -0,0 +1,19 @@ +package io.yogh.bl3p.api.v1.response.websocket; + +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; + +public class FeedForwarder extends SubscriptionForwarder implements Consumer { + private final Function parser; + + public FeedForwarder(final Set> subscribers, final Function parser) { + super(subscribers); + this.parser = parser; + } + + @Override + public void accept(final String text) { + forward(parser.apply(text)); + } +} diff --git a/src/main/java/io/yogh/bl3p/api/v1/response/websocket/OrderBookResponseCallback.java b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/OrderBookResponseCallback.java new file mode 100644 index 0000000..cff166d --- /dev/null +++ b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/OrderBookResponseCallback.java @@ -0,0 +1,17 @@ +package io.yogh.bl3p.api.v1.response.websocket; + +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.yogh.bl3p.api.v1.response.domain.OrderBook; + +public class OrderBookResponseCallback extends FeedForwarder implements Consumer { + public OrderBookResponseCallback(final Set> subscribers, final Function parser) { + super(subscribers, parser); + } + + public static OrderBookResponseCallback create(final Set> subscribers) { + return new OrderBookResponseCallback(subscribers, OrderBookResponseParser.create()); + } +} diff --git a/src/main/java/io/yogh/bl3p/api/v1/response/websocket/OrderBookResponseParser.java b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/OrderBookResponseParser.java new file mode 100644 index 0000000..93fbb7c --- /dev/null +++ b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/OrderBookResponseParser.java @@ -0,0 +1,42 @@ +package io.yogh.bl3p.api.v1.response.websocket; + +import java.util.ArrayList; +import java.util.List; + +import org.json.JSONArray; +import org.json.JSONObject; + +import io.yogh.bl3p.api.v1.response.domain.OrderBook; +import io.yogh.bl3p.api.v1.response.domain.SimpleOrder; +import io.yogh.bl3p.api.v1.response.parser.CommonParser; + +public class OrderBookResponseParser implements StringJsonParser { + @Override + public OrderBook apply(final String text) { + final JSONObject data = new JSONObject(text); + + final OrderBook orderBook = new OrderBook(); + + final List bids = new ArrayList<>(); + final JSONArray bidsJson = data.getJSONArray("bids"); + for (int i = 0; i < bidsJson.length(); i++) { + bids.add(CommonParser.parseSimpleOrder(bidsJson.getJSONObject(i))); + } + + final List asks = new ArrayList<>(); + final JSONArray asksJson = data.getJSONArray("asks"); + for (int i = 0; i < asksJson.length(); i++) { + asksJson.get(i); + asks.add(CommonParser.parseSimpleOrder(asksJson.getJSONObject(i))); + } + + orderBook.setBids(bids); + orderBook.setAsks(asks); + + return orderBook; + } + + public static OrderBookResponseParser create() { + return new OrderBookResponseParser(); + } +} diff --git a/src/main/java/io/yogh/bl3p/api/v1/response/websocket/StringJsonParser.java b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/StringJsonParser.java new file mode 100644 index 0000000..7a68092 --- /dev/null +++ b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/StringJsonParser.java @@ -0,0 +1,5 @@ +package io.yogh.bl3p.api.v1.response.websocket; + +import java.util.function.Function; + +public interface StringJsonParser extends Function {} diff --git a/src/main/java/io/yogh/bl3p/api/v1/response/websocket/SubscriptionForwarder.java b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/SubscriptionForwarder.java new file mode 100644 index 0000000..0994cde --- /dev/null +++ b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/SubscriptionForwarder.java @@ -0,0 +1,16 @@ +package io.yogh.bl3p.api.v1.response.websocket; + +import java.util.Set; +import java.util.function.Consumer; + +public class SubscriptionForwarder { + private final Set> subscribers; + + public SubscriptionForwarder(final Set> subscribers) { + this.subscribers = subscribers; + } + + protected void forward(final T item) { + subscribers.forEach(v -> v.accept(item)); + } +} diff --git a/src/main/java/io/yogh/bl3p/api/v1/response/websocket/TradeFeedResponseCallback.java b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/TradeFeedResponseCallback.java new file mode 100644 index 0000000..63294e2 --- /dev/null +++ b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/TradeFeedResponseCallback.java @@ -0,0 +1,17 @@ +package io.yogh.bl3p.api.v1.response.websocket; + +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.yogh.bl3p.api.v1.response.domain.TradeFeedInfo; + +public class TradeFeedResponseCallback extends FeedForwarder implements Consumer { + public TradeFeedResponseCallback(final Set> subscribers, final Function parser) { + super(subscribers, parser); + } + + public static TradeFeedResponseCallback create(final Set> subscribers) { + return new TradeFeedResponseCallback(subscribers, TradeFeedResponseParser.create()); + } +} diff --git a/src/main/java/io/yogh/bl3p/api/v1/response/websocket/TradeFeedResponseParser.java b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/TradeFeedResponseParser.java new file mode 100644 index 0000000..c897008 --- /dev/null +++ b/src/main/java/io/yogh/bl3p/api/v1/response/websocket/TradeFeedResponseParser.java @@ -0,0 +1,24 @@ +package io.yogh.bl3p.api.v1.response.websocket; + +import org.json.JSONObject; + +import io.yogh.bl3p.api.v1.response.domain.TradeFeedInfo; + +public class TradeFeedResponseParser implements StringJsonParser { + @Override + public TradeFeedInfo apply(final String text) { + final JSONObject data = new JSONObject(text); + final TradeFeedInfo trade = new TradeFeedInfo(); + + trade.setTimestamp(data.getLong("date")); + trade.setMarketplace(data.getString("marketplace")); + trade.setPrice(data.getInt("price_int")); + trade.setAmount(data.getInt("amount_int")); + + return trade; + } + + public static TradeFeedResponseParser create() { + return new TradeFeedResponseParser(); + } +} diff --git a/src/test/java/io/yogh/bl3p/api/v1/Bl3pClientTest.java b/src/test/java/io/yogh/bl3p/api/v1/Bl3pClientTest.java index a1a315b..3dd9ec9 100644 --- a/src/test/java/io/yogh/bl3p/api/v1/Bl3pClientTest.java +++ b/src/test/java/io/yogh/bl3p/api/v1/Bl3pClientTest.java @@ -5,12 +5,9 @@ public class Bl3pClientTest { public static void main(final String[] args) throws Bl3pException { - final Bl3pClient client = Bl3pClient.create(); + final Bl3pClient client = Bl3pClient.create() + .defaultMarket(Market.BTCEUR); - client.getTickerAsync(Market.BTCEUR, System.out::println); - - client.getOrderBookAsync(Market.BTCEUR, System.out::println); - - client.getLast1000TradesAsync(Market.BTCEUR, System.out::println); + client.subscribeTradesFeed(System.out::println); } }