Skip to content

Commit

Permalink
fix: rename sync audit to sync datalayer
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed Nov 28, 2023
1 parent 061a33a commit 0d703cd
Show file tree
Hide file tree
Showing 3 changed files with 428 additions and 643 deletions.
171 changes: 2 additions & 169 deletions src/datalayer/syncService.js
Original file line number Diff line number Diff line change
@@ -1,178 +1,14 @@
import _ from 'lodash';

import { decodeHex, decodeDataLayerResponse } from '../utils/datalayer-utils';
import { Organization, Staging, ModelKeys, Simulator } from '../models';
import { decodeDataLayerResponse } from '../utils/datalayer-utils';
import { Simulator } from '../models';
import { CONFIG } from '../user-config';
import { logger } from '../logger.js';

import * as dataLayer from './persistance';
import * as simulator from './simulator';

const POLLING_INTERVAL = 5000;
const frames = ['-', '\\', '|', '/'];

const startDataLayerUpdatePolling = async () => {
logger.info('Start Datalayer Update Polling');
const updateStoreInfo = await dataLayerWasUpdated();
if (updateStoreInfo.length) {
await Promise.all(
updateStoreInfo.map(async (store) => {
logger.info(
`Updates found syncing storeId: ${store.storeId} ${
frames[Math.floor(Math.random() * 3)]
}`,
);
await syncDataLayerStoreToClimateWarehouse(
store.storeId,
store.rootHash,
);
}),
);
}
};

const syncDataLayerStoreToClimateWarehouse = async (storeId, rootHash) => {
let storeData;

if (CONFIG().CADT.USE_SIMULATOR) {
storeData = await simulator.getStoreData(storeId, rootHash);
} else {
storeData = await dataLayer.getStoreData(storeId, rootHash);
}

if (!_.get(storeData, 'keys_values', []).length) {
return;
}

await Organization.update(
{ registryHash: rootHash },
{ where: { registryId: storeId } },
);

const organizationToTruncate = await Organization.findOne({
attributes: ['orgUid'],
where: { registryId: storeId },
raw: true,
});

try {
if (_.get(organizationToTruncate, 'orgUid')) {
const truncateOrganizationPromises = Object.keys(ModelKeys).map((key) =>
ModelKeys[key].destroy({
where: { orgUid: organizationToTruncate.orgUid },
}),
);

await Promise.all(truncateOrganizationPromises);

await Promise.all(
storeData.keys_values.map(async (kv) => {
const key = decodeHex(kv.key.replace(`${storeId}_`, ''));
const modelKey = key.split('|')[0];
let value;

try {
value = JSON.parse(decodeHex(kv.value));
} catch (err) {
console.trace(err);
logger.error(`Cant parse json value: ${decodeHex(kv.value)}`);
}

if (ModelKeys[modelKey]) {
await ModelKeys[modelKey].upsert(value);

const stagingUuid =
modelKey === 'unit'
? value.warehouseUnitId
: modelKey === 'project'
? value.warehouseProjectId
: undefined;

if (stagingUuid) {
await Staging.destroy({
where: { uuid: stagingUuid },
});
}
}
}),
);

// clean up any staging records than involved delete commands,
// since we cant track that they came in through the uuid,
// we can infer this because diff.original is null instead of empty object.
await Staging.cleanUpCommitedAndInvalidRecords();
}
} catch (error) {
console.trace('ERROR DURING SYNC TRANSACTION', error);
}
};

const dataLayerWasUpdated = async () => {
const organizations = await Organization.findAll({
attributes: ['registryId', 'registryHash'],
where: { subscribed: true },
raw: true,
});

// exit early if there are no subscribed organizations
if (!organizations.length) {
return [];
}

const subscribedOrgIds = organizations.map((org) => org.registryId);

if (!subscribedOrgIds.length) {
return [];
}

let rootResponse;
if (CONFIG().CADT.USE_SIMULATOR) {
rootResponse = await simulator.getRoots(subscribedOrgIds);
} else {
rootResponse = await dataLayer.getRoots(subscribedOrgIds);
}

if (!rootResponse.success) {
return [];
}

const updatedStores = rootResponse.root_hashes.filter((rootHash) => {
const org = organizations.find(
(org) => org.registryId == rootHash.id.replace('0x', ''),
);

if (org) {
// When a transfer is made, the climate warehouse is locked from making updates
// while waiting for the transfer to either be completed or rejected.
// This means that we know the transfer completed when the root hash changed
// and we can remove it from the pending staging table.
if (org.isHome == 1 && org.registryHash != rootHash.hash) {
Staging.destroy({ where: { isTransfer: true } });
}

// store has been updated if its confirmed and the hash has changed
return rootHash.confirmed && org.registryHash != rootHash.hash;
}

return false;
});

if (!updatedStores.length) {
return [];
}

const updateStoreInfo = await Promise.all(
updatedStores.map(async (rootHash) => {
const storeId = rootHash.id.replace('0x', '');
return {
storeId,
rootHash: rootHash.hash,
};
}),
);

return updateStoreInfo;
};

const unsubscribeFromDataLayerStore = async (storeId) => {
if (!CONFIG().CADT.USE_SIMULATOR) {
Expand Down Expand Up @@ -398,9 +234,6 @@ export const waitForAllTransactionsToConfirm = async () => {
};

export default {
startDataLayerUpdatePolling,
syncDataLayerStoreToClimateWarehouse,
dataLayerWasUpdated,
subscribeToStoreOnDataLayer,
getSubscribedStoreData,
getRootHistory,
Expand Down
Loading

0 comments on commit 0d703cd

Please sign in to comment.