Skip to content

Commit

Permalink
Merge pull request #21 from michaelgrosner/multicurrency-v2
Browse files Browse the repository at this point in the history
Multicurrency Persistence
  • Loading branch information
michaelgrosner committed Aug 25, 2015
2 parents b2dd982 + 617fd54 commit 8658f27
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 100 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ENV EXCHANGE null
ENV TradedPair BTC/USD
ENV WebClientUsername NULL
ENV WebClientPassword NULL
ENV WebClientListenPort 3000
# IP to access mongo instance. If you are on a mac, run `boot2docker ip` and replace `tribeca-mongo`.
ENV MongoDbUrl mongodb://tribeca-mongo:27017/tribeca

Expand Down
1 change: 1 addition & 0 deletions sample-dev-tribeca.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"MongoDbUrl": "mongodb://localhost:27017/tribeca",
"WebClientUsername": "NULL",
"WebClientPassword": "NULL",
"WebClientListenPort": "3000",

"HitBtcPullUrl": "http://demo-api.hitbtc.com",
"HitBtcOrderEntryUrl": "ws://demo-api.hitbtc.com:8080",
Expand Down
1 change: 1 addition & 0 deletions sample-prod-tribeca.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"MongoDbUrl": "mongodb://localhost:27017/tribeca",
"WebClientUsername": "NULL",
"WebClientPassword": "NULL",
"WebClientListenPort": "3000",

"HitBtcPullUrl": "http://api.hitbtc.com",
"HitBtcOrderEntryUrl": "wss://api.hitbtc.com:8080",
Expand Down
2 changes: 1 addition & 1 deletion src/common/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ export class PositionReport {
public value: number,
public quoteValue: number,
public pair: CurrencyPair,
public exch: Exchange,
public exchange: Exchange,
public time: moment.Moment) {}
}

Expand Down
2 changes: 1 addition & 1 deletion src/service/backtest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ export class BacktestParameters {
id: string;
}

export class BacktestPersister<T> implements Persister.ILoadAllByExchangeAndPair<T>, Persister.ILoadLatest<T> {
export class BacktestPersister<T> implements Persister.ILoadAll<T>, Persister.ILoadLatest<T> {
public load = (exchange: Models.Exchange, pair: Models.CurrencyPair, limit: number = null): Q.Promise<T[]> => {
return this.loadAll(limit);
};
Expand Down
6 changes: 0 additions & 6 deletions src/service/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,6 @@ export class OrderBroker implements Interfaces.IOrderBroker {
}
}

export class PositionPersister extends Persister.Persister<Models.PositionReport> {
constructor(db) {
super(db, "pos", Persister.timeLoader, Persister.timeSaver);
}
}

export class PositionBroker implements Interfaces.IPositionBroker {
private _log : Utils.Logger;

Expand Down
30 changes: 15 additions & 15 deletions src/service/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ var backTestSimulationSetup = (inputData : Array<Models.Market | Models.MarketTr

var getReceiver = <T>(topic: string) : Messaging.IReceive<T> => new Messaging.NullReceiver<T>();

var getPersister = <T>(collectionName: string) : Persister.ILoadAllByExchangeAndPair<T> => new Backtest.BacktestPersister<T>();
var getMarketTradePersister = () => getPersister<Models.MarketTrade>("mt");
var getPersister = <T>(collectionName: string) : Persister.ILoadAll<T> => new Backtest.BacktestPersister<T>();

var getRepository = <T>(defValue: T, collectionName: string) : Persister.ILoadLatest<T> => new Backtest.BacktestPersister<T>([defValue]);

Expand All @@ -104,7 +103,6 @@ var backTestSimulationSetup = (inputData : Array<Models.Market | Models.MarketTr
getExch: getExch,
getReceiver: getReceiver,
getPersister: getPersister,
getMarketTradePersister: getMarketTradePersister,
getRepository: getRepository,
getPublisher: getPublisher
};
Expand All @@ -128,7 +126,7 @@ var liveTradingSetup = () => {

app.use(compression());
app.use(express.static(path.join(__dirname, "admin")));
http_server.listen(3000, () => mainLog('Listening to admins on *:3000...'));
http_server.listen(config.GetNumber("WebClientListenPort"), () => mainLog('Listening to admins on *:3000...'));

var getExchange = (): Models.Exchange => {
var ex = config.GetString("EXCHANGE").toLowerCase();
Expand Down Expand Up @@ -167,12 +165,16 @@ var liveTradingSetup = () => {

var db = Persister.loadDb(config);

var getPersister = <T>(collectionName: string) : Persister.ILoadAllByExchangeAndPair<T> =>
new Persister.BasicPersister<T>(db, collectionName);
var getMarketTradePersister = () : Persister.ILoadAllByExchangeAndPair<Models.MarketTrade> => new MarketTrades.MarketTradePersister(db);
var loaderSaver = new Persister.LoaderSaver(exchange, pair);
var mtLoaderSaver = new MarketTrades.MarketTradesLoaderSaver(loaderSaver);

var getPersister = <T>(collectionName: string) : Persister.ILoadAll<T> => {
var ls = collectionName === "mt" ? mtLoaderSaver : loaderSaver;
return new Persister.Persister<T>(db, collectionName, exchange, pair, ls.loader, ls.saver);
};

var getRepository = <T>(defValue: T, collectionName: string) : Persister.ILoadLatest<T> =>
new Persister.RepositoryPersister<T>(db, defValue, collectionName);
new Persister.RepositoryPersister<T>(db, defValue, collectionName, exchange, pair, loaderSaver.loader, loaderSaver.saver);

var classes : SimulationClasses = {
exchange: exchange,
Expand All @@ -182,7 +184,6 @@ var liveTradingSetup = () => {
getExch: getExch,
getReceiver: getReceiver,
getPersister: getPersister,
getMarketTradePersister: getMarketTradePersister,
getRepository: getRepository,
getPublisher: getPublisher
};
Expand All @@ -196,8 +197,7 @@ interface SimulationClasses {
timeProvider: Utils.ITimeProvider;
getExch(orderCache: Broker.OrderStateCache): Interfaces.CombinedGateway;
getReceiver<T>(topic: string) : Messaging.IReceive<T>;
getPersister<T>(collectionName: string) : Persister.ILoadAllByExchangeAndPair<T>;
getMarketTradePersister() : Persister.ILoadAllByExchangeAndPair<Models.MarketTrade>;
getPersister<T>(collectionName: string) : Persister.ILoadAll<T>;
getRepository<T>(defValue: T, collectionName: string) : Persister.ILoadLatest<T>;
getPublisher<T>(topic: string, persister?: Persister.ILoadAll<T>): Messaging.IPublish<T>;
}
Expand All @@ -207,7 +207,7 @@ var runTradingSystem = (classes: SimulationClasses) : Q.Promise<boolean> => {
var orderPersister = getPersister("osr");
var tradesPersister = getPersister("trades");
var fairValuePersister = getPersister("fv");
var mktTradePersister = classes.getMarketTradePersister();
var mktTradePersister = getPersister("mt");
var positionPersister = getPersister("pos");
var messagesPersister = getPersister("msg");
var rfvPersister = getPersister("rfv");
Expand All @@ -222,9 +222,9 @@ var runTradingSystem = (classes: SimulationClasses) : Q.Promise<boolean> => {
var completedSuccessfully = Q.defer<boolean>();

Q.all<any>([
orderPersister.load(exchange, pair, 25000),
tradesPersister.load(exchange, pair, 10000),
mktTradePersister.load(exchange, pair, 100),
orderPersister.loadAll(25000),
tradesPersister.loadAll(10000),
mktTradePersister.loadAll(100),
messagesPersister.loadAll(50),
paramsPersister.loadLatest(),
activePersister.loadLatest(),
Expand Down
48 changes: 15 additions & 33 deletions src/service/markettrades.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,22 @@ import Broker = require("./broker");
import mongodb = require('mongodb');
import Web = require("./web");

var loader = (d: Models.ExchangePairMessage<Models.MarketTrade>) => {
if (d instanceof Models.MarketTrade) {
P.timeLoader(d);
return;
export class MarketTradesLoaderSaver {
public loader = (x : Models.MarketTrade) => {
this._wrapped.loader(x);

if (typeof x.quote !== "undefined")
this._wrapped.loader(x.quote);
}

if (d.data === null) return;
P.timeLoader(d.data);

if (d.data.quote === null) return;
P.timeLoader(d.data.quote);
};

var saver = (d: Models.ExchangePairMessage<Models.MarketTrade>) => {
if (d instanceof Models.MarketTrade) {
P.timeSaver(d);
return;
public saver = (x : Models.MarketTrade) => {
this._wrapped.saver(x);

if (typeof x.quote !== "undefined")
this._wrapped.saver(x.quote);
}

if (d.data === null) return;
P.timeSaver(d.data);

if (d.data.quote === null) return;
P.timeSaver(d.data.quote);
};

export class MarketTradePersister extends P.Persister<Models.MarketTrade> {
constructor(db: Q.Promise<mongodb.Db>) {
super(db, "mt", P.timeLoader, P.timeSaver);
}
constructor(private _wrapped: P.LoaderSaver) {}
}

export class MarketTradeBroker implements Interfaces.IMarketTradeBroker {
Expand Down Expand Up @@ -88,13 +74,9 @@ export class MarketTradeBroker implements Interfaces.IMarketTradeBroker {
private _quoteEngine: Agent.QuotingEngine,
private _base: Broker.ExchangeBroker,
private _persister: P.IPersist<Models.MarketTrade>,
initMkTrades: Array<Models.ExchangePairMessage<Models.MarketTrade> | Models.MarketTrade>) {
initMkTrades.forEach(t => {
if (t instanceof Models.MarketTrade)
this.marketTrades.push(t);
else
this.marketTrades.push((<any>t).data);
});
initMkTrades: Array<Models.MarketTrade>) {

initMkTrades.forEach(t => this.marketTrades.push(t));
this._log("loaded %d market trades", this.marketTrades.length);

_marketTradePublisher.registerSnapshot(() => _.last(this.marketTrades, 50));
Expand Down
83 changes: 45 additions & 38 deletions src/service/persister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import moment = require('moment');
import Interfaces = require("./interfaces");
import Config = require("./config");

export function loadDb(config : Config.IConfigProvider) {
export function loadDb(config: Config.IConfigProvider) {
var deferred = Q.defer<mongodb.Db>();
mongodb.MongoClient.connect(config.GetString("MongoDbUrl"), (err, db) => {
if (err) deferred.reject(err);
Expand All @@ -22,14 +22,32 @@ export function loadDb(config : Config.IConfigProvider) {
return deferred.promise;
}

export function timeLoader(x) {
if (typeof x.time !== "undefined")
x.time = moment.isMoment(x.time) ? x.time : moment(x.time);
export interface Persistable {
time?: moment.Moment|Date;
pair?: Models.CurrencyPair;
exchange?: Models.Exchange;
}

export function timeSaver(x) {
if (typeof x.time !== "undefined")
x.time = (moment.isMoment(x.time) ? x.time : moment(x.time)).toDate();
export class LoaderSaver {
public loader = (x: Persistable) => {
if (typeof x.time !== "undefined")
x.time = moment.isMoment(x.time) ? x.time : moment(x.time);
if (typeof x.exchange === "undefined")
x.exchange = this._exchange;
if (typeof x.pair === "undefined")
x.pair = this._pair;
}

public saver = (x: Persistable) => {
if (typeof x.time !== "undefined")
x.time = (moment.isMoment(x.time) ? <moment.Moment>x.time : moment(x.time)).toDate();
if (typeof x.exchange === "undefined")
x.exchange = this._exchange;
if (typeof x.pair === "undefined")
x.pair = this._pair;
}

constructor(private _exchange: Models.Exchange, private _pair: Models.CurrencyPair) { }
}

export interface IPersist<T> {
Expand All @@ -44,18 +62,15 @@ export interface ILoadAll<T> extends IPersist<T> {
loadAll(limit?: number, start_time?: moment.Moment): Q.Promise<T[]>;
}

export interface ILoadAllByExchangeAndPair<T> extends ILoadAll<T> {
load(exchange: Models.Exchange, pair: Models.CurrencyPair, limit: number): Q.Promise<T[]>;
}

export class RepositoryPersister<T> implements ILoadLatest<T> {
export class RepositoryPersister<T extends Persistable> implements ILoadLatest<T> {
_log: Utils.Logger = Utils.log("tribeca:exchangebroker:repopersister");

public loadLatest = (): Q.Promise<T> => {
var deferred = Q.defer<T>();

this.collection.then(coll => {
coll.find({}, { fields: { _id: 0 } }).limit(1).sort({ $natural: -1 }).toArray((err, arr) => {
var selector = { exchange: this._exchange, pair: this._pair };
coll.find(selector, { fields: { _id: 0 } }).limit(1).sort({ $natural: -1 }).toArray((err, arr) => {
if (err) {
deferred.reject(err);
}
Expand All @@ -64,8 +79,7 @@ export class RepositoryPersister<T> implements ILoadLatest<T> {
}
else {
var v = <T>_.defaults(arr[0], this._defaultParameter);
if (v.hasOwnProperty("time"))
timeLoader(v);
this._loader(v);
deferred.resolve(v);
}
});
Expand All @@ -75,11 +89,9 @@ export class RepositoryPersister<T> implements ILoadLatest<T> {
};

public persist = (report: T) => {
this._saver(report);
this.collection.then(coll => {
var v = report;
if (v.hasOwnProperty("time"))
timeSaver(v);
coll.insert(v, err => {
coll.insert(report, err => {
if (err)
this._log("Unable to insert into %s %s; %o", this._dbName, report, err);
});
Expand All @@ -90,25 +102,24 @@ export class RepositoryPersister<T> implements ILoadLatest<T> {
constructor(
db: Q.Promise<mongodb.Db>,
private _defaultParameter: T,
private _dbName: string) {
private _dbName: string,
private _exchange: Models.Exchange,
private _pair: Models.CurrencyPair,
private _loader: (p: Persistable) => void,
private _saver: (p: Persistable) => void) {
this.collection = db.then(db => db.collection(this._dbName));
}
}

export class Persister<T> implements ILoadAllByExchangeAndPair<T> {
export class Persister<T extends Persistable> implements ILoadAll<T> {
_log: Utils.Logger = Utils.log("tribeca:exchangebroker:persister");

public load = (exchange: Models.Exchange, pair: Models.CurrencyPair, limit: number = null): Q.Promise<T[]> => {
var selector = { exchange: exchange, pair: pair };
return this.loadInternal(selector, limit);
};

public loadAll = (limit?: number, start_time?: moment.Moment): Q.Promise<T[]> => {
var selector = {};
var selector = { exchange: this._exchange, pair: this._pair };
if (start_time) {
selector["time"] = {$gte: start_time.toDate()};
selector["time"] = { $gte: start_time.toDate() };
}

return this.loadInternal(selector, limit);
};

Expand All @@ -123,7 +134,7 @@ export class Persister<T> implements ILoadAllByExchangeAndPair<T> {
var options: any = { fields: { _id: 0 } };
if (limit !== null) {
options.limit = limit;
if (count !== 0)
if (count !== 0)
options.skip = Math.max(count - limit, 0);
}

Expand Down Expand Up @@ -161,14 +172,10 @@ export class Persister<T> implements ILoadAllByExchangeAndPair<T> {
constructor(
db: Q.Promise<mongodb.Db>,
private _dbName: string,
private _loader: (any) => void,
private _saver: (T) => void) {
private _exchange: Models.Exchange,
private _pair: Models.CurrencyPair,
private _loader: (p: Persistable) => void,
private _saver: (p: Persistable) => void) {
this.collection = db.then(db => db.collection(this._dbName));
}
}

export class BasicPersister<T> extends Persister<T> {
constructor(db: Q.Promise<mongodb.Db>, collectionName: string) {
super(db, collectionName, timeLoader, timeSaver);
}
}
6 changes: 0 additions & 6 deletions src/service/position-management.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ import moment = require("moment");
import Interfaces = require("./interfaces");
import QuotingParameters = require("./quoting-parameters");

export class RegularFairValuePersister extends Persister.Persister<Models.RegularFairValue> {
constructor(db: Q.Promise<mongodb.Db>) {
super(db, "rfv", Persister.timeLoader, Persister.timeSaver);
}
}

export class PositionManager {
private _log: Utils.Logger = Utils.log("tribeca:rfv");

Expand Down

0 comments on commit 8658f27

Please sign in to comment.