From 4d8f19d49f7e10f7b0e477b037bfbc2656286016 Mon Sep 17 00:00:00 2001 From: James Keane Date: Thu, 14 Dec 2023 16:24:03 -0500 Subject: [PATCH] Use more modern JS, and fix typescript errors. --- .github/workflows/ci.yml | 1 + package-lock.json | 10 + package.json | 5 +- src/index.js | 900 ++++++++++++++++++++------------------- src/krpc.js | 413 +++++++++--------- src/routing.js | 7 +- src/security.js | 15 +- src/storage.js | 538 ++++++++++++----------- src/token-store.js | 189 ++++---- src/types.d.ts | 50 ++- src/util.js | 37 +- test/krpc.js | 1 + test/storage.js | 4 +- tsconfig.json | 4 +- 14 files changed, 1171 insertions(+), 1003 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 51c1464..7ad1639 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,4 +20,5 @@ jobs: node-version: ${{ matrix.node-version }} cache: 'npm' - run: npm install + - run: npm run type-check - run: npm test diff --git a/package-lock.json b/package-lock.json index 9544c9f..50e891f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "@types/bencode": "^2.0.0", "@types/debug": "^4.1.5", "@types/node": "^20.10.4", + "@types/sse4_crc32": "^7.0.3", "ed25519-supercop": "^2.0.1", "mocha": "^10.2.0", "sinon": "^17.0.1", @@ -104,6 +105,15 @@ "undici-types": "~5.26.4" } }, + "node_modules/@types/sse4_crc32": { + "version": "7.0.3", + "resolved": "https://registry.npmjs.org/@types/sse4_crc32/-/sse4_crc32-7.0.3.tgz", + "integrity": "sha512-5D1oXIZk2qBZXPuYYE6bEIoSya/bAk56TmyEU61kRmyZBeyBy8tSTtIRf7NKtVgGu2h2EYdRXohW+B+iFD98jg==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, "node_modules/ansi-colors": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.1.tgz", diff --git a/package.json b/package.json index 040a52e..b6c6097 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,9 @@ "description": "", "main": "src/index.js", "scripts": { - "test": "mocha test/**/*.js" + "test": "mocha test/**/*.js", + "watch": "mocha -w --parallel test/", + "type-check": "tsc" }, "type": "module", "author": "", @@ -21,6 +23,7 @@ "@types/bencode": "^2.0.0", "@types/debug": "^4.1.5", "@types/node": "^20.10.4", + "@types/sse4_crc32": "^7.0.3", "ed25519-supercop": "^2.0.1", "mocha": "^10.2.0", "sinon": "^17.0.1", diff --git a/src/index.js b/src/index.js index b376385..1ca8027 100644 --- a/src/index.js +++ b/src/index.js @@ -1,14 +1,13 @@ -import util from 'util'; import dgram from 'dgram'; import crypto from 'crypto'; import fs from 'fs'; import EventEmitter from 'events'; -import { RoutingTable, distance } from '#root/src/routing'; -import { KRPCSocket } from '#root/src/krpc'; -import { PromiseSelector, PQueue } from '#root/src/util'; -import TokenStore from '#root/src/token-store'; -import bep44 from '#root/src/storage'; +import { RoutingTable, distance } from './routing.js'; +import { KRPCSocket } from './krpc.js'; +import { PromiseSelector, PQueue } from './util.js'; +import TokenStore from './token-store.js'; +import bep44 from './storage.js'; import debugLogger from 'debug'; const debug = debugLogger('dht'); @@ -18,26 +17,27 @@ const debug = debugLogger('dht'); const ROUTING_REFRESH_INTERVAL = 1000 * 60 * 15; // 15 minutes - -/** - * @typedef {{ - * address: string, - * port: number, - * family?: string - * }} - */ -var PeerInfo; - - /** * @typedef {{ * id?: Buffer|string, * K?: number, * nodes?: Array., - * bootstrapNodes?: Array. - * }} + * bootstrapNodes?: Array., + * socket?: dgram.Socket + * }} DHTOptions + * @typedef {{ + * id: Buffer, + * implied_port: 0|1, + * info_hash: Buffer, + * port: number, + * token: Buffer + * }} AnnouncePeerRequest + * @typedef {{ id: Buffer }} AnnoucePeerResponse + * @typedef {{ id: Buffer, info_hash: Buffer }} GetPeersRequest + * @typedef {{ + * id: Buffer, token: Buffer, values?: Buffer[], nodes?: NodeInfo[] + * }} GetPeersResponse */ -var DHTOptions; /** @@ -54,489 +54,529 @@ const BOOTSTRAP_NODES = [ /** * Implements: * * [BEP 0005](http://www.bittorrent.org/beps/bep_0005.html) - * @constructor - * @param {DHTOptions=} opt_options Optional initialization options - * @extends {EventEmitter} */ -export default function DHT(opt_options) { - EventEmitter.call(this); - - // Initialize the options - opt_options = opt_options || {}; +export default class DHT extends EventEmitter { /** - * Default value of closest nodes to query. - * @define {number} + * @param {DHTOptions=} opt_options Optional initialization options */ - this.K_ = opt_options['K'] || 8; + constructor(opt_options) { + super(); + + // Initialize the options + opt_options = opt_options || {}; + + /** + * Default value of closest nodes to query. + * @define {number} + */ + this.K_ = opt_options['K'] || 8; + + /** + * @type {Buffer} + */ + this.id = opt_options.id ? ( + Buffer.isBuffer(opt_options.id) ? + opt_options.id : Buffer.from(opt_options.id, 'hex')) : + crypto.randomBytes(20); + + /** + * @type {!Array.} + * @private + */ + this.bootstrapNodes_ = opt_options.bootstrapNodes || BOOTSTRAP_NODES; + + /** + * @type {boolean} + * @private + */ + this.isBootstrapping_ = false; + + /** + * @type {RoutingTable} + * @todo make private + */ + this.nodes_ = new RoutingTable(this.id, { K: this.K_ }); + this.nodes_.on('ping', this.handleBucketPing_.bind(this)); + this.nodes_.on('refresh', this.handleBucketRefresh_.bind(this)); + if (opt_options.nodes) this.nodes_.loadState(opt_options.nodes); + + this.refreshTimer_ = setInterval( + this.handleRoutingRefresh_.bind(this), ROUTING_REFRESH_INTERVAL); + + /** + * @type {dgram.Socket} + * @private + */ + this.socket_ = opt_options.socket || + dgram.createSocket({ type: 'udp4', reuseAddr: true }); + + /** + */ + this.rpc_ = new KRPCSocket(this.socket_); + this.rpc_.on('query', this.handleQuery_.bind(this)); + this.rpc_.on('response', this.handleNodeResponse_.bind(this)); + this.rpc_.on('timeout', this.handleNodeTimeout_.bind(this)); + + // this.socket_.on('message', this.handleSocketMessage_.bind(this)); + // this.socket_.on('error', this.handleSocketError_.bind(this)); + + /** + * Whether the socket is currently bound + * @type {boolean} + * @private + */ + this.isBound_ = false; + + /** + * To prevent ping spamming when travelling far, we track pending pings. + * @type {!Object.>} + * @private + */ + this.pendingPings_ = {}; + + /** + * Storage for the DHT tracker. + * @type {!TokenStore} + * @private + */ + this.announcedPeers_ = new TokenStore(); + + // initialize the extensions + /** + * DHT implementors, i.e. method providers + * @type {IDHTExtension[]} + * @private + */ + this.extensions_ = []; // = [bep5, /*bep44*/]; + this.use(bep44); + } - /** - * @type {Buffer} - */ - this.id = opt_options.id ? ( - Buffer.isBuffer(opt_options.id) ? - opt_options.id : Buffer.from(opt_options.id, 'hex')) : - crypto.randomBytes(20); /** - * @type {!Array.} - * @private + * Instantiate a DHT from a serialized state file. + * @param {string} filename The filename to load from. */ - this.bootstrapNodes_ = opt_options.bootstrapNodes || BOOTSTRAP_NODES; + static load(filename) { + if (filename == undefined || !fs.existsSync(filename)) return new DHT(); + + let state = JSON.parse(fs.readFileSync(filename).toString('utf-8')); + return new DHT({ + K: state.K, + id: Buffer.from(state.id, 'hex'), + bootstrapNodes: [], + nodes: state.nodes + }); + } + /** - * @type {boolean} - * @private + * @param {number=} opt_port Optional port to listen on. + * @param {string=} opt_host Optional listening address to listen on. */ - this.isBootstrapping_ = false; + async listen(opt_port, opt_host) { + debug('Starting DHT on port %s', opt_port); + const listen = () => new Promise((resolve, reject) => + this.socket_.bind(opt_port, opt_host, () => resolve(null))); + + await listen(); + this.isBound_ = true; + const addr = this.socket_.address(); + debug('DHT listening on %s:%s', addr.address, addr.port); + + // start bootstrapping + this.isBootstrapping_ = true; + await Promise.all(this.bootstrapNodes_.map( + async (peer) => { + debug('Bootstrapping node %s.', peer.address + ':' + peer.port); + await this.ping(peer); + } + )); + // collect nodes near to us to populate our bucket + await this.find_node(this.id); + debug('Bootstrapping done, with %s nodes in routing table', this.nodes_.length); + this.isBootstrapping_ = false; + } + /** - * @type {RoutingTable} - * @private */ - this.nodes_ = new RoutingTable(this.id, { K: this.K_ }); - this.nodes_.on('ping', this.handleBucketPing_.bind(this)); - this.nodes_.on('refresh', this.handleBucketRefresh_.bind(this)); - if (opt_options.nodes) this.nodes_.loadState(opt_options.nodes); + dispose() { + clearInterval(this.refreshTimer_); + this.rpc_.dispose(); + this.extensions_.forEach((e) => e.dispose()); + this.announcedPeers_.dispose(); + this.nodes_.dispose(); + this.socket_.close(); + + // this.announcedPeers_ = null; + // this.nodes_ = null; + // this.socket_ = null; + // this.rpc_ = null; + } - this.refreshTimer_ = setInterval( - this.handleRoutingRefresh_.bind(this), ROUTING_REFRESH_INTERVAL); /** - * @type {dgram.Socket} - * @private + * Save the state of this DHT to a file. + * @param {string} filepath The path to the file to save the state into. */ - this.socket_ = opt_options.socket || - dgram.createSocket({ type: 'udp4', reuseAddr: true }); + save(filepath) { + let state = { + K: this.K_, + id: this.id.toString('hex'), + nodes: this.nodes_.getState() + }; + fs.writeFileSync(filepath, JSON.stringify(state)); + } + /** + * Configure the DHT to use an extension. + * @param {IDHTExtensionConstructor} extension The extension constructor to + * initialize. */ - this.rpc_ = new KRPCSocket(this.socket_); - this.rpc_.on('query', this.handleQuery_.bind(this)); - this.rpc_.on('response', this.handleNodeResponse_.bind(this)); - this.rpc_.on('timeout', this.handleNodeTimeout_.bind(this)); + use(extension) { + const inst = new extension(this); + + /** @type {any} */ + const instany = inst; + inst.provides.forEach((method) => { + Object.defineProperty(this, method, { // @todo this doesn't work well for typing + value: instany[method].bind(inst) + }); + }); + this.extensions_.push(inst); + } - // this.socket_.on('message', this.handleSocketMessage_.bind(this)); - // this.socket_.on('error', this.handleSocketError_.bind(this)); /** - * Whether the socket is currently bound - * @type {boolean} - * @private + * Recursively traverse the network, starting from my peer list and moving + * n closer each time. + * @param {Buffer} target + * @param {string} method + * @param {any} args + * @param {((a: any, n: NodeInfo)=>T)=} opt_rescb + * @template T */ - this.isBound_ = false; + async closest_(target, method, args, opt_rescb) { + // use a priority queue to track closest responding, a set and hashing fn to + // track nodes we've already visited + const closest = new PQueue(this.K_); + const seen = new Set(); + const hash = (/** @type {NodeInfo} */ node) => + `${node.id.toString('hex')}:${node.address}:${node.port}`; + + // the 'promise selector' allows us to simulate a more traditional networking + // api. i.e. loop and 'block' until any outstanding request resolves + /** @type {PromiseSelector} */ + const selector = new PromiseSelector([ + Promise.resolve({ + r: { nodes: this.nodes_.closest(target, this.K_) } + })]); + + // loop over responses waiting to be processed + // will block waiting for a response or timeout + let res = null; + while (res = await selector.next()) { + // if (res.error) continue; // @todo why was this here? + const { node, r } = res; + + if (node) { + const res_v = opt_rescb && opt_rescb(r, node); + if (res_v) return res_v; + + // Add the responder to the bucket of closest + closest.push(distance(target, node.id), node); + } + if (!r.nodes) continue; + + // Candidates are nodes we haven't queried before, and that are closer + // than the furthest known node + let candidates = r.nodes + .filter((p) => !seen.has(hash(p))) + .filter((p) => distance(p.id, target) < closest.max); + + for (let p of candidates) { + seen.add(hash(p)); + selector.add(this.rpc_.query(p, method, args)); + } + } + + debug('Closest \'%s\' query returned without a value.', method); + return undefined; + } + /** - * To prevent ping spamming when travelling far, we track pending pings. - * @type {!Object.} - * @private */ - this.pendingPings_ = {}; + isBound() { + return this.isBound_; + } + /** - * Storage for the DHT tracker. - * @type {!TokenStore} - * @private + * @param {NodeInfo} node The node that needs to be pinged. + * @param {function(boolean): void} callback Callback to whether the ping + * succeeded or not. */ - this.announcedPeers_ = new TokenStore(); + async handleBucketPing_(node, callback) { + const res = await this.ping(node); + callback(!!res.error); + } + - // initialize the extensions /** - * DHT implementors, i.e. method providers + * When a bucket needs to be refreshed, we call find_node on a random id in the + * range. + * > "Buckets that have not been changed in 15 minutes should be "refreshed." + * > This is done by picking a random ID in the range of the bucket and + * > performing a find_nodes search on it." + * - BEP-5 + * @param {Buffer} rangeId a random node id in the range of the bucket. */ - this.extensions_ = []; // = [bep5, /*bep44*/]; - this.use(bep44); -} -util.inherits(DHT, EventEmitter); + async handleBucketRefresh_(rangeId) { + this.find_node(rangeId); + } -/** - * Instantiate a DHT from a serialized state file. - * @param {string} filename The filename to load from. - */ -DHT.load = function(filename) { - if (filename == undefined || !fs.existsSync(filename)) return new DHT(); - - let state = JSON.parse(fs.readFileSync(filename).toString('utf-8')); - return new DHT({ - K: state.K, - id: Buffer.from(state.id, 'hex'), - bootstrapNodes: [], - nodes: state.nodes - }); -}; + /** + */ + handleRoutingRefresh_() { + this.nodes_.refresh(); + } -/** - */ -DHT.prototype.listen = async function(opt_port, opt_host) { - debug('Starting DHT on port %s', opt_port); - const listen = () => new Promise((resolve, reject) => - this.socket_.bind(opt_port, opt_host, resolve)); - - await listen(); - this.isBound_ = true; - const addr = this.socket_.address(); - debug('DHT listening on %s:%s', addr.address, addr.port); - - // start bootstrapping - this.isBootstrapping_ = true; - await Promise.all(this.bootstrapNodes_.map( - async (peer) => { - debug('Bootstrapping node %s.', peer.address + ':' + peer.port); - await this.ping(peer); + /** + * Dispatches the core queries. + * @param {string} method The query method. + * @param {any} args The arguments. + * @param {NodeInfo} node The sending node. + * @param {function} respond The callback to respond to the query. + */ + handleQuery_(method, args, node, respond) { + this.nodes_.recordQuery(node); + try { + if (method === 'ping') respond( this.handlePing_(args, node) ); + if (method === 'find_node') respond( this.handleFindNode_(args.target, node) ); + if (method === 'get_peers') respond( this.handleGetPeers_(args, node) ); + if (method === 'announce_peer') respond( this.handleAnnouncePeer_(args, node) ); + } catch (e) { + // todo send error + console.error(e); } - )); - // collect nodes near to us to populate our bucket - await this.find_node(this.id); - debug('Bootstrapping done, with %s nodes in routing table', this.nodes_.length); - this.isBootstrapping_ = false; -}; - - -/** - */ -DHT.prototype.dispose = function() { - clearInterval(this.refreshTimer_); - this.rpc_.dispose(); - this.extensions_.forEach((e) => e.dispose()); - this.announcedPeers_.dispose(); - this.nodes_.dispose(); - this.socket_.close(); - - this.announcedPeers_ = null; - this.nodes_ = null; - this.socket_ = null; - this.rpc_ = null; -}; - - -/** - * Save the state of this DHT to a file. - */ -DHT.prototype.save = function(filepath) { - let state = { - K: this.K_, - id: this.id.toString('hex'), - nodes: this.nodes_.getState() - }; - fs.writeFileSync(filepath, JSON.stringify(state)); -}; + } -/** - */ -DHT.prototype.use = function(extension) { - const inst = new extension(this); - inst.provides.forEach((method) => { - Object.defineProperty(this, method, { - value: inst[method].bind(inst) - }); - }); - this.extensions_.push(inst); -}; + /** + * Record a node response to the routing table. + * @param {NodeInfo} node The node that responded. + */ + handleNodeResponse_(node) { + this.nodes_.recordResponse(node); + } -/** - */ -DHT.prototype.closest_ = async function(target, method, args, opt_rescb) { - // use a priority queue to track closest responding, a set and hashing fn to - // track nodes we've already visited - const closest = new PQueue(this.K_); - const seen = new Set(); - const hash = (node) => - `${node.id.toString('hex')}:${node.address}:${node.port}`; - - // the 'promise selector' allows us to simulate a more traditional networking - // api. i.e. loop and 'block' until any outstanding request resolves - const selector = new PromiseSelector(); - selector.add(Promise.resolve({ - node: undefined, - r: { nodes: this.nodes_.closest(target, this.K_) } - })); - - // loop over responses waiting to be processed - // will block waiting for a response or timeout - let res = null; - while (res = await selector.next()) { - if (res.error) continue; - const { node, r } = res; - - if (node) { - const res_v = opt_rescb && opt_rescb(r, node); - if (res_v) return res_v; - - // Add the responder to the bucket of closest - closest.push(distance(target, node.id), node); - } - if (!r.nodes) continue; + /** + * Record a no response to the routing table. + * @param {NodeInfo} node The node that didn't respond. + */ + handleNodeTimeout_(node) { + if (!node.id) return; + debug('Recording timeout for node %s', node.id.toString('hex')); + this.nodes_.recordNoResponse(node); + } - // Candidates are nodes we haven't queried before, and that are closer - // than the furthest known node - let candidates = r.nodes - .filter((p) => !seen.has(hash(p))) - .filter((p) => distance(p.id, target) < closest.max); - for (let p of candidates) { - seen.add(hash(p)); - selector.add(this.rpc_.query(p, method, args)); + /** + * @param {!PeerInfo} peer The peer to ping. + */ + ping(peer) { + let key = `${peer.address}:${peer.port}`; + + if (!(key in this.pendingPings_)) { + this.pendingPings_[key] = this.rpc_.query(peer, 'ping', { 'id': this.id }) + .then((res) => { + delete this.pendingPings_[key]; + return res; + }); } + return this.pendingPings_[key]; } - debug('Closest \'%s\' query returned without a value.', method); - return undefined; -}; - - -/** - */ -DHT.prototype.isBound = function() { - return this.isBound_; -}; - -/** - * @param {NodeInfo} node The node that needs to be pinged. - * @param {function(boolean): void} callback Callback to whether the ping - * succeeded or not. - */ -DHT.prototype.handleBucketPing_ = async function(node, callback) { - const res = await this.ping(node); - callback(!!res.error); -}; - - -/** - * When a bucket needs to be refreshed, we call find_node on a random id in the - * range. - * > "Buckets that have not been changed in 15 minutes should be "refreshed." - * > This is done by picking a random ID in the range of the bucket and - * > performing a find_nodes search on it." - * - BEP-5 - * @param {Buffer} rangeId a random node id in the range of the bucket. - */ -DHT.prototype.handleBucketRefresh_ = async function(rangeId) { - this.find_node(rangeId); -}; - - -/** - */ -DHT.prototype.handleRoutingRefresh_ = function() { - this.nodes_.refresh(); -}; - - -/** - * Dispatches the core queries. - */ -DHT.prototype.handleQuery_ = function(method, args, node, respond) { - this.nodes_.recordQuery(node); - try { - if (method === 'ping') respond( this.handlePing_(args, node) ); - if (method === 'find_node') respond( this.handleFindNode_(args.target, node) ); - if (method === 'get_peers') respond( this.handleGetPeers_(args, node) ); - if (method === 'announce_peer') respond( this.handleAnnouncePeer_(args, node) ); - } catch (e) { - // todo send error - console.error(e); + /** + * @param {{id: Buffer}} args + * @param {NodeInfo} node + */ + handlePing_(args, node) { + return { id: this.id }; } -}; - - -/** - * Record a node response to the routing table. - * @param {NodeInfo} node The node that responded. - */ -DHT.prototype.handleNodeResponse_ = function(node) { - this.nodes_.recordResponse(node); -}; -/** - * Record a no response to the routing table. - * @param {NodeInfo} node The node that didn't respond. - */ -DHT.prototype.handleNodeTimeout_ = function(node) { - if (!node.id) return; - debug('Recording timeout for node %s', node.id.toString('hex')); - this.nodes_.recordNoResponse(node); -}; + /** + * Returns the result of a 'find_node' query, if opt_node is given this returns + * the result of a single query otherwise it performs a recursive lookup + * @param {Buffer} id Find the closest node to id. + * @return {Promise} opt_node Optional node. + */ + find_node(id) { + return this.closest_(id, 'find_node', { + 'id': this.id, + 'target': id + }); + } -/** - * @param {!PeerInfo} peer The peer to ping. - */ -DHT.prototype.ping = function(peer) { - let key = `${peer.address}:${peer.port}`; - - if (!(key in this.pendingPings_)) { - this.pendingPings_[key] = this.rpc_.query(peer, 'ping', { 'id': this.id }) - .then((res) => { - delete this.pendingPings_[key]; - return res; - }); + /** + * @param {any} target + * @param {NodeInfo} node + */ + handleFindNode_(target, node) { + return { + 'id': this.id, + 'nodes': this.nodes_.closest(target, this.K_) + }; } - return this.pendingPings_[key]; -}; - -/** - */ -DHT.prototype.handlePing_ = function(args, node) { - return { id: this.id }; -}; + /** + * Get peers that have announced on the target hash. + * @param {Buffer|string} target The target hash. + * @return {Promise} The announced peers. + */ + async get_peers(target) { + target = (typeof target === 'string') ? Buffer.from(target, 'hex') : target; + debug('Looking up peers for \'%s\'.', target.toString('hex')); + + // maintain a set of peers + /** @type {Object} */ + const peers = {}; + const hash = (/** @type {PeerInfo} */ peer) => + `${peer.address}:${peer.family}:${peer.port}`; + const push_peer = (/** @type {PeerInfo} */peer) => peers[hash(peer)] = peer; + + await this.closest_(target, 'get_peers', { + 'id': this.id, + 'info_hash': target + }, (/** @type {GetPeersResponse} **/ r) => { + if (r.values) { + r.values.forEach((cp) => push_peer(decodeCompactPeerInfo(cp))); + } + }); -/** - * Returns the result of a 'find_node' query, if opt_node is given this returns - * the result of a single query otherwise it performs a recursive lookup - * @param {Buffer} id Find the closest node to id. - * @param {NodeInfo} opt_node Optional node. - */ -DHT.prototype.find_node = function(id) { - return this.closest_(id, 'find_node', { - 'id': this.id, - 'target': id - }); -}; + return Object.values(peers); + } + /** + * Handle an incoming `get_peers` query. + * @param {GetPeersRequest} args The args object that was sent. + * @param {NodeInfo} node The requester. + * @return {GetPeersResponse} The response. + */ + handleGetPeers_(args, node) { + /** @type {GetPeersResponse} */ + const r = { + 'id': this.id, + 'token': this.announcedPeers_.getWriteToken(args.info_hash, node), + + // according to BEP5, if we find peers locally we should *not* return nodes + // but that is stupid, so we are going to send nodes no matter what + 'nodes': this.nodes_.closest(args.info_hash, this.K_) + }; + + const peers = this.announcedPeers_.get(args.info_hash); + if (peers) { + debug('Node %s:%s \'get_peers\' query found local peers for \'%s\'.', + node.address, node.port, args.info_hash.toString('hex')); + + r['values'] = Array.from(peers).map( (p) => Buffer.from(p, 'hex') ); + } else { + debug('Node %s:%s \'get_peers\' query did not find local peers for \'%s\'.', + node.address, node.port, args.info_hash.toString('hex')); + } -/** - */ -DHT.prototype.handleFindNode_ = function(target, node) { - return { - 'id': this.id, - 'nodes': this.nodes_.closest(target, this.K_) - }; -}; + return r; + } -/** - * Get peers that have announced on the target hash. - * @param {Buffer|string} target The target hash. - * @param {PeerInfo} The announced peers. - */ -DHT.prototype.get_peers = async function(target) { - target = (typeof target === 'string') ? Buffer.from(target, 'hex') : target; - debug('Looking up peers for \'%s\'.', target.toString('hex')); - - // maintain a set of peers - const peers = {}; - const hash = (peer) => - `${peer.address}:${peer.family}:${peer.port}`; - const push_peer = (peer) => peers[hash(peer)] = peer; - - await this.closest_(target, 'get_peers', { - 'id': this.id, - 'info_hash': target - }, (r) => { - if (r.values) { - r.values.forEach((cp) => push_peer(decodeCompactPeerInfo(cp))); - } - }); + /** + * Announce to the DHT. + * @param {Buffer|string} target The target hash that we are announcing on. + * @param {number=} opt_port The port to announce, if this is not supplied + * the port will be implied. + */ + async announce_peer(target, opt_port) { + const targetID = (typeof target === 'string') ? Buffer.from(target, 'hex') : target; + + debug('Announcing \'%s\'%s.', targetID.toString('hex'), + opt_port ? (' on port ' + opt_port) : ''); + + // create a bucket to store nodes with write tokens + const writeable = new PQueue(this.K_); + await this.closest_(targetID, 'get_peers', { + 'id': this.id, + 'info_hash': targetID + }, (r, node) => { + if (node.token) writeable.push(distance(targetID, node.id), node); + }); - return Object.values(peers); -} + // write to the K closest + const closest = writeable.items(); + await this.rpc_.query(closest, 'announce_peer', { + 'id': this.id, + 'info_hash': targetID, + 'implied_port': opt_port === undefined ? 1 : 0, + 'port': opt_port || this.socket_.address().port, + 'token': (/** @type {NodeInfo} */ node) => node.token + }); -/** - * Handle an incoming `get_peers` query. - * @param {GetPeersRequest} args The args object that was sent. - * @param {NodeInfo} node The requester. - * @return {GetPeersResponse} The response. - */ -DHT.prototype.handleGetPeers_ = function(args, node) { - const r = { - 'id': this.id, - 'token': this.announcedPeers_.getWriteToken(args.info_hash, node), - - // according to BEP5, if we find peers locally we should *not* return nodes - // but that is stupid, so we are going to send nodes no matter what - 'nodes': this.nodes_.closest(args.info_hash, this.K_) - }; - - const peers = this.announcedPeers_.get(args.info_hash); - if (peers) { - debug('Node %s:%s \'get_peers\' query found local peers for \'%s\'.', - node.address, node.port, args.info_hash.toString('hex')); - - r['values'] = Array.from(peers).map( (p) => Buffer.from(p, 'hex') ); - } else { - debug('Node %s:%s \'get_peers\' query did not find local peers for \'%s\'.', - node.address, node.port, args.info_hash.toString('hex')); + return targetID; } - return r; -}; - - -/** - * Announce to the DHT. - * @param {Buffer|string} target The target hash that we are announcing on. - * @param {=number} opt_port The port to announce, if this is not supplied - * the port will be implied. - */ -DHT.prototype.announce_peer = async function(target, opt_port) { - target = (typeof target === 'string') ? Buffer.from(target, 'hex') : target; - debug('Announcing \'%s\'%s.', target.toString('hex'), - opt_port ? (' on port ' + opt_port) : ''); - - // create a bucket to store nodes with write tokens - const writeable = new PQueue(this.K_); - await this.closest_(target, 'get_peers', { - 'id': this.id, - 'info_hash': target - }, (r, node) => { - if (node.token) writeable.push(distance(target, node.id), node); - }); - - // write to the K closest - const closest = writeable.items(); - await this.rpc_.query(closest, 'announce_peer', { - 'id': this.id, - 'info_hash': target, - 'implied_port': opt_port === undefined ? 1 : 0, - 'port': opt_port || this.socket_.address().port, - 'token': (node) => node.token - }); - - return target; -}; + /** + * Handle the 'announce_peer' query. + * @param {!AnnouncePeerRequest} args The request args. + * @param {!NodeInfo} node The requester. + * @return {!AnnoucePeerResponse} The response. + */ + handleAnnouncePeer_(args, node) { + const target = args.info_hash; + + // verify that the token owner is the requesting node first, since it is + // much cheaper to check that before verifying signatures or hashes. + if (!this.announcedPeers_.verifyToken(args.token, target, node)) { + debug('Node %s:%s \'announce_peer\' to \'%s\' failed with a bad token.', + node.address, node.port, target.toString('hex')); + throw new Error('Bad token'); + } -/** - * Handle the 'announce_peer' query. - * @params {!AnnouncePeerRequest} args The request args. - * @params {!NodeInfo} node The requester. - * @return {!AnnoucePeerResponse} The response. - */ -DHT.prototype.handleAnnouncePeer_ = function(args, node) { - const target = args.info_hash; + let peers = this.announcedPeers_.get(target); + if (!peers) { + peers = new Set(); // we use a set to ensure we aren't duplicating + this.announcedPeers_.set(target, peers, node, args.token); + } - // verify that the token owner is the requesting node first, since it is - // much cheaper to check that before verifying signatures or hashes. - if (!this.announcedPeers_.verifyToken(args.token, target, node)) { - debug('Node %s:%s \'announce_peer\' to \'%s\' failed with a bad token.', + debug('Node %s:%s announced to \'%s\'.', node.address, node.port, target.toString('hex')); - throw new Error('Bad token'); + peers.add(encodeCompactNodeInfo({ + address: node.address, + family: 'ipv4', + port: args.implied_port ? node.port : args.port + }).toString('hex')); + return { 'id': this.id }; } - let peers = this.announcedPeers_.get(target); - if (!peers) { - peers = new Set(); // we use a set to ensure we aren't duplicating - this.announcedPeers_.set(target, peers, node, args.token); - } - debug('Node %s:%s announced to \'%s\'.', - node.address, node.port, target.toString('hex')); - const peer = { - address: node.address, - family: 'ipv4', - port: args.implied_port ? node.port : args.port - }; - peers.add(encodeCompactNodeInfo(peer).toString('hex')); - return { 'id': this.id }; -}; + /** + * @param {Buffer} id The target node or key id. + * @param {number=} opt_n How many to return, i.e. n-closest. + * @return {NodeInfo[]} The n closest nodes + */ + closestNodes(id, opt_n) { + return this.nodes_.closest(id, opt_n); + } +} /** diff --git a/src/krpc.js b/src/krpc.js index d30be00..83eaafd 100644 --- a/src/krpc.js +++ b/src/krpc.js @@ -1,5 +1,3 @@ -import util from 'util'; - import { EventEmitter } from 'events'; import bencode from 'bencode'; import crypto from 'crypto'; @@ -8,6 +6,10 @@ import debugLogger from 'debug'; const debug = debugLogger('dht:rpc'); +/** + * @typedef {import('dgram').Socket} UDPSocket + */ + /** * Implements the KRPC Protocol, as defined in [BEP 0005]. @@ -17,232 +19,255 @@ const debug = debugLogger('dht:rpc'); * dictionaries sent over UDP. A single query packet is sent out and a single * packet is sent in response. There is no retry. * There are three message types: query, response, and error. - * - * @constructor - * @param {import('dgram').Socket} socket The socket to run krpc over. - * @param {any=} opt_options - * @extends {EventEmitter} */ -export function KRPCSocket(socket, opt_options) { - EventEmitter.call(this); - opt_options = opt_options || {}; +export class KRPCSocket extends EventEmitter { /** - * Timeout for query responses in ms - * @define {number} + * @param {UDPSocket} socket The socket to run krpc over. + * @param {any=} opt_options */ - this.RESPONSE_TIMEOUT_ = opt_options['timeout'] === undefined ? - 2000 : opt_options['timeout']; - - /** - * Outstanding queries' resolve functions - * @type {!Object.} - * @private - */ - this.outstandingTransactions_ = {}; + constructor(socket, opt_options) { + super(); + opt_options = opt_options || {}; + + /** + * Timeout for query responses in ms + * @type {number} + * @private + */ + this.RESPONSE_TIMEOUT_ = opt_options['timeout'] === undefined ? + 2000 : opt_options['timeout']; + + /** + * Outstanding queries' resolve functions + * @type {!Object.} + * @private + */ + this.outstandingTransactions_ = {}; + + /** + * @type {UDPSocket} + * @private + */ + this.socket_ = socket; + + this.boundHandleMessage_ = this.handleMessage_.bind(this); + this.boundHandleError = this.handleError_.bind(this); + this.socket_.addListener('message', this.boundHandleMessage_); + this.socket_.addListener('error', this.boundHandleError); + } /** - * @type {import('dgram').Socket} - * @private + * Dispose of the current socket and listeners. */ - this.socket_ = socket; - - this.boundHandleMessage_ = this.handleMessage_.bind(this); - this.boundHandleError = this.handleError_.bind(this); - this.socket_.addListener('message', this.boundHandleMessage_); - this.socket_.addListener('error', this.boundHandleError); -}; -util.inherits(KRPCSocket, EventEmitter); - - -KRPCSocket.prototype.dispose = function() { - this.removeAllListeners(); - this.socket_.removeListener('message', this.boundHandleMessage_); - this.socket_.removeListener('error', this.boundHandleError); - this.socket_ = null; - - // clear outstanding transactions - for (let [_, reject, timeout] of Object.values(this.outstandingTransactions_)) { - clearTimeout(timeout); - reject('Socket is disposing'); + dispose() { + this.removeAllListeners(); + this.socket_.removeListener('message', this.boundHandleMessage_); + this.socket_.removeListener('error', this.boundHandleError); + this.socket_.unref(); + + // clear outstanding transactions + for (let [_, reject, timeout] of Object.values(this.outstandingTransactions_)) { + clearTimeout(timeout); + reject('Socket is disposing'); + } + this.outstandingTransactions_ = {}; } - this.outstandingTransactions_ = {}; -}; -/** - * - */ -KRPCSocket.prototype.query = function(peer, method, opt_args) { - // Accept and map multiple peers - if (Array.isArray(peer)) { - return Promise.all(peer.map((p) => this.query(p, method, opt_args))); - } + /** + * @param {PeerInfo|Array.} peer + * @param {string} method + * @param {KRPCQueryArgument=} opt_args + * @return {Promise.} The response from the peer to the query + */ + query(peer, method, opt_args) { + opt_args = opt_args || {}; - // Copy the args object, since we allow functions to provide values - const args = {}; - for (let arg in (opt_args || {})) { - if (typeof opt_args[arg] === 'function') { - args[arg] = opt_args[arg](peer, method, opt_args); - } else { - args[arg] = opt_args[arg]; + // Accept and map multiple peers + if (Array.isArray(peer)) { + return Promise.all(peer.map((p) => this.query(p, method, opt_args))); } - } - return this.transact_(peer, (tid) => { - const buf = bencode.encode({ - 't': tid, // transaction id - 'y': 'q', // message type, 'q' is 'query' - 'q': method, // method name of the query - 'a': args, // named arguments to the query - // 'v': '' BEP 0005 specifies we include this, but ... - }); - - // send the request - const tstr = tid.toString('hex'); - debug('Sending \'%s\' query [%s] to %s', method, tstr, peer.address + ':' + peer.port); - this.socket_.send(buf, 0, buf.length, peer.port, peer.address); - }).catch((err) => { - return { error: err }; - }) -}; + // Copy the args object, since we allow functions to provide values + /** @type {{[k: string]: any}} */ + const args = {}; + for (let arg in opt_args) { + if (typeof opt_args[arg] === 'function') { + args[arg] = opt_args[arg](peer, method, opt_args); + } else { + args[arg] = opt_args[arg]; + } + } + return this.transact_(peer, (tid) => { + const buf = bencode.encode({ + 't': tid, // transaction id + 'y': 'q', // message type, 'q' is 'query' + 'q': method, // method name of the query + 'a': args, // named arguments to the query + // 'v': '' BEP 0005 specifies we include this, but ... + }); -/** - */ -KRPCSocket.prototype.handleMessage_ = function(msg, rinfo) { - let bmsg, tid, msgtype; - try { - bmsg = bencode.decode(msg); - tid = bmsg.t.toString('hex') - msgtype = bmsg.y.toString('utf8'); - } catch(e) { - debug('Unrecognized socket message', msg.toString(), e.message); - return; + // send the request + const tstr = tid.toString('hex'); + debug('Sending \'%s\' query [%s] to %s', method, tstr, peer.address + ':' + peer.port); + this.socket_.send(buf, 0, buf.length, peer.port, peer.address); + }).catch((err) => { + return { error: err }; + }) } - // if the incoming message is a reply we handle things differently - if (msgtype === 'r' || msgtype === 'e') { - if (!(tid in this.outstandingTransactions_)) { - debug('Unexpected transaction id %s.', tid); + + /** + * @param {Buffer} msg + * @param {PeerInfo} rinfo + */ + handleMessage_(msg, rinfo) { + + let /** @type {any} */ bmsg, /** @type {string} */ tid, /** @type {string} */ msgtype; + try { + bmsg = bencode.decode(msg); + tid = bmsg.t.toString('hex') + msgtype = bmsg.y.toString('utf8'); + } catch(e) { + const message = (e && typeof e == 'object' && 'message' in e) ? e.message : ''; + debug('Unrecognized socket message', msg.toString(), message); return; } - const [resolve, reject] = this.outstandingTransactions_[tid]; - if (msgtype === 'r') { - debug('Received response for transaction: %s.', tid); - const r = bmsg.r; - if (r.nodes) { - r.nodes = decodeRecievedNodes(r.nodes); + // if the incoming message is a reply we handle things differently + if (msgtype === 'r' || msgtype === 'e') { + if (!(tid in this.outstandingTransactions_)) { + debug('Unexpected transaction id %s.', tid); + return; } - let node = makeNodeInfo(r.id, rinfo, r.token); - this.emit('response', node, bmsg.r); - resolve({ - node: node, - r: bmsg.r + const [resolve, reject] = this.outstandingTransactions_[tid]; + if (msgtype === 'r') { + debug('Received response for transaction: %s.', tid); + const r = bmsg.r; + if (r.nodes) { + r.nodes = decodeRecievedNodes(r.nodes); + } + + let node = makeNodeInfo(r.id, rinfo, r.token); + this.emit('response', node, bmsg.r); + resolve({ + node: node, + r: bmsg.r + }); + } else { + const [code, desc] = bmsg.e; + debug('Error response for transaction %s: %s %s', tid, code, desc); + const err = new KRPCError(code, desc.toString(), rinfo); + + reject(err); + // this.emit('error', err); + } + } else if (msgtype === 'q') { + const method = bmsg.q.toString(); + debug('Received incoming query with method \'%s\' from %s:%s', + method, rinfo.address, rinfo.port); + + const node = makeNodeInfo(bmsg.a.id, rinfo); + this.emit('query', method, bmsg.a, node, (/** @type {any} */ r) => { + // todo clean this up + if (r.nodes) { + r.nodes = encodeCompactNodeSet(r.nodes); + } + + const buf = bencode.encode({ + 't': bmsg.t, // transaction id + 'y': 'r', // message type, 'r' is 'response' + 'r': r // the response + }); + + // send the response + debug('Sending response to \'%s\' query [%s] to %s', method, tid, + rinfo.address + ':' + rinfo.port); + this.socket_.send(buf, 0, buf.length, rinfo.port, rinfo.address); }); } else { - const [code, desc] = bmsg.e; - debug('Error response for transaction %s: %s %s', tid, code, desc); - const err = new KRPCError(code, desc.toString(), rinfo); - - reject(err); - // this.emit('error', err); + debug('Unexpected krpc message type \'%s\'.', msgtype); } - } else if (msgtype === 'q') { - const method = bmsg.q.toString(); - debug('Received incoming query with method \'%s\' from %s:%s', - method, rinfo.address, rinfo.port); - - const node = makeNodeInfo(bmsg.a.id, rinfo); - this.emit('query', method, bmsg.a, node, (r) => { - // todo clean this up - if (r.nodes) { - r.nodes = encodeCompactNodeSet(r.nodes); - } - - const buf = bencode.encode({ - 't': bmsg.t, // transaction id - 'y': 'r', // message type, 'r' is 'response' - 'r': r // the response - }); - - // send the response - debug('Sending response to \'%s\' query [%s] to %s', method, tid, - rinfo.address + ':' + rinfo.port); - this.socket_.send(buf, 0, buf.length, rinfo.port, rinfo.address); - }); - } else { - debug('Unexpected krpc message type \'%s\'.', msgtype); } -}; - -/** - */ -KRPCSocket.prototype.handleError_ = function(err) { - console.error(err); -}; + /** + * @param {Error} err The error to handle + */ + handleError_(err) { + console.error(err); + } -/** - * Handles the transaction logic, since we want to present a nice promise based - * send -> response API, we need to do some special wrapping. - * @param {!function(number)} inner The inner function that will be transacted. - * @return {!Promise.} The transaction resolution. - */ -KRPCSocket.prototype.transact_ = function(node, inner) { - // Return the promise that will resolve on a response with - // the correct transaction id. - return new Promise((resolve, reject) => { - // Generate a random transaction id for responses - let tid; - do { - tid = crypto.randomBytes(4); - } while (tid.toString('hex') in this.outstandingTransactions_); - let tstr = tid.toString('hex'); - - let timeout = null; - const cleanUp = () => { - delete this.outstandingTransactions_[tstr]; - clearTimeout(timeout); - }; - - // set the timeout - if (this.RESPONSE_TIMEOUT_ !== 0) { - timeout = setTimeout(() => { - this.emit('timeout', node); - - cleanUp(); - debug('Timeout exceeded for transaction: ' + tstr); - reject(new Error('Timeout exceeded for transaction: ' + tstr)); - }, this.RESPONSE_TIMEOUT_); - } - // Register the transaction - this.outstandingTransactions_[tstr] = [ - (res) => { cleanUp(); resolve(res); }, - (err) => { cleanUp(); reject(err); }, - timeout - ]; + /** + * Handles the transaction logic, since we want to present a nice promise based + * send -> response API, we need to do some special wrapping. + * @param {!PeerInfo} peer + * @param {!function(Buffer):void} inner The inner function that will be transacted. + * @return {!Promise.} The transaction resolution. + */ + transact_(peer, inner) { + // Return the promise that will resolve on a response with + // the correct transaction id. + return new Promise((resolve, reject) => { + // Generate a random transaction id for responses + let tid; + do { + tid = crypto.randomBytes(4); + } while (tid.toString('hex') in this.outstandingTransactions_); + let tstr = tid.toString('hex'); + + /** @type {NodeJS.Timeout=} */ + let timeout = undefined; + const cleanUp = () => { + delete this.outstandingTransactions_[tstr]; + if (timeout) clearTimeout(timeout); + }; + + // set the timeout + if (this.RESPONSE_TIMEOUT_ !== 0) { + timeout = setTimeout(() => { + this.emit('timeout', peer); + + cleanUp(); + debug('Timeout exceeded for transaction: ' + tstr); + reject(new Error('Timeout exceeded for transaction: ' + tstr)); + }, this.RESPONSE_TIMEOUT_); + } - // call the inner fn - inner(tid); - }); -}; + // Register the transaction + this.outstandingTransactions_[tstr] = [ + (/** @type {any} */ res) => { cleanUp(); resolve(res); }, + (/** @type {any} */ err) => { cleanUp(); reject(err); }, + timeout + ]; + // call the inner fn + inner(tid); + }); + } +} /** */ -function KRPCError(code, description, peer) { - this.code = code; - this.description = description; - this.peer = peer; -}; -util.inherits(KRPCError, Error); - +class KRPCError extends Error { + /** + * @param {number} code + * @param {string} description + * @param {PeerInfo} peer + */ + constructor(code, description, peer) { + super(`[${code}]: ${description}`); + + this.code = code; + this.description = description; + this.peer = peer; + } +} /** @@ -257,11 +282,12 @@ function decodeCompactNodeInfo(buf) { buf.readUInt8(22) + '.' + buf.readUInt8(23); const port = buf.readUInt16BE(24); - return { id: id, address: ip, port: port }; + return { id: id, address: ip, port: port, family: 'ipv4' }; // todo support ipv6 }; /** + * @param {Buffer} buffer */ function decodeRecievedNodes(buffer) { const len = buffer.length / 26; @@ -312,6 +338,7 @@ function encodeCompactNodeInfo(node) { /** + * @param {Array.} nodes */ function encodeCompactNodeSet(nodes) { const buf = Buffer.alloc(nodes.length * 26); diff --git a/src/routing.js b/src/routing.js index 0320e9e..f3d7b6d 100644 --- a/src/routing.js +++ b/src/routing.js @@ -18,7 +18,7 @@ class Node { this.id = (typeof node.id === 'string') ? Buffer.from(node.id, 'hex') : node.id; - /** @type {AddressInfo} */ + /** @type {PeerInfo} */ this.address = { address: node.address, port: node.port, family: node.family }; this.token = node.token; @@ -168,6 +168,7 @@ export class RoutingTable extends EventEmitter { /** * @param {Buffer} id The id to search for. + * @return {NodeInfo[]} The node info */ closest(id, n=10) { id = (typeof id === 'string') ? Buffer.from(id, 'hex') : id; @@ -262,7 +263,7 @@ export class RoutingTable extends EventEmitter { /** * @param {Node} node The node to insert. - * @param {boolean} relaxed, whether to use relaxed mode... todo + * @param {boolean=} relaxed, whether to use relaxed mode... todo */ async _insertNode(node, relaxed) { // todo protect against rogue actor flooding ids using ip address checks @@ -285,7 +286,7 @@ export class RoutingTable extends EventEmitter { // split the bucket if it contains the localId: if (bucket.min <= this.localId && this.localId < bucket.max) { - if (this._split(bucket, relaxed)) { + if (this._split(bucket, !!relaxed)) { if (bucket.left === null || bucket.right === null) throw new Error('Bucket could not be split.'); bucket = (node.id < bucket.left.max) ? bucket.left : bucket.right; diff --git a/src/security.js b/src/security.js index 29001e0..80afe54 100644 --- a/src/security.js +++ b/src/security.js @@ -6,16 +6,16 @@ import crc32c from "sse4_crc32"; * Calculate the secure node id given the external ip address. * See: http://www.bittorrent.org/beps/bep_0042.html * @param {!string} ipaddr The ip address to compute the node ip for. - * @param {=number} opt_rand Optionally specify the random parameter. + * @param {number=} opt_rand Optionally specify the random parameter. * @return {!Buffer} The secure node id. */ export function computeSecureNodeId(ipaddr, opt_rand) { - const rand = () => parseInt(Math.random() * 255, 10); + const rand = () => Math.floor(Math.random() * 255); // todo support ipv6 const id = Buffer.alloc(20); const buf = Buffer.alloc(4); - const ip = Buffer.from(ipaddr.split('.')).readUInt32BE(); + const ip = ip2int(ipaddr); const r = opt_rand === undefined ? rand() : opt_rand; buf.writeUInt32BE(((ip & 0x030f3fff) | (r << 29)) >>> 0); @@ -32,3 +32,12 @@ export function computeSecureNodeId(ipaddr, opt_rand) { return id; }; + +/** + * Convert a string representation of an IP address to the uint32. + * @param {string} ip The IP address to convert. + * @return {number} The resulting UInt32. + */ +function ip2int(ip) { + return ip.split('.').reduce(function(ipInt, octet) { return (ipInt<<8) + parseInt(octet, 10)}, 0) >>> 0; +} diff --git a/src/storage.js b/src/storage.js index 1840c02..c3ca438 100644 --- a/src/storage.js +++ b/src/storage.js @@ -1,315 +1,335 @@ import ed25519 from 'ed25519-supercop'; import bencode from 'bencode'; -import TokenStore from '#root/src/token-store'; -import { distance } from '#root/src/routing'; -import { sha1, PQueue } from '#root/src/util'; +import TokenStore from './token-store.js'; +import { distance } from './routing.js'; +import { sha1, PQueue } from './util.js'; import debugLogger from 'debug'; const debug = debugLogger('dht:storage'); +/** + * @typedef {{ k?: Buffer|string, salt?: Buffer|string }} DHTGetOptions + * @typedef {{ id?: Buffer, token?: Buffer|((n: NodeInfo)=>Buffer|undefined), v?: Buffer, k?: Buffer, sig?: Buffer, seq?: number, salt?: Buffer }} DHTStoreRecord + * @typedef {function(function(DHTStoreRecord, Buffer): void, DHTStoreRecord=): DHTStoreRecord} DHTSignatureCallback + * @typedef { { id: Buffer, target: Buffer } } GetRequest + * @typedef {any} GetResponse todo + * @typedef {{ + * id: Buffer, k: Buffer, salt?: Buffer, sig: Buffer, + * cas?: number, seq?: number, + * token: Buffer, + * v: Buffer + * }} PutRequest + * @typedef {any} PutResponse todo + */ + /** * Storage Extension for the DHT. * Implements [BEP-44](http://www.bittorrent.org/beps/bep_0044.html) - * @constructor - * @param {IDHT} dht The DHT instance that this extension is extending */ -export default function DHTStorage(dht) { - this.provides = ['get', 'put']; +export default class DHTStorage { /** - */ - this.dht_ = dht; + * @param {IDHT} dht The DHT instance that this extension is extending + */ + constructor(dht) { + this.provides = ['get', 'put']; + + /** + * @private + */ + this.dht_ = dht; + + /** + * @private + */ + this.rpc_ = dht.rpc_; + this.rpc_.on('query', this.handleQuery_.bind(this)); + + /** + * @type {!TokenStore} + * @private + */ + this.store_ = new TokenStore(); + } - /** - */ - this.rpc_ = dht.rpc_; - this.rpc_.on('query', this.handleQuery_.bind(this)); /** - * @type {!TokenStore} - * @private + * Dispose this object. */ - this.store_ = new TokenStore(); -}; - - -/** - * Dispose this object. - */ -DHTStorage.prototype.dispose = function() { - this.store_.dispose(); -}; - - -/** - * @typedef {{ k?: Buffer|string, salt?: Buffer|string }} DHTGetOptions - * @typedef {{ v?: Buffer, k?: Buffer, sig?: Buffer, seq?: number, salt?: Buffer }} DHTStoreRecord - */ - -/** - * Lookup a value on the DHT. - * @param {Buffer|string|DHTGetOptions} args Either the SHA1 target - * hash of the value or the options dictionary. - */ -DHTStorage.prototype.get = async function(args) { - // decode the arguments - let opts = Buffer.isBuffer(args) || typeof args === 'string' ? null : args; - let salt = !(opts && 'salt' in opts) ? undefined : - (typeof opts.salt === 'string' ? Buffer.from(opts.salt) : opts.salt); - let k = !(opts && 'k' in opts) ? undefined : - (typeof opts.k === 'string' ? Buffer.from(opts.k, 'hex') : opts.k); - /** @type {Buffer} */ - let target; - if (Buffer.isBuffer(args)) { - target = args; - } else if (typeof args === 'string') { - target = Buffer.from(args, 'hex'); - } else { - if (!k) throw new Error('A target key _must_ be provided.'); - target = sha1(salt ? Buffer.concat([k, salt]) : k); + dispose() { + this.store_.dispose(); } - // first check if we have it locally - const stored = this.store_.get(target); - if (stored) { - debug('Value for \'%s\' found locally.', target.toString('hex')); - return stored; - } - // todo opts.seq can be sent in the query - debug('Asking network for value for \'%s\'.', target.toString('hex')); - const res = await this.dht_.closest_(target, 'get', { - 'target': target, - 'id': this.dht_.id - }, GetResponseValidator(target, salt)); - - if (res) { - debug('Found %s value for \'%s\'.', - res.sig ? 'mutable' : 'immutable', target.toString('hex')); - } else { - debug('No value found for \'%s\'.', target.toString('hex')); - } - return res; -}; + /** + * Lookup a value on the DHT. + * @param {Buffer|string|DHTGetOptions} args Either the SHA1 target + * hash of the value or the options dictionary. + */ + async get(args) { + // decode the arguments + let opts = Buffer.isBuffer(args) || typeof args === 'string' ? null : args; + let salt = !(opts && 'salt' in opts) ? undefined : + (typeof opts.salt === 'string' ? Buffer.from(opts.salt) : opts.salt); + let k = !(opts && 'k' in opts) ? undefined : + (typeof opts.k === 'string' ? Buffer.from(opts.k, 'hex') : opts.k); + /** @type {Buffer} */ + let target; + if (Buffer.isBuffer(args)) { + target = args; + } else if (typeof args === 'string') { + target = Buffer.from(args, 'hex'); + } else { + if (!k) throw new Error('A target key _must_ be provided.'); + target = sha1(salt ? Buffer.concat([k, salt]) : k); + } + // first check if we have it locally + const stored = this.store_.get(target); + if (stored) { + debug('Value for \'%s\' found locally.', target.toString('hex')); + return stored; + } -/** - * @typedef {function(function(DHTStoreRecord, Buffer): void, DHTStoreRecord=): DHTStoreRecord} DHTSignatureCallback + // todo opts.seq can be sent in the query + debug('Asking network for value for \'%s\'.', target.toString('hex')); + const res = await this.dht_.closest_(target, 'get', { + 'target': target, + 'id': this.dht_.id + }, GetResponseValidator(target, salt)); -/** - * @param {Buffer|string} key_or_v Either the value (if immutable) or public key - * if mutable. - * @param {(DHTSignatureCallback|string|Buffer)=} opt_salt Optional salt for - * mutable puts, or if no salt the signature callback. - * @param {DHTSignatureCallback=} cb If using salt, the signature callback. - */ -DHTStorage.prototype.put = async function(key_or_v, opt_salt, cb) { - // if immutable - if (opt_salt === undefined) return this.putImmutable_(key_or_v); - if (typeof key_or_v === 'string') throw new Error('Invalid public key'); - if (typeof opt_salt === 'function') { - cb = opt_salt; - opt_salt = undefined; - } - if (typeof opt_salt === 'string') { - opt_salt = Buffer.from(opt_salt); + if (res) { + debug('Found %s value for \'%s\'.', + res.sig ? 'mutable' : 'immutable', target.toString('hex')); + } else { + debug('No value found for \'%s\'.', target.toString('hex')); + } + return res; } - if (opt_salt && opt_salt.length > 64) throw new Error('Salt must be less than 64 bytes.'); - if (key_or_v.length !== 32) throw new Error('ed25519 public key must be 32 bytes.'); - if (!cb) throw new Error('A signing function must be provided'); - - const target = sha1(opt_salt ? - Buffer.concat([key_or_v, opt_salt]) : key_or_v); - debug('Writing mutable data for key: \'%s%s\'.', - key_or_v.toString('hex'), opt_salt ? '::' + opt_salt.toString('hex') : ''); - // prepare write tokens - /** @type {DHTStoreRecord} */ - let prev = { - k: key_or_v, - v: undefined, - sig: undefined, - seq: 0 - }; - if (opt_salt) prev.salt = opt_salt; - - // create a bucket to store nodes with write tokens - const writeable = new PQueue(this.dht_.K_); - await this.dht_.closest_(target, 'get', { - 'target': target, - 'id': this.dht_.id - }, (r, node) => { - if (node.token) { - debug('found writable node', node.address); - writeable.push(distance(target, node.id), node); + /** + * @param {Buffer|string} key_or_v Either the value (if immutable) or public key + * if mutable. + * @param {(DHTSignatureCallback|string|Buffer)=} opt_salt Optional salt for + * mutable puts, or if no salt the signature callback. + * @param {DHTSignatureCallback=} cb If using salt, the signature callback. + */ + async put(key_or_v, opt_salt, cb) { + // if immutable + if (opt_salt === undefined) return this.putImmutable_(key_or_v); + if (typeof key_or_v === 'string') throw new Error('Invalid public key'); + if (typeof opt_salt === 'function') { + cb = opt_salt; + opt_salt = undefined; } - if (r.v) { - // todo exit if the seq is higher than what we are trying to put - // todo create a list of nodes that have old data we need to update - // console.log('------', r); + if (typeof opt_salt === 'string') { + opt_salt = Buffer.from(opt_salt); } - }); - // call for value - const signed = cb((r, secretKey) => { - r.id = this.dht_.id; - - r.k = r.k || prev.k; - r.v = r.v || prev.v; - r.v = typeof r.v === 'string' ? Buffer.from(r.v) : r.v; - r.seq = r.seq || prev.seq; - if (r.salt || prev.salt) r.salt = r.salt || prev.salt; + if (opt_salt && opt_salt.length > 64) throw new Error('Salt must be less than 64 bytes.'); + if (key_or_v.length !== 32) throw new Error('ed25519 public key must be 32 bytes.'); + if (!cb) throw new Error('A signing function must be provided'); + + const target = sha1(opt_salt ? + Buffer.concat([key_or_v, opt_salt]) : key_or_v); + debug('Writing mutable data for key: \'%s%s\'.', + key_or_v.toString('hex'), opt_salt ? '::' + opt_salt.toString('hex') : ''); + + // prepare write tokens + /** @type {DHTStoreRecord} */ + let prev = { + k: key_or_v, + v: undefined, + sig: undefined, + seq: 0 + }; + if (opt_salt) prev.salt = opt_salt; + + // create a bucket to store nodes with write tokens + const writeable = new PQueue(this.dht_.K_); + await this.dht_.closest_(target, 'get', { + 'target': target, + 'id': this.dht_.id + }, (r, node) => { + if (node.token) { + debug('found writable node', node.address); + writeable.push(distance(target, node.id), node); + } + if (r.v) { + // todo exit if the seq is higher than what we are trying to put + // todo create a list of nodes that have old data we need to update + // console.log('------', r); + } + }); - r.sig = ed25519.sign(encodeSigData(r), key_or_v, secretKey); - return r; - }, prev); - signed.token = (node) => node.token; + // call for value + const signed = cb((r, secretKey) => { + r.id = this.dht_.id; - if (signed.v.length > 1000) throw new Error('v must be less than 1000 bytes'); + r.k = r.k || prev.k; + r.v = r.v || prev.v; + r.v = typeof r.v === 'string' ? Buffer.from(r.v) : r.v; + r.seq = r.seq || prev.seq; + if (r.salt || prev.salt) r.salt = r.salt || prev.salt; - // write to the K closest - const closest = writeable.items(); - await this.rpc_.query(closest, 'put', signed); - return target; -}; + r.sig = ed25519.sign(encodeSigData(r), key_or_v, secretKey); + return r; + }, prev); + signed.token = (node) => node.token; + if (!signed.v || signed.v.length > 1000) throw new Error('v must be less than 1000 bytes'); -/** - */ -DHTStorage.prototype.putImmutable_ = async function(data) { - const v = (typeof data === 'string') ? - Buffer.from(data) : data; - const target = sha1(bencode.encode(v)); - - if (v.length > 1000) throw new Error('v must be less than 1000 bytes'); - debug('Writing immutable data as \'%s\'.', target.toString('hex')); - - // create a bucket to store nodes with write tokens - const writeable = new PQueue(this.dht_.K_); - await this.dht_.closest_(target, 'get', { - 'target': target, - 'id': this.dht_.id - }, (r, node) => { - if (node.token) writeable.push(distance(target, node.id), node); - }); - - // write to the K closest - const closest = writeable.items(); - await this.rpc_.query(closest, 'put', { - 'id': this.dht_.id, - 'v': v, - 'token': (node) => node.token - }); - - return target; -}; + // write to the K closest + const closest = writeable.items(); + await this.rpc_.query(closest, 'put', signed); + return target; + } -/** - */ -DHTStorage.prototype.handleQuery_ = function(method, args, node, respond) { - try { - if (method === 'get') respond( this.handleGetQuery_(args, node) ); - if (method === 'put') respond( this.handlePutQuery_(args, node) ); - } catch (e) { - // todo figure out how to respond with an error - console.error('todo: implement error response', e); + /** + * Put immutable data to the DHT. + * @param {string|Buffer} data The immutable data to put. + * @return {Promise.} The sha key the data was stored at. + */ + async putImmutable_(data) { + const v = (typeof data === 'string') ? + Buffer.from(data) : data; + const target = sha1(bencode.encode(v)); + + if (v.length > 1000) throw new Error('v must be less than 1000 bytes'); + debug('Writing immutable data as \'%s\'.', target.toString('hex')); + + // create a bucket to store nodes with write tokens + const writeable = new PQueue(this.dht_.K_); + await this.dht_.closest_(target, 'get', { + 'target': target, + 'id': this.dht_.id + }, (r, node) => { + if (node.token) writeable.push(distance(target, node.id), node); + }); + + // write to the K closest + const closest = writeable.items(); + await this.rpc_.query(closest, 'put', { + 'id': this.dht_.id, + 'v': v, + 'token': (/** @type {NodeInfo} */ node) => node.token + }); + + return target; } -}; -/** - * Handle a get query. - * @param {GetRequest} args The get query args. - * @param {NodeInfo} node The requesting node. - * @return {GetResponse} The response. - * @private - */ -DHTStorage.prototype.handleGetQuery_ = function(args, node) { - const stored = this.store_.get(args.target); - const nodes = this.dht_.nodes_.closest(args.target, this.dht_.K_); - - if (stored) { - debug('Node %s:s \'get\' query found local value for \'%s\'.', - node.address, node.port, args.target.toString('hex')); - // todo + /** + * @param {string} method The query method we are handling. + * @param {any} args The arguments. + * @param {NodeInfo} node The sending node + * @param {function(any):void} respond + */ + handleQuery_(method, args, node, respond) { + try { + if (method === 'get') respond( this.handleGetQuery_(args, node) ); + if (method === 'put') respond( this.handlePutQuery_(args, node) ); + } catch (e) { + // todo figure out how to respond with an error + console.error('todo: implement error response', e); + } } - return Object.assign({ - id: this.dht_.id, - token: this.store_.getWriteToken(args.target, node), - nodes: nodes - }, stored || {}); -}; + /** + * Handle a get query. + * @param {GetRequest} args The get query args. + * @param {NodeInfo} node The requesting node. + * @return {GetResponse} The response. + * @private + */ + handleGetQuery_(args, node) { + const stored = this.store_.get(args.target); + const nodes = this.dht_.closestNodes(args.target, this.dht_.K_); + + if (stored) { + debug('Node %s:s \'get\' query found local value for \'%s\'.', + node.address, node.port, args.target.toString('hex')); + // todo + } -/** - * Handle a put query. - * @param {PutRequest} args The put query args. - * @param {NodeInfo} node The requesting node. - * @return {PutResponse} The response. - * @private - */ -DHTStorage.prototype.handlePutQuery_ = function(args, node) { - const isMutable = args.sig !== undefined; - const target = isMutable ? - (sha1(args.salt ? Buffer.concat([args.k, args.salt]) : args.k)) : - sha1(bencode.encode(args.v)); - - // verify that the token owner is the requesting node first, since it is - // much cheaper to check that before verifying signatures or hashes. - if (!this.store_.verifyToken(args.token, target, node)) { - debug('Node %s:%s \'put\' to \'%s\' failed with a bad token.', - node.address, node.port, target.toString('hex')); - throw new Error('Bad token'); + return Object.assign({ + id: this.dht_.id, + token: this.store_.getWriteToken(args.target, node), + nodes: nodes + }, stored || {}); } - // todo validate args.v.length < 1000 - let success = false; - if (isMutable) { - // check the signature - if (!ed25519.verify(args.sig, encodeSigData(args), args.k)) { - // todo respond with an error - debug('Node %s:%s bad \'put\' query: signature does not match.', - node.address, node.port); - throw new Error('Bad signature'); - } - const last = this.store_.get(target); - if (last) { - // todo verify seq and cas + /** + * Handle a put query. + * @param {PutRequest} args The put query args. + * @param {NodeInfo} node The requesting node. + * @return {PutResponse} The response. + * @private + */ + handlePutQuery_(args, node) { + const isMutable = args.sig !== undefined; + const target = isMutable ? + (sha1(args.salt ? Buffer.concat([args.k, args.salt]) : args.k)) : + sha1(bencode.encode(args.v)); + + // verify that the token owner is the requesting node first, since it is + // much cheaper to check that before verifying signatures or hashes. + if (!this.store_.verifyToken(args.token, target, node)) { + debug('Node %s:%s \'put\' to \'%s\' failed with a bad token.', + node.address, node.port, target.toString('hex')); + throw new Error('Bad token'); } - success = this.store_.set(target, { - k: Buffer.from(args.k), - seq: args.seq || 0, - sig: args.sig, - salt: args.salt, - v: Buffer.from(args.v) - }, node, args.token); - - if (success) { - debug('Storing mutable data for key: \'%s%s\'.', - args.k.toString('hex'), args.salt ? '::' + args.salt.toString() : ''); - } - } else { - success = this.store_.set(target, { - v: Buffer.from(args.v) - }, node, args.token); + // todo validate args.v.length < 1000 + let success = false; + if (isMutable) { + // check the signature + if (!ed25519.verify(args.sig, encodeSigData(args), args.k)) { + // todo respond with an error + debug('Node %s:%s bad \'put\' query: signature does not match.', + node.address, node.port); + throw new Error('Bad signature'); + } - if (success) { - debug('Storing immutable value with id: \'%s\'.', target.toString('hex')); + const last = this.store_.get(target); + if (last) { + // todo verify seq and cas + } + + success = this.store_.set(target, { + k: Buffer.from(args.k), + seq: args.seq || 0, + sig: args.sig, + salt: args.salt, + v: Buffer.from(args.v) + }, node, args.token); + + if (success) { + debug('Storing mutable data for key: \'%s%s\'.', + args.k.toString('hex'), args.salt ? '::' + args.salt.toString() : ''); + } + } else { + success = this.store_.set(target, { + v: Buffer.from(args.v) + }, node, args.token); + + if (success) { + debug('Storing immutable value with id: \'%s\'.', target.toString('hex')); + } } - } - // node was good, so let's refresh it in the routing table - if (!success) throw new Error('Could not write, token validation failed.'); - return { id: this.dht_.id }; -}; + // node was good, so let's refresh it in the routing table + if (!success) throw new Error('Could not write, token validation failed.'); + return { id: this.dht_.id }; + } +} /** diff --git a/src/token-store.js b/src/token-store.js index 5713e62..4740a21 100644 --- a/src/token-store.js +++ b/src/token-store.js @@ -8,113 +8,110 @@ const SECRET_REFRESH_INTERVAL = 10 * 60 * 1000; // 10 minutes /** - * @constructor + * Token storage for peer requests. + * The BitTorrent implementation uses the SHA1 hash of the IP address + * concatenated onto a secret that changes every five minutes and tokens + * up to ten minutes old are accepted. + * @todo Why are we using `any` in the LRU? */ -export default function TokenStore() { - // The BitTorrent implementation uses the SHA1 hash of the IP address - // concatenated onto a secret that changes every five minutes and tokens - // up to ten minutes old are accepted. +export default class TokenStore { + constructor() { + /** + * The secret used to compute the tokens. + * @type {!Buffer} + * @private + */ + this.secret_ = crypto.randomBytes(10); + + /** + * The interval used to update the secret. + * @type {NodeJS.Timeout} + * @private + */ + this.refreshInterval_ = setInterval(() => { + this.secret_ = crypto.randomBytes(10); + }, SECRET_REFRESH_INTERVAL); + + /** + * The data store. + * @type {LRUCache} + * @private + */ + this.store_ = new LRUCache({ + max: 500, + ttl: 7.2e+6 // 2 hours + }); + } /** - * The secret used to compute the tokens. - * @type {!Buffer} - * @private + * Dispose of this object. */ - this.secret_ = crypto.randomBytes(10); + dispose() { + clearInterval(this.refreshInterval_); + this.store_.clear(); + } /** - * The interval used to update the secret. - * @type {NodeJS.Timeout} - * @private + * Lookup the value(s) stored under the target hash. + * @param {Buffer} target The target hash to lookup. + * @return {any} The stored value or undefined if not found. */ - this.refreshInterval_ = setInterval(() => { - this.secret_ = crypto.randomBytes(10); - }, SECRET_REFRESH_INTERVAL); + get(target) { + return this.store_.get(target.toString('hex')); + } /** - * The data store. + * Set the value for a hash. + * @param {!Buffer} target The target hash. + * @param {any} value The value to store. + * @param {!Buffer} token The write token to verify. + * @param {!NodeInfo} node The requesting node. + * @return {boolean} Whether the value was set, if false it implies the write + * token didn't validate. */ - this.store_ = new LRUCache({ - max: 500, - maxAge: 7.2e+6 // 2 hours - }); -}; - - -/** - * Dispose of this object. - */ -TokenStore.prototype.dispose = function() { - clearInterval(this.refreshInterval_); - this.store_.clear(); -}; - - -/** - * Lookup the value(s) stored under the target hash. - * @param {Buffer} target The target hash to lookup. - * @return {any} The stored value or undefined if not found. - */ -TokenStore.prototype.get = function(target) { - return this.store_.get(target.toString('hex')); -}; - - -/** - * Set the value for a hash. - * @param {!Buffer} target The target hash. - * @param {any} value The value to store. - * @param {!Buffer} token The write token to verify. - * @param {!NodeInfo} node The requesting node. - * @return {boolean} Whether the value was set, if false it implies the write - * token didn't validate. - */ -TokenStore.prototype.set = function(target, value, node, token) { - if (!this.verifyToken(token, target, node)) return false; - this.store_.set(target.toString('hex'), value); - return true; -}; - - -/** - * Verify the token owner. - * @param {!Buffer} token The write token to verify. - * @param {!Buffer} target If provided also verify the target hash. - * @param {!NodeInfo} node The requesting node. - * @return {boolean} Whether the token is owner by the node. - */ -TokenStore.prototype.verifyToken = function(token, target, node) { - return token.equals(this.getWriteToken(target, node)); -}; - - -/** - * Create a new write token, bound to a specific node & target. - * @param {!Buffer} target The target hash. - * @param {!NodeInfo} node The requesting node. - * @return {!Buffer} The write token. - */ -TokenStore.prototype.getWriteToken = function(target, node) { - return crypto.createHash('sha1') - .update(target) - .update(node.address) - .digest(); -}; + set(target, value, node, token) { + if (!this.verifyToken(token, target, node)) return false; + this.store_.set(target.toString('hex'), value); + return true; + } + /** + * Verify the token owner. + * @param {!Buffer} token The write token to verify. + * @param {!Buffer} target If provided also verify the target hash. + * @param {!NodeInfo} node The requesting node. + * @return {boolean} Whether the token is owner by the node. + */ + verifyToken(token, target, node) { + return token.equals(this.getWriteToken(target, node)); + } -/** - * Get the stored keys. - * @return {!Array.} - */ -TokenStore.prototype.keys = function() { - return this.store_.keys(); -}; + /** + * Create a new write token, bound to a specific node & target. + * @param {!Buffer} target The target hash. + * @param {!NodeInfo} node The requesting node. + * @return {!Buffer} The write token. + */ + getWriteToken(target, node) { + return crypto.createHash('sha1') + .update(target) + .update(node.address) + .digest(); + } + /** + * Get the stored keys. + * @return {!Array.} + */ + keys() { + return Array.from(this.store_.keys()); + } -/** - * Get the stored size. - * @return {!number} - */ -TokenStore.prototype.size = function() { - return this.store_.size; -}; + /** + * Get the stored size. + * @return {!number} + */ + size() { + return this.store_.size; + } +} diff --git a/src/types.d.ts b/src/types.d.ts index 4322263..5bb7e00 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -1,13 +1,10 @@ -interface PeerInfo { - address: string, - port: number, - family: 'ipv4'|'ipv6' -} +// import { EventEmitter } from 'events'; -interface AddressInfo { + +interface PeerInfo { address: string, port: number, - family: 'ipv4'|'ipv6' + family?: 'ipv4'|'ipv6' } @@ -30,3 +27,42 @@ declare module 'ed25519-supercop' { function sign(message: Buffer, publicKey: Buffer, secretKey: Buffer): Buffer; } +type EventEmitter = import('events').EventEmitter; + +type KRPCQueryArgument = { [key: string]: any|((p: PeerInfo, m: string, a: KRPCQueryArgument )=>any)}; + +interface IKRPC extends EventEmitter { + query(peer:PeerInfo|PeerInfo[], method: string, opt_args: KRPCQueryArgument): Promise; +} + +interface IRoutingTable { + +} + +type ClosestCallback = (r: any, n: NodeInfo) => T; + +interface IDHT { + id: Buffer, + K_: number, // Default number of closest nodes to query + rpc_: IKRPC, + nodes_: IRoutingTable, + + closest_(target: Buffer, method: string, args: any, opt_rescb:ClosestCallback): Promise | undefined>; + + closestNodes(id: Buffer, n?:number): Array; +} + +interface IDHTExtension { + provides: string[]; + dispose(): void; +} + +interface IDHTExtensionConstructor { + new(dht: IDHT): IDHTExtension; +} + + +interface ITraversableQueryBase { + node?: NodeInfo; + r: { nodes: NodeInfo[] }; +} diff --git a/src/util.js b/src/util.js index d96e6f3..2c6604c 100644 --- a/src/util.js +++ b/src/util.js @@ -1,18 +1,36 @@ -import crypto from 'crypto'; +import * as crypto from 'crypto'; +/** + * The 'promise selector' allows us to simulate a more traditional networking + * api. i.e. loop and 'block' until any outstanding request resolves. + * @template T + */ export class PromiseSelector { - constructor() { - /** @type {Array} */ + /** + * @typedef {Promise<[Error|undefined, T|undefined, ()=>void]>} WrappingPromise + */ + + /** + * @param {Array>=} opt_init Optionally initialize the selector. + */ + constructor(opt_init) { + /** @type {Array} */ this.promises = []; + + if (opt_init) this.add(opt_init); } + /** + * The number of unresolved promises in the select queue. + * @return {number} + */ get length() { return this.promises.length; } /** * Add promises to the selector. - * @param {Array.|Promise} promises + * @param {Array.>|Promise} promises */ add(promises) { if (!Array.isArray(promises)) promises = [promises]; @@ -20,22 +38,27 @@ export class PromiseSelector { promises // .filter((p) => this.promises.indexOf(p) === -1) .forEach((p) => { - /** @type {Promise} */ + /** @type {WrappingPromise} */ let pr; const remove = () => this.promises.splice(this.promises.indexOf(pr), 1); pr = p.then( - (res) => [null, res, remove], - (err) => [err, null, remove]); + (res) => [undefined, res, remove], + (err) => [err, undefined, remove]); this.promises.push(pr); }); } + /** + * Get the next promise that resolves or rejects. + * @return {Promise|null} + */ next() { if (this.promises.length === 0) return null; return Promise.race(this.promises) .then(([err, res, remove]) => { remove(); if (err) throw err; + if (res === undefined) throw new Error("Result is undefined"); return res; }); } diff --git a/test/krpc.js b/test/krpc.js index b0e3ae8..ff96018 100644 --- a/test/krpc.js +++ b/test/krpc.js @@ -6,6 +6,7 @@ import bencode from 'bencode'; import { KRPCSocket } from '#root/src/krpc'; + describe('KRPC Protocol', () => { let socketMock = null; let krpc = null; diff --git a/test/storage.js b/test/storage.js index 9cb2907..c643e98 100644 --- a/test/storage.js +++ b/test/storage.js @@ -6,10 +6,10 @@ import { createCluster, destroyCluster } from '#root/test/util'; import bencode from 'bencode'; - const ED_SEED = Buffer.from( 'ae460d331b6707d14af2b11315b490178a649c7bb39e075009b5e7d304d9ecf8', 'hex'); + describe('BEP44 - DHT Storage Extension', () => { let cluster = null; @@ -36,7 +36,7 @@ describe('BEP44 - DHT Storage Extension', () => { '58187355b6fc57c8d167d20d9dce06b3cb6a9da6', '0f02498b891023f4f68814d2ee861bdaf7c44fe6', '50487e979b7406d35d45f0c377f808f02f3afcda' - ], 30000); + ], 40000); }); afterEach(() => { diff --git a/tsconfig.json b/tsconfig.json index f73e4c0..9d11ad5 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -16,10 +16,10 @@ "noFallthroughCasesInSwitch": true, "preserveConstEnums": true, "preserveWatchOutput": true, - "target": "ES2017" + "module": "node16" }, "include": [ "src/types.d.ts", "src/index.js", ] -} \ No newline at end of file +}