diff --git a/server/api/controllers/device.controller.js b/server/api/controllers/device.controller.js index d469864acb..b04c096e11 100644 --- a/server/api/controllers/device.controller.js +++ b/server/api/controllers/device.controller.js @@ -112,7 +112,7 @@ module.exports = function DeviceController(gladys) { * @apiGroup Device */ async function purgeAllSqliteStates(req, res) { - await gladys.device.purgeAllSqliteStates(); + gladys.event.emit(EVENTS.DEVICE.PURGE_ALL_SQLITE_STATES); res.json({ success: true }); } diff --git a/server/lib/device/device.purgeAllSqliteStates.js b/server/lib/device/device.purgeAllSqliteStates.js index eb441214c6..72b05bef14 100644 --- a/server/lib/device/device.purgeAllSqliteStates.js +++ b/server/lib/device/device.purgeAllSqliteStates.js @@ -11,81 +11,92 @@ const logger = require('../../utils/logger'); * device.purgeAllSqliteStates('d47b481b-a7be-4224-9850-313cdb8a4065'); */ async function purgeAllSqliteStates(jobId) { + if (this.purgeAllSQliteStatesInProgress) { + logger.info(`Not purging all SQlite states, a purge is already in progress`); + return null; + } logger.info(`Purging all SQlite states`); + this.purgeAllSQliteStatesInProgress = true; - const numberOfDeviceFeatureStateToDelete = await db.DeviceFeatureState.count(); - const numberOfDeviceFeatureStateAggregateToDelete = await db.DeviceFeatureStateAggregate.count(); + try { + const numberOfDeviceFeatureStateToDelete = await db.DeviceFeatureState.count(); + const numberOfDeviceFeatureStateAggregateToDelete = await db.DeviceFeatureStateAggregate.count(); - logger.info( - `Purging All SQLite states: ${numberOfDeviceFeatureStateToDelete} states & ${numberOfDeviceFeatureStateAggregateToDelete} aggregates to delete.`, - ); + logger.info( + `Purging All SQLite states: ${numberOfDeviceFeatureStateToDelete} states & ${numberOfDeviceFeatureStateAggregateToDelete} aggregates to delete.`, + ); - const numberOfIterationsStates = Math.ceil( - numberOfDeviceFeatureStateToDelete / this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH, - ); - const iterator = [...Array(numberOfIterationsStates)]; + const numberOfIterationsStates = Math.ceil( + numberOfDeviceFeatureStateToDelete / this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH, + ); + const iterator = [...Array(numberOfIterationsStates)]; - const numberOfIterationsStatesAggregates = Math.ceil( - numberOfDeviceFeatureStateAggregateToDelete / this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH, - ); - const iteratorAggregates = [...Array(numberOfIterationsStatesAggregates)]; + const numberOfIterationsStatesAggregates = Math.ceil( + numberOfDeviceFeatureStateAggregateToDelete / this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH, + ); + const iteratorAggregates = [...Array(numberOfIterationsStatesAggregates)]; - const total = numberOfIterationsStates + numberOfIterationsStatesAggregates; - let currentBatch = 0; - let currentProgressPercent = 0; + const total = numberOfIterationsStates + numberOfIterationsStatesAggregates; + let currentBatch = 0; + let currentProgressPercent = 0; - // We only save progress to DB if it changed - // Because saving progress is expensive (DB write + Websocket call) - const updateProgressIfNeeded = async () => { - currentBatch += 1; - const newProgressPercent = Math.round((currentBatch * 100) / total); - if (currentProgressPercent !== newProgressPercent) { - currentProgressPercent = newProgressPercent; - await this.job.updateProgress(jobId, currentProgressPercent); - } - }; + // We only save progress to DB if it changed + // Because saving progress is expensive (DB write + Websocket call) + const updateProgressIfNeeded = async () => { + currentBatch += 1; + const newProgressPercent = Math.round((currentBatch * 100) / total); + if (currentProgressPercent !== newProgressPercent) { + currentProgressPercent = newProgressPercent; + await this.job.updateProgress(jobId, currentProgressPercent); + } + }; - await Promise.each(iterator, async () => { - await db.sequelize.query( - ` + await Promise.each(iterator, async () => { + await db.sequelize.query( + ` DELETE FROM t_device_feature_state WHERE id IN ( SELECT id FROM t_device_feature_state LIMIT :limit ); `, - { - replacements: { - limit: this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH, + { + replacements: { + limit: this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH, + }, + type: QueryTypes.SELECT, }, - type: QueryTypes.SELECT, - }, - ); - await updateProgressIfNeeded(); - await Promise.delay(this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH); - }); + ); + await updateProgressIfNeeded(); + await Promise.delay(this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH); + }); - await Promise.each(iteratorAggregates, async () => { - await db.sequelize.query( - ` + await Promise.each(iteratorAggregates, async () => { + await db.sequelize.query( + ` DELETE FROM t_device_feature_state_aggregate WHERE id IN ( SELECT id FROM t_device_feature_state_aggregate LIMIT :limit ); `, - { - replacements: { - limit: this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH, + { + replacements: { + limit: this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH, + }, + type: QueryTypes.SELECT, }, - type: QueryTypes.SELECT, - }, - ); - await updateProgressIfNeeded(); - await Promise.delay(this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH); - }); - return { - numberOfDeviceFeatureStateToDelete, - numberOfDeviceFeatureStateAggregateToDelete, - }; + ); + await updateProgressIfNeeded(); + await Promise.delay(this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH); + }); + this.purgeAllSQliteStatesInProgress = false; + return { + numberOfDeviceFeatureStateToDelete, + numberOfDeviceFeatureStateAggregateToDelete, + }; + } catch (e) { + this.purgeAllSQliteStatesInProgress = false; + throw e; + } } module.exports = { diff --git a/server/lib/device/index.js b/server/lib/device/index.js index e77ec4db51..f69c3aca2f 100644 --- a/server/lib/device/index.js +++ b/server/lib/device/index.js @@ -99,6 +99,10 @@ const DeviceManager = function DeviceManager( EVENTS.DEVICE.MIGRATE_FROM_SQLITE_TO_DUCKDB, eventFunctionWrapper(this.migrateFromSQLiteToDuckDb.bind(this)), ); + this.eventManager.on( + EVENTS.DEVICE.PURGE_ALL_SQLITE_STATES, + eventFunctionWrapper(this.purgeAllSqliteStates.bind(this)), + ); }; DeviceManager.prototype.add = add; diff --git a/server/test/lib/device/device.purgeAllSqliteStates.test.js b/server/test/lib/device/device.purgeAllSqliteStates.test.js index 7666dfc942..d9cac1ce47 100644 --- a/server/test/lib/device/device.purgeAllSqliteStates.test.js +++ b/server/test/lib/device/device.purgeAllSqliteStates.test.js @@ -52,10 +52,14 @@ describe('Device', () => { wrapper: (type, func) => func, }; const device = new Device(event, {}, stateManager, service, {}, variable, job); - const devicePurged = await device.purgeAllSqliteStates('632c6d92-a79a-4a38-bf5b-a2024721c101'); + const devicePurgedPromise = device.purgeAllSqliteStates('632c6d92-a79a-4a38-bf5b-a2024721c101'); + const emptyRes = await device.purgeAllSqliteStates('632c6d92-a79a-4a38-bf5b-a2024721c101'); + const devicePurged = await devicePurgedPromise; expect(devicePurged).to.deep.equal({ numberOfDeviceFeatureStateAggregateToDelete: 3, numberOfDeviceFeatureStateToDelete: 110, }); + // should not start a new purge when a purge is running + expect(emptyRes).to.equal(null); }); }); diff --git a/server/utils/constants.js b/server/utils/constants.js index 76fb8e575b..95eae8e42a 100644 --- a/server/utils/constants.js +++ b/server/utils/constants.js @@ -160,6 +160,7 @@ const EVENTS = { PURGE_STATES_SINGLE_FEATURE: 'device.purge-states-single-feature', CHECK_BATTERIES: 'device.check-batteries', MIGRATE_FROM_SQLITE_TO_DUCKDB: 'device.migrate-from-sqlite-to-duckdb', + PURGE_ALL_SQLITE_STATES: 'device.purge-all-sqlite-states', }, GATEWAY: { CREATE_BACKUP: 'gateway.create-backup',