Skip to content

Commit

Permalink
e2e test improvements, and fixes for executor classes (MystenLabs#18312)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
hayes-mysten authored and tx-tomcat committed Jul 29, 2024
1 parent b32a1c9 commit 86390a7
Show file tree
Hide file tree
Showing 16 changed files with 196 additions and 66 deletions.
5 changes: 5 additions & 0 deletions .changeset/rude-cooks-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@mysten/sui': minor
---

Update parallel executor class to handle gasPrice and budgeting to remove extra rpc calls during execution"
3 changes: 3 additions & 0 deletions .github/actions/turbo-diffs/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ runs:
- uses: pnpm/action-setup@a3252b78c470c02df07e9d59298aecedc3ccdd6d # [email protected]
with:
version: 9.1.1
- name: Install dependencies
run: pnpm install --frozen-lockfile
shell: bash
- id: changes
name: Detect changes
shell: bash
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ jobs:
- 5432:5432
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # Pin v4.1.1
# Disabled for now as it makes test runs take longer
# - uses: bmwill/rust-cache@v1 # Fork of 'Swatinem/rust-cache' which allows caching additional paths
- uses: Swatinem/rust-cache@v2
with:
cache-all-crates: true
cache-on-failure: true
- uses: pnpm/action-setup@a3252b78c470c02df07e9d59298aecedc3ccdd6d # [email protected]
with:
version: 9.1.1
Expand Down
7 changes: 6 additions & 1 deletion sdk/docs/pages/typescript/executors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,15 @@ way that avoids conflicts between transactions using the same object ids.
`200_000_000n`),
- `minimumCoinBalance`: After executing a transaction, the the gasCoin will be reused unless it's
balance is below this value (default `50_000_000n`),
- `defaultBudget`: The default budget for transactions, which will be used if the transaction does
not specify a budget (default `minimumCoinBalance`),
- `maxPoolSize`: The maximum number of gas coins to keep in the gas pool, which also limits the
maximum number of concurrent transactions (default 50),
- sourceCoins`: An array of coins to use to create the gas pool, defaults to using all coins owned
- `sourceCoins`: An array of coins to use to create the gas pool, defaults to using all coins owned
by the signer.
- `epochBoundaryWindow` Time to wait before/after the expected epoch boundary before re-fetching the
gas pool (in milliseconds). Building transactions will be paused for up to 2x this duration around
each epoch boundary to ensure the gas price is up-to-date for the next epoch. (default `1000`)

```ts
import { getFullnodeUrl, SuiClient } from '@mysten/sui/client';
Expand Down
3 changes: 3 additions & 0 deletions sdk/docs/pages/typescript/transaction-building/intents.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import { coinWithBalance, Transaction } from '@mysten/sui/transactions';

const tx = new Transaction();

// Setting the sender is required for the CoinWithBalance intent to resolve coins when not using the gas coin
tx.setSender(keypair.toSuiAddress());

tx.transferObjects(
[
// Create a SUI coin (balance is in MIST)
Expand Down
14 changes: 7 additions & 7 deletions sdk/typescript/src/transactions/ObjectCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ export class ObjectCache {

if (cached.initialSharedVersion && !input.UnresolvedObject.initialSharedVersion) {
input.UnresolvedObject.initialSharedVersion = cached.initialSharedVersion;
}

if (cached.version && !input.UnresolvedObject.version) {
input.UnresolvedObject.version = cached.version;
}
} else {
if (cached.version && !input.UnresolvedObject.version) {
input.UnresolvedObject.version = cached.version;
}

if (cached.digest && !input.UnresolvedObject.digest) {
input.UnresolvedObject.digest = cached.digest;
if (cached.digest && !input.UnresolvedObject.digest) {
input.UnresolvedObject.digest = cached.digest;
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion sdk/typescript/src/transactions/executor/caching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { bcs } from '../../bcs/index.js';
import type { ExecuteTransactionBlockParams, SuiClient } from '../../client/index.js';
import type { Signer } from '../../cryptography/keypair.js';
import type { BuildTransactionOptions } from '../json-rpc-resolver.js';
import type { ObjectCacheOptions } from '../ObjectCache.js';
import { ObjectCache } from '../ObjectCache.js';
import type { Transaction } from '../Transaction.js';
Expand Down Expand Up @@ -32,10 +33,14 @@ export class CachingTransactionExecutor {
await this.cache.clearCustom();
}

async buildTransaction({ transaction }: { transaction: Transaction }) {
async buildTransaction({
transaction,
...options
}: { transaction: Transaction } & BuildTransactionOptions) {
transaction.addBuildPlugin(this.cache.asPlugin());
return transaction.build({
client: this.#client,
...options,
});
}

Expand Down
121 changes: 114 additions & 7 deletions sdk/typescript/src/transactions/executor/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,28 @@ const PARALLEL_EXECUTOR_DEFAULTS = {
initialCoinBalance: 200_000_000n,
minimumCoinBalance: 50_000_000n,
maxPoolSize: 50,
epochBoundaryWindow: 1_000,
} satisfies Omit<ParallelTransactionExecutorOptions, 'signer' | 'client'>;
export interface ParallelTransactionExecutorOptions extends Omit<ObjectCacheOptions, 'address'> {
client: SuiClient;
signer: Signer;
/** The number of coins to create in a batch when refilling the gas pool */
coinBatchSize?: number;
/** The initial balance of each coin created for the gas pool */
initialCoinBalance?: bigint;
/** The minimum balance of a coin that can be reused for future transactions. If the gasCoin is below this value, it will be used when refilling the gasPool */
minimumCoinBalance?: bigint;
/** The gasBudget to use if the transaction has not defined it's own gasBudget, defaults to `minimumCoinBalance` */
defaultGasBudget?: bigint;
/**
* Time to wait before/after the expected epoch boundary before re-fetching the gas pool (in milliseconds).
* Building transactions will be paused for up to 2x this duration around each epoch boundary to ensure the
* gas price is up-to-date for the next epoch.
* */
epochBoundaryWindow?: number;
/** The maximum number of transactions that can be execute in parallel, this also determines the maximum number of gas coins that will be created */
maxPoolSize?: number;
/** An initial list of coins used to fund the gas pool, uses all owned SUI coins by default */
sourceCoins?: string[];
}

Expand All @@ -41,13 +55,22 @@ export class ParallelTransactionExecutor {
#coinBatchSize: number;
#initialCoinBalance: bigint;
#minimumCoinBalance: bigint;
#epochBoundaryWindow: number;
#defaultGasBudget: bigint;
#maxPoolSize: number;
#sourceCoins: Map<string, SuiObjectRef | null> | null;
#coinPool: CoinWithBalance[] = [];
#cache: CachingTransactionExecutor;
#objectIdQueues = new Map<string, (() => void)[]>();
#buildQueue = new SerialQueue();
#executeQueue: ParallelQueue;
#lastDigest: string | null = null;
#cacheLock: Promise<void> | null = null;
#pendingTransactions = 0;
#gasPrice: null | {
price: bigint;
expiration: number;
} = null;

constructor(options: ParallelTransactionExecutorOptions) {
this.#signer = options.signer;
Expand All @@ -57,6 +80,9 @@ export class ParallelTransactionExecutor {
options.initialCoinBalance ?? PARALLEL_EXECUTOR_DEFAULTS.initialCoinBalance;
this.#minimumCoinBalance =
options.minimumCoinBalance ?? PARALLEL_EXECUTOR_DEFAULTS.minimumCoinBalance;
this.#defaultGasBudget = options.defaultGasBudget ?? this.#minimumCoinBalance;
this.#epochBoundaryWindow =
options.epochBoundaryWindow ?? PARALLEL_EXECUTOR_DEFAULTS.epochBoundaryWindow;
this.#maxPoolSize = options.maxPoolSize ?? PARALLEL_EXECUTOR_DEFAULTS.maxPoolSize;
this.#cache = new CachingTransactionExecutor({
client: options.client,
Expand All @@ -69,7 +95,8 @@ export class ParallelTransactionExecutor {
}

resetCache() {
return this.#cache.reset();
this.#gasPrice = null;
return this.#updateCache(() => this.#cache.reset());
}

async executeTransaction(transaction: Transaction) {
Expand Down Expand Up @@ -145,20 +172,36 @@ export class ParallelTransactionExecutor {
async #execute(transaction: Transaction, usedObjects: Set<string>) {
let gasCoin!: CoinWithBalance;
try {
const bytes = await this.#buildQueue.runTask(async () => {
transaction.setSenderIfNotSet(this.#signer.toSuiAddress());

await this.#buildQueue.runTask(async () => {
const data = transaction.getData();

if (!data.gasData.price) {
transaction.setGasPrice(await this.#getGasPrice());
}

if (!data.gasData.budget) {
transaction.setGasBudget(this.#defaultGasBudget);
}

await this.#updateCache();
gasCoin = await this.#getGasCoin();
this.#pendingTransactions++;
transaction.setGasPayment([
{
objectId: gasCoin.id,
version: gasCoin.version,
digest: gasCoin.digest,
},
]);
transaction.setSenderIfNotSet(this.#signer.toSuiAddress());

return this.#cache.buildTransaction({ transaction: transaction });
// Resolve cached references
await this.#cache.buildTransaction({ transaction, onlyTransactionKind: true });
});

const bytes = await transaction.build({ client: this.#client });

const { signature } = await this.#signer.signTransaction(bytes);

const results = await this.#cache.executeTransaction({
Expand Down Expand Up @@ -197,6 +240,8 @@ export class ParallelTransactionExecutor {
}
}

this.#lastDigest = results.digest;

return {
digest: results.digest,
effects: toB64(effectsBytes),
Expand All @@ -210,7 +255,13 @@ export class ParallelTransactionExecutor {
this.#sourceCoins.set(gasCoin.id, null);
}

await this.#cache.cache.deleteObjects([...usedObjects]);
await this.#updateCache(async () => {
await Promise.all([
this.#cache.cache.deleteObjects([...usedObjects]),
this.#waitForLastDigest(),
]);
});

throw error;
} finally {
usedObjects.forEach((objectId) => {
Expand All @@ -221,11 +272,35 @@ export class ParallelTransactionExecutor {
this.#objectIdQueues.delete(objectId);
}
});
this.#pendingTransactions--;
}
}

/** Helper for synchronizing cache updates, by ensuring only one update happens at a time. This can also be used to wait for any pending cache updates */
async #updateCache(fn?: () => Promise<void>) {
if (this.#cacheLock) {
await this.#cacheLock;
}

this.#cacheLock =
fn?.().then(
() => {
this.#cacheLock = null;
},
() => {},
) ?? null;
}

async #waitForLastDigest() {
const digest = this.#lastDigest;
if (digest) {
this.#lastDigest = null;
await this.#client.waitForTransaction({ digest });
}
}

async #getGasCoin() {
if (this.#coinPool.length === 0 && this.#executeQueue.activeTasks <= this.#maxPoolSize) {
if (this.#coinPool.length === 0 && this.#pendingTransactions <= this.#maxPoolSize) {
await this.#refillCoinPool();
}

Expand All @@ -237,10 +312,40 @@ export class ParallelTransactionExecutor {
return coin;
}

async #getGasPrice(): Promise<bigint> {
const remaining = this.#gasPrice
? this.#gasPrice.expiration - this.#epochBoundaryWindow - Date.now()
: 0;

if (remaining > 0) {
return this.#gasPrice!.price;
}

if (this.#gasPrice) {
const timeToNextEpoch = Math.max(
this.#gasPrice.expiration + this.#epochBoundaryWindow - Date.now(),
1_000,
);

await new Promise((resolve) => setTimeout(resolve, timeToNextEpoch));
}

const state = await this.#client.getLatestSuiSystemState();

this.#gasPrice = {
price: BigInt(state.referenceGasPrice),
expiration:
Number.parseInt(state.epochStartTimestampMs, 10) +
Number.parseInt(state.epochDurationMs, 10),
};

return this.#getGasPrice();
}

async #refillCoinPool() {
const batchSize = Math.min(
this.#coinBatchSize,
this.#maxPoolSize - (this.#coinPool.length + this.#executeQueue.activeTasks) + 1,
this.#maxPoolSize - (this.#coinPool.length + this.#pendingTransactions) + 1,
);

if (batchSize === 0) {
Expand Down Expand Up @@ -289,6 +394,8 @@ export class ParallelTransactionExecutor {
}
txb.transferObjects(coinResults, address);

await this.#updateCache(() => this.#waitForLastDigest());

const result = await this.#client.signAndExecuteTransaction({
transaction: txb,
signer: this.#signer,
Expand Down
13 changes: 12 additions & 1 deletion sdk/typescript/src/transactions/executor/serial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export class SerialTransactionExecutor {
#queue = new SerialQueue();
#signer: Signer;
#cache: CachingTransactionExecutor;
#client: SuiClient;
#lastDigest: string | null = null;

constructor({
signer,
Expand All @@ -24,6 +26,7 @@ export class SerialTransactionExecutor {
signer: Signer;
}) {
this.#signer = signer;
this.#client = options.client;
this.#cache = new CachingTransactionExecutor({
client: options.client,
cache: options.cache,
Expand Down Expand Up @@ -69,7 +72,14 @@ export class SerialTransactionExecutor {
};

resetCache() {
return this.#cache.reset();
return Promise.all([this.#cache.reset(), this.#waitForLastTransaction()]);
}

async #waitForLastTransaction() {
if (this.#lastDigest) {
await this.#client.waitForTransaction({ digest: this.#lastDigest });
this.#lastDigest = null;
}
}

executeTransaction(transaction: Transaction | Uint8Array) {
Expand All @@ -92,6 +102,7 @@ export class SerialTransactionExecutor {
const effectsBytes = Uint8Array.from(results.rawEffects!);
const effects = bcs.TransactionEffects.parse(effectsBytes);
await this.applyEffects(effects);
this.#lastDigest = results.digest;

return {
digest: results.digest,
Expand Down
Loading

0 comments on commit 86390a7

Please sign in to comment.