Skip to content

Commit

Permalink
ecsp stream handling
Browse files Browse the repository at this point in the history
  • Loading branch information
leolambo committed May 16, 2024
1 parent ff8a421 commit 51cd3c1
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 24 deletions.
18 changes: 14 additions & 4 deletions packages/bitcore-node/src/providers/chain-state/evm/api/ecsp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ export class BaseEVMExternalStateProvider extends InternalStateProvider implemen
}).bind(this);
const txStream = new NodeQueryStream(block?.transactions || [], getTransaction, args);
// stream results into response
await NodeQueryStream.onStream(txStream, req!, res!);
const result = await NodeQueryStream.onStream(txStream, req!, res!);
if (result?.success === false) {
logger.error('Error mid-stream (streamTransactions): %o', result.error);
}
} catch (err) {
logger.error('Error streaming transactions from historical node %o', err);
throw err;
Expand All @@ -220,14 +223,18 @@ export class BaseEVMExternalStateProvider extends InternalStateProvider implemen
const { tokenAddress } = args;
try {
// Calculate confirmations with tip height
let result;
const tip = await this.getLocalTip(params);
args.tipHeight = tip ? tip.height : 0;
if (!args.tokenAddress) {
const txStream = MoralisAPI.streamTransactionsByAddress({ chain, network, address, args });
await ExternalApiStream.onStream(txStream, req!, res!);
result = await ExternalApiStream.onStream(txStream, req!, res!);
} else {
const tokenTransfers = MoralisAPI.streamERC20TransactionsByAddress({ chain, network, address, tokenAddress, args });
await ExternalApiStream.onStream(tokenTransfers, req!, res!);
result = await ExternalApiStream.onStream(tokenTransfers, req!, res!);
}
if (result?.success === false) {
logger.error('Error mid-stream (streamAddressTransactions): %o', result.error);
}
} catch (err) {
logger.error('Error streaming address transactions from external provider: %o', err);
Expand Down Expand Up @@ -258,7 +265,10 @@ export class BaseEVMExternalStateProvider extends InternalStateProvider implemen
// Pipe all txStreams to the mergedStream
ExternalApiStream.mergeStreams(txStreams, mergedStream);
// Ensure mergeStream resolves
await _mergedStream;
const result = await _mergedStream;
if (result?.success === false) {
logger.error('Error mid-stream (streamWalletTransactions): %o', result.error);
}
} catch (err) {
logger.error('Error streaming wallet transactions from external provider: %o', err);
throw err;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
}
}

getAllTouchedAddresses(tx: Partial<IEVMTransaction>): { tos: IEVMCachedAddress[], froms: IEVMCachedAddress[] } {
const {to, from, effects} = tx;
getAllTouchedAddresses(tx: Partial<IEVMTransaction>): { tos: IEVMCachedAddress[], froms: IEVMCachedAddress[] } {
const { to, from, effects } = tx;
let toBatch = new Set<string>();
let fromBatch = new Set<string>();
const addToBatch = (batch: Set<string>, obj: IEVMCachedAddress) => {
Expand All @@ -179,7 +179,7 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
// Handle ERC20s
addToBatch(toBatch, { address: effect.to, tokenAddress: effect.contractAddress });
addToBatch(fromBatch, { address: effect.from, tokenAddress: effect.contractAddress });
}
}
}
}

Expand Down Expand Up @@ -254,7 +254,7 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
walletsAddys.map(w => w.wallet),
w => w.toHexString()
);

// If config value is set then only store needed tx properties
let leanTx: IEVMTransaction | IEVMTransactionInProcess = tx;
if ((Config.chainConfig({ chain, network }) as IEVMNetworkConfig).leanTransactionStorage) {
Expand Down Expand Up @@ -325,7 +325,7 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
...erc20Data
};
}
} catch (e) {}
} catch (e) { }
try {
const erc721Data: IAbiDecodeResponse = getErc721Decoder().decodeMethod(input);
if (erc721Data) {
Expand All @@ -334,7 +334,7 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
...erc721Data
};
}
} catch (e) {}
} catch (e) { }
try {
const invoiceData: IAbiDecodeResponse = getInvoiceDecoder().decodeMethod(input);
if (invoiceData) {
Expand All @@ -343,7 +343,7 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
...invoiceData
};
}
} catch (e) {}
} catch (e) { }
try {
const multisendData: IAbiDecodeResponse = getMultisendDecoder().decodeMethod(input);
if (multisendData) {
Expand All @@ -352,7 +352,7 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
...multisendData
};
}
} catch (e) {}
} catch (e) { }
try {
const multisigData: IAbiDecodeResponse = getMultisigDecoder().decodeMethod(input);
if (multisigData) {
Expand All @@ -361,10 +361,10 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
...multisigData
};
}
} catch (e) {}
} catch (e) { }
return undefined;
}

/**
* Creates an object with param names as keys instead of an array of objects
* @param abi
Expand Down Expand Up @@ -530,7 +530,7 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
tx: IEVMTransactionInProcess | Partial<MongoBound<IEVMTransactionInProcess>>,
options?: TransformOptions
): EVMTransactionJSON | string {

let transaction: EVMTransactionJSON = {
txid: tx.txid || '',
network: tx.network || '',
Expand All @@ -548,7 +548,7 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
from: tx.from || '',
effects: tx.effects || []
};

// Add non-lean properties if we aren't excluding them
const config = (Config.chainConfig({ chain: tx.chain as string, network: tx.network as string }) as IEVMNetworkConfig);
if (config && !config.leanTransactionStorage) {
Expand All @@ -558,7 +558,7 @@ export class EVMTransactionModel extends BaseTransaction<IEVMTransaction> {
data: dataStr,
abiType: tx.abiType || valueOrDefault(decodedData, undefined),
internal: tx.internal
? tx.internal.map(t => ({ ...t, decodedData: this.abiDecode(t.action.input || '0x') }))
? tx.internal.map(t => ({ ...t, decodedData: this.abiDecode(t?.action?.input || '0x') }))
: [],
calls: tx.calls ? tx.calls.map(t => ({ ...t, decodedData: this.abiDecode(t.input || '0x') })) : []
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import request = require('request');
import config from '../../../../config';
import { isDateValid } from '../../../../utils/check';
import { EVMTransactionStorage } from '../../evm/models/transaction';
import { ErigonTraceResponse } from '../../evm/p2p/rpcs/erigonRpc';
import { EVMTransactionJSON, Transaction } from '../../evm/types';
import moralisChains from '../defaults';
import { ExternalApiStream as apiStream } from '../streams/apiStream';
Expand Down Expand Up @@ -151,11 +152,36 @@ const transformTransaction = (tx) => {
to: tx.to_address,
from: tx.from_address,
data: tx.input,
internal: tx.internal_transactions,
internal: tx?.internal_transactions.map(t => transformInternalTransaction(t)),
effects: tx.effects,
};
}

const transformInternalTransaction = (tx) => {
return {
action: {
callType: tx.type?.toLowerCase(),
from: tx.from,
gas: tx.gas,
input: tx.input,
to: tx.to,
value: tx.value,
},
blockHash: tx.block_hash,
blockNumber: Number(tx.block_number),
error: tx.error,
result: {
gasUsed: tx.gas_used,
output: tx.output
},
subtraces: tx.subtraces,
traceAddress: tx.traceAddress || [],
transactionHash: tx.transaction_hash,
transactionPosition: tx.transactionPosition || 0,
type: tx.type?.toLowerCase()
} as ErigonTraceResponse;
}

const transformTokenTransfer = (transfer) => {
let _transfer = transformTransaction(transfer);
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ export class ExternalApiStream extends Readable {
}

// handles events emitted by the streamed response, request from client, and response to client
static onStream(stream: Readable, req: Request, res: Response): Promise<void> {
return new Promise<any | void>((resolve, reject) => {
static onStream(stream: Readable, req: Request, res: Response):
Promise<{ success: boolean, error?: any }> {
return new Promise<{ success: boolean, error?: any }>((resolve, reject) => {
let closed = false;
let isFirst = true;

Expand All @@ -88,7 +89,14 @@ export class ExternalApiStream extends Readable {
stream.on('error', function (err) {
if (!closed) {
closed = true;
return reject(err);
if (!isFirst) {
res.write(',\n{"error": "An error occurred during data stream"}\n]');
res.end();
res.destroy();
return resolve({ success: false, error: err });
} else {
return reject(err);
}
}
return;
});
Expand All @@ -115,7 +123,7 @@ export class ExternalApiStream extends Readable {
closed = true;
}
res.end();
resolve(true);
resolve({ success: true });
}
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ export class NodeQueryStream extends Readable {
}
}

static onStream(stream: Readable, req: Request, res: Response): Promise<void> {
static onStream(stream: Readable, req: Request, res: Response):
Promise<{ success: boolean, error?: any }> {
return ExternalApiStream.onStream(stream, req, res);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/bitcore-wallet-service/src/lib/model/copayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export class Copayer {
const x = new Copayer();

x.version = 2;
x.createdOn = Math.floor(Date.now() / 1000);
x.createdOn = Math.floor(Date.now() / 1000);
x.coin = opts.coin;
x.chain = opts.chain || opts.coin;
x.xPubKey = opts.xPubKey;
Expand Down

0 comments on commit 51cd3c1

Please sign in to comment.