Merge pull request #1877 from siiky/feat/lam-1291/stress-testing

LAM-1291 feat: stress testing scripts and some performance improvements
This commit is contained in:
Rafael Taranto 2025-06-06 09:09:50 +01:00 committed by GitHub
commit 22938ab594
166 changed files with 1207 additions and 95813 deletions

View file

@ -47,9 +47,13 @@ function updateCore(coinRec, isCurrentlyRunning) {
`grep "i-am-aware-zcashd-will-be-replaced-by-zebrad-and-zallet-in-2025=" /mnt/blockchains/zcash/zcash.conf || true`,
)
) {
common.logger.info(`i-am-aware-zcashd-will-be-replaced-by-zebrad-and-zallet-in-2025 already defined, skipping...`)
common.logger.info(
`i-am-aware-zcashd-will-be-replaced-by-zebrad-and-zallet-in-2025 already defined, skipping...`,
)
} else {
common.logger.info(`Setting 'i-am-aware-zcashd-will-be-replaced-by-zebrad-and-zallet-in-2025=1' in config file...`)
common.logger.info(
`Setting 'i-am-aware-zcashd-will-be-replaced-by-zebrad-and-zallet-in-2025=1' in config file...`,
)
common.es(
`echo "\ni-am-aware-zcashd-will-be-replaced-by-zebrad-and-zallet-in-2025=1" >> /mnt/blockchains/zcash/zcash.conf`,
)

View file

@ -173,6 +173,13 @@ function getMachineName(machineId) {
return db.oneOrNone(sql, [machineId]).then(it => it?.name)
}
const getPairedMachineName = deviceId =>
db.oneOrNone(
'SELECT name FROM devices WHERE device_id = $1 AND paired = TRUE',
[deviceId],
machine => machine?.name,
)
function getMachine(machineId, config) {
const sql = `${MACHINE_WITH_CALCULATED_FIELD_SQL} WHERE d.device_id = $1`
@ -702,8 +709,55 @@ function updatePhotos(dir, photoPairs) {
)
}
let pendingRecordPings = new Map()
const enqueueRecordPing = ping => {
pendingRecordPings.set(ping.deviceId, ping)
}
// from @sindresorhus/is-empty-iterable
const isEmptyIterable = iter => {
for (const _ of iter) return false
return true
}
const batchRecordPendingPings = () => {
let pings = pendingRecordPings.values()
pendingRecordPings = new Map()
if (isEmptyIterable(pings)) return Promise.resolve()
const prepareChunk = (t, chunk) =>
chunk
.flatMap(({ deviceId, last_online, version, model }) => [
t.none(
`INSERT INTO machine_pings (device_id, device_time)
VALUES ($1, $2)
ON CONFLICT (device_id) DO
UPDATE SET device_time = $2,
updated = now()`,
[deviceId, last_online],
),
t.none(
pgp.helpers.update({ last_online, version, model }, null, 'devices') +
'WHERE device_id = ${deviceId}',
{ deviceId },
),
])
.toArray()
const MaxBatchSize = 500
return db
.task(async t => {
while (!isEmptyIterable(pings)) {
const chunk = pings.take(MaxBatchSize)
await t.batch(prepareChunk(t, chunk)).catch(err => logger.error(err))
}
})
.catch(err => logger.error(err))
}
module.exports = {
getMachineName,
getPairedMachineName,
getMachines,
getUnpairedMachines,
getMachine,
@ -719,4 +773,6 @@ module.exports = {
updateDiagnostics,
updateFailedQRScans,
batchDiagnostics,
enqueueRecordPing,
batchRecordPendingPings,
}

View file

@ -1,9 +1,8 @@
const pairing = require('../pairing')
const { getPairedMachineName } = require('../machine-loader')
const logger = require('../logger')
const authorize = function (req, res, next) {
return pairing
.isPaired(req.deviceId)
return getPairedMachineName(req.deviceId)
.then(deviceName => {
if (deviceName) {
req.deviceName = deviceName

View file

@ -1,5 +1,7 @@
const crypto = require('crypto')
//const IS_STRESS_TESTING = process.env.LAMASSU_STRESS_TESTING === 'YES'
function sha256(buf) {
if (!buf) return null
const hash = crypto.createHash('sha256')
@ -12,7 +14,9 @@ const populateDeviceId = function (req, res, next) {
const peerCert = req.socket.getPeerCertificate
? req.socket.getPeerCertificate()
: null
const deviceId = peerCert?.raw ? sha256(peerCert.raw) : null
let deviceId = peerCert?.raw ? sha256(peerCert.raw) : null
//if (!deviceId && IS_STRESS_TESTING) deviceId = req.headers.device_id
if (!deviceId)
return res.status(500).json({ error: 'Unable to find certificate' })

View file

@ -78,9 +78,6 @@ const populateSettings = function (req, res, next) {
const { needsSettingsReload, settingsCache } = state
const operatorId = res.locals.operatorId
const versionId = req.headers['config-version']
if (versionId !== state.oldVersionId) {
state.oldVersionId = versionId
}
try {
// Priority of configs to retrieve
@ -113,7 +110,7 @@ const populateSettings = function (req, res, next) {
const operatorSettings = settingsCache.get(`${operatorId}-latest`)
if (!!needsSettingsReload[operatorId] || !operatorSettings) {
if (needsSettingsReload[operatorId] || !operatorSettings) {
needsSettingsReload[operatorId]
? logger.debug(
'Fetching and caching a new latest config value, as a reload was requested',
@ -128,8 +125,7 @@ const populateSettings = function (req, res, next) {
const versionId = settings.version
settingsCache.set(`${operatorId}-latest`, settings)
settingsCache.set(`${operatorId}-v${versionId}`, settings)
if (needsSettingsReload[operatorId])
delete needsSettingsReload[operatorId]
delete needsSettingsReload[operatorId]
req.settings = settings
})
.then(() => next())

View file

@ -1,7 +1,13 @@
const plugins = require('../plugins')
const { enqueueRecordPing } = require('../machine-loader')
module.exports = (req, res, next) =>
plugins(req.settings, req.deviceId)
.recordPing(req.deviceTime, req.query.version, req.query.model)
.then(() => next())
.catch(() => next())
const record = (req, res, next) => {
enqueueRecordPing({
deviceId: req.deviceId,
last_online: req.deviceTime,
model: req.query.model,
version: req.query.version,
})
next()
}
module.exports = record

View file

@ -3,7 +3,6 @@ const SETTINGS_CACHE_REFRESH = 3600
module.exports = (function () {
return {
oldVersionId: 'unset',
needsSettingsReload: {},
settingsCache: new NodeCache({
stdTTL: SETTINGS_CACHE_REFRESH,

View file

@ -91,7 +91,7 @@ function saveAccounts(accounts) {
)
}
function loadAccounts(schemaVersion) {
function _loadAccounts(db, schemaVersion) {
const sql = `SELECT data
FROM user_config
WHERE type = $1
@ -100,14 +100,15 @@ function loadAccounts(schemaVersion) {
ORDER BY id DESC
LIMIT 1`
return db
.oneOrNone(sql, [
'accounts',
schemaVersion || NEW_SETTINGS_LOADER_SCHEMA_VERSION,
])
.then(_.compose(_.defaultTo({}), _.get('data.accounts')))
return db.oneOrNone(
sql,
['accounts', schemaVersion || NEW_SETTINGS_LOADER_SCHEMA_VERSION],
row => row?.data?.accounts ?? {},
)
}
const loadAccounts = schemaVersion => _loadAccounts(db, schemaVersion)
function hideSecretFields(accounts) {
return _.flow(
_.filter(path => !_.isEmpty(_.get(path, accounts))),
@ -167,16 +168,19 @@ function migrationSaveConfig(config) {
})
}
function loadLatest(schemaVersion) {
return Promise.all([
loadLatestConfigOrNoneReturningVersion(schemaVersion),
loadAccounts(schemaVersion),
]).then(([configObj, accounts]) => ({
config: configObj.config,
accounts,
version: configObj.version,
}))
}
const loadLatest = schemaVersion =>
db
.task(t =>
t.batch([
loadLatestConfigOrNoneReturningVersion(t, schemaVersion),
_loadAccounts(t, schemaVersion),
]),
)
.then(([configObj, accounts]) => ({
config: configObj.config,
accounts,
version: configObj.version,
}))
function loadLatestConfig() {
const sql = `SELECT data
@ -195,7 +199,7 @@ function loadLatestConfig() {
})
}
function loadLatestConfigOrNoneReturningVersion(schemaVersion) {
function loadLatestConfigOrNoneReturningVersion(db, schemaVersion) {
const sql = `SELECT data, id
FROM user_config
WHERE type = 'config'
@ -222,7 +226,7 @@ function loadLatestConfigOrNone(schemaVersion) {
.then(row => (row ? row.data.config : {}))
}
function loadConfig(versionId) {
function loadConfig(db, versionId) {
const sql = `SELECT data
FROM user_config
WHERE id = $1
@ -231,8 +235,11 @@ function loadConfig(versionId) {
AND valid`
return db
.one(sql, [versionId, NEW_SETTINGS_LOADER_SCHEMA_VERSION])
.then(row => row.data.config)
.one(
sql,
[versionId, NEW_SETTINGS_LOADER_SCHEMA_VERSION],
({ data: { config } }) => config,
)
.catch(err => {
if (err.name === 'QueryResultError') {
throw new Error('No such config version: ' + versionId)
@ -245,12 +252,14 @@ function loadConfig(versionId) {
function load(versionId) {
if (!versionId) Promise.reject('versionId is required')
return Promise.all([loadConfig(versionId), loadAccounts()]).then(
([config, accounts]) => ({
config,
accounts,
}),
)
return db.task(t => {
t.batch([loadConfig(t, versionId), _loadAccounts(t)]).then(
([config, accounts]) => ({
config,
accounts,
}),
)
})
}
const fetchCurrentConfigVersion = () => {

View file

@ -1,9 +1,8 @@
const db = require('./db')
const _ = require('lodash/fp')
function getOperatorId(service) {
const sql = `SELECT operator_id FROM operator_ids WHERE service = '${service}'`
return db.oneOrNone(sql).then(_.get('operator_id'))
const sql = 'SELECT operator_id FROM operator_ids WHERE service = ${service}'
return db.oneOrNone(sql, { service }, ({ operator_id }) => operator_id)
}
module.exports = { getOperatorId }

View file

@ -81,13 +81,4 @@ function authorizeCaDownload(caToken) {
})
}
function isPaired(deviceId) {
const sql =
'select device_id, name from devices where device_id=$1 and paired=TRUE'
return db
.oneOrNone(sql, [deviceId])
.then(row => (row && row.device_id === deviceId ? row.name : false))
}
module.exports = { pair, unpair, authorizeCaDownload, isPaired }
module.exports = { pair, unpair, authorizeCaDownload }

View file

@ -409,31 +409,6 @@ function plugins(settings, deviceId) {
})
}
function recordPing(deviceTime, version, model) {
const devices = {
version,
model,
last_online: deviceTime,
}
return Promise.all([
db.none(
`insert into machine_pings(device_id, device_time)
values ($1, $2)
ON CONFLICT (device_id) DO UPDATE SET device_time = $2,
updated = now()`,
[deviceId, deviceTime],
),
db.none(
pgp.helpers.update(devices, null, 'devices') +
'WHERE device_id = ${deviceId}',
{
deviceId,
},
),
])
}
function pruneMachinesHeartbeat() {
const sql = `DELETE
FROM machine_network_heartbeat h
@ -1062,7 +1037,6 @@ function plugins(settings, deviceId) {
return {
getRates,
recordPing,
buildRates,
getRawRates,
buildRatesNoCommission,

View file

@ -14,6 +14,7 @@ const coinAtmRadar = require('./coinatmradar/coinatmradar')
const configManager = require('./new-config-manager')
const complianceTriggers = require('./compliance-triggers')
const settingsLoader = require('./new-settings-loader')
const machineLoader = require('./machine-loader')
const NodeCache = require('node-cache')
const db = require('./db')
const processBatches = require('./tx-batching-processing')
@ -31,6 +32,7 @@ const PRUNE_MACHINES_HEARTBEAT = 1 * T.day
const TRANSACTION_BATCH_LIFECYCLE = 20 * T.minutes
const TICKER_RATES_INTERVAL = 59 * T.seconds
const FAILED_SCANS_INTERVAL = 1 * T.day
const PENDING_PINGS_INTERVAL = 90 * T.seconds // lib/notifier/codes.js
const CHECK_NOTIFICATION_INTERVAL = 20 * T.seconds
const PENDING_INTERVAL = 10 * T.seconds
@ -309,6 +311,11 @@ function doPolling() {
QUEUE.SLOW,
settings,
)
addToQueue(
machineLoader.batchRecordPendingPings,
PENDING_PINGS_INTERVAL,
QUEUE.SLOW,
)
}
module.exports = { setup, reload }

View file

@ -78,11 +78,11 @@ const loadRoutes = async () => {
// app /pair and /ca routes
app.use('/', pairingRoutes)
app.use(findOperatorId)
app.use(populateDeviceId)
app.use(authorize)
app.use(configRequiredRoutes, populateSettings)
app.use(filterOldRequests)
app.use(findOperatorId)
app.use(configRequiredRoutes, populateSettings)
// other app routes
app.use('/graphql', recordPing)

View file

@ -27,7 +27,7 @@ function getLastSeen(req, res, next) {
function updateLogs(req, res, next) {
return logs
.update(req.deviceId, req.body.logs)
.then(status => res.json({ success: status }))
.then(success => res.json({ success }))
.catch(next)
}