Skip to content

Commit

Permalink
Cache message backlog and adjust flush logic
Browse files Browse the repository at this point in the history
  • Loading branch information
yorhodes committed Nov 7, 2024
1 parent bb76c09 commit 175d477
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 25 deletions.
25 changes: 18 additions & 7 deletions typescript/cli/src/commands/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,49 @@ import { log } from '../logger.js';
import { tryReadJson, writeJson } from '../utils/files.js';
import { selectRegistryWarpRoute } from '../utils/tokens.js';

import { symbolCommandOption } from './options.js';
import {
agentTargetsCommandOption,
overrideRegistryUriCommandOption,
symbolCommandOption,
} from './options.js';
import { MessageOptionsArgTypes } from './send.js';

const DEFAULT_RELAYER_CACHE = 'relayer-cache.json';
const DEFAULT_RELAYER_CACHE = `${overrideRegistryUriCommandOption.default}/relayer-cache.json`;

export const relayerCommand: CommandModuleWithContext<
MessageOptionsArgTypes & { cache: string; symbol?: string }
MessageOptionsArgTypes & { chains?: string; cache: string; symbol?: string }
> = {
command: 'relayer',
describe: 'Run a Hyperlane message relayer',
builder: {
chains: agentTargetsCommandOption,
cache: {
describe: 'Path to relayer cache file',
type: 'string',
default: DEFAULT_RELAYER_CACHE,
},
symbol: symbolCommandOption,
},
handler: async ({ context, cache, symbol }) => {
handler: async ({ context, cache, chains, symbol }) => {
const chainAddresses = await context.registry.getAddresses();
const core = HyperlaneCore.fromAddressesMap(
chainAddresses,
context.multiProvider,
);

let whitelist: ChainMap<string[]> | undefined;
const chainsArray =
chains?.split(',').map((_) => _.trim()) ?? Object.keys(chainAddresses);

const whitelist: ChainMap<string[]> = Object.fromEntries(
chainsArray.map((chain) => [chain, []]),
);

// add warp route addresses to whitelist
if (symbol) {
const warpRoute = await selectRegistryWarpRoute(context.registry, symbol);
whitelist = {};
warpRoute.tokens.forEach(
({ chainName, addressOrDenom }) =>
(whitelist![chainName] = [addressOrDenom!]),
(whitelist[chainName] = [addressOrDenom!]),
);
}

Expand Down
1 change: 1 addition & 0 deletions typescript/cli/src/utils/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ export function stubMerkleTreeConfig(
},
},
ism: {},
backlog: [],
});
}
6 changes: 5 additions & 1 deletion typescript/sdk/src/core/HyperlaneCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ export class HyperlaneCore extends HyperlaneApp<CoreFactories> {
mailbox.filters.Dispatch(),
(_sender, _destination, _recipient, message, event) => {
const parsed = HyperlaneCore.parseDispatchedMessage(message);
this.logger.info(`Observed message ${parsed.id} on ${originChain}`);
const destChain = this.getDestination(parsed);

this.logger.info(
`Observed message ${parsed.id} on ${originChain} to ${destChain}`,
);
return handler(parsed, event);
},
);
Expand Down
39 changes: 22 additions & 17 deletions typescript/sdk/src/core/HyperlaneRelayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const DerivedIsmConfigWithAddressSchema =

const BacklogMessageSchema = z.object({
attempts: z.number(),
lastAttemptTimestamp: z.number(),
lastAttempt: z.number(),
message: z.string(),
dispatchTx: z.string(),
});
Expand All @@ -48,13 +48,15 @@ const MessageBacklogSchema = z.array(BacklogMessageSchema);
export const RelayerCacheSchema = z.object({
hook: z.record(z.record(DerivedHookConfigWithAddressSchema)),
ism: z.record(z.record(DerivedIsmConfigWithAddressSchema)),
// backlog: MessageBacklogSchema.optional(),
backlog: MessageBacklogSchema,
});

type RelayerCache = z.infer<typeof RelayerCacheSchema>;

type MessageWhitelist = ChainMap<Set<Address>>;

// message must have origin and destination chains in the whitelist
// if whitelist has non-empty address set for chain, message must have sender and recipient in the set
export function messageMatchesWhitelist(
whitelist: MessageWhitelist,
message: ParsedMessage,
Expand All @@ -65,7 +67,7 @@ export function messageMatchesWhitelist(
}

const sender = bytes32ToAddress(message.sender);
if (!originAddresses.has(sender)) {
if (originAddresses.size !== 0 && !originAddresses.has(sender)) {
return false;
}

Expand All @@ -76,7 +78,7 @@ export function messageMatchesWhitelist(
}

const recipient = bytes32ToAddress(message.recipient);
if (!destinationAddresses.has(recipient)) {
if (destinationAddresses.size !== 0 && !destinationAddresses.has(recipient)) {
return false;
}

Expand All @@ -91,7 +93,7 @@ export class HyperlaneRelayer {

protected readonly whitelist: ChainMap<Set<Address>> | undefined;

public backlog: z.TypeOf<typeof MessageBacklogSchema> = [];
public backlog: RelayerCache['backlog'];
public cache: RelayerCache | undefined;

protected stopRelayingHandler: ((chains?: ChainName[]) => void) | undefined;
Expand All @@ -101,7 +103,7 @@ export class HyperlaneRelayer {
constructor({
core,
caching = true,
retryTimeout = 5 * 1000,
retryTimeout = 1000,
whitelist = undefined,
}: {
core: HyperlaneCore;
Expand Down Expand Up @@ -130,6 +132,7 @@ export class HyperlaneRelayer {
this.cache = {
hook: {},
ism: {},
backlog: [],
};
}
}
Expand Down Expand Up @@ -246,7 +249,7 @@ export class HyperlaneRelayer {
dispatchTx,
});

this.logger.info({ message, metadata }, `Relaying message ${message.id}`);
this.logger.info(`Relaying message ${message.id}`);
return this.core.deliver(message, metadata);
}

Expand Down Expand Up @@ -275,17 +278,16 @@ export class HyperlaneRelayer {
while (this.stopRelayingHandler) {
const backlogMsg = this.backlog.shift();

// if backlog is empty, wait 1s and try again
if (!backlogMsg) {
this.logger.trace('Backlog empty, waiting 1s');
await sleep(1000);
continue;
}

// linear backoff (attempts * retryTimeout)
if (
Date.now() <
backlogMsg.lastAttemptTimestamp +
backlogMsg.attempts * this.retryTimeout
backlogMsg.lastAttempt + backlogMsg.attempts * this.retryTimeout
) {
this.backlog.push(backlogMsg);
continue;
Expand All @@ -305,15 +307,12 @@ export class HyperlaneRelayer {
await this.relayMessage(dispatchReceipt, undefined, dispatchMsg);
} catch (error) {
this.logger.error(
`Failed to relay message ${id} from backlog (attempt #${
attempts + 1
})`,
`Failed to relay message ${id} (attempt #${attempts + 1})`,
);
this.backlog.push({
...backlogMsg,
attempts: attempts + 1,
lastAttemptTimestamp: Date.now(),
message,
dispatchTx,
lastAttempt: Date.now(),
});
}
}
Expand All @@ -330,6 +329,8 @@ export class HyperlaneRelayer {
start(): void {
assert(!this.stopRelayingHandler, 'Relayer already started');

this.backlog = this.cache?.backlog ?? [];

const { removeHandler } = this.core.onDispatch(async (message, event) => {
if (
this.whitelist &&
Expand All @@ -344,7 +345,7 @@ export class HyperlaneRelayer {

this.backlog.push({
attempts: 0,
lastAttemptTimestamp: 0,
lastAttempt: Date.now(),
message: message.message,
dispatchTx: event.transactionHash,
});
Expand All @@ -360,5 +361,9 @@ export class HyperlaneRelayer {
assert(this.stopRelayingHandler, 'Relayer not started');
this.stopRelayingHandler(this.whitelistChains());
this.stopRelayingHandler = undefined;

if (this.cache) {
this.cache.backlog = this.backlog;
}
}
}

0 comments on commit 175d477

Please sign in to comment.