-
Notifications
You must be signed in to change notification settings - Fork 11.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
e2e test improvements, and fixes for executor classes #18312
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
'@mysten/sui': minor | ||
--- | ||
|
||
Update parallel executor class to handle gasPrice and budgeting to remove extra rpc calls during execution" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,9 @@ runs: | |
- uses: pnpm/action-setup@a3252b78c470c02df07e9d59298aecedc3ccdd6d # [email protected] | ||
with: | ||
version: 9.1.1 | ||
- name: Install dependencies | ||
run: pnpm install --frozen-lockfile | ||
shell: bash | ||
- id: changes | ||
name: Detect changes | ||
shell: bash | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,8 +43,10 @@ jobs: | |
- 5432:5432 | ||
steps: | ||
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # Pin v4.1.1 | ||
# Disabled for now as it makes test runs take longer | ||
# - uses: bmwill/rust-cache@v1 # Fork of 'Swatinem/rust-cache' which allows caching additional paths | ||
- uses: Swatinem/rust-cache@v2 | ||
with: | ||
cache-all-crates: true | ||
cache-on-failure: true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this shaves a few minutes off the build, but doesn't seem to cache builds for things in the repo. Probably needs further improvement |
||
- uses: pnpm/action-setup@a3252b78c470c02df07e9d59298aecedc3ccdd6d # [email protected] | ||
with: | ||
version: 9.1.1 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,14 +18,28 @@ const PARALLEL_EXECUTOR_DEFAULTS = { | |
initialCoinBalance: 200_000_000n, | ||
minimumCoinBalance: 50_000_000n, | ||
maxPoolSize: 50, | ||
epochBoundaryWindow: 1_000, | ||
} satisfies Omit<ParallelTransactionExecutorOptions, 'signer' | 'client'>; | ||
export interface ParallelTransactionExecutorOptions extends Omit<ObjectCacheOptions, 'address'> { | ||
client: SuiClient; | ||
signer: Signer; | ||
/** The number of coins to create in a batch when refilling the gas pool */ | ||
coinBatchSize?: number; | ||
/** The initial balance of each coin created for the gas pool */ | ||
initialCoinBalance?: bigint; | ||
/** The minimum balance of a coin that can be reused for future transactions. If the gasCoin is below this value, it will be used when refilling the gasPool */ | ||
minimumCoinBalance?: bigint; | ||
/** The gasBudget to use if the transaction has not defined it's own gasBudget, defaults to `minimumCoinBalance` */ | ||
defaultGasBudget?: bigint; | ||
/** | ||
* Time to wait before/after the expected epoch boundary before re-fetching the gas pool (in milliseconds). | ||
* Building transactions will be paused for up to 2x this duration around each epoch boundary to ensure the | ||
* gas price is up-to-date for the next epoch. | ||
* */ | ||
epochBoundaryWindow?: number; | ||
/** The maximum number of transactions that can be execute in parallel, this also determines the maximum number of gas coins that will be created */ | ||
maxPoolSize?: number; | ||
/** An initial list of coins used to fund the gas pool, uses all owned SUI coins by default */ | ||
sourceCoins?: string[]; | ||
} | ||
|
||
|
@@ -41,13 +55,22 @@ export class ParallelTransactionExecutor { | |
#coinBatchSize: number; | ||
#initialCoinBalance: bigint; | ||
#minimumCoinBalance: bigint; | ||
#epochBoundaryWindow: number; | ||
#defaultGasBudget: bigint; | ||
#maxPoolSize: number; | ||
#sourceCoins: Map<string, SuiObjectRef | null> | null; | ||
#coinPool: CoinWithBalance[] = []; | ||
#cache: CachingTransactionExecutor; | ||
#objectIdQueues = new Map<string, (() => void)[]>(); | ||
#buildQueue = new SerialQueue(); | ||
#executeQueue: ParallelQueue; | ||
#lastDigest: string | null = null; | ||
#cacheLock: Promise<void> | null = null; | ||
#pendingTransactions = 0; | ||
#gasPrice: null | { | ||
price: bigint; | ||
expiration: number; | ||
} = null; | ||
|
||
constructor(options: ParallelTransactionExecutorOptions) { | ||
this.#signer = options.signer; | ||
|
@@ -57,6 +80,9 @@ export class ParallelTransactionExecutor { | |
options.initialCoinBalance ?? PARALLEL_EXECUTOR_DEFAULTS.initialCoinBalance; | ||
this.#minimumCoinBalance = | ||
options.minimumCoinBalance ?? PARALLEL_EXECUTOR_DEFAULTS.minimumCoinBalance; | ||
this.#defaultGasBudget = options.defaultGasBudget ?? this.#minimumCoinBalance; | ||
this.#epochBoundaryWindow = | ||
options.epochBoundaryWindow ?? PARALLEL_EXECUTOR_DEFAULTS.epochBoundaryWindow; | ||
this.#maxPoolSize = options.maxPoolSize ?? PARALLEL_EXECUTOR_DEFAULTS.maxPoolSize; | ||
this.#cache = new CachingTransactionExecutor({ | ||
client: options.client, | ||
|
@@ -69,7 +95,8 @@ export class ParallelTransactionExecutor { | |
} | ||
|
||
resetCache() { | ||
return this.#cache.reset(); | ||
this.#gasPrice = null; | ||
return this.#updateCache(() => this.#cache.reset()); | ||
} | ||
|
||
async executeTransaction(transaction: Transaction) { | ||
|
@@ -145,20 +172,36 @@ export class ParallelTransactionExecutor { | |
async #execute(transaction: Transaction, usedObjects: Set<string>) { | ||
let gasCoin!: CoinWithBalance; | ||
try { | ||
const bytes = await this.#buildQueue.runTask(async () => { | ||
transaction.setSenderIfNotSet(this.#signer.toSuiAddress()); | ||
|
||
await this.#buildQueue.runTask(async () => { | ||
const data = transaction.getData(); | ||
|
||
if (!data.gasData.price) { | ||
transaction.setGasPrice(await this.#getGasPrice()); | ||
} | ||
|
||
if (!data.gasData.budget) { | ||
transaction.setGasBudget(this.#defaultGasBudget); | ||
} | ||
|
||
await this.#updateCache(); | ||
gasCoin = await this.#getGasCoin(); | ||
this.#pendingTransactions++; | ||
transaction.setGasPayment([ | ||
{ | ||
objectId: gasCoin.id, | ||
version: gasCoin.version, | ||
digest: gasCoin.digest, | ||
}, | ||
]); | ||
transaction.setSenderIfNotSet(this.#signer.toSuiAddress()); | ||
|
||
return this.#cache.buildTransaction({ transaction: transaction }); | ||
// Resolve cached references | ||
await this.#cache.buildTransaction({ transaction, onlyTransactionKind: true }); | ||
}); | ||
|
||
const bytes = await transaction.build({ client: this.#client }); | ||
|
||
const { signature } = await this.#signer.signTransaction(bytes); | ||
|
||
const results = await this.#cache.executeTransaction({ | ||
|
@@ -197,6 +240,8 @@ export class ParallelTransactionExecutor { | |
} | ||
} | ||
|
||
this.#lastDigest = results.digest; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The executor tracks a list of gas coins to use to refill the pool, this contains the coins from failed executions and coins who's balance was too low. we don't track the versions for these coins, because in error cases, they might become invalid. By tracking the digest of the last executed transaction, we can wait for that digest to be indexed before executing the refill transaction, which should ensure the index is up-to-date for all coins used to refill the pool. |
||
|
||
return { | ||
digest: results.digest, | ||
effects: toB64(effectsBytes), | ||
|
@@ -210,7 +255,13 @@ export class ParallelTransactionExecutor { | |
this.#sourceCoins.set(gasCoin.id, null); | ||
} | ||
|
||
await this.#cache.cache.deleteObjects([...usedObjects]); | ||
await this.#updateCache(async () => { | ||
await Promise.all([ | ||
this.#cache.cache.deleteObjects([...usedObjects]), | ||
this.#waitForLastDigest(), | ||
]); | ||
}); | ||
|
||
throw error; | ||
} finally { | ||
usedObjects.forEach((objectId) => { | ||
|
@@ -221,11 +272,35 @@ export class ParallelTransactionExecutor { | |
this.#objectIdQueues.delete(objectId); | ||
} | ||
}); | ||
this.#pendingTransactions--; | ||
} | ||
} | ||
|
||
/** Helper for synchronizing cache updates, by ensuring only one update happens at a time. This can also be used to wait for any pending cache updates */ | ||
async #updateCache(fn?: () => Promise<void>) { | ||
if (this.#cacheLock) { | ||
await this.#cacheLock; | ||
} | ||
|
||
this.#cacheLock = | ||
fn?.().then( | ||
() => { | ||
this.#cacheLock = null; | ||
}, | ||
() => {}, | ||
) ?? null; | ||
} | ||
|
||
async #waitForLastDigest() { | ||
const digest = this.#lastDigest; | ||
if (digest) { | ||
this.#lastDigest = null; | ||
await this.#client.waitForTransaction({ digest }); | ||
} | ||
} | ||
|
||
async #getGasCoin() { | ||
if (this.#coinPool.length === 0 && this.#executeQueue.activeTasks <= this.#maxPoolSize) { | ||
if (this.#coinPool.length === 0 && this.#pendingTransactions <= this.#maxPoolSize) { | ||
await this.#refillCoinPool(); | ||
} | ||
|
||
|
@@ -237,10 +312,40 @@ export class ParallelTransactionExecutor { | |
return coin; | ||
} | ||
|
||
async #getGasPrice(): Promise<bigint> { | ||
const remaining = this.#gasPrice | ||
? this.#gasPrice.expiration - this.#epochBoundaryWindow - Date.now() | ||
: 0; | ||
|
||
if (remaining > 0) { | ||
return this.#gasPrice!.price; | ||
} | ||
|
||
if (this.#gasPrice) { | ||
const timeToNextEpoch = Math.max( | ||
this.#gasPrice.expiration + this.#epochBoundaryWindow - Date.now(), | ||
1_000, | ||
); | ||
|
||
await new Promise((resolve) => setTimeout(resolve, timeToNextEpoch)); | ||
} | ||
|
||
const state = await this.#client.getLatestSuiSystemState(); | ||
|
||
this.#gasPrice = { | ||
price: BigInt(state.referenceGasPrice), | ||
expiration: | ||
Number.parseInt(state.epochStartTimestampMs, 10) + | ||
Number.parseInt(state.epochDurationMs, 10), | ||
}; | ||
|
||
return this.#getGasPrice(); | ||
} | ||
|
||
async #refillCoinPool() { | ||
const batchSize = Math.min( | ||
this.#coinBatchSize, | ||
this.#maxPoolSize - (this.#coinPool.length + this.#executeQueue.activeTasks) + 1, | ||
this.#maxPoolSize - (this.#coinPool.length + this.#pendingTransactions) + 1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using the number of transaction in the executeQueue was slightly wrong, because they may not have borrowed a coin yet. Instead we should have been counting the coins in the pool + the transactions that are actively borrowing a gas coin |
||
); | ||
|
||
if (batchSize === 0) { | ||
|
@@ -289,6 +394,8 @@ export class ParallelTransactionExecutor { | |
} | ||
txb.transferObjects(coinResults, address); | ||
|
||
await this.#updateCache(() => this.#waitForLastDigest()); | ||
|
||
const result = await this.#client.signAndExecuteTransaction({ | ||
transaction: txb, | ||
signer: this.#signer, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was defaulting to using a new version of turbo that removed commands this job depends on, so e2e test were only triggered on rust changes