fix: separate poller and middleware postgres listeners

This commit is contained in:
Sérgio Salgado 2022-01-19 18:55:33 +00:00
parent 5945f9d31b
commit b98d73cd6e
4 changed files with 64 additions and 36 deletions

View file

@ -151,7 +151,7 @@ function unpair (rec) {
} }
function reboot (rec) { function reboot (rec) {
return db.none('NOTIFY $1:name, $2', ['poller', JSON.stringify( return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify(
{ {
type: 'machineAction', type: 'machineAction',
action: 'reboot', action: 'reboot',
@ -161,7 +161,7 @@ function reboot (rec) {
} }
function shutdown (rec) { function shutdown (rec) {
return db.none('NOTIFY $1:name, $2', ['poller', JSON.stringify( return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify(
{ {
type: 'machineAction', type: 'machineAction',
action: 'shutdown', action: 'shutdown',
@ -171,7 +171,7 @@ function shutdown (rec) {
} }
function restartServices (rec) { function restartServices (rec) {
return db.none('NOTIFY $1:name, $2', ['poller', JSON.stringify( return db.none('NOTIFY $1:name, $2', ['machineAction', JSON.stringify(
{ {
type: 'machineAction', type: 'machineAction',
action: 'restartServices', action: 'restartServices',

View file

@ -1,10 +1,64 @@
const _ = require('lodash/fp') const _ = require('lodash/fp')
const db = require('../db')
const state = require('./state') const state = require('./state')
const newSettingsLoader = require('../new-settings-loader') const newSettingsLoader = require('../new-settings-loader')
const helpers = require('../route-helpers') const helpers = require('../route-helpers')
const logger = require('../logger') const logger = require('../logger')
db.connect({ direct: true }).then(sco => {
sco.client.on('notification', data => {
const parsedData = JSON.parse(data.payload)
switch (parsedData.type) {
case 'reload':
return reload(parsedData.operatorId)
default:
break
}
})
return sco.none('LISTEN $1:name', 'reload')
}).catch(console.error)
db.connect({ direct: true }).then(sco => {
sco.client.on('notification', data => {
const parsedData = JSON.parse(data.payload)
switch (parsedData.type) {
case 'machineAction':
return machineAction(parsedData.action, parsedData.value)
default:
break
}
})
return sco.none('LISTEN $1:name', 'machineAction')
}).catch(console.error)
function machineAction (type, value) {
const deviceId = value.deviceId
const operatorId = value.operatorId
const pid = state.pids?.[operatorId]?.[deviceId]?.pid
switch (type) {
case 'reboot':
logger.debug(`Rebooting machine '${deviceId}' from operator ${operatorId}`)
state.reboots[operatorId] = { [deviceId]: pid }
break
case 'shutdown':
logger.debug(`Shutting down machine '${deviceId}' from operator ${operatorId}`)
state.shutdowns[operatorId] = { [deviceId]: pid }
break
case 'restartServices':
logger.debug(`Restarting services of machine '${deviceId}' from operator ${operatorId}`)
state.restartServicesMap[operatorId] = { [deviceId]: pid }
break
default:
break
}
}
function reload (operatorId) {
state.needsSettingsReload[operatorId.operatorId] = true
}
const populateSettings = function (req, res, next) { const populateSettings = function (req, res, next) {
const { needsSettingsReload, settingsCache } = state const { needsSettingsReload, settingsCache } = state
const operatorId = res.locals.operatorId const operatorId = res.locals.operatorId

View file

@ -2,6 +2,7 @@ const _ = require('lodash/fp')
const db = require('./db') const db = require('./db')
const migration = require('./config-migration') const migration = require('./config-migration')
const { asyncLocalStorage } = require('./async-storage') const { asyncLocalStorage } = require('./async-storage')
const { getOperatorId } = require('./operator')
const OLD_SETTINGS_LOADER_SCHEMA_VERSION = 1 const OLD_SETTINGS_LOADER_SCHEMA_VERSION = 1
const NEW_SETTINGS_LOADER_SCHEMA_VERSION = 2 const NEW_SETTINGS_LOADER_SCHEMA_VERSION = 2
@ -71,12 +72,12 @@ function showAccounts (schemaVersion) {
const configSql = 'insert into user_config (type, data, valid, schema_version) values ($1, $2, $3, $4)' const configSql = 'insert into user_config (type, data, valid, schema_version) values ($1, $2, $3, $4)'
function saveConfig (config) { function saveConfig (config) {
return loadLatestConfigOrNone() return Promise.all([loadLatestConfigOrNone(), getOperatorId('middleware')])
.then(currentConfig => { .then(([currentConfig, operatorId]) => {
const newConfig = _.assign(currentConfig, config) const newConfig = _.assign(currentConfig, config)
return db.tx(t => { return db.tx(t => {
return t.none(configSql, ['config', { config: newConfig }, true, NEW_SETTINGS_LOADER_SCHEMA_VERSION]) return t.none(configSql, ['config', { config: newConfig }, true, NEW_SETTINGS_LOADER_SCHEMA_VERSION])
.then(() => t.none('NOTIFY $1:name, $2', ['poller', JSON.stringify({ type: 'reload', schema: asyncLocalStorage.getStore().get('schema') })])) .then(() => t.none('NOTIFY $1:name, $2', ['reload', JSON.stringify({ type: 'reload', schema: asyncLocalStorage.getStore().get('schema'), operatorId })]))
}).catch(console.error) }).catch(console.error)
}) })
} }

View file

@ -18,7 +18,6 @@ const util = require('util')
const db = require('./db') const db = require('./db')
const state = require('./middlewares/state') const state = require('./middlewares/state')
const processBatches = require('./tx-batching-processing') const processBatches = require('./tx-batching-processing')
const { getOperatorId } = require('./operator')
const INCOMING_TX_INTERVAL = 30 * T.seconds const INCOMING_TX_INTERVAL = 30 * T.seconds
const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds
@ -85,13 +84,11 @@ db.connect({ direct: true }).then(sco => {
switch (parsedData.type) { switch (parsedData.type) {
case 'reload': case 'reload':
return reload(parsedData.schema) return reload(parsedData.schema)
case 'machineAction':
return machineAction(parsedData.action, parsedData.value)
default: default:
break break
} }
}) })
return sco.none('LISTEN $1:name', 'poller') return sco.none('LISTEN $1:name', 'reload')
}).catch(console.error) }).catch(console.error)
function reload (schema) { function reload (schema) {
@ -99,40 +96,16 @@ function reload (schema) {
store.set('schema', schema) store.set('schema', schema)
// set asyncLocalStorage so settingsLoader loads settings for the right schema // set asyncLocalStorage so settingsLoader loads settings for the right schema
return asyncLocalStorage.run(store, () => { return asyncLocalStorage.run(store, () => {
return Promise.all([settingsLoader.loadLatest(), getOperatorId('middleware')]) return settingsLoader.loadLatest()
.then(([settings, operatorId]) => { .then(settings => {
const pi = plugins(settings) const pi = plugins(settings)
cachedVariables.set(schema, { settings, pi, isReloading: false }) cachedVariables.set(schema, { settings, pi, isReloading: false })
state.needsSettingsReload[operatorId.operatorId] = true
logger.debug(`Settings for schema '${schema}' reloaded in poller`) logger.debug(`Settings for schema '${schema}' reloaded in poller`)
return updateAndLoadSanctions() return updateAndLoadSanctions()
}) })
}) })
} }
function machineAction (type, value) {
const deviceId = value.deviceId
const operatorId = value.operatorId
const pid = state.pids?.[operatorId]?.[deviceId]?.pid
switch (type) {
case 'reboot':
logger.debug(`Rebooting machine '${deviceId}' from operator ${operatorId}`)
state.reboots[operatorId] = { [deviceId]: pid }
break
case 'shutdown':
logger.debug(`Shutting down machine '${deviceId}' from operator ${operatorId}`)
state.shutdowns[operatorId] = { [deviceId]: pid }
break
case 'restartServices':
logger.debug(`Restarting services of machine '${deviceId}' from operator ${operatorId}`)
state.restartServicesMap[operatorId] = { [deviceId]: pid }
break
default:
break
}
}
function pi () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).pi } function pi () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).pi }
function settings () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).settings } function settings () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).settings }