forked from RafaelVidaurre/yakuza
-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.js
245 lines (197 loc) · 5.33 KB
/
task.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
/**
* @author Rafael Vidaurre
* @module Task
*/
'use strict';
var Http, Q, _;
Q = require('q');
Http = require('./http');
_ = require('lodash');
/**
* Is the product of a Task being built, contains status data, the main method of the task and other,
* stuff required for it to be run
* @class
*/
function Task (taskId, main, params, defaultCookies, config, job) {
/** Id of the task's task definition */
this.taskId = taskId;
/** Time at which the task started running */
this.startTime = null;
/** Time at which the task finished running */
this.endTime = null;
/** Time the task spent running */
this.elapsedTime = null;
/**
* Configuration object
* @private
*/
this._config = config;
/**
* Number of retries performed by the built task
* @private
*/
this._retries = 0;
/**
* Deferred which controls task's _runningPromise resolution
* @private
*/
this._runningDeferred = Q.defer();
/**
* Promise which exposes Task's running state
* @private
*/
this._runningPromise = this._runningDeferred.promise;
/**
* Parameters which will be used by its main method
* @private
*/
this._params = params;
/**
* Main method to be run
* @private
*/
this._main = main;
/**
* Storage for the task instance, this saves data which is exposed explicitly via emitter.share()
* method and is later on provided in the _onSuccess method as an argument of the task's promise's
* resolve method
* @private
*/
this._sharedStorage = {};
/**
* Request object for this task instance
* @private
*/
this._http = null;
/**
* Jar to be saved by the task, if defined it will be used in the next execution block if this task
* finishes successfully
* @private
*/
this._savedJar = null;
/**
* Reference to the job that instanced this task
* @private
*/
this._job = job;
this._http = defaultCookies ? new Http(defaultCookies) : new Http();
}
/**
* Method run when the task finishes running even if errors ocurred
* @private
*/
Task.prototype._onFinish = function () {
this.endTime = Date.now();
this.elapsedTime = this.endTime - this.startTime;
};
/**
* Called by the task's emitter object, it exposes a key with its value to be used in another task
* later on
* @param {string} key Key by which the value will be shared
* @param value A value which will be shared
* @param {object} options Object of options for sharing
* @private
*/
Task.prototype._onShare = function (key, value, options) {
var current, shareMethod, shareMethodFunction;
if (options) {
shareMethod = options.method;
}
if (value === undefined) {
throw new Error('Missing key/value in share method call');
}
if (!shareMethod) {
shareMethod = 'default';
}
if (_.isString(shareMethod)) {
shareMethodFunction = this._job._scraper._shareMethods[shareMethod];
} else {
shareMethodFunction = shareMethod;
}
if (!shareMethodFunction) {
throw new Error('Share method doesn\'t exist.');
}
if (!_.isFunction(shareMethodFunction)) {
throw new Error('Share method is not a function');
}
current = this._job._getShared(this.taskId, key);
this._job._setShared(this.taskId, key, shareMethodFunction(current, value));
};
/**
* Called in the task's main method when the task ended successfuly
* @param response Data retrieved by the task
* @private
*/
Task.prototype._onSuccess = function (data) {
var hookMessage, response, stopJob;
stopJob = false;
// Response object to be provided to the promise
response = {
data: data,
task: this,
status: 'success',
savedCookieJar: this._savedJar
};
// Object passed to the hook for execution control and providing useful data
hookMessage = {
stopJob: function () {
stopJob = true;
},
data: response.data
};
if (_.isFunction(this._config.hooks.onSuccess)) {
this._config.hooks.onSuccess(hookMessage);
}
this._onFinish();
if (stopJob) {
this._runningDeferred.reject(response);
} else {
this._runningDeferred.resolve(response);
}
};
/**
* Called in the task's main method, it saves the current cookies that have been set by the task.
* note that the cookies ONLY get applied if the task finishes successfully
* @private
*/
Task.prototype._onSaveCookies = function () {
// TODO: Accept custom jar as parameter
var jar;
jar = this._http.getCookieJar();
this._savedJar = jar;
};
/**
* Called by the task's main method when an error ocurred
* @param {Error} error Error object with stracktrace and everything
* @param {string} message Message explaining what failed
* @private
*/
Task.prototype._onFail = function (error, message) {
var response;
response = {
error: error,
message: message,
task: this,
status: 'fail',
requestLog: this._http.getLog()
};
this._onFinish();
this._runningDeferred.reject(response);
};
/**
* Run this task's main method by providing it needed parameters. This is where the scraping spends
* most of its time
* @private
*/
Task.prototype._run = function () {
var emitter = {
success: this._onSuccess.bind(this),
fail: this._onFail.bind(this),
share: this._onShare.bind(this),
saveCookies: this._onSaveCookies.bind(this)
};
this.startTime = Date.now();
// TODO: Maybe handle the exception thrown by the onError method to control crashes
this._main(emitter, this._http, this._params);
};
module.exports = Task;