From 5945f9d31b8ed443331ecda3d66e7b51657ea914 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Salgado?= Date: Wed, 19 Jan 2022 02:13:02 +0000 Subject: [PATCH 1/3] fix: add config reload flags to the state middleware --- lib/middlewares/populateSettings.js | 18 ++++++++++-------- lib/middlewares/state.js | 1 + lib/poller.js | 15 +++++++++------ 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/lib/middlewares/populateSettings.js b/lib/middlewares/populateSettings.js index 146aafeb..fd3d4146 100644 --- a/lib/middlewares/populateSettings.js +++ b/lib/middlewares/populateSettings.js @@ -1,11 +1,12 @@ +const _ = require('lodash/fp') + const state = require('./state') const newSettingsLoader = require('../new-settings-loader') const helpers = require('../route-helpers') const logger = require('../logger') -const { settingsCache } = state - const populateSettings = function (req, res, next) { + const { needsSettingsReload, settingsCache } = state const operatorId = res.locals.operatorId const versionId = req.headers['config-version'] if (versionId !== state.oldVersionId) { @@ -14,20 +15,21 @@ const populateSettings = function (req, res, next) { try { const operatorSettings = settingsCache.get(operatorId) - if (!versionId && operatorSettings) { - req.settings = operatorSettings - return next() - } - - if (!versionId && !operatorSettings) { + if (!versionId && (!operatorSettings || !!needsSettingsReload[operatorId])) { return newSettingsLoader.loadLatest() .then(settings => { settingsCache.set(operatorId, settings) + delete needsSettingsReload[operatorId] req.settings = settings }) .then(() => next()) .catch(next) } + + if (!versionId && operatorSettings) { + req.settings = operatorSettings + return next() + } } catch (e) { logger.error(e) } diff --git a/lib/middlewares/state.js b/lib/middlewares/state.js index 10267d88..2599e259 100644 --- a/lib/middlewares/state.js +++ b/lib/middlewares/state.js @@ -4,6 +4,7 @@ const SETTINGS_CACHE_REFRESH = 3600 module.exports = (function () { return { oldVersionId: 'unset', + needsSettingsReload: {}, settingsCache: new NodeCache({ stdTTL: SETTINGS_CACHE_REFRESH, checkperiod: SETTINGS_CACHE_REFRESH // Clear cache every hour diff --git a/lib/poller.js b/lib/poller.js index 25011b17..914554dd 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -18,6 +18,7 @@ const util = require('util') const db = require('./db') const state = require('./middlewares/state') const processBatches = require('./tx-batching-processing') +const { getOperatorId } = require('./operator') const INCOMING_TX_INTERVAL = 30 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds @@ -98,12 +99,14 @@ function reload (schema) { store.set('schema', schema) // set asyncLocalStorage so settingsLoader loads settings for the right schema return asyncLocalStorage.run(store, () => { - return settingsLoader.loadLatest().then(settings => { - const pi = plugins(settings) - cachedVariables.set(schema, { settings, pi, isReloading: false }) - logger.debug(`Settings for schema '${schema}' reloaded in poller`) - return updateAndLoadSanctions() - }) + return Promise.all([settingsLoader.loadLatest(), getOperatorId('middleware')]) + .then(([settings, operatorId]) => { + const pi = plugins(settings) + cachedVariables.set(schema, { settings, pi, isReloading: false }) + state.needsSettingsReload[operatorId.operatorId] = true + logger.debug(`Settings for schema '${schema}' reloaded in poller`) + return updateAndLoadSanctions() + }) }) } From b98d73cd6eed0b2e7520dfcf0424e4487eebcca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Salgado?= Date: Wed, 19 Jan 2022 18:55:33 +0000 Subject: [PATCH 2/3] fix: separate poller and middleware postgres listeners --- lib/machine-loader.js | 6 ++-- lib/middlewares/populateSettings.js | 54 +++++++++++++++++++++++++++++ lib/new-settings-loader.js | 7 ++-- lib/poller.js | 33 ++---------------- 4 files changed, 64 insertions(+), 36 deletions(-) diff --git a/lib/machine-loader.js b/lib/machine-loader.js index 6818676f..51cf7430 100644 --- a/lib/machine-loader.js +++ b/lib/machine-loader.js @@ -151,7 +151,7 @@ function unpair (rec) { } function reboot (rec) { - return db.none('NOTIFY $1:name, $2', ['poller', JSON.stringify( + return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify( { type: 'machineAction', action: 'reboot', @@ -161,7 +161,7 @@ function reboot (rec) { } function shutdown (rec) { - return db.none('NOTIFY $1:name, $2', ['poller', JSON.stringify( + return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify( { type: 'machineAction', action: 'shutdown', @@ -171,7 +171,7 @@ function shutdown (rec) { } function restartServices (rec) { - return db.none('NOTIFY $1:name, $2', ['poller', JSON.stringify( + return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify( { type: 'machineAction', action: 'restartServices', diff --git a/lib/middlewares/populateSettings.js b/lib/middlewares/populateSettings.js index fd3d4146..5e696e80 100644 --- a/lib/middlewares/populateSettings.js +++ b/lib/middlewares/populateSettings.js @@ -1,10 +1,64 @@ const _ = require('lodash/fp') +const db = require('../db') const state = require('./state') const newSettingsLoader = require('../new-settings-loader') const helpers = require('../route-helpers') const logger = require('../logger') +db.connect({ direct: true }).then(sco => { + sco.client.on('notification', data => { + const parsedData = JSON.parse(data.payload) + switch (parsedData.type) { + case 'reload': + return reload(parsedData.operatorId) + default: + break + } + }) + return sco.none('LISTEN $1:name', 'reload') +}).catch(console.error) + +db.connect({ direct: true }).then(sco => { + sco.client.on('notification', data => { + const parsedData = JSON.parse(data.payload) + switch (parsedData.type) { + case 'machineAction': + return machineAction(parsedData.action, parsedData.value) + default: + break + } + }) + return sco.none('LISTEN $1:name', 'machineAction') +}).catch(console.error) + +function machineAction (type, value) { + const deviceId = value.deviceId + const operatorId = value.operatorId + const pid = state.pids?.[operatorId]?.[deviceId]?.pid + + switch (type) { + case 'reboot': + logger.debug(`Rebooting machine '${deviceId}' from operator ${operatorId}`) + state.reboots[operatorId] = { [deviceId]: pid } + break + case 'shutdown': + logger.debug(`Shutting down machine '${deviceId}' from operator ${operatorId}`) + state.shutdowns[operatorId] = { [deviceId]: pid } + break + case 'restartServices': + logger.debug(`Restarting services of machine '${deviceId}' from operator ${operatorId}`) + state.restartServicesMap[operatorId] = { [deviceId]: pid } + break + default: + break + } +} + +function reload (operatorId) { + state.needsSettingsReload[operatorId.operatorId] = true +} + const populateSettings = function (req, res, next) { const { needsSettingsReload, settingsCache } = state const operatorId = res.locals.operatorId diff --git a/lib/new-settings-loader.js b/lib/new-settings-loader.js index 18da5c61..744f0408 100644 --- a/lib/new-settings-loader.js +++ b/lib/new-settings-loader.js @@ -2,6 +2,7 @@ const _ = require('lodash/fp') const db = require('./db') const migration = require('./config-migration') const { asyncLocalStorage } = require('./async-storage') +const { getOperatorId } = require('./operator') const OLD_SETTINGS_LOADER_SCHEMA_VERSION = 1 const NEW_SETTINGS_LOADER_SCHEMA_VERSION = 2 @@ -71,12 +72,12 @@ function showAccounts (schemaVersion) { const configSql = 'insert into user_config (type, data, valid, schema_version) values ($1, $2, $3, $4)' function saveConfig (config) { - return loadLatestConfigOrNone() - .then(currentConfig => { + return Promise.all([loadLatestConfigOrNone(), getOperatorId('middleware')]) + .then(([currentConfig, operatorId]) => { const newConfig = _.assign(currentConfig, config) return db.tx(t => { return t.none(configSql, ['config', { config: newConfig }, true, NEW_SETTINGS_LOADER_SCHEMA_VERSION]) - .then(() => t.none('NOTIFY $1:name, $2', ['poller', JSON.stringify({ type: 'reload', schema: asyncLocalStorage.getStore().get('schema') })])) + .then(() => t.none('NOTIFY $1:name, $2', ['reload', JSON.stringify({ type: 'reload', schema: asyncLocalStorage.getStore().get('schema'), operatorId })])) }).catch(console.error) }) } diff --git a/lib/poller.js b/lib/poller.js index 914554dd..a36c4fb9 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -18,7 +18,6 @@ const util = require('util') const db = require('./db') const state = require('./middlewares/state') const processBatches = require('./tx-batching-processing') -const { getOperatorId } = require('./operator') const INCOMING_TX_INTERVAL = 30 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds @@ -85,13 +84,11 @@ db.connect({ direct: true }).then(sco => { switch (parsedData.type) { case 'reload': return reload(parsedData.schema) - case 'machineAction': - return machineAction(parsedData.action, parsedData.value) default: break } }) - return sco.none('LISTEN $1:name', 'poller') + return sco.none('LISTEN $1:name', 'reload') }).catch(console.error) function reload (schema) { @@ -99,40 +96,16 @@ function reload (schema) { store.set('schema', schema) // set asyncLocalStorage so settingsLoader loads settings for the right schema return asyncLocalStorage.run(store, () => { - return Promise.all([settingsLoader.loadLatest(), getOperatorId('middleware')]) - .then(([settings, operatorId]) => { + return settingsLoader.loadLatest() + .then(settings => { const pi = plugins(settings) cachedVariables.set(schema, { settings, pi, isReloading: false }) - state.needsSettingsReload[operatorId.operatorId] = true logger.debug(`Settings for schema '${schema}' reloaded in poller`) return updateAndLoadSanctions() }) }) } -function machineAction (type, value) { - const deviceId = value.deviceId - const operatorId = value.operatorId - const pid = state.pids?.[operatorId]?.[deviceId]?.pid - - switch (type) { - case 'reboot': - logger.debug(`Rebooting machine '${deviceId}' from operator ${operatorId}`) - state.reboots[operatorId] = { [deviceId]: pid } - break - case 'shutdown': - logger.debug(`Shutting down machine '${deviceId}' from operator ${operatorId}`) - state.shutdowns[operatorId] = { [deviceId]: pid } - break - case 'restartServices': - logger.debug(`Restarting services of machine '${deviceId}' from operator ${operatorId}`) - state.restartServicesMap[operatorId] = { [deviceId]: pid } - break - default: - break - } -} - function pi () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).pi } function settings () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).settings } From df4dd5ccb8b52a4f06e0d3c76025425e41ede41d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Salgado?= Date: Thu, 20 Jan 2022 15:26:26 +0000 Subject: [PATCH 3/3] fix: remove event type --- lib/machine-loader.js | 3 --- lib/middlewares/populateSettings.js | 14 ++------------ lib/new-settings-loader.js | 2 +- lib/poller.js | 7 +------ 4 files changed, 4 insertions(+), 22 deletions(-) diff --git a/lib/machine-loader.js b/lib/machine-loader.js index 51cf7430..ce62e05c 100644 --- a/lib/machine-loader.js +++ b/lib/machine-loader.js @@ -153,7 +153,6 @@ function unpair (rec) { function reboot (rec) { return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify( { - type: 'machineAction', action: 'reboot', value: _.pick(['deviceId', 'operatorId', 'action'], rec) } @@ -163,7 +162,6 @@ function reboot (rec) { function shutdown (rec) { return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify( { - type: 'machineAction', action: 'shutdown', value: _.pick(['deviceId', 'operatorId', 'action'], rec) } @@ -173,7 +171,6 @@ function shutdown (rec) { function restartServices (rec) { return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify( { - type: 'machineAction', action: 'restartServices', value: _.pick(['deviceId', 'operatorId', 'action'], rec) } diff --git a/lib/middlewares/populateSettings.js b/lib/middlewares/populateSettings.js index 5e696e80..80ccc9a6 100644 --- a/lib/middlewares/populateSettings.js +++ b/lib/middlewares/populateSettings.js @@ -9,12 +9,7 @@ const logger = require('../logger') db.connect({ direct: true }).then(sco => { sco.client.on('notification', data => { const parsedData = JSON.parse(data.payload) - switch (parsedData.type) { - case 'reload': - return reload(parsedData.operatorId) - default: - break - } + return reload(parsedData.operatorId) }) return sco.none('LISTEN $1:name', 'reload') }).catch(console.error) @@ -22,12 +17,7 @@ db.connect({ direct: true }).then(sco => { db.connect({ direct: true }).then(sco => { sco.client.on('notification', data => { const parsedData = JSON.parse(data.payload) - switch (parsedData.type) { - case 'machineAction': - return machineAction(parsedData.action, parsedData.value) - default: - break - } + return machineAction(parsedData.action, parsedData.value) }) return sco.none('LISTEN $1:name', 'machineAction') }).catch(console.error) diff --git a/lib/new-settings-loader.js b/lib/new-settings-loader.js index 744f0408..d9e838ef 100644 --- a/lib/new-settings-loader.js +++ b/lib/new-settings-loader.js @@ -77,7 +77,7 @@ function saveConfig (config) { const newConfig = _.assign(currentConfig, config) return db.tx(t => { return t.none(configSql, ['config', { config: newConfig }, true, NEW_SETTINGS_LOADER_SCHEMA_VERSION]) - .then(() => t.none('NOTIFY $1:name, $2', ['reload', JSON.stringify({ type: 'reload', schema: asyncLocalStorage.getStore().get('schema'), operatorId })])) + .then(() => t.none('NOTIFY $1:name, $2', ['reload', JSON.stringify({ schema: asyncLocalStorage.getStore().get('schema'), operatorId })])) }).catch(console.error) }) } diff --git a/lib/poller.js b/lib/poller.js index a36c4fb9..99ea5543 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -81,12 +81,7 @@ cachedVariables.on('expired', (key, val) => { db.connect({ direct: true }).then(sco => { sco.client.on('notification', data => { const parsedData = JSON.parse(data.payload) - switch (parsedData.type) { - case 'reload': - return reload(parsedData.schema) - default: - break - } + return reload(parsedData.schema) }) return sco.none('LISTEN $1:name', 'reload') }).catch(console.error)