Skip to content

Commit

Permalink
add fallback for web3 json-rpc...
Browse files Browse the repository at this point in the history
Jagden committed Dec 18, 2024
1 parent f40fec4 commit 7e05827
Showing 18 changed files with 251 additions and 51 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/config.yml
Original file line number Diff line number Diff line change
@@ -40,8 +40,8 @@ jobs:
steps:
- name: Check out repository
uses: actions/checkout@v3
with:
fetch-depth: 0
# with:
# fetch-depth: 0
- name: Setup node
uses: actions/setup-node@v3
with:
15 changes: 14 additions & 1 deletion boyar/create-version-file.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
#!/bin/bash -e

set -x

git status

#git fetch --tags
git pull --tags

echo boo

git tag

if [[ ! -z "$CIRCLE_TAG" ]]; then
echo "This is a release run - Updating the .version file to indicate the correct Semver"
echo "For this release ($CIRCLE_TAG)..."
@@ -13,7 +24,9 @@ if [[ ! -z "$CIRCLE_TAG" ]]; then

echo "$CIRCLE_TAG" > .version
else
LATEST_SEMVER=$(git describe --tags --abbrev=0)
#LATEST_SEMVER=$(git describe --tags --abbrev=0)
LATEST_SEMVER=$(git tag --sort=-v:refname | head -n 1)
SHORT_COMMIT=$(git rev-parse HEAD | cut -c1-8)
echo "$LATEST_SEMVER-$SHORT_COMMIT" > .version
echo "$LATEST_SEMVER-$SHORT_COMMIT"
fi
8 changes: 2 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
"clean": "rimraf ./dist/",
"prebuild": "npm run clean",
"build": "tsc --skipLibCheck -p ./tsconfig.prod.json && ./boyar/create-version-file.sh && docker build -t local/management-service .",
"buildonly": "tsc --skipLibCheck -p ./tsconfig.prod.json && ./boyar/create-version-file.sh",
"test": "ava --verbose --timeout=10m --serial --fail-fast",
"test:quick": "echo '-- TEST --' && ava --verbose --timeout=10m --serial",
"test:e2e": "ava --verbose --timeout=10m --serial --config ./ava.config.e2e.js",
@@ -44,8 +45,8 @@
"@types/node": "^14.14.16",
"@types/node-fetch": "^2.5.5",
"@types/yargs": "^15.0.4",
"@typescript-eslint/eslint-plugin": "^2.25.0",
"@typescript-eslint/parser": "^2.25.0",
"@typescript-eslint/eslint-plugin": "^2.34.0",
"@typescript-eslint/parser": "^2.34.0",
"ava": "^3.5.1",
"docker-compose-mocha": "^1.2.0",
"eslint": "^6.8.0",
2 changes: 1 addition & 1 deletion src/api/render-node.test.ts
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ test.serial('[integration] getNodeManagement responds according to Ethereum and
t.timeout(5 * 60 * 1000);

const ethereum = new EthereumTestDriver(true);
const ethereumEndpoint = 'http://localhost:7545';
const ethereumEndpoint = ['http://localhost:7545'];
const maticEndpoint = 'mock-endpoint';
const finalityBufferBlocks = 5;

6 changes: 3 additions & 3 deletions src/cli-args.test.ts
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ const configPath = 'some/path/config.json';

const minimalConfigValue = {
EthereumGenesisContract: 'bar',
EthereumEndpoint: 'http://localhost:7545',
EthereumEndpoint: ['http://localhost:7545'],
'node-address': 'ecfcccbc1e54852337298c7e90f5ecee79439e67',
};
const inputConfigValue = {
@@ -71,7 +71,7 @@ test('parseOptions: environment variables and no config', (t) => {

t.assert((output.ExternalLaunchConfig = {}));
t.assert((output.StatusJsonPath = './status/status.json'));
t.assert((output.EthereumEndpoint = mockEthereumEndpoint));
t.assert((output.EthereumEndpoint = [mockEthereumEndpoint]));
t.assert((output['node-address'] = mockNodeAddress));
});

@@ -87,7 +87,7 @@ test('parseOptions: env vars take precedence', (t) => {

const output = parseArgs(['--config', configPath]);

t.assert((output.EthereumEndpoint = mockEthereumEndpoint));
t.assert((output.EthereumEndpoint = [mockEthereumEndpoint]));
t.assert((output['node-address'] = mockNodeAddress));
});

10 changes: 10 additions & 0 deletions src/cli-args.ts
Original file line number Diff line number Diff line change
@@ -5,6 +5,14 @@ import * as Logger from './logger';

import { setConfigEnvVars } from './env-var-args';

function ensureEthereumEndpointIsArray(obj: ServiceConfiguration): void {
if (!obj.EthereumEndpoint) {
obj.EthereumEndpoint = []; // Initialize as an empty array if the field is undefined or null
} else if (!Array.isArray(obj.EthereumEndpoint)) {
obj.EthereumEndpoint = [obj.EthereumEndpoint]; // Convert to array if it's not already one
}
}

export function parseArgs(argv: string[]): ServiceConfiguration {
const options = yargs(argv)
.option('config', {
@@ -28,6 +36,8 @@ export function parseArgs(argv: string[]): ServiceConfiguration {
// Support passing required config values via environment variables
setConfigEnvVars(config);

ensureEthereumEndpointIsArray(config);

const validationErrors = validateServiceConfiguration(config);
if (validationErrors) {
Logger.error(`Invalid JSON config: '${JSON.stringify(config)}'.`);
2 changes: 1 addition & 1 deletion src/config.example.ts
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ export const exampleConfig: ServiceConfiguration = {
Port: 8080,
EthereumGenesisContract: '0xD859701C81119aB12A1e62AF6270aD2AE05c7AB3',
EthereumFirstBlock: 11191390,
EthereumEndpoint: 'http://ganache:7545',
EthereumEndpoint: ['http://ganache:7545'],
DeploymentDescriptorUrl: 'https://deployment.orbs.network/mainnet.json',
ElectionsAuditOnly: false,
StatusJsonPath: './status/status.json',
8 changes: 4 additions & 4 deletions src/config.test.ts
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ test('accepts legal config', (t) => {
BootstrapMode: false,
Port: 2,
EthereumGenesisContract: 'foo',
EthereumEndpoint: 'http://localhost:7545',
EthereumEndpoint: ['http://localhost:7545'],
EthereumPollIntervalSeconds: 0.1,
EthereumRequestsPerSecondLimit: 0,
ElectionsStaleUpdateSeconds: 7 * 24 * 60 * 60,
@@ -34,7 +34,7 @@ test('declines illegal config (1)', (t) => {
BootstrapMode: false,
Port: 2,
EthereumGenesisContract: 'foo',
EthereumEndpoint: 'http://localhost:7545',
EthereumEndpoint: ['http://localhost:7545'],
EthereumPollIntervalSeconds: 0.1,
EthereumRequestsPerSecondLimit: 0,
ElectionsStaleUpdateSeconds: 7 * 24 * 60 * 60,
@@ -60,7 +60,7 @@ test('declines illegal config (2)', (t) => {
BootstrapMode: false,
Port: 2,
EthereumGenesisContract: 'foo',
EthereumEndpoint: 'foo-bar:123',
EthereumEndpoint: ['foo-bar:123'],
EthereumPollIntervalSeconds: 0.1,
EthereumRequestsPerSecondLimit: 0,
ElectionsStaleUpdateSeconds: 7 * 24 * 60 * 60,
@@ -77,6 +77,6 @@ test('declines illegal config (2)', (t) => {
Verbose: true,
'node-address': 'ecfcccbc1e54852337298c7e90f5ecee79439e67',
}),
['Ethereum endpoint is not a valid url']
['Ethereum endpoint Item 1: must be a valid URL']
);
});
55 changes: 48 additions & 7 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ export interface ServiceConfiguration {
BootstrapMode: boolean;
Port: number;
EthereumGenesisContract: string;
EthereumEndpoint: string;
EthereumEndpoint: string[];
/** @deprecated Use `EthereumEndpoint` instead */
MaticEndpoint?: string;
DeploymentDescriptorUrl: string;
@@ -47,6 +47,39 @@ export const defaultServiceConfiguration = {
Verbose: false,
};

// Define the types for the custom validator function
validate.validators.array = function (
value: unknown,
options: { item?: validate.ValidateOption },
key: string
): string | undefined {
// Check if the value is an array
if (!Array.isArray(value)) {
return `${key} must be an array.`;
}

// If there are item-level validation options, validate each item
if (options && options.item) {
const errors = value
.map((item, index) => {
const error = validate.single(item, options.item);
if (error) {
return `Item ${index + 1}: ${error.join(', ')}`;
}
return undefined;
})
.filter((error): error is string => !!error); // Narrow the type to strings

// If there are errors, return them as a single string
if (errors.length > 0) {
return errors.join('; ');
}
}

// Return undefined if there are no errors
return undefined;
};

export function validateServiceConfiguration(c: Partial<ServiceConfiguration>): string[] | undefined {
const serviceConfigConstraints = {
BootstrapMode: {
@@ -101,12 +134,20 @@ export function validateServiceConfiguration(c: Partial<ServiceConfiguration>):
numericality: { noStrings: true },
},
EthereumEndpoint: {
presence: { allowEmpty: false },
type: 'string',
url: {
allowLocal: true,
},
},
presence: true, // Ensure the attribute is present
type: "array", // Ensure it's an array
array: {
item: {
presence: true, // Ensure each item is not empty
type: "string", // Ensure each item in the array is a string
format: {
pattern: /^(https?:\/\/[^\s$.?#].[^\s]*)$/i, // URL regex pattern
message: "must be a valid URL"
}
}
}
},

EthereumGenesisContract: {
presence: { allowEmpty: false },
type: 'string',
6 changes: 6 additions & 0 deletions src/env-var-args.test.ts
Original file line number Diff line number Diff line change
@@ -63,6 +63,12 @@ test('setConfigEnvVars uses environment variables when set', (t) => {
continue;
}

if (key == 'EthereumEndpoint') {
t.deepEqual(input[key as keyof ServiceConfiguration], [mockEnv.ETHEREUM_ENDPOINT]);
continue;
}

//console.log (key, input[key as keyof ServiceConfiguration], mockEnv[camelCaseToSnakeCase(key) as keyof typeof mockEnv]);
t.assert(input[key as keyof ServiceConfiguration] === mockEnv[camelCaseToSnakeCase(key) as keyof typeof mockEnv]);
}
});
3 changes: 2 additions & 1 deletion src/env-var-args.ts
Original file line number Diff line number Diff line change
@@ -12,7 +12,8 @@ export function setConfigEnvVars(config: ServiceConfiguration): void {
config.BootstrapMode = process.env.BOOTSTRAP_MODE ? process.env.BOOTSTRAP_MODE === 'true' : config.BootstrapMode;
config.Port = process.env.PORT ? Number(process.env.PORT) : config.Port;
config.EthereumGenesisContract = process.env.ETHEREUM_GENESIS_CONTRACT ?? config.EthereumGenesisContract;
config.EthereumEndpoint = process.env.ETHEREUM_ENDPOINT ?? config.EthereumEndpoint;
// parse ETHEREUM_ENDPOINT, if it has multiple values, split by comma
config.EthereumEndpoint = process.env.ETHEREUM_ENDPOINT ? process.env.ETHEREUM_ENDPOINT.split(',') : config.EthereumEndpoint;
config.DeploymentDescriptorUrl = process.env.DEPLOYMENT_DESCRIPTOR_URL ?? config.DeploymentDescriptorUrl;
config.ElectionsAuditOnly = process.env.ELECTIONS_AUDIT_ONLY
? process.env.ELECTIONS_AUDIT_ONLY === 'true'
2 changes: 1 addition & 1 deletion src/ethereum/block-sync.test.ts
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ test.serial('[integration] BlockSync reads registry for contract addresses', asy
t.timeout(5 * 60 * 1000);

const ethereum = new EthereumTestDriver(true);
const ethereumEndpoint = 'http://localhost:7545';
const ethereumEndpoint = ['http://localhost:7545'];
const finalityBufferBlocks = 5;

// setup Ethereum state
33 changes: 31 additions & 2 deletions src/ethereum/block-sync.ts
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ export class BlockSync {
private eventFetchers: { [T in EventName]: EventFetcher };

constructor(private state: StateManager, private config: BlockSyncConfiguration) {
this.reader = new EthereumReader(config);
this.reader = new EthereumReader(config, () => this.resetAllContracts());
this.lastProcessedBlock = config.EthereumFirstBlock;
this.eventFetchers = {
ContractAddressUpdated: new LookaheadEventFetcher('ContractAddressUpdated', this.reader),
@@ -63,6 +63,14 @@ export class BlockSync {
// we read blocks 1-1000 from old address and blocks 1001+ from the new address.
// This simplification is ok because contracts will be locked from emitting events during transition.

resetAllContracts() {
console.log('resetAllContracts');
for (const eventName of eventNames) {
console.log ('resetContract', eventName);
this.eventFetchers[eventName].resetContract();
}
}

async processEventsInBlock(blockNumber: number, latestAllowedBlock: number) {
// update all contract addresses according to state to track changes in contract registry
for (const eventName of eventNames) {
@@ -82,7 +90,28 @@ export class BlockSync {
const blockTime = await this.reader.getRefTime(blockNumber);
this.state.applyNewEvents(blockNumber, blockTime, sorted);
this.state.applyNewTimeRef(blockTime, blockNumber);
Logger.log(`BlockSync: processed ${sorted.length} events in block ${blockNumber} with time ${blockTime}.`);
Logger.log(`BlockSync: processed ${sorted.length} events in block ${blockNumber} with time ${blockTime}, (${this.secondsDeltaToTimeString(blockTime)}).`);
}

secondsDeltaToTimeString(totalSeconds : number) : string {
// Get the current timestamp in seconds
const nowInSeconds = Math.floor(Date.now() / 1000);

// Calculate the delta (difference)
const deltaSeconds = Math.abs(totalSeconds - nowInSeconds);

// Calculate days, hours, minutes, and seconds
const days = Math.floor(deltaSeconds / (24 * 60 * 60));
let remainingSeconds = deltaSeconds % (24 * 60 * 60);

const hours = Math.floor(remainingSeconds / (60 * 60));
remainingSeconds %= 60 * 60;

const minutes = Math.floor(remainingSeconds / 60);
const seconds = remainingSeconds % 60;

// Pad with leading zeros and format as DD:HH:MM:SS
return `${String(days).padStart(2, '0')}d:${String(hours).padStart(2, '0')}:${String(minutes).padStart(2, '0')}:${String(seconds).padStart(2, '0')}`;
}

getRequestStats(): DailyStatsData {
107 changes: 93 additions & 14 deletions src/ethereum/ethereum-reader.ts
Original file line number Diff line number Diff line change
@@ -9,41 +9,88 @@ import https from 'https';
const HTTP_TIMEOUT_SEC = 20;

const subDomain = 'eth-api'
const domain = 'orbs.network'
const domain = 'orbs.network'
let timer: NodeJS.Timeout | null = null;

export type EthereumConfiguration = {
EthereumEndpoint: string;
EthereumEndpoint: string[];
EthereumRequestsPerSecondLimit: number;
};

export class EthereumReader {
private web3: Web3;
private lastSwitchTime = 0;
private currentWeb3Index = 0;
private web3s: Web3[];
private throttled?: pThrottle.ThrottledFunction<[], void>;
private agent: https.Agent;
private blockTimeSinceFail: number;
private resetContracts : Function;

public requestStats = new DailyStats();

constructor(config: EthereumConfiguration) {
constructor(config: EthereumConfiguration, resetContracts: Function) {
this.agent = new https.Agent({
maxSockets: 5,
});
this.resetContracts = resetContracts;
this.blockTimeSinceFail = 0;
this.web3 = new Web3(
new Web3.providers.HttpProvider(config.EthereumEndpoint, {

this.web3s = config.EthereumEndpoint.map(endpoint => new Web3(
new Web3.providers.HttpProvider(endpoint, {
keepAlive: true,
timeout: HTTP_TIMEOUT_SEC * 1000,
})
);
));

if (config.EthereumRequestsPerSecondLimit > 0) {
this.throttled = pThrottle(() => Promise.resolve(), config.EthereumRequestsPerSecondLimit, 1000);
}
}

getWeb3(): Web3 {
//console.log ('getWeb3: returning web3 to ' + this.web3s[this.currentWeb3Index], 'index:', this.currentWeb3Index);
return this.web3s[this.currentWeb3Index];
}

switchWeb3() {
if (this.lastSwitchTime > Date.now() - 10000) {
console.log('switchWeb3: ignoring switch request, too soon.');
return;
}

this.lastSwitchTime = Date.now();
this.currentWeb3Index = (this.currentWeb3Index + 1) % this.web3s.length;

const currentProvider = this.getWeb3().eth.currentProvider;
if (currentProvider instanceof Web3.providers.HttpProvider) {
console.log ('switchWeb3: switching to web3 to ' + currentProvider.host);
}
this.resetContracts();

if (this.currentWeb3Index != 0) {
if (timer!=null) { // clear any old timer if exist.
clearTimeout (timer);
}

timer = setTimeout(() => { // set a timer to switch back to the first provider.
this.currentWeb3Index = 0;
console.log('switchWeb3: switching to web3 to first provider.');
this.resetContracts();
}, 60000 * 10); // after 10 minutes, return to the first web3
}
}

async getBlockNumber(): Promise<number> {
if (this.throttled) await this.throttled();
this.requestStats.add(1);
return this.web3.eth.getBlockNumber();

try {
return await this.getWeb3().eth.getBlockNumber();
} catch (error) {
console.error("Error fetching block number:", error);
this.switchWeb3();
return await this.getWeb3().eth.getBlockNumber();
}
}

// orbs GET api dediated to serve block time from cache
@@ -75,15 +122,20 @@ export class EthereumReader {
return null;
}
}
async getRefTime(blockNumber: number | 'latest'): Promise<number> {

calcSecondsAgo (time: number): number {
return Math.floor((Date.now() / 1000) - time)
}

async getRefTime(blockNumber: number | 'latest'): Promise<number> {
// get from cache first
const shouldTry = (this.blockTimeSinceFail == 0 || this.blockTimeSinceFail > 5)
if (blockNumber !== 'latest' && shouldTry){
if (blockNumber !== 'latest' && shouldTry) {
this.blockTimeSinceFail = 0;
const blocktime = await this.getBlockTime(blockNumber)
if(blocktime)
if (blocktime) {
return blocktime
}
}
console.log('getBlockTime failed', blockNumber)
// count calls web3 provider
@@ -92,7 +144,16 @@ export class EthereumReader {
// fallback to web3
if (this.throttled) await this.throttled();
this.requestStats.add(1);
const block = await this.web3.eth.getBlock(blockNumber);

let block
try {
block = await this.getWeb3().eth.getBlock(blockNumber);
} catch (error) {
console.error("Error fetching block number:", error);
this.switchWeb3();
block = await this.getWeb3().eth.getBlock(blockNumber);
}

if (!block) {
throw new Error(`web3.eth.getBlock for ${blockNumber} return empty block.`);
}
@@ -102,17 +163,35 @@ export class EthereumReader {
getContractForEvent(eventName: EventName, address: string): Contract {
const contractName = contractByEventName(eventName);
const abi = getAbiForContract(address, contractName);
return new this.web3.eth.Contract(abi, address);

try {
const web3instance = this.getWeb3();
return new web3instance.eth.Contract(abi, address);
} catch (error) {
console.error("Error fetching contract:", error);
this.switchWeb3();
const web3instance = this.getWeb3();
return new web3instance.eth.Contract(abi, address);
//return new this.getWeb3().eth.Contract(abi, address);
}
}

// throws error if fails, caller needs to decrease page size if needed
async getPastEvents(eventName: EventName, { fromBlock, toBlock }: PastEventOptions, contract?: Contract) {
if (!contract) return [];
if (this.throttled) await this.throttled();
this.requestStats.add(1);

return contract.getPastEvents(eventName, {
fromBlock,
toBlock,
toBlock
}).catch((e) => {
console.error("Error fetching past events:", e);
this.switchWeb3();
return contract.getPastEvents(eventName, {
fromBlock,
toBlock
});
});
}
}
6 changes: 5 additions & 1 deletion src/ethereum/event-fetcher.ts
Original file line number Diff line number Diff line change
@@ -15,8 +15,12 @@ export abstract class EventFetcher {
return true;
}

resetContract() {
this.contract = undefined;
}

// every fetcher instance should override this function
abstract async fetchBlock(blockNumber: number, latestAllowedBlock: number): Promise<EventData[]>;
abstract fetchBlock(blockNumber: number, latestAllowedBlock: number): Promise<EventData[]>;
}

// the simplest fetcher, yet inefficient, good for testing
4 changes: 2 additions & 2 deletions src/ethereum/test-driver.ts
Original file line number Diff line number Diff line change
@@ -155,8 +155,8 @@ export class EthereumTestDriver {
return await d.web3.eth.getBlockNumber();
}

async getCurrentBlockPreDeploy(ethereumEndpoint: string): Promise<number> {
const web3 = new Web3(ethereumEndpoint);
async getCurrentBlockPreDeploy(ethereumEndpoint: string[]): Promise<number> {
const web3 = new Web3(ethereumEndpoint[0]);
return await web3.eth.getBlockNumber();
}

26 changes: 23 additions & 3 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -10,13 +10,15 @@ process.on('uncaughtException', function (err) {
});

function censorConfig(conf: ServiceConfiguration) {
const censoredEthEndpointArray = conf.EthereumEndpoint.map((endpoint) => endpoint.slice(0, 30) + "**********");

const external: {[key: string]: any} = {
...conf.ExternalLaunchConfig,
EthereumEndpoint: conf.EthereumEndpoint.slice(0, -10) + "**********",
EthereumEndpoint: censoredEthEndpointArray,
}
const censored = {
...conf,
EthereumEndpoint: conf.EthereumEndpoint.slice(0, -10) + "**********",
EthereumEndpoint: censoredEthEndpointArray,
ExternalLaunchConfig: external
}

@@ -25,9 +27,27 @@ function censorConfig(conf: ServiceConfiguration) {
return censored;
}

function enrichConfig(conf: ServiceConfiguration) : ServiceConfiguration {
const enriched = {
...conf,
}

// if the EthereumEndpoint is a pointing to the Matic network, do not do the fallback enrichment.

if (enriched.EthereumEndpoint[0].includes('matic') || conf.EthereumEndpoint[0].includes('polygon')) {
console.log ('Matic network detected, not adding a fallback endpoint.');
return enriched;
}

const ethEndPoint = [enriched.EthereumEndpoint[0], 'https://rpcman-fastly.orbs.network/rpc?chain=ethereum&appId=guardian&key=888798GHWJ759843GFDSJK759843'];
enriched.EthereumEndpoint = ethEndPoint;

return enriched;
}

try {
Logger.log('Management service started.');
const config = parseArgs(process.argv);
const config = enrichConfig (parseArgs(process.argv));
const censoredConfig = censorConfig(config)

Logger.log(`Input config: '${JSON.stringify(censoredConfig)}'.`);

0 comments on commit 7e05827

Please sign in to comment.