Skip to content

Commit

Permalink
fix: index based sync bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Taylor authored and Michael Taylor committed Feb 20, 2024
1 parent b270a28 commit a86d1c5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ const server = http.createServer(rootRouter);
server.on('error', onError);
server.on('listening', onListening);

server.listen(port, bindAddress, () => {
server.listen(port, bindAddress, async () => {
console.log(`Server listening at http://${bindAddress}:${port}`);
console.log(`Mirror URL: ${getMirrorUrl()}`);
console.log(`Mirror URL: ${await getMirrorUrl()}`);
});

const io = new Server(server);
Expand Down
35 changes: 15 additions & 20 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,8 @@ const task = new Task('sync-registries', async () => {
where: { metaKey: 'migratedToIndexBasedSync' },
});

console.log(
'############',
hasMigratedToNewSyncMethod?.metaValue,
hasMigratedToGenerationIndexSync?.metaValue,
);

if (
!hasMigratedToNewSyncMethod?.metaValue ||
!CONFIG().CADT.USE_SIMULATOR
) {
if (!hasMigratedToGenerationIndexSync?.metaValue) {
if (hasMigratedToNewSyncMethod || CONFIG().CADT.USE_SIMULATOR) {
if (hasMigratedToGenerationIndexSync) {
await processJob();
} else {
await generateGenerationIndex();
Expand Down Expand Up @@ -217,16 +208,16 @@ const syncOrganizationAudit = async (organization) => {
currentGeneration = lastRootSaved;
}

const historyIndex = currentGeneration.generation;
const lastProcessedIndex = currentGeneration.generation;

if (historyIndex > rootHistory.length) {
if (lastProcessedIndex > rootHistory.length) {
logger.error(
`Could not find root history for ${organization.name} with timestamp ${currentGeneration.timestamp}, something is wrong and the sync for this organization will be paused until this is resolved.`,
);
}

const rootHistoryCount = rootHistory.length - 1;
const syncRemaining = rootHistoryCount - historyIndex;
const rootHistoryZeroBasedCount = rootHistory.length - 1;
const syncRemaining = rootHistoryZeroBasedCount - lastProcessedIndex;
const isSynced = syncRemaining === 0;

await Organization.update(
Expand All @@ -241,9 +232,13 @@ const syncOrganizationAudit = async (organization) => {
return;
}

const toBeProcessedIndex = lastProcessedIndex + 1;

// Organization not synced, sync it
logger.info(' ');
logger.info(`Syncing ${organization.name} generation ${historyIndex}`);
logger.info(
`Syncing ${organization.name} generation ${toBeProcessedIndex}`,
);
logger.info(
`${organization.name} is ${syncRemaining} DataLayer generations away from being fully synced.`,
);
Expand All @@ -252,8 +247,8 @@ const syncOrganizationAudit = async (organization) => {
await new Promise((resolve) => setTimeout(resolve, 30000));
}

const root1 = _.get(rootHistory, `[${historyIndex}]`);
const root2 = _.get(rootHistory, `[${historyIndex + 1}]`);
const root1 = _.get(rootHistory, `[${lastProcessedIndex}]`);
const root2 = _.get(rootHistory, `[${toBeProcessedIndex}]`);

logger.info(`ROOT 1 ${JSON.stringify(root1)}`);
logger.info(`ROOT 2', ${JSON.stringify(root2)}`);
Expand Down Expand Up @@ -305,7 +300,7 @@ const syncOrganizationAudit = async (organization) => {

const updateTransaction = async (transaction, mirrorTransaction) => {
logger.info(
`Syncing ${organization.name} generation ${historyIndex + 1}`,
`Syncing ${organization.name} generation ${toBeProcessedIndex}`,
);
for (const diff of optimizedKvDiff) {
const key = decodeHex(diff.key);
Expand All @@ -319,7 +314,7 @@ const syncOrganizationAudit = async (organization) => {
table: modelKey,
change: decodeHex(diff.value),
onchainConfirmationTimeStamp: root2.timestamp,
generation: historyIndex + 1,
generation: toBeProcessedIndex,
comment: _.get(
tryParseJSON(
decodeHex(_.get(comment, '[0].value', encodeHex('{}'))),
Expand Down

0 comments on commit a86d1c5

Please sign in to comment.