Skip to content

Commit

Permalink
Added websockets
Browse files Browse the repository at this point in the history
API is now fully wrapped!
  • Loading branch information
jorn committed May 26, 2018
1 parent 7094c8f commit 435657b
Show file tree
Hide file tree
Showing 14 changed files with 358 additions and 7 deletions.
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@
<artifactId>json</artifactId>
<version>20140107</version>
</dependency>
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.glassfish.tyrus.bundles</groupId>
<artifactId>tyrus-standalone-client</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
57 changes: 57 additions & 0 deletions src/main/java/io/yogh/bl3p/api/v1/Bl3pClient.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.yogh.bl3p.api.v1;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;

import io.yogh.bl3p.api.v1.request.ApiCall;
import io.yogh.bl3p.api.v1.request.authenticated.CancelOrderCall;
Expand Down Expand Up @@ -42,6 +45,7 @@
import io.yogh.bl3p.api.v1.response.domain.OrderHistory;
import io.yogh.bl3p.api.v1.response.domain.OrderInfo;
import io.yogh.bl3p.api.v1.response.domain.TickerInfo;
import io.yogh.bl3p.api.v1.response.domain.TradeFeedInfo;
import io.yogh.bl3p.api.v1.response.domain.TradeInfo;
import io.yogh.bl3p.api.v1.response.domain.TransactionHistory;
import io.yogh.bl3p.api.v1.response.exception.Bl3pException;
Expand All @@ -59,6 +63,8 @@
import io.yogh.bl3p.api.v1.response.parser.RetrieveAllTradesResponseParser;
import io.yogh.bl3p.api.v1.response.parser.TickerResponseParser;
import io.yogh.bl3p.api.v1.response.parser.TransactionHistoryParser;
import io.yogh.bl3p.api.v1.response.websocket.OrderBookResponseCallback;
import io.yogh.bl3p.api.v1.response.websocket.TradeFeedResponseCallback;

/**
* Thin client providing all methods encompassing the authenticated API.
Expand All @@ -71,6 +77,12 @@ public final class Bl3pClient {
private Currency defaultCurrency;
private FeeCurrency defaultFeeCurrency;

private final Set<Consumer<OrderBook>> orderbookSubscribers = new HashSet<>();
private WebSocketClient orderbookClient;

private final Set<Consumer<TradeFeedInfo>> tradesSubscribers = new HashSet<>();
private WebSocketClient tradesClient;

public static Bl3pClient create() {
return new Bl3pClient()
.defaultMarket(Market.BTCEUR)
Expand Down Expand Up @@ -743,6 +755,51 @@ public void getLast1000TradesAsync(final Market market, final AsyncCallback<List
.build());
}

public void subscribeOrderbookFeed(final Consumer<OrderBook> consumer) {
final boolean added = orderbookSubscribers.add(consumer);
if (added && orderbookSubscribers.size() == 1) {
startOrderbookFeed();
}
}

public void unsubscribeOrderbookFeed(final Consumer<OrderBook> consumer) {
final boolean removed = orderbookSubscribers.remove(consumer);
if (removed && orderbookSubscribers.isEmpty()) {
stopOrderbookFeed();
}
}

public void startOrderbookFeed() {
orderbookClient = new WebSocketClient(defaultMarket, "orderbook", OrderBookResponseCallback.create(orderbookSubscribers));
}

public void stopOrderbookFeed() {
orderbookClient.terminate();
}

public void subscribeTradesFeed(final Consumer<TradeFeedInfo> consumer) {
final boolean added = tradesSubscribers.add(consumer);
if (added && tradesSubscribers.size() == 1) {
startTradesFeed();
}
}

public void unsubscribeTradesFeed(final Consumer<TradeFeedInfo> consumer) {
final boolean removed = tradesSubscribers.remove(consumer);
if (removed && tradesSubscribers.isEmpty()) {
stopTradesFeed();
}
}

public void startTradesFeed() {
System.out.println("Starting...");
tradesClient = new WebSocketClient(defaultMarket, "trades", TradeFeedResponseCallback.create(tradesSubscribers));
}

public void stopTradesFeed() {
tradesClient.terminate();
}

private <T> T doPublicCall(final Parser<T> parser, final ApiCall call) throws Bl3pException {
return parser.parse(Bl3pClientRequestUtil.doPublicCall(call));
}
Expand Down
47 changes: 47 additions & 0 deletions src/main/java/io/yogh/bl3p/api/v1/WebSocketClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.yogh.bl3p.api.v1;

import java.net.URI;
import java.util.function.Consumer;

import javax.websocket.ContainerProvider;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.yogh.bl3p.api.v1.request.domain.Market;

public class WebSocketClient implements MessageHandler {
private static final Logger LOG = LoggerFactory.getLogger(Bl3pClientRequestUtil.class);

private static final String HOST = "wss://api.bl3p.eu/1/";

private static Object waitLock = new Object();

public WebSocketClient(final Market market, final String channel, final Consumer<String> parser) {
final WebSocketContainer container = ContainerProvider.getWebSocketContainer();

final String url = HOST + market.name() + "/" + channel;

try (final Session session = container.connectToServer(new WebSocketEndPoint(parser), URI.create(url))) {
waitForTerminate();
} catch (final Exception e) {
LOG.error("Exception during websocket session.", e);
// Squelch error.
}
}

private static void waitForTerminate() {
synchronized (waitLock) {
try {
waitLock.wait();
} catch (final InterruptedException e) {}
}
}

public void terminate() {
waitLock.notifyAll();
}
}
40 changes: 40 additions & 0 deletions src/main/java/io/yogh/bl3p/api/v1/WebSocketEndPoint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.yogh.bl3p.api.v1;

import java.util.function.Consumer;

import javax.websocket.ClientEndpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ClientEndpoint
public class WebSocketEndPoint {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketEndPoint.class);

private final Consumer<String> consumer;

public WebSocketEndPoint(final Consumer<String> consumer) {
this.consumer = consumer;

LOG.debug("Created WebSocketEndPoint: {}", this);
}

@OnOpen
public void onOpen(final EndpointConfig conf) {
LOG.debug("Opened WebSocketEndPoint: {}", conf);
}

@OnMessage
public void onMessage(final String message) {
consumer.accept(message);
}

@OnClose
public void onClose(final EndpointConfig conf) {
LOG.debug("Closed WebSocketEndPoint: {}", conf);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.yogh.bl3p.api.v1.response.domain;

public class TradeFeedInfo {
private long timestamp;
private String marketplace;
private int price;
private String type;
private int amount;

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(final long timestamp) {
this.timestamp = timestamp;
}

public String getMarketplace() {
return marketplace;
}

public void setMarketplace(final String marketplace) {
this.marketplace = marketplace;
}

public int getPrice() {
return price;
}

public void setPrice(final int price) {
this.price = price;
}

public String getType() {
return type;
}

public void setType(final String type) {
this.type = type;
}

public int getAmount() {
return amount;
}

public void setAmount(final int amount) {
this.amount = amount;
}

@Override
public String toString() {
return "TradeFeedInfo [timestamp=" + timestamp + ", marketplace=" + marketplace + ", price=" + price + ", type=" + type + ", amount=" + amount
+ "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ public static SimpleOrder parseSimpleOrder(final JSONObject orderJson) {
final SimpleOrder order = new SimpleOrder();
order.setAmount(orderJson.getInt("amount_int"));
order.setPrice(orderJson.getInt("price_int"));
order.setCount(orderJson.getInt("count"));

if (orderJson.has("count")) {
order.setCount(orderJson.getInt("count"));
}

return order;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.yogh.bl3p.api.v1.response.websocket;

import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;

public class FeedForwarder<T> extends SubscriptionForwarder<T> implements Consumer<String> {
private final Function<String, T> parser;

public FeedForwarder(final Set<Consumer<T>> subscribers, final Function<String, T> parser) {
super(subscribers);
this.parser = parser;
}

@Override
public void accept(final String text) {
forward(parser.apply(text));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.yogh.bl3p.api.v1.response.websocket;

import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;

import io.yogh.bl3p.api.v1.response.domain.OrderBook;

public class OrderBookResponseCallback extends FeedForwarder<OrderBook> implements Consumer<String> {
public OrderBookResponseCallback(final Set<Consumer<OrderBook>> subscribers, final Function<String, OrderBook> parser) {
super(subscribers, parser);
}

public static OrderBookResponseCallback create(final Set<Consumer<OrderBook>> subscribers) {
return new OrderBookResponseCallback(subscribers, OrderBookResponseParser.create());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.yogh.bl3p.api.v1.response.websocket;

import java.util.ArrayList;
import java.util.List;

import org.json.JSONArray;
import org.json.JSONObject;

import io.yogh.bl3p.api.v1.response.domain.OrderBook;
import io.yogh.bl3p.api.v1.response.domain.SimpleOrder;
import io.yogh.bl3p.api.v1.response.parser.CommonParser;

public class OrderBookResponseParser implements StringJsonParser<OrderBook> {
@Override
public OrderBook apply(final String text) {
final JSONObject data = new JSONObject(text);

final OrderBook orderBook = new OrderBook();

final List<SimpleOrder> bids = new ArrayList<>();
final JSONArray bidsJson = data.getJSONArray("bids");
for (int i = 0; i < bidsJson.length(); i++) {
bids.add(CommonParser.parseSimpleOrder(bidsJson.getJSONObject(i)));
}

final List<SimpleOrder> asks = new ArrayList<>();
final JSONArray asksJson = data.getJSONArray("asks");
for (int i = 0; i < asksJson.length(); i++) {
asksJson.get(i);
asks.add(CommonParser.parseSimpleOrder(asksJson.getJSONObject(i)));
}

orderBook.setBids(bids);
orderBook.setAsks(asks);

return orderBook;
}

public static OrderBookResponseParser create() {
return new OrderBookResponseParser();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.yogh.bl3p.api.v1.response.websocket;

import java.util.function.Function;

public interface StringJsonParser<T> extends Function<String, T> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.yogh.bl3p.api.v1.response.websocket;

import java.util.Set;
import java.util.function.Consumer;

public class SubscriptionForwarder<T> {
private final Set<Consumer<T>> subscribers;

public SubscriptionForwarder(final Set<Consumer<T>> subscribers) {
this.subscribers = subscribers;
}

protected void forward(final T item) {
subscribers.forEach(v -> v.accept(item));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.yogh.bl3p.api.v1.response.websocket;

import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;

import io.yogh.bl3p.api.v1.response.domain.TradeFeedInfo;

public class TradeFeedResponseCallback extends FeedForwarder<TradeFeedInfo> implements Consumer<String> {
public TradeFeedResponseCallback(final Set<Consumer<TradeFeedInfo>> subscribers, final Function<String, TradeFeedInfo> parser) {
super(subscribers, parser);
}

public static TradeFeedResponseCallback create(final Set<Consumer<TradeFeedInfo>> subscribers) {
return new TradeFeedResponseCallback(subscribers, TradeFeedResponseParser.create());
}
}
Loading

0 comments on commit 435657b

Please sign in to comment.