From b98d73cd6eed0b2e7520dfcf0424e4487eebcca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Salgado?= Date: Wed, 19 Jan 2022 18:55:33 +0000 Subject: [PATCH] fix: separate poller and middleware postgres listeners --- lib/machine-loader.js | 6 ++-- lib/middlewares/populateSettings.js | 54 +++++++++++++++++++++++++++++ lib/new-settings-loader.js | 7 ++-- lib/poller.js | 33 ++---------------- 4 files changed, 64 insertions(+), 36 deletions(-) diff --git a/lib/machine-loader.js b/lib/machine-loader.js index 6818676f..51cf7430 100644 --- a/lib/machine-loader.js +++ b/lib/machine-loader.js @@ -151,7 +151,7 @@ function unpair (rec) { } function reboot (rec) { - return db.none('NOTIFY $1:name, $2', ['poller', JSON.stringify( + return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify( { type: 'machineAction', action: 'reboot', @@ -161,7 +161,7 @@ function reboot (rec) { } function shutdown (rec) { - return db.none('NOTIFY $1:name, $2', ['poller', JSON.stringify( + return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify( { type: 'machineAction', action: 'shutdown', @@ -171,7 +171,7 @@ function shutdown (rec) { } function restartServices (rec) { - return db.none('NOTIFY $1:name, $2', ['poller', JSON.stringify( + return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify( { type: 'machineAction', action: 'restartServices', diff --git a/lib/middlewares/populateSettings.js b/lib/middlewares/populateSettings.js index fd3d4146..5e696e80 100644 --- a/lib/middlewares/populateSettings.js +++ b/lib/middlewares/populateSettings.js @@ -1,10 +1,64 @@ const _ = require('lodash/fp') +const db = require('../db') const state = require('./state') const newSettingsLoader = require('../new-settings-loader') const helpers = require('../route-helpers') const logger = require('../logger') +db.connect({ direct: true }).then(sco => { + sco.client.on('notification', data => { + const parsedData = JSON.parse(data.payload) + switch (parsedData.type) { + case 'reload': + return reload(parsedData.operatorId) + default: + break + } + }) + return sco.none('LISTEN $1:name', 'reload') +}).catch(console.error) + +db.connect({ direct: true }).then(sco => { + sco.client.on('notification', data => { + const parsedData = JSON.parse(data.payload) + switch (parsedData.type) { + case 'machineAction': + return machineAction(parsedData.action, parsedData.value) + default: + break + } + }) + return sco.none('LISTEN $1:name', 'machineAction') +}).catch(console.error) + +function machineAction (type, value) { + const deviceId = value.deviceId + const operatorId = value.operatorId + const pid = state.pids?.[operatorId]?.[deviceId]?.pid + + switch (type) { + case 'reboot': + logger.debug(`Rebooting machine '${deviceId}' from operator ${operatorId}`) + state.reboots[operatorId] = { [deviceId]: pid } + break + case 'shutdown': + logger.debug(`Shutting down machine '${deviceId}' from operator ${operatorId}`) + state.shutdowns[operatorId] = { [deviceId]: pid } + break + case 'restartServices': + logger.debug(`Restarting services of machine '${deviceId}' from operator ${operatorId}`) + state.restartServicesMap[operatorId] = { [deviceId]: pid } + break + default: + break + } +} + +function reload (operatorId) { + state.needsSettingsReload[operatorId.operatorId] = true +} + const populateSettings = function (req, res, next) { const { needsSettingsReload, settingsCache } = state const operatorId = res.locals.operatorId diff --git a/lib/new-settings-loader.js b/lib/new-settings-loader.js index 18da5c61..744f0408 100644 --- a/lib/new-settings-loader.js +++ b/lib/new-settings-loader.js @@ -2,6 +2,7 @@ const _ = require('lodash/fp') const db = require('./db') const migration = require('./config-migration') const { asyncLocalStorage } = require('./async-storage') +const { getOperatorId } = require('./operator') const OLD_SETTINGS_LOADER_SCHEMA_VERSION = 1 const NEW_SETTINGS_LOADER_SCHEMA_VERSION = 2 @@ -71,12 +72,12 @@ function showAccounts (schemaVersion) { const configSql = 'insert into user_config (type, data, valid, schema_version) values ($1, $2, $3, $4)' function saveConfig (config) { - return loadLatestConfigOrNone() - .then(currentConfig => { + return Promise.all([loadLatestConfigOrNone(), getOperatorId('middleware')]) + .then(([currentConfig, operatorId]) => { const newConfig = _.assign(currentConfig, config) return db.tx(t => { return t.none(configSql, ['config', { config: newConfig }, true, NEW_SETTINGS_LOADER_SCHEMA_VERSION]) - .then(() => t.none('NOTIFY $1:name, $2', ['poller', JSON.stringify({ type: 'reload', schema: asyncLocalStorage.getStore().get('schema') })])) + .then(() => t.none('NOTIFY $1:name, $2', ['reload', JSON.stringify({ type: 'reload', schema: asyncLocalStorage.getStore().get('schema'), operatorId })])) }).catch(console.error) }) } diff --git a/lib/poller.js b/lib/poller.js index 914554dd..a36c4fb9 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -18,7 +18,6 @@ const util = require('util') const db = require('./db') const state = require('./middlewares/state') const processBatches = require('./tx-batching-processing') -const { getOperatorId } = require('./operator') const INCOMING_TX_INTERVAL = 30 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds @@ -85,13 +84,11 @@ db.connect({ direct: true }).then(sco => { switch (parsedData.type) { case 'reload': return reload(parsedData.schema) - case 'machineAction': - return machineAction(parsedData.action, parsedData.value) default: break } }) - return sco.none('LISTEN $1:name', 'poller') + return sco.none('LISTEN $1:name', 'reload') }).catch(console.error) function reload (schema) { @@ -99,40 +96,16 @@ function reload (schema) { store.set('schema', schema) // set asyncLocalStorage so settingsLoader loads settings for the right schema return asyncLocalStorage.run(store, () => { - return Promise.all([settingsLoader.loadLatest(), getOperatorId('middleware')]) - .then(([settings, operatorId]) => { + return settingsLoader.loadLatest() + .then(settings => { const pi = plugins(settings) cachedVariables.set(schema, { settings, pi, isReloading: false }) - state.needsSettingsReload[operatorId.operatorId] = true logger.debug(`Settings for schema '${schema}' reloaded in poller`) return updateAndLoadSanctions() }) }) } -function machineAction (type, value) { - const deviceId = value.deviceId - const operatorId = value.operatorId - const pid = state.pids?.[operatorId]?.[deviceId]?.pid - - switch (type) { - case 'reboot': - logger.debug(`Rebooting machine '${deviceId}' from operator ${operatorId}`) - state.reboots[operatorId] = { [deviceId]: pid } - break - case 'shutdown': - logger.debug(`Shutting down machine '${deviceId}' from operator ${operatorId}`) - state.shutdowns[operatorId] = { [deviceId]: pid } - break - case 'restartServices': - logger.debug(`Restarting services of machine '${deviceId}' from operator ${operatorId}`) - state.restartServicesMap[operatorId] = { [deviceId]: pid } - break - default: - break - } -} - function pi () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).pi } function settings () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).settings }