diff --git a/lib/app.js b/lib/app.js index 14ace5aa..72394605 100644 --- a/lib/app.js +++ b/lib/app.js @@ -64,8 +64,7 @@ function loadSanctions (settings) { function startServer (settings) { return Promise.resolve() .then(() => { - poller.start(settings) - + poller.setup(['public']) const httpsServerOptions = { key: fs.readFileSync(options.keyPath), cert: fs.readFileSync(options.certPath), diff --git a/lib/poller.js b/lib/poller.js index 52d43f76..8a44df96 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -1,5 +1,5 @@ const _ = require('lodash/fp') - +const Queue = require('queue-promise') const plugins = require('./plugins') const notifier = require('./notifier') const T = require('./time') @@ -11,6 +11,10 @@ const sanctions = require('./ofac/index') const coinAtmRadar = require('./coinatmradar/coinatmradar') const configManager = require('./new-config-manager') const complianceTriggers = require('./compliance-triggers') +const { asyncLocalStorage, defaultStore } = require('./async-storage') +const settingsLoader = require('./new-settings-loader') +const NodeCache = require('node-cache') +const util = require('util') const INCOMING_TX_INTERVAL = 30 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds @@ -24,25 +28,68 @@ const LOGS_CLEAR_INTERVAL = 1 * T.day const SANCTIONS_INITIAL_DOWNLOAD_INTERVAL = 5 * T.minutes const SANCTIONS_UPDATE_INTERVAL = 1 * T.week const RADAR_UPDATE_INTERVAL = 5 * T.minutes -const PRUNE_MACHINES_HEARBEAT = 1 * T.day +const PRUNE_MACHINES_HEARTBEAT = 1 * T.day const CHECK_NOTIFICATION_INTERVAL = 20 * T.seconds - const PENDING_INTERVAL = 10 * T.seconds +const CACHE_ENTRY_TTL = 3600 // seconds -const coinFilter = ['ETH'] +const FAST_QUEUE_WAIT = 1 * T.seconds +const SLOW_QUEUE_WAIT = 10 * T.seconds -let _pi, _settings +const FAST_QUEUE = new Queue({ + concurrent: 600, + interval: FAST_QUEUE_WAIT +}) -function reload (__settings) { - _settings = __settings - _pi = plugins(_settings) - logger.debug('settings reloaded in poller') - updateAndLoadSanctions() +const SLOW_QUEUE = new Queue({ + concurrent: 10, + interval: SLOW_QUEUE_WAIT +}) + +// Fix for asyncLocalStorage store being lost due to callback-based queue +FAST_QUEUE.enqueue = util.promisify(FAST_QUEUE.enqueue) +SLOW_QUEUE.enqueue = util.promisify(SLOW_QUEUE.enqueue) + +const QUEUE = { + FAST: FAST_QUEUE, + SLOW: SLOW_QUEUE } -function pi () { return _pi } -function settings () { return _settings } +const coinFilter = ['ETH'] +const schemaCallbacks = new Map() + +const cachedVariables = new NodeCache({ + stdTTL: CACHE_ENTRY_TTL, + checkperiod: CACHE_ENTRY_TTL, + deleteOnExpire: false, + useClones: false // pass values by reference instead of cloning +}) + +cachedVariables.on('expired', (key, val) => { + if (!val.isReloading) { + // since val is passed by reference we don't need to do cachedVariables.set() + val.isReloading = true + return reload(key) + } +}) + +function reload (schema) { + const store = defaultStore() + store.set('schema', schema) + // set asyncLocalStorage so settingsLoader loads settings for the right schema + asyncLocalStorage.run(store, () => { + return settingsLoader.loadLatest().then(settings => { + const pi = plugins(settings) + cachedVariables.set(schema, { settings, pi, isReloading: false }) + logger.debug(`settings for schema "${schema}" reloaded in poller`) + return updateAndLoadSanctions() + }) + }) +} + +function pi () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).pi } +function settings () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).settings } function initialSanctionsDownload () { const structs = sanctions.getStructs() @@ -70,9 +117,40 @@ function updateCoinAtmRadar () { .then(rates => coinAtmRadar.update(rates, settings())) } -function start (__settings) { - reload(__settings) +function initializeEachSchema (schemas = ['public']) { + // for each schema set "thread variables" and do polling + return _.forEach(schema => { + const store = defaultStore() + store.set('schema', schema) + return asyncLocalStorage.run(store, () => { + return settingsLoader.loadLatest().then(settings => { + // prevent inadvertedly clearing the array without clearing timeouts + if (schemaCallbacks.has(schema)) throw new Error(`The schema "${schema}" cannot be initialized twice on poller`) + const pi = plugins(settings) + cachedVariables.set(schema, { settings, pi, isReloading: false }) + schemaCallbacks.set(schema, []) + return doPolling(schema) + }) + }).catch(console.error) + }, schemas) +} +function addToQueue (func, interval, schema, queue, ...vars) { + return schemaCallbacks.get(schema).push(setInterval(() => { + return queue.enqueue().then(() => { + // get plugins or settings from the cache every time func is run + const loadVariables = vars.length > 0 && typeof vars[0] === 'function' + if (loadVariables) { + const funcVars = [...vars] + funcVars[0] = vars[0]() + return func(...funcVars) + } + return func(...vars) + }).catch(console.error) + }, interval)) +} + +function doPolling (schema) { pi().executeTrades() pi().pong() pi().clearOldLogs() @@ -87,23 +165,37 @@ function start (__settings) { notifier.checkNotification(pi()) updateCoinAtmRadar() - setInterval(() => pi().executeTrades(), TRADE_INTERVAL) - setInterval(() => cashOutTx.monitorLiveIncoming(settings(), false, coinFilter), LIVE_INCOMING_TX_INTERVAL) - setInterval(() => cashOutTx.monitorStaleIncoming(settings(), false, coinFilter), INCOMING_TX_INTERVAL) + addToQueue(pi().executeTrades, TRADE_INTERVAL, schema, QUEUE.FAST) + addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter) + addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter) if (!_.isEmpty(coinFilter)) { - setInterval(() => cashOutTx.monitorLiveIncoming(settings(), true, coinFilter), LIVE_INCOMING_TX_INTERVAL_FILTER) - setInterval(() => cashOutTx.monitorStaleIncoming(settings(), true, coinFilter), INCOMING_TX_INTERVAL_FILTER) + addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter) + addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter) } - setInterval(() => cashOutTx.monitorUnnotified(settings()), UNNOTIFIED_INTERVAL) - setInterval(() => cashInTx.monitorPending(settings()), PENDING_INTERVAL) - setInterval(() => pi().sweepHd(), SWEEP_HD_INTERVAL) - setInterval(() => pi().pong(), PONG_INTERVAL) - setInterval(() => pi().clearOldLogs(), LOGS_CLEAR_INTERVAL) - setInterval(() => notifier.checkNotification(pi()), CHECK_NOTIFICATION_INTERVAL) - setInterval(initialSanctionsDownload, SANCTIONS_INITIAL_DOWNLOAD_INTERVAL) - setInterval(updateAndLoadSanctions, SANCTIONS_UPDATE_INTERVAL) - setInterval(updateCoinAtmRadar, RADAR_UPDATE_INTERVAL) - setInterval(() => pi().pruneMachinesHeartbeat(), PRUNE_MACHINES_HEARBEAT) + addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings) + addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings) + addToQueue(pi().sweepHd, SWEEP_HD_INTERVAL, schema, QUEUE.FAST, settings) + addToQueue(pi().pong, PONG_INTERVAL, schema, QUEUE.FAST) + addToQueue(pi().clearOldLogs, LOGS_CLEAR_INTERVAL, schema, QUEUE.SLOW) + addToQueue(notifier.checkNotification, CHECK_NOTIFICATION_INTERVAL, schema, QUEUE.FAST, pi) + addToQueue(initialSanctionsDownload, SANCTIONS_INITIAL_DOWNLOAD_INTERVAL, schema, QUEUE.SLOW) + addToQueue(updateAndLoadSanctions, SANCTIONS_UPDATE_INTERVAL, schema, QUEUE.SLOW) + addToQueue(updateCoinAtmRadar, RADAR_UPDATE_INTERVAL, schema, QUEUE.SLOW) + addToQueue(pi().pruneMachinesHeartbeat(), PRUNE_MACHINES_HEARTBEAT, schema, QUEUE.SLOW, settings) } -module.exports = { start, reload } +function setup (schemasToAdd = [], schemasToRemove = []) { + // clear callback array for each schema in schemasToRemove and clear cached variables + _.forEach(schema => { + const callbacks = schemaCallbacks.get(schema) + _.forEach(clearInterval, callbacks) + schemaCallbacks.delete(schema) + cachedVariables.del(schema) + }, schemasToRemove) + + return initializeEachSchema(schemasToAdd) +} + +const getActiveSchemas = () => Array.from(schemaCallbacks.keys()) + +module.exports = { setup, reload, getActiveSchemas } diff --git a/package-lock.json b/package-lock.json index 983fa99c..1a48e424 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17669,6 +17669,11 @@ "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" }, + "queue-promise": { + "version": "2.2.1", + "resolved": "https://registry.npmjs.org/queue-promise/-/queue-promise-2.2.1.tgz", + "integrity": "sha512-C3eyRwLF9m6dPV4MtqMVFX+Xmc7keZ9Ievm3jJ/wWM5t3uVbFnGsJXwpYzZ4LaIEcX9bss/mdaKzyrO6xheRuA==" + }, "random-bytes": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/random-bytes/-/random-bytes-1.0.0.tgz", diff --git a/package.json b/package.json index dafdfd7a..ae4647ff 100644 --- a/package.json +++ b/package.json @@ -65,6 +65,7 @@ "pretty-ms": "^2.1.0", "promise-sequential": "^1.1.1", "request-promise": "^4.2.6", + "queue-promise": "^2.2.1", "semver": "^7.1.3", "serve-static": "^1.12.4", "socket.io": "^2.0.3",