-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworker.js
executable file
·103 lines (86 loc) · 2.74 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env node
var {ActivityPoller} = require('aws-swf');
var {hostname} = require('os');
var resolve = require('resolve');
var bluebird = require('bluebird');
var yargs = require('yargs');
var throttle = require('lodash.throttle');
const HEARTBEAT_PERIOD = 10;
var {load} = require('./common');
var options = yargs
.usage('Usage: $0 [options]')
.option('file', {
alias: 'f',
describe: 'Javascript file containing activities object'
})
.option('domain', {
alias: 'd',
describe: 'Name of the SWF Domain to execute activities within'
})
.option('taskList', {
alias: 't',
describe: 'Name of the SWF Task List to execute activities for'
})
.option('identity', {
alias: 'i',
describe: 'Unique identifier of this worker instance',
default: `activity-${hostname()}-${process.pid}`
})
.option('limit', {
alias: 'l',
describe: 'Limit the number of worker processes that can run concurrently'
})
.demand(['file', 'domain', 'taskList'])
.argv;
const parse = str => {
try {
return JSON.parse(str);
} catch(e) {
return str;
}
};
const execute = (file, activityTask) =>
load(file)
.then(activities => {
var {name} = activityTask.config.activityType;
var activity = activities[name];
if (!activity) {
throw Error(`Activity '${name}' not defined in '${file}'`);
}
var input = parse(activityTask.config.input);
var _context = context(activityTask);
console.log(`Executing activity '${name}'`, input);
return bluebird.resolve(activity(input, _context))
.finally(() => _context.heartbeat.cancel());
});
const context = activityTask => ({
Promise: bluebird,
heartbeat: throttle(data => {
console.log('Sending heartbeat', data);
activityTask.recordHeartbeat(data);
}, HEARTBEAT_PERIOD * 1000)
});
var worker = new ActivityPoller({
domain: options.domain,
identity: options.identity,
taskList: {name: options.taskList},
taskLimitation: options.limit
});
worker.on('activityTask', activityTask => {
console.log('Received activity task');
execute(options.file, activityTask)
.tap(result => console.log('Activity execution succeeded', result))
.catch(err => {
console.error('Activity execution failed', err);
activityTask.respondFailed(err.name, err.message);
throw err;
})
.then(result => activityTask.respondCompleted(result));
});
worker.on('poll', () => console.log('Polling for activity tasks...'));
console.log(`Starting activity worker '${options.identity}' for task list '${options.taskList}' in domain '${options.domain}'`);
worker.start();
process.on('SIGINT', () => {
console.log('Caught SIGINT, polling will stop after current request...');
worker.stop();
});