-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprice_agent.js
273 lines (237 loc) · 8.32 KB
/
price_agent.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
//
// Module to get stock price
//
var when = require('when');
var config = require('./config');
var alpha = require('alphavantage')({key: config.alphaVantageKey});
var mqtt = require('mqtt');
var logger = require('./utility').logger;
/*
* Use Alpha Vantage API to return the price snapshot
*/
function getPriceSnapshotSingle(symbol) {
return when.promise(function (resolve, reject) {
alpha.data.daily(symbol).then(function(result) {
var prices = [];
var key = 'Time Series (Daily)';
var priceKey = '4. close';
if (result.hasOwnProperty(key) && result[key]) {
var record = result[key];
for (var dateKey in record) {
if (record.hasOwnProperty(dateKey)) {
var date = new Date(dateKey);
if (date) {
prices.push({
date: date,
price: parseFloat(record[dateKey][priceKey])
})
}
}
}
}
// sort prices array by date
prices.sort(function (p1, p2) {
return p2.date - p1.date;
});
resolve({
price: prices[0].price,
change: prices[0].price - prices[1].price,
changeInPercent: (prices[0].price - prices[1].price)/prices[1].price
});
}).catch(function(err) {
reject(err);
});
});
}
/*
* Since the Alpha Vantage API is very slow, I use an asynchronous design to download the quotes:
* 1. Other applications will tell price agent which symbols to track through mqtt (stock/symbols)
* 2. Stock agent will publish the stock quotes through mqtt (stock/quotes)
* 3. The price agent is always running in the background and download the quotes periodically (every 5 minutes)
* 4. For each iteration, price agent will pace at 1 second interval
*/
var symbolsToTrack = [];
var updateInterval = 5 * 60 * 1000; // 5 minutes
var delay = 15 * 1000; // 15 seconds
function getPriceSnapshot(symbols) {
return when.promise(function (resolve, reject) {
try {
if (symbols.length == 0) {
resolve(null);
return;
}
var snapshot = {};
var index = 0;
var retries = 0;
(function getNextPriceSnapshot() {
var symbol = symbols[index];
getPriceSnapshotSingle(symbol).then(function(result) {
snapshot[symbol] = result;
publishQuote(snapshot);
if (++index == symbols.length) {
resolve(snapshot);
} else {
retries = 0;
setTimeout(getNextPriceSnapshot, delay);
}
}).catch(function(err) {
if (retries++ < 2) {
setTimeout(getNextPriceSnapshot, retries * delay);
} else {
logger.error("Skip fetching symbol " + symbol + ' due to error: ', JSON.stringify(err));
if (++index == symbols.length) {
resolve(snapshot);
} else {
setTimeout(getNextPriceSnapshot, retries * delay);
}
}
});
})();
} catch (error) {
logger.error("Unable to download quote from Yahoo Finance.", JSON.stringify(error, null, 2));
resolve(null);
}
});
}
var mqttClient;
var TOPIC_SYMBOL = config.mqttTopicSymbol;
var TOPIC_QUOTE = config.mqttTopicQutoes;
function downloadQuotes() {
var symbols = [];
// Go through the list of symbol in reverse order, so the latest symbol will get updated first
for (var i = symbolsToTrack.length - 1; i >=0; i--) {
var record = symbolsToTrack[i];
if (record.count) {
symbols.push(record.symbol);
}
}
if (symbols.length) {
getPriceSnapshot(symbols).then(function(snapshot) {
//logger.info('Publish quotes on topic: ' + TOPIC_QUOTE + ' for symbols:' + symbolsToTrack.toString());
//mqttClient.publish(TOPIC_QUOTE, JSON.stringify(snapshot));
logger.info('Refreshed ' + Object.keys(snapshot).length + ' quotes');
});
}
}
function publishQuote(snapshot) {
logger.info('Publish quotes on topic: ' + TOPIC_QUOTE + ' for symbols:' + Object.keys(snapshot).toString());
mqttClient.publish(TOPIC_QUOTE, JSON.stringify(snapshot));
}
function addSymbolToTrack(symbol) {
for (var i = 0; i < symbolsToTrack.length; i++) {
if (symbol == symbolsToTrack[i].symbol) {
symbolsToTrack[i].count++;
return;
}
}
symbolsToTrack.push({
symbol: symbol,
count: 1
});
}
function removeSymbolToTrack(symbol) {
for (var i = 0; i < symbolsToTrack.length; i++) {
if (symbol == symbolsToTrack[i].symbol) {
break;
}
}
if (i < symbolsToTrack.length) {
symbolsToTrack[i].count--;
if (symbolsToTrack[i].count == 0) {
symbolsToTrack.splice(i, 1);
}
}
}
mqttClient = mqtt.connect(config.mqttBrokerURL);
mqttClient.on('connect', function() {
logger.info('Connected to ' + config.mqttBrokerURL);
// subscribe to symbol topic as input
logger.info('Subscribing to topic: ' + TOPIC_SYMBOL);
mqttClient.subscribe(TOPIC_SYMBOL);
// start a timer to download quotes periodically
setInterval(downloadQuotes, updateInterval);
});
mqttClient.on('message', function(topic, message) {
if (topic == TOPIC_SYMBOL) {
try {
var params, action, symbols;
// Parse command parameters
params = JSON.parse(message.toString());
if (params.hasOwnProperty('action')) {
action = params['action'].toUpperCase();
}
if (params.hasOwnProperty('symbols')) {
symbols = params['symbols'];
}
// Validate command parameters
if (action != 'ADD' && action != 'DELETE') {
throw new Error('Invalid action parameter');
}
if (!Array.isArray(symbols) || !symbols.length) {
throw new Error('Invalid symbols parameter');
}
logger.info(action + ' symbols: ' + symbols.toString());
// Update symbolsToTrack
for (var i = 0; i < symbols.length; i++) {
var symbol = symbols[i];
if (!/^[a-z]+$/i.test(symbol)) {
//
// Alpha Vantage API limitation: the symbol has to be all letters
//
logger.info('Ignore symbol that contains non alphabetic: ' + symbol);
continue;
}
if (action == 'ADD') {
addSymbolToTrack(symbol);
} else if (action == 'DELETE') {
removeSymbolToTrack(symbol);
}
}
// Download quotes immediately
if (action == 'ADD') {
downloadQuotes();
}
} catch (err) {
logger.error(topic + ': ' + err.message + ' in: ' + message.toString());
}
}
});
/*
Yahoo Finance API is no longer available
function getPriceSnapshot(symbols) {
return when.promise(function (resolve, reject) {
try {
if (!Array.isArray(symbols)) {
symbols = [symbols]; // make it an array
}
// Get price snapshot from Yahoo Finance
yahooFinance.snapshot({
symbols: symbols,
fields: ['s', 'p', 'l1', 'c1', 'p2']
}, function (err, results) {
if (err) {
reject(err);
} else {
var snapshot = {};
for (var i = 0; i < results.length; i++) {
var record = results[i];
if (record.hasOwnProperty('symbol') &&
record.hasOwnProperty('lastTradePriceOnly') &&
record.lastTradePriceOnly) {
snapshot[record.symbol] = {
price: record.lastTradePriceOnly,
change: record.change,
changeInPercent: record.changeInPercent
};
}
}
resolve(snapshot);
}
});
} catch (error) {
logger.error("Unable to download quote from Yahoo Finance.", JSON.stringify(error, null, 2));
resolve(null);
}
});
}
*/