chore: remove dependency on async local storage

This commit is contained in:
Rafael Taranto 2025-04-01 11:21:27 +01:00
parent d452aac0f9
commit ec30811de1
14 changed files with 109 additions and 296 deletions

View file

@ -8,13 +8,11 @@ const T = require('./time')
const logger = require('./logger')
const cashOutTx = require('./cash-out/cash-out-tx')
const cashInTx = require('./cash-in/cash-in-tx')
const customers = require('./customers')
const sanctionsUpdater = require('./ofac/update')
const sanctions = require('./ofac/index')
const coinAtmRadar = require('./coinatmradar/coinatmradar')
const configManager = require('./new-config-manager')
const complianceTriggers = require('./compliance-triggers')
const { asyncLocalStorage, defaultStore } = require('./async-storage')
const settingsLoader = require('./new-settings-loader')
const NodeCache = require('node-cache')
const util = require('util')
@ -56,17 +54,11 @@ const SLOW_QUEUE = new Queue({
interval: SLOW_QUEUE_WAIT
})
// Fix for asyncLocalStorage store being lost due to callback-based queue
FAST_QUEUE.enqueue = util.promisify(FAST_QUEUE.enqueue)
SLOW_QUEUE.enqueue = util.promisify(SLOW_QUEUE.enqueue)
const QUEUE = {
FAST: FAST_QUEUE,
SLOW: SLOW_QUEUE
}
const schemaCallbacks = new Map()
const cachedVariables = new NodeCache({
stdTTL: CACHE_ENTRY_TTL,
checkperiod: CACHE_ENTRY_TTL,
@ -78,31 +70,25 @@ cachedVariables.on('expired', (key, val) => {
if (!val.isReloading) {
// since val is passed by reference we don't need to do cachedVariables.set()
val.isReloading = true
return reload(key)
return reload()
}
})
db.connect({ direct: true }).then(sco => {
sco.client.on('notification', data => {
const parsedData = JSON.parse(data.payload)
return reload(parsedData.schema)
sco.client.on('notification', () => {
return reload()
})
return sco.none('LISTEN $1:name', 'reload')
}).catch(console.error)
function reload (schema) {
const store = defaultStore()
store.set('schema', schema)
// set asyncLocalStorage so settingsLoader loads settings for the right schema
return asyncLocalStorage.run(store, () => {
return settingsLoader.loadLatest()
.then(settings => {
const pi = plugins(settings)
cachedVariables.set(schema, { settings, pi, isReloading: false })
logger.debug(`Settings for schema '${schema}' reloaded in poller`)
return updateAndLoadSanctions()
})
})
function reload () {
return settingsLoader.loadLatest()
.then(settings => {
const pi = plugins(settings)
cachedVariables.set('public', { settings, pi, isReloading: false })
logger.debug(`Settings for schema 'public' reloaded in poller`)
return updateAndLoadSanctions()
})
}
function pi () { return cachedVariables.get('public').pi }
@ -205,26 +191,12 @@ const cleanOldFailedQRScans = () => {
})
}
// function checkExternalCompliance (settings) {
// return customers.checkExternalCompliance(settings)
// }
function initializeEachSchema (schemas = ['public']) {
// for each schema set "thread variables" and do polling
return _.forEach(schema => {
const store = defaultStore()
store.set('schema', schema)
return asyncLocalStorage.run(store, () => {
return settingsLoader.loadLatest().then(settings => {
// prevent inadvertedly clearing the array without clearing timeouts
if (schemaCallbacks.has(schema)) throw new Error(`The schema "${schema}" cannot be initialized twice on poller`)
const pi = plugins(settings)
cachedVariables.set(schema, { settings, pi, isReloading: false })
schemaCallbacks.set(schema, [])
return doPolling(schema)
})
}).catch(console.error)
}, schemas)
function setup () {
return settingsLoader.loadLatest().then(settings => {
const pi = plugins(settings)
cachedVariables.set('public', { settings, pi, isReloading: false })
return doPolling()
}).catch(console.error)
}
function recursiveTimeout (func, timeout, ...vars) {
@ -246,23 +218,11 @@ function recursiveTimeout (func, timeout, ...vars) {
}, timeout)
}
function addToQueue (func, interval, schema, queue, ...vars) {
function addToQueue (func, interval, queue, ...vars) {
recursiveTimeout(func, interval, ...vars)
// return schemaCallbacks.get(schema).push(setInterval(() => {
// return queue.enqueue().then(() => {
// // get plugins or settings from the cache every time func is run
// const loadVariables = vars.length > 0 && typeof vars[0] === 'function'
// if (loadVariables) {
// const funcVars = [...vars]
// funcVars[0] = vars[0]()
// return func(...funcVars)
// }
// return func(...vars)
// }).catch(console.error)
// }, interval))
}
function doPolling (schema) {
function doPolling () {
pi().executeTrades()
pi().pong()
pi().clearOldLogs()
@ -272,40 +232,24 @@ function doPolling (schema) {
pi().sweepHd()
notifier.checkNotification(pi())
updateCoinAtmRadar()
// checkExternalCompliance(settings())
addToQueue(pi().getRawRates, TICKER_RATES_INTERVAL, schema, QUEUE.FAST)
addToQueue(pi().executeTrades, TRADE_INTERVAL, schema, QUEUE.FAST)
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings)
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings)
addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings)
addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings)
addToQueue(processBatches, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE)
addToQueue(pi().sweepHd, SWEEP_HD_INTERVAL, schema, QUEUE.FAST, settings)
addToQueue(pi().pong, PONG_INTERVAL, schema, QUEUE.FAST)
addToQueue(pi().clearOldLogs, LOGS_CLEAR_INTERVAL, schema, QUEUE.SLOW)
addToQueue(notifier.checkNotification, CHECK_NOTIFICATION_INTERVAL, schema, QUEUE.FAST, pi)
addToQueue(initialSanctionsDownload, SANCTIONS_INITIAL_DOWNLOAD_INTERVAL, schema, QUEUE.SLOW)
addToQueue(updateAndLoadSanctions, SANCTIONS_UPDATE_INTERVAL, schema, QUEUE.SLOW)
addToQueue(updateCoinAtmRadar, RADAR_UPDATE_INTERVAL, schema, QUEUE.SLOW)
addToQueue(pi().pruneMachinesHeartbeat, PRUNE_MACHINES_HEARTBEAT, schema, QUEUE.SLOW, settings)
addToQueue(cleanOldFailedQRScans, FAILED_SCANS_INTERVAL, schema, QUEUE.SLOW, settings)
addToQueue(cleanOldFailedPDF417Scans, FAILED_SCANS_INTERVAL, schema, QUEUE.SLOW, settings)
// addToQueue(checkExternalCompliance, EXTERNAL_COMPLIANCE_INTERVAL, schema, QUEUE.SLOW, settings)
addToQueue(pi().getRawRates, TICKER_RATES_INTERVAL, QUEUE.FAST)
addToQueue(pi().executeTrades, TRADE_INTERVAL, QUEUE.FAST)
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, QUEUE.FAST, settings)
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, QUEUE.FAST, settings)
addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, QUEUE.FAST, settings)
addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, QUEUE.FAST, settings)
addToQueue(processBatches, UNNOTIFIED_INTERVAL, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE)
addToQueue(pi().sweepHd, SWEEP_HD_INTERVAL, QUEUE.FAST, settings)
addToQueue(pi().pong, PONG_INTERVAL, QUEUE.FAST)
addToQueue(pi().clearOldLogs, LOGS_CLEAR_INTERVAL, QUEUE.SLOW)
addToQueue(notifier.checkNotification, CHECK_NOTIFICATION_INTERVAL, QUEUE.FAST, pi)
addToQueue(initialSanctionsDownload, SANCTIONS_INITIAL_DOWNLOAD_INTERVAL, QUEUE.SLOW)
addToQueue(updateAndLoadSanctions, SANCTIONS_UPDATE_INTERVAL, QUEUE.SLOW)
addToQueue(updateCoinAtmRadar, RADAR_UPDATE_INTERVAL, QUEUE.SLOW)
addToQueue(pi().pruneMachinesHeartbeat, PRUNE_MACHINES_HEARTBEAT, QUEUE.SLOW, settings)
addToQueue(cleanOldFailedQRScans, FAILED_SCANS_INTERVAL, QUEUE.SLOW, settings)
addToQueue(cleanOldFailedPDF417Scans, FAILED_SCANS_INTERVAL, QUEUE.SLOW, settings)
}
function setup (schemasToAdd = [], schemasToRemove = []) {
// clear callback array for each schema in schemasToRemove and clear cached variables
_.forEach(schema => {
const callbacks = schemaCallbacks.get(schema)
_.forEach(clearInterval, callbacks)
schemaCallbacks.delete(schema)
cachedVariables.del(schema)
}, schemasToRemove)
return initializeEachSchema(schemasToAdd)
}
const getActiveSchemas = () => Array.from(schemaCallbacks.keys())
module.exports = { setup, reload, getActiveSchemas }
module.exports = { setup, reload }