Feat: allow polling of multiple db schemas
This commit is contained in:
parent
5a44216547
commit
4ce4a32502
4 changed files with 129 additions and 32 deletions
152
lib/poller.js
152
lib/poller.js
|
|
@ -1,5 +1,5 @@
|
|||
const _ = require('lodash/fp')
|
||||
|
||||
const Queue = require('queue-promise')
|
||||
const plugins = require('./plugins')
|
||||
const notifier = require('./notifier')
|
||||
const T = require('./time')
|
||||
|
|
@ -11,6 +11,10 @@ const sanctions = require('./ofac/index')
|
|||
const coinAtmRadar = require('./coinatmradar/coinatmradar')
|
||||
const configManager = require('./new-config-manager')
|
||||
const complianceTriggers = require('./compliance-triggers')
|
||||
const { asyncLocalStorage, defaultStore } = require('./async-storage')
|
||||
const settingsLoader = require('./new-settings-loader')
|
||||
const NodeCache = require('node-cache')
|
||||
const util = require('util')
|
||||
|
||||
const INCOMING_TX_INTERVAL = 30 * T.seconds
|
||||
const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds
|
||||
|
|
@ -24,25 +28,68 @@ const LOGS_CLEAR_INTERVAL = 1 * T.day
|
|||
const SANCTIONS_INITIAL_DOWNLOAD_INTERVAL = 5 * T.minutes
|
||||
const SANCTIONS_UPDATE_INTERVAL = 1 * T.week
|
||||
const RADAR_UPDATE_INTERVAL = 5 * T.minutes
|
||||
const PRUNE_MACHINES_HEARBEAT = 1 * T.day
|
||||
const PRUNE_MACHINES_HEARTBEAT = 1 * T.day
|
||||
|
||||
const CHECK_NOTIFICATION_INTERVAL = 20 * T.seconds
|
||||
|
||||
const PENDING_INTERVAL = 10 * T.seconds
|
||||
const CACHE_ENTRY_TTL = 3600 // seconds
|
||||
|
||||
const coinFilter = ['ETH']
|
||||
const FAST_QUEUE_WAIT = 1 * T.seconds
|
||||
const SLOW_QUEUE_WAIT = 10 * T.seconds
|
||||
|
||||
let _pi, _settings
|
||||
const FAST_QUEUE = new Queue({
|
||||
concurrent: 600,
|
||||
interval: FAST_QUEUE_WAIT
|
||||
})
|
||||
|
||||
function reload (__settings) {
|
||||
_settings = __settings
|
||||
_pi = plugins(_settings)
|
||||
logger.debug('settings reloaded in poller')
|
||||
updateAndLoadSanctions()
|
||||
const SLOW_QUEUE = new Queue({
|
||||
concurrent: 10,
|
||||
interval: SLOW_QUEUE_WAIT
|
||||
})
|
||||
|
||||
// Fix for asyncLocalStorage store being lost due to callback-based queue
|
||||
FAST_QUEUE.enqueue = util.promisify(FAST_QUEUE.enqueue)
|
||||
SLOW_QUEUE.enqueue = util.promisify(SLOW_QUEUE.enqueue)
|
||||
|
||||
const QUEUE = {
|
||||
FAST: FAST_QUEUE,
|
||||
SLOW: SLOW_QUEUE
|
||||
}
|
||||
|
||||
function pi () { return _pi }
|
||||
function settings () { return _settings }
|
||||
const coinFilter = ['ETH']
|
||||
const schemaCallbacks = new Map()
|
||||
|
||||
const cachedVariables = new NodeCache({
|
||||
stdTTL: CACHE_ENTRY_TTL,
|
||||
checkperiod: CACHE_ENTRY_TTL,
|
||||
deleteOnExpire: false,
|
||||
useClones: false // pass values by reference instead of cloning
|
||||
})
|
||||
|
||||
cachedVariables.on('expired', (key, val) => {
|
||||
if (!val.isReloading) {
|
||||
// since val is passed by reference we don't need to do cachedVariables.set()
|
||||
val.isReloading = true
|
||||
return reload(key)
|
||||
}
|
||||
})
|
||||
|
||||
function reload (schema) {
|
||||
const store = defaultStore()
|
||||
store.set('schema', schema)
|
||||
// set asyncLocalStorage so settingsLoader loads settings for the right schema
|
||||
asyncLocalStorage.run(store, () => {
|
||||
return settingsLoader.loadLatest().then(settings => {
|
||||
const pi = plugins(settings)
|
||||
cachedVariables.set(schema, { settings, pi, isReloading: false })
|
||||
logger.debug(`settings for schema "${schema}" reloaded in poller`)
|
||||
return updateAndLoadSanctions()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function pi () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).pi }
|
||||
function settings () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).settings }
|
||||
|
||||
function initialSanctionsDownload () {
|
||||
const structs = sanctions.getStructs()
|
||||
|
|
@ -70,9 +117,40 @@ function updateCoinAtmRadar () {
|
|||
.then(rates => coinAtmRadar.update(rates, settings()))
|
||||
}
|
||||
|
||||
function start (__settings) {
|
||||
reload(__settings)
|
||||
function initializeEachSchema (schemas = ['public']) {
|
||||
// for each schema set "thread variables" and do polling
|
||||
return _.forEach(schema => {
|
||||
const store = defaultStore()
|
||||
store.set('schema', schema)
|
||||
return asyncLocalStorage.run(store, () => {
|
||||
return settingsLoader.loadLatest().then(settings => {
|
||||
// prevent inadvertedly clearing the array without clearing timeouts
|
||||
if (schemaCallbacks.has(schema)) throw new Error(`The schema "${schema}" cannot be initialized twice on poller`)
|
||||
const pi = plugins(settings)
|
||||
cachedVariables.set(schema, { settings, pi, isReloading: false })
|
||||
schemaCallbacks.set(schema, [])
|
||||
return doPolling(schema)
|
||||
})
|
||||
}).catch(console.error)
|
||||
}, schemas)
|
||||
}
|
||||
|
||||
function addToQueue (func, interval, schema, queue, ...vars) {
|
||||
return schemaCallbacks.get(schema).push(setInterval(() => {
|
||||
return queue.enqueue().then(() => {
|
||||
// get plugins or settings from the cache every time func is run
|
||||
const loadVariables = vars.length > 0 && typeof vars[0] === 'function'
|
||||
if (loadVariables) {
|
||||
const funcVars = [...vars]
|
||||
funcVars[0] = vars[0]()
|
||||
return func(...funcVars)
|
||||
}
|
||||
return func(...vars)
|
||||
}).catch(console.error)
|
||||
}, interval))
|
||||
}
|
||||
|
||||
function doPolling (schema) {
|
||||
pi().executeTrades()
|
||||
pi().pong()
|
||||
pi().clearOldLogs()
|
||||
|
|
@ -87,23 +165,37 @@ function start (__settings) {
|
|||
notifier.checkNotification(pi())
|
||||
updateCoinAtmRadar()
|
||||
|
||||
setInterval(() => pi().executeTrades(), TRADE_INTERVAL)
|
||||
setInterval(() => cashOutTx.monitorLiveIncoming(settings(), false, coinFilter), LIVE_INCOMING_TX_INTERVAL)
|
||||
setInterval(() => cashOutTx.monitorStaleIncoming(settings(), false, coinFilter), INCOMING_TX_INTERVAL)
|
||||
addToQueue(pi().executeTrades, TRADE_INTERVAL, schema, QUEUE.FAST)
|
||||
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter)
|
||||
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter)
|
||||
if (!_.isEmpty(coinFilter)) {
|
||||
setInterval(() => cashOutTx.monitorLiveIncoming(settings(), true, coinFilter), LIVE_INCOMING_TX_INTERVAL_FILTER)
|
||||
setInterval(() => cashOutTx.monitorStaleIncoming(settings(), true, coinFilter), INCOMING_TX_INTERVAL_FILTER)
|
||||
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter)
|
||||
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter)
|
||||
}
|
||||
setInterval(() => cashOutTx.monitorUnnotified(settings()), UNNOTIFIED_INTERVAL)
|
||||
setInterval(() => cashInTx.monitorPending(settings()), PENDING_INTERVAL)
|
||||
setInterval(() => pi().sweepHd(), SWEEP_HD_INTERVAL)
|
||||
setInterval(() => pi().pong(), PONG_INTERVAL)
|
||||
setInterval(() => pi().clearOldLogs(), LOGS_CLEAR_INTERVAL)
|
||||
setInterval(() => notifier.checkNotification(pi()), CHECK_NOTIFICATION_INTERVAL)
|
||||
setInterval(initialSanctionsDownload, SANCTIONS_INITIAL_DOWNLOAD_INTERVAL)
|
||||
setInterval(updateAndLoadSanctions, SANCTIONS_UPDATE_INTERVAL)
|
||||
setInterval(updateCoinAtmRadar, RADAR_UPDATE_INTERVAL)
|
||||
setInterval(() => pi().pruneMachinesHeartbeat(), PRUNE_MACHINES_HEARBEAT)
|
||||
addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings)
|
||||
addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings)
|
||||
addToQueue(pi().sweepHd, SWEEP_HD_INTERVAL, schema, QUEUE.FAST, settings)
|
||||
addToQueue(pi().pong, PONG_INTERVAL, schema, QUEUE.FAST)
|
||||
addToQueue(pi().clearOldLogs, LOGS_CLEAR_INTERVAL, schema, QUEUE.SLOW)
|
||||
addToQueue(notifier.checkNotification, CHECK_NOTIFICATION_INTERVAL, schema, QUEUE.FAST, pi)
|
||||
addToQueue(initialSanctionsDownload, SANCTIONS_INITIAL_DOWNLOAD_INTERVAL, schema, QUEUE.SLOW)
|
||||
addToQueue(updateAndLoadSanctions, SANCTIONS_UPDATE_INTERVAL, schema, QUEUE.SLOW)
|
||||
addToQueue(updateCoinAtmRadar, RADAR_UPDATE_INTERVAL, schema, QUEUE.SLOW)
|
||||
addToQueue(pi().pruneMachinesHeartbeat(), PRUNE_MACHINES_HEARTBEAT, schema, QUEUE.SLOW, settings)
|
||||
}
|
||||
|
||||
module.exports = { start, reload }
|
||||
function setup (schemasToAdd = [], schemasToRemove = []) {
|
||||
// clear callback array for each schema in schemasToRemove and clear cached variables
|
||||
_.forEach(schema => {
|
||||
const callbacks = schemaCallbacks.get(schema)
|
||||
_.forEach(clearInterval, callbacks)
|
||||
schemaCallbacks.delete(schema)
|
||||
cachedVariables.del(schema)
|
||||
}, schemasToRemove)
|
||||
|
||||
return initializeEachSchema(schemasToAdd)
|
||||
}
|
||||
|
||||
const getActiveSchemas = () => Array.from(schemaCallbacks.keys())
|
||||
|
||||
module.exports = { setup, reload, getActiveSchemas }
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue