-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathindex.js
145 lines (128 loc) · 4.56 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
var args = require('electron').argv();
var async = require('async');
var redis = require('redis');
require('colors');
var Models = require('telepat-models');
var workerType = args.params.t;
var workerIndex = args.params.i;
/**
*
* @type {Base_Worker}
*/
var theWorker = null;
var configManager = null;
async.series([
function(callback) {
configManager = new Models.ConfigurationManager('./config.spec.json', './config.json');
configManager.load(function(err) {
if (err) return callback(err);
var testResult = configManager.test();
callback(testResult !== true ? testResult : undefined);
});
},
function(callback) {
switch (workerType) {
case 'aggregation': {
var AggregationWorker = require('./lib/aggregation_worker');
theWorker = new AggregationWorker(workerIndex, configManager.config);
break;
}
case 'write': {
var WriterWorker = require('./lib/writer_worker');
theWorker = new WriterWorker(workerIndex, configManager.config);
break;
}
case 'transport_manager': {
var TransportManagerWorker = require('./lib/transport_manager');
theWorker = new TransportManagerWorker(workerIndex, configManager.config);
break;
}
default: {
var workerTypeParts = workerType.split('_');
if (workerTypeParts[1] === 'transport') {
var ClientTransportWorker = require('./lib/client_transport/'+workerTypeParts[0]);
theWorker = new ClientTransportWorker(workerIndex, configManager.config);
} else {
console.log('Invalid worker type "'+workerType+'"');
process.exit(1);
}
}
}
callback();
},
function(callback) {
theWorker.config.subscribe_limit = theWorker.config.subscribe_limit || 64;
theWorker.config.get_limit = theWorker.config.get_limit || 384;
if (theWorker.config.logger) {
theWorker.config.logger.name = theWorker.name;
Models.Application.logger = new Models.TelepatLogger(theWorker.config.logger);
} else {
Models.Application.logger = new Models.TelepatLogger({
type: 'Console',
name: theWorker.name,
settings: {level: 'info'}
});
}
if (!Models[theWorker.config.main_database]) {
Models.Application.logger.emergency('Unable to load "'+theWorker.config.main_database+
'" main database: not found. Aborting...');
process.exit(2);
}
Models.Application.datasource = new Models.Datasource();
Models.Application.datasource.setMainDatabase(new Models[theWorker.config.main_database](theWorker.config[theWorker.config.main_database]));
callback();
},
function(callback) {
Models.Application.datasource.dataStorage.onReady(function() {
callback();
});
},
function(callback) {
if (Models.Application.redisClient)
Models.Application.redisClient = null;
Models.Application.redisClient = redis.createClient(theWorker.config.redis.port, theWorker.config.redis.host);
Models.Application.redisClient.on('error', function(err) {
Models.Application.logger.error('Failed connecting to Redis "'+
theWorker.config.redis.host+'": '+err.message+'. Retrying...');
});
Models.Application.redisClient.on('ready', function() {
Models.Application.logger.info('Client connected to Redis.');
callback();
});
},
function(callback) {
if (Models.Application.redisCacheClient)
Models.Application.redisCacheClient = null;
Models.Application.redisCacheClient = redis.createClient(theWorker.config.redisCache.port, theWorker.config.redisCache.host);
Models.Application.redisCacheClient.on('error', function(err) {
Models.Application.logger.error('Failed connecting to Redis Cache "'+theWorker.config.redisCache.host+'": '+
err.message+'. Retrying...');
});
Models.Application.redisCacheClient.on('ready', function() {
Models.Application.logger.info('Client connected to Redis Cache.');
callback();
});
},
function(callback) {
if (!Models[theWorker.config.message_queue]) {
Models.Application.logger.emergency('Unable to load "'+theWorker.config.message_queue+
'" messaging queue: not found. Aborting...');
process.exit(-1);
}
var messageQueueConfig = theWorker.config[theWorker.config.message_queue];
if (messageQueueConfig === undefined) {
messageQueueConfig = {broadcast: theWorker.broadcast, exclusive: theWorker.exclusive};
} else {
messageQueueConfig.broadcast = theWorker.broadcast;
messageQueueConfig.exclusive = theWorker.exclusive;
}
var messagingClient = new Models[theWorker.config.message_queue](messageQueueConfig, theWorker.name, workerType);
theWorker.messagingClient = messagingClient;
messagingClient.onReady(callback);
}
], function(err) {
if (err) {
throw err;
}
theWorker.ready();
});