-
-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathetcd.orchestrator.ts
71 lines (64 loc) · 2.62 KB
/
etcd.orchestrator.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import { Injectable, Logger, OnApplicationShutdown } from '@nestjs/common';
import { KeyValueOptions, KeyValueMetadata, setValue } from '@nestcloud/common';
import { Etcd } from './etcd';
import { InjectEtcd } from './decorators/inject-etcd.decorator';
import { ETCD_KEY_VALUE_ERROR } from './etcd.messages';
import * as RPC from 'etcd3/lib/rpc';
import { Watcher } from 'etcd3/lib/watch';
interface KeyValue {
name: string;
property: string;
target: Function;
options: KeyValueOptions;
watcher?: Watcher;
}
@Injectable()
export class EtcdOrchestrator implements OnApplicationShutdown {
private readonly keyValues = new Map<string, KeyValue>();
private logger = new Logger(EtcdOrchestrator.name);
constructor(
@InjectEtcd() private readonly etcd: Etcd,
) {
}
public addKeyValue(target: Function, keyValues: KeyValueMetadata[]) {
keyValues.forEach(({ name, property, options }) => {
const key = `${name}__${property}__${target.constructor.name}`;
this.keyValues.set(key, { name, property, target, options });
});
}
onApplicationShutdown(signal?: string): any {
this.keyValues.forEach(item => item.watcher ? item.watcher.cancel() : '');
}
public async mountKeyValues() {
for (const item of this.keyValues.values()) {
const { name, property, target, options = {} } = item;
try {
const value = await this.etcd.get(name).string();
setValue(target, value, property, options);
} catch (e) {
this.logger.error(ETCD_KEY_VALUE_ERROR(name), e);
}
const watcher = await this.etcd.watch().key(name).create();
watcher.on('data', (res: RPC.IWatchResponse) => {
res.events.forEach(evt => {
if (evt.type === 'Put') {
const value = evt.kv.value.toString();
setValue(target, value, property, options);
} else if (evt.type === 'Delete') {
setValue(target, '', property, options);
}
});
});
watcher.on('connected', async () => {
try {
const value = await this.etcd.get(name).string();
setValue(target, value, property, options);
} catch (e) {
this.logger.error(ETCD_KEY_VALUE_ERROR(name), e);
}
});
watcher.on('error', e => this.logger.error(ETCD_KEY_VALUE_ERROR(name), e.message));
item.watcher = watcher;
}
}
}