Skip to content

Commit

Permalink
[ts sdk] e2e test improvements, and fixes for executor classes
Browse files Browse the repository at this point in the history
  • Loading branch information
hayes-mysten committed Jun 21, 2024
1 parent aa1ad98 commit 7ed1c76
Show file tree
Hide file tree
Showing 16 changed files with 196 additions and 66 deletions.
5 changes: 5 additions & 0 deletions .changeset/rude-cooks-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@mysten/sui': minor
---

Update parallel executor class to handle gasPrice and budgeting to remove extra rpc calls during execution"
3 changes: 3 additions & 0 deletions .github/actions/turbo-diffs/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ runs:
- uses: pnpm/action-setup@a3252b78c470c02df07e9d59298aecedc3ccdd6d # [email protected]
with:
version: 9.1.1
- name: Install dependencies
run: pnpm install --frozen-lockfile
shell: bash
- id: changes
name: Detect changes
shell: bash
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ jobs:
- 5432:5432
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # Pin v4.1.1
# Disabled for now as it makes test runs take longer
# - uses: bmwill/rust-cache@v1 # Fork of 'Swatinem/rust-cache' which allows caching additional paths
- uses: Swatinem/rust-cache@v2
with:
cache-all-crates: true
cache-on-failure: true
- uses: pnpm/action-setup@a3252b78c470c02df07e9d59298aecedc3ccdd6d # [email protected]
with:
version: 9.1.1
Expand Down
7 changes: 6 additions & 1 deletion sdk/docs/pages/typescript/executors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,15 @@ way that avoids conflicts between transactions using the same object ids.
`200_000_000n`),
- `minimumCoinBalance`: After executing a transaction, the the gasCoin will be reused unless it's
balance is below this value (default `50_000_000n`),
- `defaultBudget`: The default budget for transactions, which will be used if the transaction does
not specify a budget (default `minimumCoinBalance`),
- `maxPoolSize`: The maximum number of gas coins to keep in the gas pool, which also limits the
maximum number of concurrent transactions (default 50),
- sourceCoins`: An array of coins to use to create the gas pool, defaults to using all coins owned
- `sourceCoins`: An array of coins to use to create the gas pool, defaults to using all coins owned
by the signer.
- `epochBoundaryWindow` Time to wait before/after the expected epoch boundary before re-fetching the
gas pool (in milliseconds). Building transactions will be paused for up to 2x this duration around
each epoch boundary to ensure the gas price is up-to-date for the next epoch. (default `1000`)

```ts
import { getFullnodeUrl, SuiClient } from '@mysten/sui/client';
Expand Down
3 changes: 3 additions & 0 deletions sdk/docs/pages/typescript/transaction-building/intents.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import { coinWithBalance, Transaction } from '@mysten/sui/transactions';

const tx = new Transaction();

// Setting the sender is required for the CoinWithBalance intent to resolve coins when not using the gas coin
tx.setSender(keypair.toSuiAddress());

tx.transferObjects(
[
// Create a SUI coin (balance is in MIST)
Expand Down
14 changes: 7 additions & 7 deletions sdk/typescript/src/transactions/ObjectCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ export class ObjectCache {

if (cached.initialSharedVersion && !input.UnresolvedObject.initialSharedVersion) {
input.UnresolvedObject.initialSharedVersion = cached.initialSharedVersion;
}

if (cached.version && !input.UnresolvedObject.version) {
input.UnresolvedObject.version = cached.version;
}
} else {
if (cached.version && !input.UnresolvedObject.version) {
input.UnresolvedObject.version = cached.version;
}

if (cached.digest && !input.UnresolvedObject.digest) {
input.UnresolvedObject.digest = cached.digest;
if (cached.digest && !input.UnresolvedObject.digest) {
input.UnresolvedObject.digest = cached.digest;
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion sdk/typescript/src/transactions/executor/caching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { bcs } from '../../bcs/index.js';
import type { ExecuteTransactionBlockParams, SuiClient } from '../../client/index.js';
import type { Signer } from '../../cryptography/keypair.js';
import type { BuildTransactionOptions } from '../json-rpc-resolver.js';
import type { ObjectCacheOptions } from '../ObjectCache.js';
import { ObjectCache } from '../ObjectCache.js';
import type { Transaction } from '../Transaction.js';
Expand Down Expand Up @@ -32,10 +33,14 @@ export class CachingTransactionExecutor {
await this.cache.clearCustom();
}

async buildTransaction({ transaction }: { transaction: Transaction }) {
async buildTransaction({
transaction,
...options
}: { transaction: Transaction } & BuildTransactionOptions) {
transaction.addBuildPlugin(this.cache.asPlugin());
return transaction.build({
client: this.#client,
...options,
});
}

Expand Down
121 changes: 114 additions & 7 deletions sdk/typescript/src/transactions/executor/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,28 @@ const PARALLEL_EXECUTOR_DEFAULTS = {
initialCoinBalance: 200_000_000n,
minimumCoinBalance: 50_000_000n,
maxPoolSize: 50,
epochBoundaryWindow: 1_000,
} satisfies Omit<ParallelTransactionExecutorOptions, 'signer' | 'client'>;
export interface ParallelTransactionExecutorOptions extends Omit<ObjectCacheOptions, 'address'> {
client: SuiClient;
signer: Signer;
/** The number of coins to create in a batch when refilling the gas pool */
coinBatchSize?: number;
/** The initial balance of each coin created for the gas pool */
initialCoinBalance?: bigint;
/** The minimum balance of a coin that can be reused for future transactions. If the gasCoin is below this value, it will be used when refilling the gasPool */
minimumCoinBalance?: bigint;
/** The gasBudget to use if the transaction has not defined it's own gasBudget, defaults to `minimumCoinBalance` */
defaultGasBudget?: bigint;
/**
* Time to wait before/after the expected epoch boundary before re-fetching the gas pool (in milliseconds).
* Building transactions will be paused for up to 2x this duration around each epoch boundary to ensure the
* gas price is up-to-date for the next epoch.
* */
epochBoundaryWindow?: number;
/** The maximum number of transactions that can be execute in parallel, this also determines the maximum number of gas coins that will be created */
maxPoolSize?: number;
/** An initial list of coins used to fund the gas pool, uses all owned SUI coins by default */
sourceCoins?: string[];
}

Expand All @@ -41,13 +55,22 @@ export class ParallelTransactionExecutor {
#coinBatchSize: number;
#initialCoinBalance: bigint;
#minimumCoinBalance: bigint;
#epochBoundaryWindow: number;
#defaultGasBudget: bigint;
#maxPoolSize: number;
#sourceCoins: Map<string, SuiObjectRef | null> | null;
#coinPool: CoinWithBalance[] = [];
#cache: CachingTransactionExecutor;
#objectIdQueues = new Map<string, (() => void)[]>();
#buildQueue = new SerialQueue();
#executeQueue: ParallelQueue;
#lastDigest: string | null = null;
#cacheLock: Promise<void> | null = null;
#pendingTransactions = 0;
#gasPrice: null | {
price: bigint;
expiration: number;
} = null;

constructor(options: ParallelTransactionExecutorOptions) {
this.#signer = options.signer;
Expand All @@ -57,6 +80,9 @@ export class ParallelTransactionExecutor {
options.initialCoinBalance ?? PARALLEL_EXECUTOR_DEFAULTS.initialCoinBalance;
this.#minimumCoinBalance =
options.minimumCoinBalance ?? PARALLEL_EXECUTOR_DEFAULTS.minimumCoinBalance;
this.#defaultGasBudget = options.defaultGasBudget ?? this.#minimumCoinBalance;
this.#epochBoundaryWindow =
options.epochBoundaryWindow ?? PARALLEL_EXECUTOR_DEFAULTS.epochBoundaryWindow;
this.#maxPoolSize = options.maxPoolSize ?? PARALLEL_EXECUTOR_DEFAULTS.maxPoolSize;
this.#cache = new CachingTransactionExecutor({
client: options.client,
Expand All @@ -69,7 +95,8 @@ export class ParallelTransactionExecutor {
}

resetCache() {
return this.#cache.reset();
this.#gasPrice = null;
return this.#updateCache(() => this.#cache.reset());
}

async executeTransaction(transaction: Transaction) {
Expand Down Expand Up @@ -145,20 +172,36 @@ export class ParallelTransactionExecutor {
async #execute(transaction: Transaction, usedObjects: Set<string>) {
let gasCoin!: CoinWithBalance;
try {
const bytes = await this.#buildQueue.runTask(async () => {
transaction.setSenderIfNotSet(this.#signer.toSuiAddress());

await this.#buildQueue.runTask(async () => {
const data = transaction.getData();

if (!data.gasData.price) {
transaction.setGasPrice(await this.#getGasPrice());
}

if (!data.gasData.budget) {
transaction.setGasBudget(this.#defaultGasBudget);
}

await this.#updateCache();
gasCoin = await this.#getGasCoin();
this.#pendingTransactions++;
transaction.setGasPayment([
{
objectId: gasCoin.id,
version: gasCoin.version,
digest: gasCoin.digest,
},
]);
transaction.setSenderIfNotSet(this.#signer.toSuiAddress());

return this.#cache.buildTransaction({ transaction: transaction });
// Resolve cached references
await this.#cache.buildTransaction({ transaction, onlyTransactionKind: true });
});

const bytes = await transaction.build({ client: this.#client });

const { signature } = await this.#signer.signTransaction(bytes);

const results = await this.#cache.executeTransaction({
Expand Down Expand Up @@ -197,6 +240,8 @@ export class ParallelTransactionExecutor {
}
}

this.#lastDigest = results.digest;

return {
digest: results.digest,
effects: toB64(effectsBytes),
Expand All @@ -210,7 +255,13 @@ export class ParallelTransactionExecutor {
this.#sourceCoins.set(gasCoin.id, null);
}

await this.#cache.cache.deleteObjects([...usedObjects]);
await this.#updateCache(async () => {
await Promise.all([
this.#cache.cache.deleteObjects([...usedObjects]),
this.#waitForLastDigest(),
]);
});

throw error;
} finally {
usedObjects.forEach((objectId) => {
Expand All @@ -221,11 +272,35 @@ export class ParallelTransactionExecutor {
this.#objectIdQueues.delete(objectId);
}
});
this.#pendingTransactions--;
}
}

/** Helper for synchronizing cache updates, by ensuring only one update happens at a time. This can also be used to wait for any pending cache updates */
async #updateCache(fn?: () => Promise<void>) {
if (this.#cacheLock) {
await this.#cacheLock;
}

this.#cacheLock =
fn?.().then(
() => {
this.#cacheLock = null;
},
() => {},
) ?? null;
}

async #waitForLastDigest() {
const digest = this.#lastDigest;
if (digest) {
this.#lastDigest = null;
await this.#client.waitForTransaction({ digest });
}
}

async #getGasCoin() {
if (this.#coinPool.length === 0 && this.#executeQueue.activeTasks <= this.#maxPoolSize) {
if (this.#coinPool.length === 0 && this.#pendingTransactions <= this.#maxPoolSize) {
await this.#refillCoinPool();
}

Expand All @@ -237,10 +312,40 @@ export class ParallelTransactionExecutor {
return coin;
}

async #getGasPrice(): Promise<bigint> {
const remaining = this.#gasPrice
? this.#gasPrice.expiration - this.#epochBoundaryWindow - Date.now()
: 0;

if (remaining > 0) {
return this.#gasPrice!.price;
}

if (this.#gasPrice) {
const timeToNextEpoch = Math.max(
this.#gasPrice.expiration + this.#epochBoundaryWindow - Date.now(),
1_000,
);

await new Promise((resolve) => setTimeout(resolve, timeToNextEpoch));
}

const state = await this.#client.getLatestSuiSystemState();

this.#gasPrice = {
price: BigInt(state.referenceGasPrice),
expiration:
Number.parseInt(state.epochStartTimestampMs, 10) +
Number.parseInt(state.epochDurationMs, 10),
};

return this.#getGasPrice();
}

async #refillCoinPool() {
const batchSize = Math.min(
this.#coinBatchSize,
this.#maxPoolSize - (this.#coinPool.length + this.#executeQueue.activeTasks) + 1,
this.#maxPoolSize - (this.#coinPool.length + this.#pendingTransactions) + 1,
);

if (batchSize === 0) {
Expand Down Expand Up @@ -289,6 +394,8 @@ export class ParallelTransactionExecutor {
}
txb.transferObjects(coinResults, address);

await this.#updateCache(() => this.#waitForLastDigest());

const result = await this.#client.signAndExecuteTransaction({
transaction: txb,
signer: this.#signer,
Expand Down
13 changes: 12 additions & 1 deletion sdk/typescript/src/transactions/executor/serial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export class SerialTransactionExecutor {
#queue = new SerialQueue();
#signer: Signer;
#cache: CachingTransactionExecutor;
#client: SuiClient;
#lastDigest: string | null = null;

constructor({
signer,
Expand All @@ -24,6 +26,7 @@ export class SerialTransactionExecutor {
signer: Signer;
}) {
this.#signer = signer;
this.#client = options.client;
this.#cache = new CachingTransactionExecutor({
client: options.client,
cache: options.cache,
Expand Down Expand Up @@ -69,7 +72,14 @@ export class SerialTransactionExecutor {
};

resetCache() {
return this.#cache.reset();
return Promise.all([this.#cache.reset(), this.#waitForLastTransaction()]);
}

async #waitForLastTransaction() {
if (this.#lastDigest) {
await this.#client.waitForTransaction({ digest: this.#lastDigest });
this.#lastDigest = null;
}
}

executeTransaction(transaction: Transaction | Uint8Array) {
Expand All @@ -92,6 +102,7 @@ export class SerialTransactionExecutor {
const effectsBytes = Uint8Array.from(results.rawEffects!);
const effects = bcs.TransactionEffects.parse(effectsBytes);
await this.applyEffects(effects);
this.#lastDigest = results.digest;

return {
digest: results.digest,
Expand Down
Loading

0 comments on commit 7ed1c76

Please sign in to comment.