Skip to content

Commit fd85786

Browse files
committed
Merge branch 'dev-webui-redo'
2 parents 53ef6bc + 2e623b3 commit fd85786

File tree

11 files changed

+293
-128
lines changed

11 files changed

+293
-128
lines changed

lib/cli.js

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,11 +283,27 @@ const CLI = {
283283
},
284284

285285
async count(opts = {}) {
286-
return CLI.list(Object.assign(evil(opts), { count: true, values: false }));
286+
const args = [...arguments].map(v => evil(v));
287+
if (args.length === 0) {
288+
opts = {};
289+
} else if (typeof(args[0]) === 'string') {
290+
opts = { gte: args[0], lt: args[1] };
291+
} else {
292+
opts = args[0];
293+
}
294+
return CLI.list(Object.assign(opts, { count: true, values: false }));
287295
},
288296

289297
async keys(opts = {}) {
290-
return CLI.list(Object.assign(evil(opts), { values: false }));
298+
const args = [...arguments].map(v => evil(v));
299+
if (args.length === 0) {
300+
opts = {};
301+
} else if (typeof(args[0]) === 'string') {
302+
opts = { gte: args[0], lt: args[1] };
303+
} else {
304+
opts = args[0];
305+
}
306+
return CLI.list(Object.assign(opts, { values: false }));
291307
},
292308

293309
async list(opts = {}) {

lib/client.js

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class NetDB {
1515
#defer = undefined;
1616
#last_key = undefined;
1717
#last_list = undefined;
18+
#trace = false;
1819

1920
/**
2021
* signature overload accepts:
@@ -53,6 +54,9 @@ class NetDB {
5354
conn.on('close', () => this.close());
5455

5556
new LineBuffer(conn, (line) => {
57+
if (this.#trace) {
58+
console.log({ client_recv: line });
59+
}
5660
let json = decode(line);
5761
let call = json.call;
5862
// console.log({ line: line.toString(), json, call, tc: this.calls[call] });
@@ -67,7 +71,7 @@ class NetDB {
6771
if (json.error) {
6872
err?.(json.error);
6973
} else {
70-
fn?.(json);
74+
fn?.(json, call);
7175
}
7276
if (!json.continue) {
7377
delete this.calls[call];
@@ -85,10 +89,17 @@ class NetDB {
8589
}
8690

8791
async #send(obj, fn, err) {
92+
if (this.#trace) {
93+
console.log({ client_send: obj });
94+
}
8895
if (this.#conn) {
89-
let call = this.next++;
90-
this.calls[call] = { fn, err };
91-
this.#conn.write(encode({ call, ...obj }));
96+
if (fn) {
97+
let call = this.next++;
98+
this.calls[call] = { fn, err };
99+
this.#conn.write(encode({ call, ...obj }));
100+
} else {
101+
this.#conn.write(encode(obj));
102+
}
92103
return;
93104
}
94105
if (!(this.#base && this.#host)) {
@@ -293,13 +304,14 @@ class NetDB {
293304
});
294305
}
295306

296-
list(opt, fn, op2 = {}) {
307+
list(opt = {}, fn, op2 = {}) {
297308
this.#last_list = { opt, fn, op2 };
309+
let count = 0;
298310
return new Promise((resolve, reject) => {
299311
const recs = fn ? undefined : [];
300312
this.#send(
301313
{ list: true, opt },
302-
(reply) => {
314+
(reply, call) => {
303315
let { list, error } = reply;
304316
if (list || error) {
305317
if (list.key) {
@@ -316,6 +328,10 @@ class NetDB {
316328
} else {
317329
recs.push(op2.raw ? reply : list);
318330
}
331+
count++;
332+
if (opt.ack && count % opt.ack === 0) {
333+
this.#send({ ack: call, value: count });
334+
}
319335
}
320336
if (!reply.continue) {
321337
resolve(recs);

lib/clone.js

Lines changed: 94 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,10 @@
11
#!/usr/bin/env node
22

3-
let args = process.argv.slice(2);
4-
/*
5-
let host = args.host || 'localhost';
6-
let port = parseInt(args.port || 3000);
7-
let user = args.user || undefined;
8-
let pass = args.pass || undefined;
9-
*/
10-
let client = require('./client');
11-
12-
/*
13-
function onerror(error) {
14-
console.log({ db_error: error });
15-
}
16-
*/
3+
const env = process.env;
4+
const args = process.argv.slice(2);
5+
const client = require('./client');
6+
const { log } = require('./util');
177

18-
console.log('[net-level-clone]');
198
if (args.length < 2) {
209
console.log('usage: host/port/user/pass/base host/port/user/pass/base query');
2110
process.exit();
@@ -32,56 +21,99 @@ function parse(tok) {
3221
};
3322
}
3423

35-
function connect(rec) {
36-
return new Promise((resolve, reject) => {
37-
let db = new client();
38-
db.open(rec.host, rec.port)
39-
.then(() => {
40-
return db.auth(rec.user, rec.pass);
41-
})
42-
.then(() => {
43-
return db.use(rec.base);
44-
})
45-
.then(() => {
46-
console.log(rec);
47-
resolve(db);
48-
})
49-
.catch(reject);
50-
});
24+
const retries = parseInt(env['RETRIES'] || 60);
25+
const retry_timeout = parseInt(env['RETRY_TIMEOUT'] || 1) * 1000;
26+
27+
async function connect(rec) {
28+
const db = new client({ retries, retry_timeout });
29+
await db.open(rec.host, rec.port);
30+
await db.auth(rec.user, rec.pass);
31+
await db.use(rec.base);
32+
return db;
5133
}
52-
let b1 = parse(args[0]);
53-
let b2 = parse(args[1]);
54-
let q = eval(`(${args[2] || '{}'})`);
55-
let clone = b1.host !== b2.host || b1.base !== b2.base;
56-
57-
connect(b1)
58-
.then((db) => {
59-
b1 = db;
60-
return connect(b2);
61-
})
62-
.then((db) => {
63-
b2 = db;
64-
})
65-
.then(() => {
66-
b1.list(q, (key, value) => {
67-
console.log({ key, value });
68-
if (clone && key && value) {
69-
b2.put(key, value)
34+
35+
// for use in mode settings eval
36+
const series = 1;
37+
const b1t = parse(args[0]);
38+
const b2t = parse(args[1]);
39+
const q = eval(`(${args[2] || '{}'})`);
40+
const can_clone = b1t.host !== b2t.host || b1t.port !== b2t.port || b1t.base !== b2t.base;
41+
42+
const interval = parseInt(q.interval || 0) * 1000;
43+
const overlap = q.overlap;
44+
const mode = q.mode;
45+
46+
delete q.interval;
47+
delete q.overlap;
48+
delete q.mode;
49+
50+
log('[net-level-clone]', [b1t.host, b1t.port, b1t.base], [b2t.host, b2t.port, b2t.base]);
51+
52+
async function do_clone(b1, b2) {
53+
// (time) series continues after last key from destination base
54+
// with the option to include that record again with overlap:true
55+
// which is useful when the last record will be updated several times
56+
// before being finalized and the next record written (streaming + historical)
57+
if (mode === series) {
58+
const lastkey = await b2.list({ reverse: true, limit: 1, values: false });
59+
if (lastkey && lastkey.length) {
60+
log({ lastkey: lastkey[0].key });
61+
if (overlap) {
62+
q.gte = lastkey[0].key;
63+
} else {
64+
q.gt = lastkey[0].key;
65+
}
66+
}
67+
}
68+
69+
const pro = [];
70+
71+
q.limit = 500;
72+
let cloned;
73+
do {
74+
cloned = 0;
75+
await b1.list(q, (key, value) => {
76+
log({ get: key });
77+
if (key && value) {
78+
pro.push(b2.put(key, value)
7079
.then(() => {
71-
console.log({ put: key });
80+
cloned++;
81+
if (overlap) {
82+
q.gte = key;
83+
} else {
84+
q.gt = key;
85+
}
86+
log({ put: key });
7287
})
7388
.catch((/*error*/) => {
74-
console.log({ put_error: key });
89+
log({ put_error: key });
7590
b1.close();
76-
});
91+
})
92+
);
7793
}
78-
})
79-
.then(() => {
80-
b1.close();
81-
b2.close();
82-
process.exit();
83-
})
84-
.catch((error) => {
85-
console.log(error);
86-
});
87-
});
94+
});
95+
await Promise.all(pro);
96+
console.log({ cloned });
97+
} while (cloned > 0);
98+
99+
}
100+
101+
if (can_clone) (async () => {
102+
const b1 = await connect(b1t);
103+
const b2 = await connect(b2t);
104+
105+
while (true) {
106+
await do_clone(b1, b2);
107+
if (interval) {
108+
await new Promise(resolve => setTimeout(resolve, interval));
109+
continue;
110+
} else {
111+
break;
112+
}
113+
}
114+
115+
await b1.close();
116+
await b2.close();
117+
process.exit();
118+
119+
}) ();

0 commit comments

Comments
 (0)