-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
137 lines (116 loc) · 4.09 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
'use strict';
/**
* @typedef {object} JobData
* @property {object} request Parsing request
* @property {string} request.url URL for parsing.
* @property {object} request.rules Parsing rules. See more https://github.com/redco/goose-parser#parse-rules
* @property {object} request.options Environment options. See more https://github.com/redco/goose-parser#environments
* @property {object} request.actions Actions to execute before parsing. See more https://github.com/redco/goose-parser#actions
* @property {object} request.pagination Pagination rules. See more https://github.com/redco/goose-parser#pagination
*/
/**
* @typedef {object} Job
* @property {JobData} data job data
*/
const debug = require('debug')('parser-node');
const GooseParser = require('goose-parser');
const kue = require('kue');
const _ = require('lodash');
const kueHelper = require('./tools/kueHelper');
const Parser = GooseParser.Parser;
const PhantomEnvironment = GooseParser.PhantomEnvironment;
const domain = require('domain');
const path = require('path');
const url = require('url');
const queue = kueHelper.init();
const QUEUE_NAME = process.env.QUEUE_NAME || 'parser-default';
const MAX_MEMORY_LIMIT = (process.env.GOOSE_MEMORY_LIMIT || 256) * 1024 * 1024;
const TIME_LIMIT_FOR_JOB = process.env.TIME_LIMIT_FOR_JOB || 2 * 60 * 1000; // 2 min by default
const defaultOptions = {
screen: require('./options/viewports'),
userAgent: require('./options/userAgents'),
snapshot: false,
loadImages: true,
webSecurity: false
};
debug('Connecting to %s queue', QUEUE_NAME);
queue.process(
QUEUE_NAME,
/**
@param {Job} job
@param {function} done
*/
(job, done) => {
debug('New task on queue %s with data %o', QUEUE_NAME, job.data);
const envOptions = _.defaults(_.clone(job.data.request.options) || {}, defaultOptions);
envOptions.url = encodeURI(job.data.request.url);
const env = new PhantomEnvironment(envOptions);
const finishJob = wrapJobWithTimeLimit(env, job, done);
checkMemoryUsage();
const domainInstance = domain.create();
domainInstance.on('error', function(e) {
debug('Error had happened: %s %s', e.message, e.stack);
finishJob(e);
});
domainInstance.run(function() {
parse(env, job.data.request)
.then(
result => {
debug('Work is done!');
finishJob(null, {result});
},
e => {
debug('Parsing error: %s %s', e.message, e.stack);
finishJob(e);
});
});
});
function parse(env, jobRequest) {
const parserOptions = {
environment: env
};
if (jobRequest.pagination) {
parserOptions.pagination = jobRequest.pagination;
}
const parser = new Parser(parserOptions);
return parser.parse({
actions: jobRequest.actions,
rules: jobRequest.rules,
transform: jobRequest.transform,
rulesParams: jobRequest.rulesParams
});
}
function wrapJobWithTimeLimit(env, job, done) {
let jobIsDone = false;
const jobTimeout = setTimeout(() => finishJob(new Error(`Time limit ${TIME_LIMIT_FOR_JOB} exceeded, killing job`)), TIME_LIMIT_FOR_JOB);
const finishJob = function doneJob(e, results) {
clearTimeout(jobTimeout);
if (jobIsDone) {
return;
}
jobIsDone = true;
cleanEnv(env, e)
.then(
() => done(e, results),
() => done(e, results)
);
};
return finishJob;
}
function cleanEnv(env, e) {
if (e) {
return env.tearDown();
}
return Promise.resolve
}
function checkMemoryUsage() {
const memoryUsage = process.memoryUsage().rss;
debug('Memory used: %o', memoryUsage);
if (memoryUsage > MAX_MEMORY_LIMIT) {
debug('Memory limit exceeded');
queue.shutdown(TIME_LIMIT_FOR_JOB, function(err) {
console.log('Kue shutdown: ', err || '');
process.exit(0);
});
}
}