Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: clean up old sync method code #57

Merged
merged 4 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 2 additions & 169 deletions src/datalayer/syncService.js
Original file line number Diff line number Diff line change
@@ -1,178 +1,14 @@
import _ from 'lodash';

import { decodeHex, decodeDataLayerResponse } from '../utils/datalayer-utils';
import { Organization, Staging, ModelKeys, Simulator } from '../models';
import { decodeDataLayerResponse } from '../utils/datalayer-utils';
import { Simulator } from '../models';
import { CONFIG } from '../user-config';
import { logger } from '../logger.js';

import * as dataLayer from './persistance';
import * as simulator from './simulator';

const POLLING_INTERVAL = 5000;
const frames = ['-', '\\', '|', '/'];

const startDataLayerUpdatePolling = async () => {
logger.info('Start Datalayer Update Polling');
const updateStoreInfo = await dataLayerWasUpdated();
if (updateStoreInfo.length) {
await Promise.all(
updateStoreInfo.map(async (store) => {
logger.info(
`Updates found syncing storeId: ${store.storeId} ${
frames[Math.floor(Math.random() * 3)]
}`,
);
await syncDataLayerStoreToClimateWarehouse(
store.storeId,
store.rootHash,
);
}),
);
}
};

const syncDataLayerStoreToClimateWarehouse = async (storeId, rootHash) => {
let storeData;

if (CONFIG().CADT.USE_SIMULATOR) {
storeData = await simulator.getStoreData(storeId, rootHash);
} else {
storeData = await dataLayer.getStoreData(storeId, rootHash);
}

if (!_.get(storeData, 'keys_values', []).length) {
return;
}

await Organization.update(
{ registryHash: rootHash },
{ where: { registryId: storeId } },
);

const organizationToTruncate = await Organization.findOne({
attributes: ['orgUid'],
where: { registryId: storeId },
raw: true,
});

try {
if (_.get(organizationToTruncate, 'orgUid')) {
const truncateOrganizationPromises = Object.keys(ModelKeys).map((key) =>
ModelKeys[key].destroy({
where: { orgUid: organizationToTruncate.orgUid },
}),
);

await Promise.all(truncateOrganizationPromises);

await Promise.all(
storeData.keys_values.map(async (kv) => {
const key = decodeHex(kv.key.replace(`${storeId}_`, ''));
const modelKey = key.split('|')[0];
let value;

try {
value = JSON.parse(decodeHex(kv.value));
} catch (err) {
console.trace(err);
logger.error(`Cant parse json value: ${decodeHex(kv.value)}`);
}

if (ModelKeys[modelKey]) {
await ModelKeys[modelKey].upsert(value);

const stagingUuid =
modelKey === 'unit'
? value.warehouseUnitId
: modelKey === 'project'
? value.warehouseProjectId
: undefined;

if (stagingUuid) {
await Staging.destroy({
where: { uuid: stagingUuid },
});
}
}
}),
);

// clean up any staging records than involved delete commands,
// since we cant track that they came in through the uuid,
// we can infer this because diff.original is null instead of empty object.
await Staging.cleanUpCommitedAndInvalidRecords();
}
} catch (error) {
console.trace('ERROR DURING SYNC TRANSACTION', error);
}
};

const dataLayerWasUpdated = async () => {
const organizations = await Organization.findAll({
attributes: ['registryId', 'registryHash'],
where: { subscribed: true },
raw: true,
});

// exit early if there are no subscribed organizations
if (!organizations.length) {
return [];
}

const subscribedOrgIds = organizations.map((org) => org.registryId);

if (!subscribedOrgIds.length) {
return [];
}

let rootResponse;
if (CONFIG().CADT.USE_SIMULATOR) {
rootResponse = await simulator.getRoots(subscribedOrgIds);
} else {
rootResponse = await dataLayer.getRoots(subscribedOrgIds);
}

if (!rootResponse.success) {
return [];
}

const updatedStores = rootResponse.root_hashes.filter((rootHash) => {
const org = organizations.find(
(org) => org.registryId == rootHash.id.replace('0x', ''),
);

if (org) {
// When a transfer is made, the climate warehouse is locked from making updates
// while waiting for the transfer to either be completed or rejected.
// This means that we know the transfer completed when the root hash changed
// and we can remove it from the pending staging table.
if (org.isHome == 1 && org.registryHash != rootHash.hash) {
Staging.destroy({ where: { isTransfer: true } });
}

// store has been updated if its confirmed and the hash has changed
return rootHash.confirmed && org.registryHash != rootHash.hash;
}

return false;
});

if (!updatedStores.length) {
return [];
}

const updateStoreInfo = await Promise.all(
updatedStores.map(async (rootHash) => {
const storeId = rootHash.id.replace('0x', '');
return {
storeId,
rootHash: rootHash.hash,
};
}),
);

return updateStoreInfo;
};

const unsubscribeFromDataLayerStore = async (storeId) => {
if (!CONFIG().CADT.USE_SIMULATOR) {
Expand Down Expand Up @@ -398,9 +234,6 @@ export const waitForAllTransactionsToConfirm = async () => {
};

export default {
startDataLayerUpdatePolling,
syncDataLayerStoreToClimateWarehouse,
dataLayerWasUpdated,
subscribeToStoreOnDataLayer,
getSubscribedStoreData,
getRootHistory,
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ToadScheduler } from 'toad-scheduler';

import syncDefaultOrganizations from './sync-default-organizations';
import syncPickLists from './sync-picklists';
import syncAudit from './sync-audit-table';
import syncAudit from './sync-datalayer';
import syncOrganizationMeta from './sync-organization-meta';
import syncGovernanceBody from './sync-governance-body';

Expand Down
Loading
Loading