Merge pull request #1816 from RafaelTaranto/chore/database-optimization
LAM-1092 database optimization
This commit is contained in:
commit
2dddd24d3f
18 changed files with 141 additions and 307 deletions
|
|
@ -4,34 +4,22 @@ const _ = require('lodash/fp')
|
|||
require('../lib/environment-helper')
|
||||
const db = require('../lib/db')
|
||||
const migrate = require('../lib/migrate')
|
||||
const { asyncLocalStorage, defaultStore } = require('../lib/async-storage')
|
||||
|
||||
const createMigration = `CREATE TABLE IF NOT EXISTS migrations (
|
||||
id serial PRIMARY KEY,
|
||||
data json NOT NULL
|
||||
)`
|
||||
|
||||
const select = 'select * from migrations limit 1'
|
||||
// no need to log the migration process
|
||||
process.env.SKIP_SERVER_LOGS = true
|
||||
|
||||
const getMigrateFile = () => Promise.resolve()
|
||||
|
||||
const store = defaultStore()
|
||||
asyncLocalStorage.run(store, () => {
|
||||
db.none(createMigration)
|
||||
.then(() => Promise.all([db.oneOrNone(select), getMigrateFile()]))
|
||||
.then(([qResult, migrateFile]) => {
|
||||
process.env.SKIP_SERVER_LOGS = !(qResult && _.find(({ title }) => title === '1572524820075-server-support-logs.js', qResult.data.migrations ?? []))
|
||||
if (!qResult && migrateFile) {
|
||||
return db.none('insert into migrations (id, data) values (1, $1)', [migrateFile])
|
||||
}
|
||||
})
|
||||
.then(() => migrate.run())
|
||||
.then(() => {
|
||||
console.log('DB Migration succeeded.')
|
||||
process.exit(0)
|
||||
})
|
||||
.catch(err => {
|
||||
console.error('DB Migration failed: %s', err)
|
||||
process.exit(1)
|
||||
})
|
||||
})
|
||||
db.none(createMigration)
|
||||
.then(() => migrate.run())
|
||||
.then(() => {
|
||||
console.log('DB Migration succeeded.')
|
||||
process.exit(0)
|
||||
})
|
||||
.catch(err => {
|
||||
console.error('DB Migration failed: %s', err)
|
||||
process.exit(1)
|
||||
})
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
#!/usr/bin/env node
|
||||
|
||||
require('../lib/environment-helper')
|
||||
const { asyncLocalStorage, defaultStore } = require('../lib/async-storage')
|
||||
const userManagement = require('../lib/new-admin/graphql/modules/userManagement')
|
||||
const authErrors = require('../lib/new-admin/graphql/errors')
|
||||
|
||||
|
|
@ -32,23 +31,21 @@ if (role !== 'user' && role !== 'superuser') {
|
|||
process.exit(2)
|
||||
}
|
||||
|
||||
asyncLocalStorage.run(defaultStore(), () => {
|
||||
userManagement.createRegisterToken(name, role).then(token => {
|
||||
if (domain === 'localhost' && process.env.NODE_ENV !== 'production') {
|
||||
console.log(`https://${domain}:3001/register?t=${token.token}`)
|
||||
} else {
|
||||
console.log(`https://${domain}/register?t=${token.token}`)
|
||||
}
|
||||
userManagement.createRegisterToken(name, role).then(token => {
|
||||
if (domain === 'localhost' && process.env.NODE_ENV !== 'production') {
|
||||
console.log(`https://${domain}:3001/register?t=${token.token}`)
|
||||
} else {
|
||||
console.log(`https://${domain}/register?t=${token.token}`)
|
||||
}
|
||||
|
||||
process.exit(0)
|
||||
}).catch(err => {
|
||||
process.exit(0)
|
||||
}).catch(err => {
|
||||
|
||||
if (err instanceof authErrors.UserAlreadyExistsError){
|
||||
console.log(`A user with email ${name} already exists!`)
|
||||
process.exit(2)
|
||||
}
|
||||
if (err instanceof authErrors.UserAlreadyExistsError){
|
||||
console.log(`A user with email ${name} already exists!`)
|
||||
process.exit(2)
|
||||
}
|
||||
|
||||
console.log('Error: %s', err)
|
||||
process.exit(3)
|
||||
})
|
||||
console.log('Error: %s', err)
|
||||
process.exit(3)
|
||||
})
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
FROM node:22-alpine AS build
|
||||
RUN apk add --no-cache npm git curl build-base net-tools python3 postgresql-dev
|
||||
|
||||
WORKDIR lamassu-server
|
||||
WORKDIR /lamassu-server
|
||||
|
||||
COPY ["package.json", "package-lock.json", "./"]
|
||||
RUN npm version --allow-same-version --git-tag-version false --commit-hooks false 1.0.0
|
||||
|
|
@ -28,6 +28,8 @@ ENTRYPOINT [ "/lamassu-server/bin/lamassu-server-entrypoint.sh" ]
|
|||
FROM node:22-alpine AS build-ui
|
||||
RUN apk add --no-cache npm git curl build-base python3
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY ["new-lamassu-admin/package.json", "new-lamassu-admin/package-lock.json", "./"]
|
||||
|
||||
RUN npm version --allow-same-version --git-tag-version false --commit-hooks false 1.0.0
|
||||
|
|
@ -38,7 +40,7 @@ RUN npm run build
|
|||
|
||||
|
||||
FROM l-s-base AS l-a-s
|
||||
COPY --from=build-ui /build /lamassu-server/public
|
||||
COPY --from=build-ui /app/build /lamassu-server/public
|
||||
|
||||
RUN chmod +x /lamassu-server/bin/lamassu-admin-server-entrypoint.sh
|
||||
|
||||
|
|
|
|||
46
lib/app.js
46
lib/app.js
|
|
@ -3,7 +3,6 @@ const https = require('https')
|
|||
const argv = require('minimist')(process.argv.slice(2))
|
||||
|
||||
require('./environment-helper')
|
||||
const { asyncLocalStorage, defaultStore } = require('./async-storage')
|
||||
const { loadRoutes } = require('./routes')
|
||||
const logger = require('./logger')
|
||||
const poller = require('./poller')
|
||||
|
|
@ -21,32 +20,29 @@ const version = require('../package.json').version
|
|||
logger.info('Version: %s', version)
|
||||
|
||||
function run () {
|
||||
const store = defaultStore()
|
||||
return asyncLocalStorage.run(store, () => {
|
||||
return new Promise((resolve, reject) => {
|
||||
let count = 0
|
||||
let handler
|
||||
return new Promise((resolve, reject) => {
|
||||
let count = 0
|
||||
let handler
|
||||
|
||||
const errorHandler = err => {
|
||||
count += 1
|
||||
logger.error(err)
|
||||
logger.error('[%d] Retrying in 10s...', count)
|
||||
}
|
||||
const errorHandler = err => {
|
||||
count += 1
|
||||
logger.error(err)
|
||||
logger.error('[%d] Retrying in 10s...', count)
|
||||
}
|
||||
|
||||
const runner = () => {
|
||||
settingsLoader.loadLatest()
|
||||
.then(settings => {
|
||||
clearInterval(handler)
|
||||
return loadSanctions(settings)
|
||||
.then(startServer)
|
||||
.then(resolve)
|
||||
})
|
||||
.catch(errorHandler)
|
||||
}
|
||||
const runner = () => {
|
||||
settingsLoader.loadLatest()
|
||||
.then(settings => {
|
||||
clearInterval(handler)
|
||||
return loadSanctions(settings)
|
||||
.then(startServer)
|
||||
.then(resolve)
|
||||
})
|
||||
.catch(errorHandler)
|
||||
}
|
||||
|
||||
handler = setInterval(runner, 10000)
|
||||
runner()
|
||||
})
|
||||
handler = setInterval(runner, 10000)
|
||||
runner()
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -69,7 +65,7 @@ function loadSanctions (settings) {
|
|||
async function startServer () {
|
||||
const app = await loadRoutes()
|
||||
|
||||
poller.setup(['public'])
|
||||
poller.setup()
|
||||
|
||||
const httpsServerOptions = {
|
||||
key: fs.readFileSync(KEY_PATH),
|
||||
|
|
|
|||
|
|
@ -1,11 +0,0 @@
|
|||
const { AsyncLocalStorage } = require('async_hooks')
|
||||
const asyncLocalStorage = new AsyncLocalStorage()
|
||||
|
||||
const defaultStore = () => {
|
||||
const store = new Map()
|
||||
store.set('schema', 'public')
|
||||
store.set('defaultSchema', 'ERROR_SCHEMA')
|
||||
return store
|
||||
}
|
||||
|
||||
module.exports = { asyncLocalStorage, defaultStore }
|
||||
|
|
@ -50,7 +50,7 @@ const BINARIES = {
|
|||
defaultUrlHash: 'd89c2afd78183f3ee815adcccdff02098be0c982633889e7b1e9c9656fbef219',
|
||||
defaultDir: 'dashcore-18.1.0/bin',
|
||||
url: 'https://github.com/dashpay/dash/releases/download/v21.1.1/dashcore-21.1.1-x86_64-linux-gnu.tar.gz',
|
||||
dir: 'dashcore-21.1.1/bin'
|
||||
dir: 'dashcore-21.1.1/bin',
|
||||
urlHash: 'c3157d4a82a3cb7c904a68e827bd1e629854fefcc0dcaf1de4343a810a190bf5',
|
||||
},
|
||||
LTC: {
|
||||
|
|
|
|||
|
|
@ -1,8 +0,0 @@
|
|||
const { asyncLocalStorage, defaultStore } = require('./async-storage')
|
||||
|
||||
const computeSchema = (req, res, next) => {
|
||||
const store = defaultStore()
|
||||
return asyncLocalStorage.run(store, () => next())
|
||||
}
|
||||
|
||||
module.exports = computeSchema
|
||||
85
lib/db.js
85
lib/db.js
|
|
@ -5,81 +5,12 @@ const _ = require('lodash/fp')
|
|||
const { PSQL_URL } = require('./constants')
|
||||
const logger = require('./logger')
|
||||
const eventBus = require('./event-bus')
|
||||
const { asyncLocalStorage, defaultStore } = require('./async-storage')
|
||||
|
||||
const DATABASE_NOT_REACHABLE = 'Database not reachable.'
|
||||
|
||||
const stripDefaultDbFuncs = dbCtx => {
|
||||
return {
|
||||
ctx: dbCtx.ctx,
|
||||
query: dbCtx.$query,
|
||||
result: dbCtx.$result,
|
||||
many: dbCtx.$many,
|
||||
oneOrNone: dbCtx.$oneOrNone,
|
||||
one: dbCtx.$one,
|
||||
none: dbCtx.$none,
|
||||
any: dbCtx.$any,
|
||||
manyOrNone: dbCtx.$manyOrNone,
|
||||
tx: dbCtx.$tx,
|
||||
task: dbCtx.$task,
|
||||
batch: dbCtx.batch,
|
||||
multi: dbCtx.$multi,
|
||||
connect: dbCtx.connect
|
||||
}
|
||||
}
|
||||
|
||||
const _tx = (obj, opts, cb) => {
|
||||
return obj.tx(opts, t => {
|
||||
return cb(stripDefaultDbFuncs(t))
|
||||
})
|
||||
}
|
||||
|
||||
const _task = (obj, opts, cb) => {
|
||||
return obj.task(opts, t => {
|
||||
return cb(stripDefaultDbFuncs(t))
|
||||
})
|
||||
}
|
||||
|
||||
const getSchema = () => {
|
||||
const store = asyncLocalStorage.getStore() ?? defaultStore()
|
||||
return asyncLocalStorage.run(store, () => store.get('schema'))
|
||||
}
|
||||
const getDefaultSchema = () => 'ERROR_SCHEMA'
|
||||
|
||||
const searchPathWrapper = (t, cb) => {
|
||||
return t.none('SET search_path TO $1:name', [getSchema()])
|
||||
.then(cb.bind(t, t))
|
||||
.catch(logger.error)
|
||||
.finally(() => t.none('SET search_path TO $1:name', [getDefaultSchema()]))
|
||||
}
|
||||
|
||||
const pgp = Pgp({
|
||||
pgNative: true,
|
||||
schema: 'ERROR_SCHEMA',
|
||||
extend (obj, dbContext) {
|
||||
obj.__taskEx = function (cb, throwOnError = true) {
|
||||
const args = pgp.utils.taskArgs(arguments)
|
||||
const schema = getSchema()
|
||||
if (!schema && throwOnError) {
|
||||
return Promise.reject(new Error('No schema selected, cannot complete query'))
|
||||
} else if (!schema) {
|
||||
return Promise.resolve('No schema selected, cannot complete query')
|
||||
}
|
||||
return obj.task.call(this, args.options, t => searchPathWrapper(t, cb))
|
||||
}
|
||||
obj.$query = (query, values, qrm, throwOnError) => obj.__taskEx(t => t.query(query, values, qrm), throwOnError)
|
||||
obj.$result = (query, variables, cb, thisArg) => obj.__taskEx(t => t.result(query, variables, cb, thisArg))
|
||||
obj.$many = (query, variables) => obj.__taskEx(t => t.many(query, variables))
|
||||
obj.$manyOrNone = (query, variables) => obj.__taskEx(t => t.manyOrNone(query, variables))
|
||||
obj.$oneOrNone = (query, variables) => obj.__taskEx(t => t.oneOrNone(query, variables))
|
||||
obj.$one = (query, variables) => obj.__taskEx(t => t.one(query, variables))
|
||||
obj.$none = (query, variables) => obj.__taskEx(t => t.none(query, variables))
|
||||
obj.$any = (query, variables) => obj.__taskEx(t => t.any(query, variables))
|
||||
obj.$multi = (query, variables) => obj.__taskEx(t => t.multi(query, variables))
|
||||
// when opts is not defined "cb" occupies the "opts" spot of the arguments
|
||||
obj.$tx = (opts, cb) => typeof opts === 'function' ? _tx(obj, {}, opts) : _tx(obj, opts, cb)
|
||||
obj.$task = (opts, cb) => typeof opts === 'function' ? _task(obj, {}, opts) : _task(obj, opts, cb)
|
||||
},
|
||||
schema: 'public',
|
||||
error: (err, e) => {
|
||||
if (e.cn) logger.error(DATABASE_NOT_REACHABLE)
|
||||
else if (e.query) {
|
||||
|
|
@ -90,7 +21,7 @@ const pgp = Pgp({
|
|||
}
|
||||
})
|
||||
|
||||
const db = stripDefaultDbFuncs(pgp(PSQL_URL))
|
||||
const db = pgp(PSQL_URL)
|
||||
|
||||
eventBus.subscribe('log', args => {
|
||||
if (process.env.SKIP_SERVER_LOGS) return
|
||||
|
|
@ -104,14 +35,10 @@ eventBus.subscribe('log', args => {
|
|||
|
||||
const sql = `insert into server_logs
|
||||
(id, device_id, message, log_level, meta) values ($1, $2, $3, $4, $5) returning *`
|
||||
// need to set AsyncLocalStorage (ALS) for this function as well
|
||||
// because this module is imported before ALS is set up on app.js
|
||||
const store = defaultStore()
|
||||
asyncLocalStorage.run(store, () => {
|
||||
db.one(sql, [uuid.v4(), '', msgToSave, level, meta])
|
||||
.then(_.mapKeys(_.camelCase))
|
||||
.catch(_.noop)
|
||||
})
|
||||
|
||||
db.one(sql, [uuid.v4(), '', msgToSave, level, meta])
|
||||
.then(_.mapKeys(_.camelCase))
|
||||
.catch(_.noop)
|
||||
})
|
||||
|
||||
module.exports = db
|
||||
|
|
|
|||
|
|
@ -1,10 +0,0 @@
|
|||
const { asyncLocalStorage, defaultStore } = require('../async-storage')
|
||||
|
||||
const computeSchema = (req, res, next) => {
|
||||
const store = defaultStore()
|
||||
asyncLocalStorage.run(store, () => {
|
||||
next()
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = computeSchema
|
||||
|
|
@ -16,14 +16,12 @@ const { mergeResolvers } = require('@graphql-tools/merge')
|
|||
const { makeExecutableSchema } = require('@graphql-tools/schema')
|
||||
|
||||
require('../environment-helper')
|
||||
const { asyncLocalStorage, defaultStore } = require('../async-storage')
|
||||
const logger = require('../logger')
|
||||
const exchange = require('../exchange')
|
||||
|
||||
const { authDirectiveTransformer } = require('./graphql/directives')
|
||||
const { typeDefs, resolvers } = require('./graphql/schema')
|
||||
const findOperatorId = require('../middlewares/operatorId')
|
||||
const computeSchema = require('../compute-schema')
|
||||
const { USER_SESSIONS_CLEAR_INTERVAL } = require('../constants')
|
||||
const { session, cleanUserSessions, buildApolloContext } = require('./middlewares')
|
||||
|
||||
|
|
@ -53,7 +51,6 @@ const loadRoutes = async () => {
|
|||
app.use(express.urlencoded({ extended: true })) // support encoded bodies
|
||||
app.use(express.static(path.resolve(__dirname, '..', '..', 'public')))
|
||||
app.use(cleanUserSessions(USER_SESSIONS_CLEAR_INTERVAL))
|
||||
app.use(computeSchema)
|
||||
app.use(findOperatorId)
|
||||
app.use(session)
|
||||
|
||||
|
|
@ -111,20 +108,17 @@ const certOptions = {
|
|||
ca: fs.readFileSync(CA_PATH)
|
||||
}
|
||||
|
||||
function run () {
|
||||
const store = defaultStore()
|
||||
asyncLocalStorage.run(store, async () => {
|
||||
const app = await loadRoutes()
|
||||
const serverPort = devMode ? 8070 : 443
|
||||
async function run () {
|
||||
const app = await loadRoutes()
|
||||
const serverPort = devMode ? 8070 : 443
|
||||
|
||||
const serverLog = `lamassu-admin-server listening on port ${serverPort}`
|
||||
const serverLog = `lamassu-admin-server listening on port ${serverPort}`
|
||||
|
||||
// cache markets on startup
|
||||
exchange.getMarkets().catch(console.error)
|
||||
// cache markets on startup
|
||||
exchange.getMarkets().catch(console.error)
|
||||
|
||||
const webServer = https.createServer(certOptions, app)
|
||||
webServer.listen(serverPort, () => logger.info(serverLog))
|
||||
})
|
||||
const webServer = https.createServer(certOptions, app)
|
||||
webServer.listen(serverPort, () => logger.info(serverLog))
|
||||
}
|
||||
|
||||
module.exports = { run }
|
||||
|
|
|
|||
|
|
@ -1,21 +1,18 @@
|
|||
const { asyncLocalStorage } = require('../../async-storage')
|
||||
const db = require('../../db')
|
||||
const { USER_SESSIONS_TABLE_NAME } = require('../../constants')
|
||||
const logger = require('../../logger')
|
||||
|
||||
const schemaCache = {}
|
||||
let schemaCache = Date.now()
|
||||
|
||||
const cleanUserSessions = (cleanInterval) => (req, res, next) => {
|
||||
const schema = asyncLocalStorage.getStore() ? asyncLocalStorage.getStore().get('schema') : null
|
||||
const now = Date.now()
|
||||
|
||||
if (!schema) return next()
|
||||
if (schema && schemaCache.schema + cleanInterval > now) return next()
|
||||
if (schemaCache + cleanInterval > now) return next()
|
||||
|
||||
logger.debug(`Clearing expired sessions for schema ${schema}`)
|
||||
logger.debug(`Clearing expired sessions for schema 'public'`)
|
||||
return db.none('DELETE FROM $1^ WHERE expire < to_timestamp($2 / 1000.0)', [USER_SESSIONS_TABLE_NAME, now])
|
||||
.then(() => {
|
||||
schemaCache.schema = now
|
||||
schemaCache = now
|
||||
return next()
|
||||
})
|
||||
.catch(next)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ const crypto = require('crypto')
|
|||
|
||||
const _ = require('lodash/fp')
|
||||
const db = require('./db')
|
||||
const { asyncLocalStorage } = require('./async-storage')
|
||||
const { getOperatorId } = require('./operator')
|
||||
const { getTermsConditions, setTermsConditions } = require('./new-config-manager')
|
||||
|
||||
|
|
@ -57,7 +56,7 @@ const addTermsHash = configs => {
|
|||
const notifyReload = (dbOrTx, operatorId) =>
|
||||
dbOrTx.none(
|
||||
'NOTIFY $1:name, $2',
|
||||
['reload', JSON.stringify({ schema: asyncLocalStorage.getStore().get('schema'), operatorId })]
|
||||
['reload', JSON.stringify({ operatorId })]
|
||||
)
|
||||
|
||||
function saveAccounts (accounts) {
|
||||
|
|
|
|||
|
|
@ -456,13 +456,6 @@ function plugins (settings, deviceId) {
|
|||
.catch(logger.error)
|
||||
}
|
||||
|
||||
function pong () {
|
||||
return db.none(`UPDATE server_events SET created=now() WHERE event_type=$1;
|
||||
INSERT INTO server_events (event_type) SELECT $1
|
||||
WHERE NOT EXISTS (SELECT 1 FROM server_events WHERE event_type=$1);`, ['ping'])
|
||||
.catch(logger.error)
|
||||
}
|
||||
|
||||
/*
|
||||
* Trader functions
|
||||
*/
|
||||
|
|
@ -935,7 +928,6 @@ function plugins (settings, deviceId) {
|
|||
getPhoneCode,
|
||||
getEmailCode,
|
||||
executeTrades,
|
||||
pong,
|
||||
clearOldLogs,
|
||||
notifyConfirmation,
|
||||
sweepHd,
|
||||
|
|
|
|||
132
lib/poller.js
132
lib/poller.js
|
|
@ -8,16 +8,13 @@ const T = require('./time')
|
|||
const logger = require('./logger')
|
||||
const cashOutTx = require('./cash-out/cash-out-tx')
|
||||
const cashInTx = require('./cash-in/cash-in-tx')
|
||||
const customers = require('./customers')
|
||||
const sanctionsUpdater = require('./ofac/update')
|
||||
const sanctions = require('./ofac/index')
|
||||
const coinAtmRadar = require('./coinatmradar/coinatmradar')
|
||||
const configManager = require('./new-config-manager')
|
||||
const complianceTriggers = require('./compliance-triggers')
|
||||
const { asyncLocalStorage, defaultStore } = require('./async-storage')
|
||||
const settingsLoader = require('./new-settings-loader')
|
||||
const NodeCache = require('node-cache')
|
||||
const util = require('util')
|
||||
const db = require('./db')
|
||||
const processBatches = require('./tx-batching-processing')
|
||||
|
||||
|
|
@ -26,7 +23,6 @@ const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds
|
|||
const UNNOTIFIED_INTERVAL = 10 * T.seconds
|
||||
const SWEEP_HD_INTERVAL = 5 * T.minute
|
||||
const TRADE_INTERVAL = 60 * T.seconds
|
||||
const PONG_INTERVAL = 10 * T.seconds
|
||||
const LOGS_CLEAR_INTERVAL = 1 * T.day
|
||||
const SANCTIONS_INITIAL_DOWNLOAD_INTERVAL = 5 * T.minutes
|
||||
const SANCTIONS_UPDATE_INTERVAL = 1 * T.day
|
||||
|
|
@ -56,17 +52,11 @@ const SLOW_QUEUE = new Queue({
|
|||
interval: SLOW_QUEUE_WAIT
|
||||
})
|
||||
|
||||
// Fix for asyncLocalStorage store being lost due to callback-based queue
|
||||
FAST_QUEUE.enqueue = util.promisify(FAST_QUEUE.enqueue)
|
||||
SLOW_QUEUE.enqueue = util.promisify(SLOW_QUEUE.enqueue)
|
||||
|
||||
const QUEUE = {
|
||||
FAST: FAST_QUEUE,
|
||||
SLOW: SLOW_QUEUE
|
||||
}
|
||||
|
||||
const schemaCallbacks = new Map()
|
||||
|
||||
const cachedVariables = new NodeCache({
|
||||
stdTTL: CACHE_ENTRY_TTL,
|
||||
checkperiod: CACHE_ENTRY_TTL,
|
||||
|
|
@ -78,31 +68,25 @@ cachedVariables.on('expired', (key, val) => {
|
|||
if (!val.isReloading) {
|
||||
// since val is passed by reference we don't need to do cachedVariables.set()
|
||||
val.isReloading = true
|
||||
return reload(key)
|
||||
return reload()
|
||||
}
|
||||
})
|
||||
|
||||
db.connect({ direct: true }).then(sco => {
|
||||
sco.client.on('notification', data => {
|
||||
const parsedData = JSON.parse(data.payload)
|
||||
return reload(parsedData.schema)
|
||||
sco.client.on('notification', () => {
|
||||
return reload()
|
||||
})
|
||||
return sco.none('LISTEN $1:name', 'reload')
|
||||
}).catch(console.error)
|
||||
|
||||
function reload (schema) {
|
||||
const store = defaultStore()
|
||||
store.set('schema', schema)
|
||||
// set asyncLocalStorage so settingsLoader loads settings for the right schema
|
||||
return asyncLocalStorage.run(store, () => {
|
||||
return settingsLoader.loadLatest()
|
||||
.then(settings => {
|
||||
const pi = plugins(settings)
|
||||
cachedVariables.set(schema, { settings, pi, isReloading: false })
|
||||
logger.debug(`Settings for schema '${schema}' reloaded in poller`)
|
||||
return updateAndLoadSanctions()
|
||||
})
|
||||
})
|
||||
function reload () {
|
||||
return settingsLoader.loadLatest()
|
||||
.then(settings => {
|
||||
const pi = plugins(settings)
|
||||
cachedVariables.set('public', { settings, pi, isReloading: false })
|
||||
logger.debug(`Settings for schema 'public' reloaded in poller`)
|
||||
return updateAndLoadSanctions()
|
||||
})
|
||||
}
|
||||
|
||||
function pi () { return cachedVariables.get('public').pi }
|
||||
|
|
@ -205,26 +189,12 @@ const cleanOldFailedQRScans = () => {
|
|||
})
|
||||
}
|
||||
|
||||
// function checkExternalCompliance (settings) {
|
||||
// return customers.checkExternalCompliance(settings)
|
||||
// }
|
||||
|
||||
function initializeEachSchema (schemas = ['public']) {
|
||||
// for each schema set "thread variables" and do polling
|
||||
return _.forEach(schema => {
|
||||
const store = defaultStore()
|
||||
store.set('schema', schema)
|
||||
return asyncLocalStorage.run(store, () => {
|
||||
return settingsLoader.loadLatest().then(settings => {
|
||||
// prevent inadvertedly clearing the array without clearing timeouts
|
||||
if (schemaCallbacks.has(schema)) throw new Error(`The schema "${schema}" cannot be initialized twice on poller`)
|
||||
const pi = plugins(settings)
|
||||
cachedVariables.set(schema, { settings, pi, isReloading: false })
|
||||
schemaCallbacks.set(schema, [])
|
||||
return doPolling(schema)
|
||||
})
|
||||
}).catch(console.error)
|
||||
}, schemas)
|
||||
function setup () {
|
||||
return settingsLoader.loadLatest().then(settings => {
|
||||
const pi = plugins(settings)
|
||||
cachedVariables.set('public', { settings, pi, isReloading: false })
|
||||
return doPolling()
|
||||
}).catch(console.error)
|
||||
}
|
||||
|
||||
function recursiveTimeout (func, timeout, ...vars) {
|
||||
|
|
@ -246,25 +216,12 @@ function recursiveTimeout (func, timeout, ...vars) {
|
|||
}, timeout)
|
||||
}
|
||||
|
||||
function addToQueue (func, interval, schema, queue, ...vars) {
|
||||
function addToQueue (func, interval, queue, ...vars) {
|
||||
recursiveTimeout(func, interval, ...vars)
|
||||
// return schemaCallbacks.get(schema).push(setInterval(() => {
|
||||
// return queue.enqueue().then(() => {
|
||||
// // get plugins or settings from the cache every time func is run
|
||||
// const loadVariables = vars.length > 0 && typeof vars[0] === 'function'
|
||||
// if (loadVariables) {
|
||||
// const funcVars = [...vars]
|
||||
// funcVars[0] = vars[0]()
|
||||
// return func(...funcVars)
|
||||
// }
|
||||
// return func(...vars)
|
||||
// }).catch(console.error)
|
||||
// }, interval))
|
||||
}
|
||||
|
||||
function doPolling (schema) {
|
||||
function doPolling () {
|
||||
pi().executeTrades()
|
||||
pi().pong()
|
||||
pi().clearOldLogs()
|
||||
cashOutTx.monitorLiveIncoming(settings())
|
||||
cashOutTx.monitorStaleIncoming(settings())
|
||||
|
|
@ -272,40 +229,23 @@ function doPolling (schema) {
|
|||
pi().sweepHd()
|
||||
notifier.checkNotification(pi())
|
||||
updateCoinAtmRadar()
|
||||
// checkExternalCompliance(settings())
|
||||
|
||||
addToQueue(pi().getRawRates, TICKER_RATES_INTERVAL, schema, QUEUE.FAST)
|
||||
addToQueue(pi().executeTrades, TRADE_INTERVAL, schema, QUEUE.FAST)
|
||||
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings)
|
||||
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings)
|
||||
addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings)
|
||||
addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings)
|
||||
addToQueue(processBatches, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE)
|
||||
addToQueue(pi().sweepHd, SWEEP_HD_INTERVAL, schema, QUEUE.FAST, settings)
|
||||
addToQueue(pi().pong, PONG_INTERVAL, schema, QUEUE.FAST)
|
||||
addToQueue(pi().clearOldLogs, LOGS_CLEAR_INTERVAL, schema, QUEUE.SLOW)
|
||||
addToQueue(notifier.checkNotification, CHECK_NOTIFICATION_INTERVAL, schema, QUEUE.FAST, pi)
|
||||
addToQueue(initialSanctionsDownload, SANCTIONS_INITIAL_DOWNLOAD_INTERVAL, schema, QUEUE.SLOW)
|
||||
addToQueue(updateAndLoadSanctions, SANCTIONS_UPDATE_INTERVAL, schema, QUEUE.SLOW)
|
||||
addToQueue(updateCoinAtmRadar, RADAR_UPDATE_INTERVAL, schema, QUEUE.SLOW)
|
||||
addToQueue(pi().pruneMachinesHeartbeat, PRUNE_MACHINES_HEARTBEAT, schema, QUEUE.SLOW, settings)
|
||||
addToQueue(cleanOldFailedQRScans, FAILED_SCANS_INTERVAL, schema, QUEUE.SLOW, settings)
|
||||
addToQueue(cleanOldFailedPDF417Scans, FAILED_SCANS_INTERVAL, schema, QUEUE.SLOW, settings)
|
||||
// addToQueue(checkExternalCompliance, EXTERNAL_COMPLIANCE_INTERVAL, schema, QUEUE.SLOW, settings)
|
||||
addToQueue(pi().getRawRates, TICKER_RATES_INTERVAL, QUEUE.FAST)
|
||||
addToQueue(pi().executeTrades, TRADE_INTERVAL, QUEUE.FAST)
|
||||
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, QUEUE.FAST, settings)
|
||||
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, QUEUE.FAST, settings)
|
||||
addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, QUEUE.FAST, settings)
|
||||
addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, QUEUE.FAST, settings)
|
||||
addToQueue(processBatches, UNNOTIFIED_INTERVAL, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE)
|
||||
addToQueue(pi().sweepHd, SWEEP_HD_INTERVAL, QUEUE.FAST, settings)
|
||||
addToQueue(pi().clearOldLogs, LOGS_CLEAR_INTERVAL, QUEUE.SLOW)
|
||||
addToQueue(notifier.checkNotification, CHECK_NOTIFICATION_INTERVAL, QUEUE.FAST, pi)
|
||||
addToQueue(initialSanctionsDownload, SANCTIONS_INITIAL_DOWNLOAD_INTERVAL, QUEUE.SLOW)
|
||||
addToQueue(updateAndLoadSanctions, SANCTIONS_UPDATE_INTERVAL, QUEUE.SLOW)
|
||||
addToQueue(updateCoinAtmRadar, RADAR_UPDATE_INTERVAL, QUEUE.SLOW)
|
||||
addToQueue(pi().pruneMachinesHeartbeat, PRUNE_MACHINES_HEARTBEAT, QUEUE.SLOW, settings)
|
||||
addToQueue(cleanOldFailedQRScans, FAILED_SCANS_INTERVAL, QUEUE.SLOW, settings)
|
||||
addToQueue(cleanOldFailedPDF417Scans, FAILED_SCANS_INTERVAL, QUEUE.SLOW, settings)
|
||||
}
|
||||
|
||||
function setup (schemasToAdd = [], schemasToRemove = []) {
|
||||
// clear callback array for each schema in schemasToRemove and clear cached variables
|
||||
_.forEach(schema => {
|
||||
const callbacks = schemaCallbacks.get(schema)
|
||||
_.forEach(clearInterval, callbacks)
|
||||
schemaCallbacks.delete(schema)
|
||||
cachedVariables.del(schema)
|
||||
}, schemasToRemove)
|
||||
|
||||
return initializeEachSchema(schemasToAdd)
|
||||
}
|
||||
|
||||
const getActiveSchemas = () => Array.from(schemaCallbacks.keys())
|
||||
|
||||
module.exports = { setup, reload, getActiveSchemas }
|
||||
module.exports = { setup, reload }
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ const logger = require('./logger')
|
|||
|
||||
const addRWBytes = require('./middlewares/addRWBytes')
|
||||
const authorize = require('./middlewares/authorize')
|
||||
const computeSchema = require('./middlewares/compute-schema')
|
||||
const errorHandler = require('./middlewares/errorHandler')
|
||||
const filterOldRequests = require('./middlewares/filterOldRequests')
|
||||
const findOperatorId = require('./middlewares/operatorId')
|
||||
|
|
@ -77,7 +76,6 @@ const loadRoutes = async () => {
|
|||
|
||||
app.use(findOperatorId)
|
||||
app.use(populateDeviceId)
|
||||
app.use(computeSchema)
|
||||
app.use(authorize)
|
||||
app.use(configRequiredRoutes, populateSettings)
|
||||
app.use(filterOldRequests)
|
||||
|
|
|
|||
12
migrations/1743526540370-deprecate-tables.js
Normal file
12
migrations/1743526540370-deprecate-tables.js
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
const db = require('./db')
|
||||
|
||||
exports.up = next => db.multi([
|
||||
'DROP TABLE aggregated_machine_pings;',
|
||||
'DROP TABLE cash_in_refills;',
|
||||
'DROP TABLE cash_out_refills;',
|
||||
'DROP TABLE customer_compliance_persistence;',
|
||||
'DROP TABLE compliance_overrides_persistence;',
|
||||
'DROP TABLE server_events;',
|
||||
], next)
|
||||
|
||||
exports.down = next => next()
|
||||
10
migrations/1744294267662-bills-performance.js
Normal file
10
migrations/1744294267662-bills-performance.js
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
const db = require('./db')
|
||||
|
||||
exports.up = next => db.multi([
|
||||
'ALTER TABLE bills ADD CONSTRAINT cash_in_txs_id FOREIGN KEY (cash_in_txs_id) REFERENCES cash_in_txs(id);',
|
||||
'CREATE INDEX bills_cash_in_txs_id_idx ON bills USING btree (cash_in_txs_id);',
|
||||
`CREATE INDEX bills_null_cashbox_batch_id_idx ON bills (cash_in_txs_id) WHERE cashbox_batch_id IS NULL AND destination_unit = 'cashbox';`,
|
||||
'CREATE INDEX cash_in_txs_device_id_idx ON cash_in_txs USING btree (device_id);'
|
||||
], next)
|
||||
|
||||
exports.down = next => next()
|
||||
11
migrations/1744294267663-blacklist-normalization.js
Normal file
11
migrations/1744294267663-blacklist-normalization.js
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
const db = require('./db')
|
||||
|
||||
exports.up = next => db.multi([
|
||||
'ALTER TABLE public.blacklist DROP CONSTRAINT IF EXISTS blacklist_pkey;',
|
||||
'ALTER TABLE public.blacklist ADD PRIMARY KEY (address);',
|
||||
'DROP INDEX IF EXISTS blacklist_temp_address_key;',
|
||||
'CREATE UNIQUE INDEX blacklist_address_idx ON public.blacklist USING btree (address);',
|
||||
|
||||
], next)
|
||||
|
||||
exports.down = next => next()
|
||||
Loading…
Add table
Add a link
Reference in a new issue