321 lines
8.7 KiB
JavaScript
321 lines
8.7 KiB
JavaScript
const fs = require('fs/promises')
|
|
const path = require('path')
|
|
const _ = require('lodash/fp')
|
|
const Queue = require('queue-promise')
|
|
const plugins = require('./plugins')
|
|
const notifier = require('./notifier')
|
|
const T = require('./time')
|
|
const logger = require('./logger')
|
|
const cashOutTx = require('./cash-out/cash-out-tx')
|
|
const cashInTx = require('./cash-in/cash-in-tx')
|
|
const sanctionsUpdater = require('./ofac/update')
|
|
const sanctions = require('./ofac/index')
|
|
const coinAtmRadar = require('./coinatmradar/coinatmradar')
|
|
const configManager = require('./new-config-manager')
|
|
const complianceTriggers = require('./compliance-triggers')
|
|
const settingsLoader = require('./new-settings-loader')
|
|
const machineLoader = require('./machine-loader')
|
|
const NodeCache = require('node-cache')
|
|
const db = require('./db')
|
|
const processBatches = require('./tx-batching-processing')
|
|
|
|
const INCOMING_TX_INTERVAL = 30 * T.seconds
|
|
const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds
|
|
const UNNOTIFIED_INTERVAL = 10 * T.seconds
|
|
const SWEEP_HD_INTERVAL = 5 * T.minute
|
|
const TRADE_INTERVAL = 60 * T.seconds
|
|
const LOGS_CLEAR_INTERVAL = 1 * T.day
|
|
const SANCTIONS_INITIAL_DOWNLOAD_INTERVAL = 5 * T.minutes
|
|
const SANCTIONS_UPDATE_INTERVAL = 1 * T.day
|
|
const RADAR_UPDATE_INTERVAL = 5 * T.minutes
|
|
const PRUNE_MACHINES_HEARTBEAT = 1 * T.day
|
|
const TRANSACTION_BATCH_LIFECYCLE = 20 * T.minutes
|
|
const TICKER_RATES_INTERVAL = 59 * T.seconds
|
|
const FAILED_SCANS_INTERVAL = 1 * T.day
|
|
const PENDING_PINGS_INTERVAL = 90 * T.seconds // lib/notifier/codes.js
|
|
|
|
const CHECK_NOTIFICATION_INTERVAL = 20 * T.seconds
|
|
const PENDING_INTERVAL = 10 * T.seconds
|
|
const CACHE_ENTRY_TTL = 3600 // seconds
|
|
|
|
const FAST_QUEUE_WAIT = 1 * T.seconds
|
|
const SLOW_QUEUE_WAIT = 10 * T.seconds
|
|
|
|
const OPERATOR_DATA_DIR = process.env.OPERATOR_DATA_DIR
|
|
|
|
const FAST_QUEUE = new Queue({
|
|
concurrent: 600,
|
|
interval: FAST_QUEUE_WAIT,
|
|
})
|
|
|
|
const SLOW_QUEUE = new Queue({
|
|
concurrent: 10,
|
|
interval: SLOW_QUEUE_WAIT,
|
|
})
|
|
|
|
const QUEUE = {
|
|
FAST: FAST_QUEUE,
|
|
SLOW: SLOW_QUEUE,
|
|
}
|
|
|
|
const cachedVariables = new NodeCache({
|
|
stdTTL: CACHE_ENTRY_TTL,
|
|
checkperiod: CACHE_ENTRY_TTL,
|
|
deleteOnExpire: false,
|
|
useClones: false, // pass values by reference instead of cloning
|
|
})
|
|
|
|
cachedVariables.on('expired', (key, val) => {
|
|
if (!val.isReloading) {
|
|
// since val is passed by reference we don't need to do cachedVariables.set()
|
|
val.isReloading = true
|
|
return reload()
|
|
}
|
|
})
|
|
|
|
db.connect({ direct: true })
|
|
.then(sco => {
|
|
sco.client.on('notification', () => {
|
|
return reload()
|
|
})
|
|
return sco.none('LISTEN $1:name', 'reload')
|
|
})
|
|
.catch(console.error)
|
|
|
|
function reload() {
|
|
return settingsLoader.loadLatest().then(settings => {
|
|
const pi = plugins(settings)
|
|
cachedVariables.set('public', { settings, pi, isReloading: false })
|
|
logger.debug(`Settings for schema 'public' reloaded in poller`)
|
|
return updateAndLoadSanctions()
|
|
})
|
|
}
|
|
|
|
function pi() {
|
|
return cachedVariables.get('public').pi
|
|
}
|
|
function settings() {
|
|
return cachedVariables.get('public').settings
|
|
}
|
|
|
|
function initialSanctionsDownload() {
|
|
const structs = sanctions.getStructs()
|
|
const isEmptyStructs =
|
|
_.isNil(structs) || _.flow(_.values, _.all(_.isEmpty))(structs)
|
|
|
|
if (!isEmptyStructs) return Promise.resolve()
|
|
|
|
return updateAndLoadSanctions()
|
|
}
|
|
|
|
function updateAndLoadSanctions() {
|
|
const triggers = configManager.getTriggers(settings().config)
|
|
const hasSanctions = complianceTriggers.hasSanctions(triggers)
|
|
|
|
if (!hasSanctions) return Promise.resolve()
|
|
|
|
logger.info('Updating sanctions database...')
|
|
return sanctionsUpdater
|
|
.update()
|
|
.then(sanctions.load)
|
|
.then(() => logger.info('Sanctions database updated.'))
|
|
}
|
|
|
|
function updateCoinAtmRadar() {
|
|
return pi()
|
|
.getRawRates()
|
|
.then(rates => coinAtmRadar.update(rates, settings()))
|
|
}
|
|
|
|
const readdir = dirpath =>
|
|
fs
|
|
.readdir(dirpath, { withFileTypes: true })
|
|
.then(_.map(entry => _.set('path', path.join(dirpath, entry.name), entry)))
|
|
|
|
const readdirRec = rootPath =>
|
|
readdir(rootPath)
|
|
.then(entries =>
|
|
Promise.all(
|
|
entries.map(entry =>
|
|
entry.isDirectory() ? readdirRec(entry.path) : [entry],
|
|
),
|
|
),
|
|
)
|
|
.then(_.flatten)
|
|
|
|
const stat = path => fs.stat(path).then(_.set('path', path))
|
|
const pathComponents = p => path.normalize(p).split(path.sep)
|
|
|
|
// @see lib/customers.js:updateIdCardData()
|
|
const cleanOldFailedPDF417Scans = () => {
|
|
const matcher = (c, pat) => (typeof pat === 'function' ? pat(c) : c === pat)
|
|
const PDF417ScanPathPattern = _.concat(pathComponents(OPERATOR_DATA_DIR), [
|
|
'id-operator',
|
|
() => /* customerid*/ true,
|
|
'idcarddata',
|
|
fname => path.extname(fname) === 'jpg',
|
|
])
|
|
const isPDF417Scan = entry => {
|
|
entry = pathComponents(entry.path)
|
|
return (
|
|
entry.length === PDF417ScanPathPattern.length &&
|
|
_.isMatchWith(matcher, PDF417ScanPathPattern, pathComponents(entry.path))
|
|
)
|
|
}
|
|
|
|
let old = new Date()
|
|
old.setDate(old.getDate() - 2) // 2 days ago
|
|
old = old.getTime()
|
|
|
|
/* NOTE: Small caveat to mtime: last time the file was written to. */
|
|
const isOld = filestat => filestat.mtimeMs < old
|
|
|
|
return readdirRec(path.join(OPERATOR_DATA_DIR, 'id-operator'))
|
|
.then(entries =>
|
|
Promise.all(
|
|
entries
|
|
.filter(entry => entry.isFile() && isPDF417Scan(entry))
|
|
.map(entry => stat(entry.path)),
|
|
),
|
|
)
|
|
.then(filestats =>
|
|
Promise.all(
|
|
filestats.filter(isOld).map(_.flow(_.get(['path']), fs.unlink)),
|
|
),
|
|
)
|
|
.catch(err => {
|
|
console.log('Error cleaning up failed PDF417 scans:', err)
|
|
})
|
|
}
|
|
|
|
// @see lib/machine-loader.js:updateFailedQRScans()
|
|
const cleanOldFailedQRScans = () => {
|
|
const old = new Date()
|
|
old.setDate(old.getDate() - 2) // 2 days ago
|
|
|
|
const isOld = filepath => {
|
|
const then = new Date(path.basename(filepath).replace(/-[0-9]+\.jpg$/, ''))
|
|
return then < old
|
|
}
|
|
|
|
return readdirRec(path.join(OPERATOR_DATA_DIR, 'failedQRScans'))
|
|
.then(entries =>
|
|
Promise.all(
|
|
entries
|
|
.filter(entry => entry.isFile() && isOld(entry.path))
|
|
.map(entry => fs.unlink(entry.path)),
|
|
),
|
|
)
|
|
.catch(err => {
|
|
console.log('Error cleaning up failed QR scans:', err)
|
|
})
|
|
}
|
|
|
|
function setup() {
|
|
return settingsLoader
|
|
.loadLatest()
|
|
.then(settings => {
|
|
const pi = plugins(settings)
|
|
cachedVariables.set('public', { settings, pi, isReloading: false })
|
|
return doPolling()
|
|
})
|
|
.catch(console.error)
|
|
}
|
|
|
|
function recursiveTimeout(func, timeout, ...vars) {
|
|
setTimeout(() => {
|
|
let promise = null
|
|
|
|
const loadVariables = vars.length > 0 && typeof vars[0] === 'function'
|
|
if (loadVariables) {
|
|
const funcVars = [...vars]
|
|
funcVars[0] = vars[0]()
|
|
promise = func(...funcVars)
|
|
} else {
|
|
promise = func(...vars)
|
|
}
|
|
|
|
promise.finally(() => {
|
|
recursiveTimeout(func, timeout, ...vars)
|
|
})
|
|
}, timeout)
|
|
}
|
|
|
|
function addToQueue(func, interval, queue, ...vars) {
|
|
recursiveTimeout(func, interval, ...vars)
|
|
}
|
|
|
|
function doPolling() {
|
|
pi().executeTrades()
|
|
pi().clearOldLogs()
|
|
cashOutTx.monitorLiveIncoming(settings())
|
|
cashOutTx.monitorStaleIncoming(settings())
|
|
cashOutTx.monitorUnnotified(settings())
|
|
pi().sweepHd()
|
|
notifier.checkNotification(pi())
|
|
updateCoinAtmRadar()
|
|
|
|
addToQueue(pi().getRawRates, TICKER_RATES_INTERVAL, QUEUE.FAST)
|
|
addToQueue(pi().executeTrades, TRADE_INTERVAL, QUEUE.FAST)
|
|
addToQueue(
|
|
cashOutTx.monitorLiveIncoming,
|
|
LIVE_INCOMING_TX_INTERVAL,
|
|
QUEUE.FAST,
|
|
settings,
|
|
)
|
|
addToQueue(
|
|
cashOutTx.monitorStaleIncoming,
|
|
INCOMING_TX_INTERVAL,
|
|
QUEUE.FAST,
|
|
settings,
|
|
)
|
|
addToQueue(
|
|
cashOutTx.monitorUnnotified,
|
|
UNNOTIFIED_INTERVAL,
|
|
QUEUE.FAST,
|
|
settings,
|
|
)
|
|
addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, QUEUE.FAST, settings)
|
|
addToQueue(
|
|
processBatches,
|
|
UNNOTIFIED_INTERVAL,
|
|
QUEUE.FAST,
|
|
settings,
|
|
TRANSACTION_BATCH_LIFECYCLE,
|
|
)
|
|
addToQueue(pi().sweepHd, SWEEP_HD_INTERVAL, QUEUE.FAST, settings)
|
|
addToQueue(pi().clearOldLogs, LOGS_CLEAR_INTERVAL, QUEUE.SLOW)
|
|
addToQueue(
|
|
notifier.checkNotification,
|
|
CHECK_NOTIFICATION_INTERVAL,
|
|
QUEUE.FAST,
|
|
pi,
|
|
)
|
|
addToQueue(
|
|
initialSanctionsDownload,
|
|
SANCTIONS_INITIAL_DOWNLOAD_INTERVAL,
|
|
QUEUE.SLOW,
|
|
)
|
|
addToQueue(updateAndLoadSanctions, SANCTIONS_UPDATE_INTERVAL, QUEUE.SLOW)
|
|
addToQueue(updateCoinAtmRadar, RADAR_UPDATE_INTERVAL, QUEUE.SLOW)
|
|
addToQueue(
|
|
pi().pruneMachinesHeartbeat,
|
|
PRUNE_MACHINES_HEARTBEAT,
|
|
QUEUE.SLOW,
|
|
settings,
|
|
)
|
|
addToQueue(cleanOldFailedQRScans, FAILED_SCANS_INTERVAL, QUEUE.SLOW, settings)
|
|
addToQueue(
|
|
cleanOldFailedPDF417Scans,
|
|
FAILED_SCANS_INTERVAL,
|
|
QUEUE.SLOW,
|
|
settings,
|
|
)
|
|
addToQueue(
|
|
machineLoader.batchRecordPendingPings,
|
|
PENDING_PINGS_INTERVAL,
|
|
QUEUE.SLOW,
|
|
)
|
|
}
|
|
|
|
module.exports = { setup, reload }
|