Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e test improvements, and fixes for executor classes #18312

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rude-cooks-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@mysten/sui': minor
---

Update parallel executor class to handle gasPrice and budgeting to remove extra rpc calls during execution"
3 changes: 3 additions & 0 deletions .github/actions/turbo-diffs/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ runs:
- uses: pnpm/action-setup@a3252b78c470c02df07e9d59298aecedc3ccdd6d # [email protected]
with:
version: 9.1.1
- name: Install dependencies
run: pnpm install --frozen-lockfile
shell: bash
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was defaulting to using a new version of turbo that removed commands this job depends on, so e2e test were only triggered on rust changes

- id: changes
name: Detect changes
shell: bash
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ jobs:
- 5432:5432
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # Pin v4.1.1
# Disabled for now as it makes test runs take longer
# - uses: bmwill/rust-cache@v1 # Fork of 'Swatinem/rust-cache' which allows caching additional paths
- uses: Swatinem/rust-cache@v2
with:
cache-all-crates: true
cache-on-failure: true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shaves a few minutes off the build, but doesn't seem to cache builds for things in the repo. Probably needs further improvement

- uses: pnpm/action-setup@a3252b78c470c02df07e9d59298aecedc3ccdd6d # [email protected]
with:
version: 9.1.1
Expand Down
7 changes: 6 additions & 1 deletion sdk/docs/pages/typescript/executors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,15 @@ way that avoids conflicts between transactions using the same object ids.
`200_000_000n`),
- `minimumCoinBalance`: After executing a transaction, the the gasCoin will be reused unless it's
balance is below this value (default `50_000_000n`),
- `defaultBudget`: The default budget for transactions, which will be used if the transaction does
not specify a budget (default `minimumCoinBalance`),
- `maxPoolSize`: The maximum number of gas coins to keep in the gas pool, which also limits the
maximum number of concurrent transactions (default 50),
- sourceCoins`: An array of coins to use to create the gas pool, defaults to using all coins owned
- `sourceCoins`: An array of coins to use to create the gas pool, defaults to using all coins owned
by the signer.
- `epochBoundaryWindow` Time to wait before/after the expected epoch boundary before re-fetching the
gas pool (in milliseconds). Building transactions will be paused for up to 2x this duration around
each epoch boundary to ensure the gas price is up-to-date for the next epoch. (default `1000`)

```ts
import { getFullnodeUrl, SuiClient } from '@mysten/sui/client';
Expand Down
3 changes: 3 additions & 0 deletions sdk/docs/pages/typescript/transaction-building/intents.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import { coinWithBalance, Transaction } from '@mysten/sui/transactions';

const tx = new Transaction();

// Setting the sender is required for the CoinWithBalance intent to resolve coins when not using the gas coin
tx.setSender(keypair.toSuiAddress());

tx.transferObjects(
[
// Create a SUI coin (balance is in MIST)
Expand Down
14 changes: 7 additions & 7 deletions sdk/typescript/src/transactions/ObjectCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ export class ObjectCache {

if (cached.initialSharedVersion && !input.UnresolvedObject.initialSharedVersion) {
input.UnresolvedObject.initialSharedVersion = cached.initialSharedVersion;
}

if (cached.version && !input.UnresolvedObject.version) {
input.UnresolvedObject.version = cached.version;
}
} else {
if (cached.version && !input.UnresolvedObject.version) {
input.UnresolvedObject.version = cached.version;
}

if (cached.digest && !input.UnresolvedObject.digest) {
input.UnresolvedObject.digest = cached.digest;
if (cached.digest && !input.UnresolvedObject.digest) {
input.UnresolvedObject.digest = cached.digest;
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion sdk/typescript/src/transactions/executor/caching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { bcs } from '../../bcs/index.js';
import type { ExecuteTransactionBlockParams, SuiClient } from '../../client/index.js';
import type { Signer } from '../../cryptography/keypair.js';
import type { BuildTransactionOptions } from '../json-rpc-resolver.js';
import type { ObjectCacheOptions } from '../ObjectCache.js';
import { ObjectCache } from '../ObjectCache.js';
import type { Transaction } from '../Transaction.js';
Expand Down Expand Up @@ -32,10 +33,14 @@ export class CachingTransactionExecutor {
await this.cache.clearCustom();
}

async buildTransaction({ transaction }: { transaction: Transaction }) {
async buildTransaction({
transaction,
...options
}: { transaction: Transaction } & BuildTransactionOptions) {
transaction.addBuildPlugin(this.cache.asPlugin());
return transaction.build({
client: this.#client,
...options,
});
}

Expand Down
121 changes: 114 additions & 7 deletions sdk/typescript/src/transactions/executor/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,28 @@ const PARALLEL_EXECUTOR_DEFAULTS = {
initialCoinBalance: 200_000_000n,
minimumCoinBalance: 50_000_000n,
maxPoolSize: 50,
epochBoundaryWindow: 1_000,
} satisfies Omit<ParallelTransactionExecutorOptions, 'signer' | 'client'>;
export interface ParallelTransactionExecutorOptions extends Omit<ObjectCacheOptions, 'address'> {
client: SuiClient;
signer: Signer;
/** The number of coins to create in a batch when refilling the gas pool */
coinBatchSize?: number;
/** The initial balance of each coin created for the gas pool */
initialCoinBalance?: bigint;
/** The minimum balance of a coin that can be reused for future transactions. If the gasCoin is below this value, it will be used when refilling the gasPool */
minimumCoinBalance?: bigint;
/** The gasBudget to use if the transaction has not defined it's own gasBudget, defaults to `minimumCoinBalance` */
defaultGasBudget?: bigint;
/**
* Time to wait before/after the expected epoch boundary before re-fetching the gas pool (in milliseconds).
* Building transactions will be paused for up to 2x this duration around each epoch boundary to ensure the
* gas price is up-to-date for the next epoch.
* */
epochBoundaryWindow?: number;
/** The maximum number of transactions that can be execute in parallel, this also determines the maximum number of gas coins that will be created */
maxPoolSize?: number;
/** An initial list of coins used to fund the gas pool, uses all owned SUI coins by default */
sourceCoins?: string[];
}

Expand All @@ -41,13 +55,22 @@ export class ParallelTransactionExecutor {
#coinBatchSize: number;
#initialCoinBalance: bigint;
#minimumCoinBalance: bigint;
#epochBoundaryWindow: number;
#defaultGasBudget: bigint;
#maxPoolSize: number;
#sourceCoins: Map<string, SuiObjectRef | null> | null;
#coinPool: CoinWithBalance[] = [];
#cache: CachingTransactionExecutor;
#objectIdQueues = new Map<string, (() => void)[]>();
#buildQueue = new SerialQueue();
#executeQueue: ParallelQueue;
#lastDigest: string | null = null;
#cacheLock: Promise<void> | null = null;
#pendingTransactions = 0;
#gasPrice: null | {
price: bigint;
expiration: number;
} = null;

constructor(options: ParallelTransactionExecutorOptions) {
this.#signer = options.signer;
Expand All @@ -57,6 +80,9 @@ export class ParallelTransactionExecutor {
options.initialCoinBalance ?? PARALLEL_EXECUTOR_DEFAULTS.initialCoinBalance;
this.#minimumCoinBalance =
options.minimumCoinBalance ?? PARALLEL_EXECUTOR_DEFAULTS.minimumCoinBalance;
this.#defaultGasBudget = options.defaultGasBudget ?? this.#minimumCoinBalance;
this.#epochBoundaryWindow =
options.epochBoundaryWindow ?? PARALLEL_EXECUTOR_DEFAULTS.epochBoundaryWindow;
this.#maxPoolSize = options.maxPoolSize ?? PARALLEL_EXECUTOR_DEFAULTS.maxPoolSize;
this.#cache = new CachingTransactionExecutor({
client: options.client,
Expand All @@ -69,7 +95,8 @@ export class ParallelTransactionExecutor {
}

resetCache() {
return this.#cache.reset();
this.#gasPrice = null;
return this.#updateCache(() => this.#cache.reset());
}

async executeTransaction(transaction: Transaction) {
Expand Down Expand Up @@ -145,20 +172,36 @@ export class ParallelTransactionExecutor {
async #execute(transaction: Transaction, usedObjects: Set<string>) {
let gasCoin!: CoinWithBalance;
try {
const bytes = await this.#buildQueue.runTask(async () => {
transaction.setSenderIfNotSet(this.#signer.toSuiAddress());

await this.#buildQueue.runTask(async () => {
const data = transaction.getData();

if (!data.gasData.price) {
transaction.setGasPrice(await this.#getGasPrice());
}

if (!data.gasData.budget) {
transaction.setGasBudget(this.#defaultGasBudget);
}

await this.#updateCache();
gasCoin = await this.#getGasCoin();
this.#pendingTransactions++;
transaction.setGasPayment([
{
objectId: gasCoin.id,
version: gasCoin.version,
digest: gasCoin.digest,
},
]);
transaction.setSenderIfNotSet(this.#signer.toSuiAddress());

return this.#cache.buildTransaction({ transaction: transaction });
// Resolve cached references
await this.#cache.buildTransaction({ transaction, onlyTransactionKind: true });
});

const bytes = await transaction.build({ client: this.#client });

const { signature } = await this.#signer.signTransaction(bytes);

const results = await this.#cache.executeTransaction({
Expand Down Expand Up @@ -197,6 +240,8 @@ export class ParallelTransactionExecutor {
}
}

this.#lastDigest = results.digest;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executor tracks a list of gas coins to use to refill the pool, this contains the coins from failed executions and coins who's balance was too low. we don't track the versions for these coins, because in error cases, they might become invalid. By tracking the digest of the last executed transaction, we can wait for that digest to be indexed before executing the refill transaction, which should ensure the index is up-to-date for all coins used to refill the pool.


return {
digest: results.digest,
effects: toB64(effectsBytes),
Expand All @@ -210,7 +255,13 @@ export class ParallelTransactionExecutor {
this.#sourceCoins.set(gasCoin.id, null);
}

await this.#cache.cache.deleteObjects([...usedObjects]);
await this.#updateCache(async () => {
await Promise.all([
this.#cache.cache.deleteObjects([...usedObjects]),
this.#waitForLastDigest(),
]);
});

throw error;
} finally {
usedObjects.forEach((objectId) => {
Expand All @@ -221,11 +272,35 @@ export class ParallelTransactionExecutor {
this.#objectIdQueues.delete(objectId);
}
});
this.#pendingTransactions--;
}
}

/** Helper for synchronizing cache updates, by ensuring only one update happens at a time. This can also be used to wait for any pending cache updates */
async #updateCache(fn?: () => Promise<void>) {
if (this.#cacheLock) {
await this.#cacheLock;
}

this.#cacheLock =
fn?.().then(
() => {
this.#cacheLock = null;
},
() => {},
) ?? null;
}

async #waitForLastDigest() {
const digest = this.#lastDigest;
if (digest) {
this.#lastDigest = null;
await this.#client.waitForTransaction({ digest });
}
}

async #getGasCoin() {
if (this.#coinPool.length === 0 && this.#executeQueue.activeTasks <= this.#maxPoolSize) {
if (this.#coinPool.length === 0 && this.#pendingTransactions <= this.#maxPoolSize) {
await this.#refillCoinPool();
}

Expand All @@ -237,10 +312,40 @@ export class ParallelTransactionExecutor {
return coin;
}

async #getGasPrice(): Promise<bigint> {
const remaining = this.#gasPrice
? this.#gasPrice.expiration - this.#epochBoundaryWindow - Date.now()
: 0;

if (remaining > 0) {
return this.#gasPrice!.price;
}

if (this.#gasPrice) {
const timeToNextEpoch = Math.max(
this.#gasPrice.expiration + this.#epochBoundaryWindow - Date.now(),
1_000,
);

await new Promise((resolve) => setTimeout(resolve, timeToNextEpoch));
}

const state = await this.#client.getLatestSuiSystemState();

this.#gasPrice = {
price: BigInt(state.referenceGasPrice),
expiration:
Number.parseInt(state.epochStartTimestampMs, 10) +
Number.parseInt(state.epochDurationMs, 10),
};

return this.#getGasPrice();
}

async #refillCoinPool() {
const batchSize = Math.min(
this.#coinBatchSize,
this.#maxPoolSize - (this.#coinPool.length + this.#executeQueue.activeTasks) + 1,
this.#maxPoolSize - (this.#coinPool.length + this.#pendingTransactions) + 1,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the number of transaction in the executeQueue was slightly wrong, because they may not have borrowed a coin yet. Instead we should have been counting the coins in the pool + the transactions that are actively borrowing a gas coin

);

if (batchSize === 0) {
Expand Down Expand Up @@ -289,6 +394,8 @@ export class ParallelTransactionExecutor {
}
txb.transferObjects(coinResults, address);

await this.#updateCache(() => this.#waitForLastDigest());

const result = await this.#client.signAndExecuteTransaction({
transaction: txb,
signer: this.#signer,
Expand Down
13 changes: 12 additions & 1 deletion sdk/typescript/src/transactions/executor/serial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export class SerialTransactionExecutor {
#queue = new SerialQueue();
#signer: Signer;
#cache: CachingTransactionExecutor;
#client: SuiClient;
#lastDigest: string | null = null;

constructor({
signer,
Expand All @@ -24,6 +26,7 @@ export class SerialTransactionExecutor {
signer: Signer;
}) {
this.#signer = signer;
this.#client = options.client;
this.#cache = new CachingTransactionExecutor({
client: options.client,
cache: options.cache,
Expand Down Expand Up @@ -69,7 +72,14 @@ export class SerialTransactionExecutor {
};

resetCache() {
return this.#cache.reset();
return Promise.all([this.#cache.reset(), this.#waitForLastTransaction()]);
}

async #waitForLastTransaction() {
if (this.#lastDigest) {
await this.#client.waitForTransaction({ digest: this.#lastDigest });
this.#lastDigest = null;
}
}

executeTransaction(transaction: Transaction | Uint8Array) {
Expand All @@ -92,6 +102,7 @@ export class SerialTransactionExecutor {
const effectsBytes = Uint8Array.from(results.rawEffects!);
const effects = bcs.TransactionEffects.parse(effectsBytes);
await this.applyEffects(effects);
this.#lastDigest = results.digest;

return {
digest: results.digest,
Expand Down
Loading
Loading