Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Steth batch unbuffered #608

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ethereum-steth/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"@grpc/grpc-js": "^1.10.2",
"@types/lodash": "^4.14.202",
"@types/node": "^22.4.1",
"@types/uuid": "^10.0.0",
"async-mutex": "^0.5.0",
"bignumber.js": "^9.1.2",
"dotenv": "^16.4.5",
Expand Down
73 changes: 50 additions & 23 deletions ethereum-steth/src/clients/eth_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import BigNumber from 'bignumber.js'
import { ethers } from 'ethers'
import { keccak256, toUtf8Bytes } from 'ethers/lib/utils'
import { either as E } from 'fp-ts'
import { randomUUID } from 'node:crypto'
import { retryAsync } from 'ts-retry'
import { Logger } from 'winston'
import { BlockDto } from '../entity/events'
Expand Down Expand Up @@ -114,25 +115,47 @@ export class ETHProvider
public async getUnbufferedEvents(startBlock: number, endBlock: number): Promise<E.Either<Error, UnbufferedEvent[]>> {
const end = this.metrics.etherJsDurationHistogram.labels({ method: this.getUnbufferedEvents.name }).startTimer()

try {
const out = await retryAsync<UnbufferedEvent[]>(
async (): Promise<UnbufferedEvent[]> => {
return await this.lidoRunner.queryFilter(this.lidoRunner.filters.Unbuffered(), startBlock, endBlock)
},
{ delay: DELAY_IN_500MS, maxTry: ATTEMPTS_5 },
)
const fetchUnbufferedEvents = async (startBlock: number, endBlock: number): Promise<UnbufferedEvent[]> => {
try {
const out = await retryAsync<UnbufferedEvent[]>(
async (): Promise<UnbufferedEvent[]> => {
return await this.lidoRunner.queryFilter(this.lidoRunner.filters.Unbuffered(), startBlock, endBlock)
},
{ delay: DELAY_IN_500MS, maxTry: ATTEMPTS_5 },
)

out.sort((a, b) => a.blockNumber - b.blockNumber)
this.metrics.etherJsRequest.labels({ method: this.getUnbufferedEvents.name, status: StatusOK }).inc()
end({ status: StatusOK })

this.metrics.etherJsRequest.labels({ method: this.getUnbufferedEvents.name, status: StatusOK }).inc()
end({ status: StatusOK })
return out
} catch (e) {
this.metrics.etherJsRequest.labels({ method: this.getUnbufferedEvents.name, status: StatusFail }).inc()
end({ status: StatusFail })

return E.right(out)
throw new NetworkError(e, `Could not call UnbufferedEvents`)
}
}

const unbufferedBatchSize = 500
const promises = []
for (let i = startBlock; i <= endBlock; i += unbufferedBatchSize) {
const start = i
const end = Math.min(i + unbufferedBatchSize - 1, endBlock)

promises.push(fetchUnbufferedEvents(start, end))
}

try {
const events = (await Promise.all(promises)).flat()
events.sort((a, b) => a.blockNumber - b.blockNumber)

return E.right(events)
} catch (e) {
this.metrics.etherJsRequest.labels({ method: this.getUnbufferedEvents.name, status: StatusFail }).inc()
end({ status: StatusFail })
if (e instanceof NetworkError) {
return E.left(e)
}

throw new NetworkError(e, `Could not call StakingRouterETHDeposited`)
return E.left(new Error(`Could not fetch unbuffered events`))
}
}

Expand Down Expand Up @@ -189,7 +212,7 @@ export class ETHProvider
}

public async getBalanceByBlockHash(address: string, blockHash: string): Promise<E.Either<Error, BigNumber>> {
const end = this.metrics.etherJsDurationHistogram.startTimer({ method: 'getBalanceByBlockHash' })
const end = this.metrics.etherJsDurationHistogram.startTimer({ method: this.getBalanceByBlockHash.name })

try {
const out = await retryAsync<string>(
Expand All @@ -202,20 +225,20 @@ export class ETHProvider
{ delay: DELAY_IN_500MS, maxTry: ATTEMPTS_5 },
)

this.metrics.etherJsRequest.labels({ method: 'getBalanceByBlockHash', status: StatusOK }).inc()
this.metrics.etherJsRequest.labels({ method: this.getBalanceByBlockHash.name, status: StatusOK }).inc()
end({ status: StatusOK })

return E.right(new BigNumber(out))
} catch (e) {
this.metrics.etherJsRequest.labels({ method: 'getBalanceByBlockHash', status: StatusFail }).inc()
this.metrics.etherJsRequest.labels({ method: this.getBalanceByBlockHash.name, status: StatusFail }).inc()
end({ status: StatusFail })

return E.left(new NetworkError(e, `Could not fetch balance by address ${address} and blockHash ${blockHash}`))
}
}

public async getStakingLimitInfo(blockNumber: number): Promise<E.Either<Error, StakingLimitInfo>> {
const end = this.metrics.etherJsDurationHistogram.labels({ method: 'getStakingLimitInfo' }).startTimer()
const end = this.metrics.etherJsDurationHistogram.labels({ method: this.getStakingLimitInfo.name }).startTimer()

try {
const out = await retryAsync<StakingLimitInfo>(
Expand All @@ -233,12 +256,12 @@ export class ETHProvider
{ delay: DELAY_IN_500MS, maxTry: ATTEMPTS_5 },
)

this.metrics.etherJsRequest.labels({ method: 'getStakeLimitFullInfo', status: StatusOK }).inc()
this.metrics.etherJsRequest.labels({ method: this.getStakingLimitInfo.name, status: StatusOK }).inc()
end({ status: StatusOK })

return E.right(out)
} catch (e) {
this.metrics.etherJsRequest.labels({ method: 'getStakeLimitFullInfo', status: StatusFail }).inc()
this.metrics.etherJsRequest.labels({ method: this.getStakingLimitInfo.name, status: StatusFail }).inc()
end({ status: StatusFail })

return E.left(new NetworkError(e, `Could not call lidoContract.getStakeLimitFullInfo`))
Expand Down Expand Up @@ -314,7 +337,7 @@ export class ETHProvider
}

const chunkPromises: Promise<void>[] = []
const MAX_REQUESTS_CHUNK_SIZE = 875
const MAX_REQUESTS_CHUNK_SIZE = 500
const out = new DataRW<WithdrawalRequest>([])

for (let i = 0; i < requestIds.length; i += MAX_REQUESTS_CHUNK_SIZE) {
Expand Down Expand Up @@ -950,6 +973,7 @@ export class ETHProvider
jsonrpc: string
method: string
params: Array<any>
id: string
}

type RpcResponse = {
Expand All @@ -968,12 +992,13 @@ export class ETHProvider
jsonrpc: '2.0',
method: `eth_getStorageAt`,
params: [address, slot, blockTag],
id: randomUUID().toLowerCase(),
}

try {
const data = await retryAsync<string>(
async (): Promise<string> => {
const response: Response = await fetch(`https://eth.drpc.org`, {
const response: Response = await fetch(this.jsonRpcProvider.connection.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Expand Down Expand Up @@ -1024,6 +1049,7 @@ export class ETHProvider
jsonrpc: string
method: string
params: Array<any>
id: string
}

type RpcResponse = {
Expand All @@ -1040,12 +1066,13 @@ export class ETHProvider
jsonrpc: '2.0',
method: `eth_getStorageAt`,
params: [address, slotAddr, blockTag],
id: randomUUID().toLowerCase(),
}

try {
const data = await retryAsync<string>(
async (): Promise<string> => {
const response: Response = await fetch(`https://eth.drpc.org`, {
const response: Response = await fetch(this.jsonRpcProvider.connection.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Expand Down
5 changes: 5 additions & 0 deletions ethereum-steth/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,11 @@
resolved "https://registry.yarnpkg.com/@types/triple-beam/-/triple-beam-1.3.5.tgz#74fef9ffbaa198eb8b588be029f38b00299caa2c"
integrity sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw==

"@types/uuid@^10.0.0":
version "10.0.0"
resolved "https://registry.yarnpkg.com/@types/uuid/-/uuid-10.0.0.tgz#e9c07fe50da0f53dc24970cca94d619ff03f6f6d"
integrity sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==

"@types/uuid@^8.3.4":
version "8.3.4"
resolved "https://registry.yarnpkg.com/@types/uuid/-/uuid-8.3.4.tgz#bd86a43617df0594787d38b735f55c805becf1bc"
Expand Down
2 changes: 1 addition & 1 deletion l2-bridge-arbitrum/src/clients/l2_client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ describe('l2Client', () => {

expect(l2Logs.right.length).toBe(16)
},
TEST_TIMEOUT,
TEST_TIMEOUT * 5,
)

test(
Expand Down
2 changes: 1 addition & 1 deletion l2-bridge-arbitrum/src/clients/l2_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ export class L2Client implements IL2BridgeBalanceClient {
}

public async fetchL2Logs(startBlock: number, endBlock: number, address: string[]): Promise<E.Either<Error, Log[]>> {
const batchSize = 2_000
const batchSize = 500
const batchRequests: RpcRequest[] = []
for (let i = startBlock; i <= endBlock; i += batchSize) {
const from = i
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,6 @@ describe('monitor_withdrawals', () => {
expect(result.right.amount).toBe(expectedAmount)
expect(result.right.total).toBe(5)
},
TEST_TIMEOUT,
TEST_TIMEOUT * 5,
)
})
Loading