Skip to content

Commit

Permalink
[IND-481] Update handlers to use SQL function only removing the Knex …
Browse files Browse the repository at this point in the history
…option (#811)

* [IND-481] Update handlers to use SQL function only removing the Knex option

This is towards updating the block processor to be executed as a SQL function.

* Address code rabbit suggestions.
  • Loading branch information
lcwik authored Nov 28, 2023
1 parent cb7d7a1 commit cf6c8e9
Show file tree
Hide file tree
Showing 38 changed files with 1,050 additions and 4,052 deletions.
130 changes: 38 additions & 92 deletions indexer/services/ender/__tests__/handlers/asset-handler.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { stats, STATS_FUNCTION_NAME } from '@dydxprotocol-indexer/base';
import {
AssetCreateEventV1,
IndexerTendermintBlock,
Expand Down Expand Up @@ -36,15 +35,11 @@ import {
} from '../helpers/constants';
import { updateBlockCache } from '../../src/caches/block-cache';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import config from '../../src/config';

describe('assetHandler', () => {
beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
jest.spyOn(stats, 'increment');
jest.spyOn(stats, 'timing');
jest.spyOn(stats, 'gauge');
});

beforeEach(async () => {
Expand Down Expand Up @@ -100,99 +95,50 @@ describe('assetHandler', () => {
});
});

it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'fails when market doesnt exist for asset (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent: defaultAssetCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

await expect(onMessage(kafkaMessage)).rejects.toThrowError(
'Unable to find market with id: 0',
);
it('fails when market doesnt exist for asset', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent: defaultAssetCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'creates new asset (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
await MarketTable.create(testConstants.defaultMarket);
await marketRefresher.updateMarkets();
const transactionIndex: number = 0;
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
'Unable to find market with id: 0',
);
});

const assetEvent: AssetCreateEventV1 = defaultAssetCreateEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
// Confirm there is no existing asset to or from the sender subaccount
await expectNoExistingAssets();

await onMessage(kafkaMessage);

const newAssets: AssetFromDatabase[] = await AssetTable.findAll(
{},
[], {
orderBy: [[AssetColumns.id, Ordering.ASC]],
});
expect(newAssets.length).toEqual(1);
expectAssetMatchesEvent(assetEvent, newAssets[0]);
if (!useSqlFunction) {
expectTimingStats();
}
const asset: AssetFromDatabase = assetRefresher.getAssetFromId('0');
expect(asset).toBeDefined();
it('creates new asset', async () => {
await MarketTable.create(testConstants.defaultMarket);
await marketRefresher.updateMarkets();
const transactionIndex: number = 0;

const assetEvent: AssetCreateEventV1 = defaultAssetCreateEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
});
// Confirm there is no existing asset to or from the sender subaccount
await expectNoExistingAssets();

function expectTimingStats() {
expectTimingStat('create_asset');
}
await onMessage(kafkaMessage);

function expectTimingStat(fnName: string) {
expect(stats.timing).toHaveBeenCalledWith(
`ender.${STATS_FUNCTION_NAME}.timing`,
expect.any(Number),
{
className: 'AssetCreationHandler',
eventType: 'AssetCreateEvent',
fnName,
},
);
}
const newAssets: AssetFromDatabase[] = await AssetTable.findAll(
{},
[], {
orderBy: [[AssetColumns.id, Ordering.ASC]],
});
expect(newAssets.length).toEqual(1);
expectAssetMatchesEvent(assetEvent, newAssets[0]);
const asset: AssetFromDatabase = assetRefresher.getAssetFromId('0');
expect(asset).toBeDefined();
});
});

function expectAssetMatchesEvent(
event: AssetCreateEventV1,
Expand Down
Loading

0 comments on commit cf6c8e9

Please sign in to comment.