-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.js
148 lines (118 loc) · 4.3 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env node -r panik -r dotenv/config
const assert = require('assert');
const { URL } = require('url');
const redis = require('redis');
const bitcoin = require('bitcoin');
const safync = require('./safync');
const { urlToBitcoinOptions } = require('./utils');
const Promise = require('bluebird');
const { pMemoize } = require('./pmr');
const pgp = require('pg-promise');
const createRedisMemCache = require('p-memoize-redis');
const delay = require('delay');
const config = require('./config');
Promise.promisifyAll(redis);
const { BITCOIND_RPC_URL, REDIS_URL } = process.env;
const bitcoinRpc = new bitcoin.Client(urlToBitcoinOptions(new URL(BITCOIND_RPC_URL)));
safync.applyTo(bitcoinRpc, 'cmd');
// const memBitcoinRpcCmdAsync = pMemoize(bitcoinRpc.cmdAsync, { cache: createRedisMemCache(REDIS_URL, 'drivenetRpc') });
const memBitcoinRpcCmdAsync = bitcoinRpc.cmdAsync.bind(bitcoinRpc);
function* applyVout(t, block, tx, vout, index) {
yield t.none(`insert into vout (tx_id, n, script_pub_key, value) values ($/txId/, $/n/, $/scriptPubKey/, $/value/)`, {
txId: tx.txid,
n: index,
scriptPubKey: vout.scriptPubKey,
value: vout.value,
});
}
function* applyVin(t, block, tx, vin, index) {
yield t.none(
`insert into vin (tx_id, n, coinbase, prev_tx_id, vout, script_sig) values ($/txId/, $/n/, $/coinbase/, $/prevTxId/, $/vout/, $/scriptSig/)`,
{
txId: tx.txid,
n: index,
coinbase: vin.coinbase ? vin.coinbase : null,
prevTxId: vin.txid,
vout: vin.vout,
scriptSig: vin.scriptSig,
}
);
}
function* applyTx(t, block, tx, index) {
yield t.none(
`insert into tx (tx_id, hash, block_hash, n, locktime, version) values ($/id/, $/hash/, $/blockHash/, $/index/, $/locktime/, $/version/)`,
{
id: tx.txid,
hash: tx.hash,
blockHash: block.hash,
index,
locktime: tx.locktime,
version: tx.version,
}
);
for (let i = 0; i < tx.vin.length; i++) {
const vin = tx.vin[i];
yield* applyVin(t, block, tx, vin, i);
}
for (let i = 0; i < tx.vout.length; i++) {
const vout = tx.vout[i];
yield* applyVout(t, block, tx, vout, i);
}
}
function* applyBlock(t, block) {
yield t.none(`insert into block (hash, height, time, blindhash) values ($/hash/, $/height/, $/time/, $/blindHash/)`, {
hash: block.hash,
height: block.height,
time: new Date(block.time * 1e3),
blindHash: block.blindhash || null,
});
for (let i = 0; i < block.tx.length; i++) {
const tx = block.tx[i];
yield* applyTx(t, block, tx, i);
}
}
const main = async () => {
// TODO: detect re-org
console.log('Starting');
const db = pgp()(config.databaseUrl);
const tick = async () => {
let { height: localHeight } = await db.one('select coalesce(max(height), -1) height from block');
const { blocks: remoteHeight } = await bitcoinRpc.cmdAsync('getblockchaininfo');
console.log(`Ticking. Local height: ${localHeight}; remote height: ${remoteHeight}`);
for (; localHeight >= 0; localHeight--) {
const { hash: localHash } = await db.one(`select hash from block where height = $/localHeight/`, {
localHeight,
});
const remoteHash = await bitcoinRpc.cmdAsync('getblockhash', localHeight);
// TODO: Remove this
const rewindRandomly = false && Math.random() > 0.25;
if (rewindRandomly) {
console.log('**TESTING** Will rewind randomly');
}
if (!rewindRandomly && localHash === remoteHash) {
break;
}
console.log(`${localHeight}: Local hash (${localHash}) <> remote hash (${remoteHash})`);
await db
.result(`delete from block where height = $/height/`, { height: localHeight })
.then(_ => assert.equal(_.rowCount, 1));
// process.exit(1);
}
let blocksAppended = 0;
for (localHeight++; localHeight <= remoteHeight; localHeight++) {
console.log('Appending block at height', localHeight);
const blockHash = await bitcoinRpc.cmdAsync('getblockhash', localHeight);
const block = await memBitcoinRpcCmdAsync('getblock', blockHash, 2);
await db.tx(t => t.batch(Array.from(applyBlock(t, block))));
// TODO: Remove this
if (false && ++blocksAppended > 10) {
break;
}
}
};
while (true) {
await tick();
await delay(5e3);
}
};
main().then(process.exit);