refactor: batch record pings

This commit is contained in:
siiky 2025-06-03 09:44:42 +01:00
parent be06ea5097
commit 1b26150499
4 changed files with 61 additions and 46 deletions

View file

@ -702,6 +702,52 @@ function updatePhotos(dir, photoPairs) {
) )
} }
let pendingRecordPings = new Map()
const enqueueRecordPing = ping => {
pendingRecordPings.set(ping.deviceId, ping)
}
// from @sindresorhus/is-empty-iterable
const isEmptyIterable = iter => {
for (const _ of iter) return false
return true
}
const batchRecordPendingPings = () => {
let pings = pendingRecordPings.values()
pendingRecordPings = new Map()
if (isEmptyIterable(pings)) return Promise.resolve()
const prepareChunk = (t, chunk) =>
chunk
.flatMap(({ deviceId, last_online, version, model }) => [
t.none(
`INSERT INTO machine_pings (device_id, device_time)
VALUES ($1, $2)
ON CONFLICT (device_id) DO
UPDATE SET device_time = $2,
updated = now()`,
[deviceId, last_online],
),
t.none(
pgp.helpers.update({ last_online, version, model }, null, 'devices') +
'WHERE device_id = ${deviceId}',
{ deviceId },
),
])
.toArray()
const MaxBatchSize = 500
return db
.task(async t => {
while (!isEmptyIterable(pings)) {
const chunk = pings.take(MaxBatchSize)
await t.batch(prepareChunk(t, chunk)).catch(err => logger.error(err))
}
})
.catch(err => logger.error(err))
}
module.exports = { module.exports = {
getMachineName, getMachineName,
getMachines, getMachines,
@ -719,4 +765,6 @@ module.exports = {
updateDiagnostics, updateDiagnostics,
updateFailedQRScans, updateFailedQRScans,
batchDiagnostics, batchDiagnostics,
enqueueRecordPing,
batchRecordPendingPings,
} }

View file

@ -1,27 +1,13 @@
const mem = require('mem') const { enqueueRecordPing } = require('../machine-loader')
const logger = require('../logger') const record = (req, res, next) => {
const plugins = require('../plugins') enqueueRecordPing({
const T = require('../time')
const record = mem(
({ deviceId, deviceTime, model, version, settings }) =>
plugins(settings, deviceId)
.recordPing(deviceTime, version, model)
.catch(logger.error),
{
cacheKey: ({ deviceId }) => deviceId,
maxAge: (3 / 2) * T.minute, // lib/notifier/codes.js
},
)
module.exports = (req, res, next) => {
record({
deviceId: req.deviceId, deviceId: req.deviceId,
deviceTime: req.deviceTime, last_online: req.deviceTime,
model: req.query.model, model: req.query.model,
version: req.query.version, version: req.query.version,
settings: req.settings,
}) })
next() next()
} }
module.exports = record

View file

@ -409,31 +409,6 @@ function plugins(settings, deviceId) {
}) })
} }
function recordPing(deviceTime, version, model) {
const devices = {
version,
model,
last_online: deviceTime,
}
return Promise.all([
db.none(
`insert into machine_pings(device_id, device_time)
values ($1, $2)
ON CONFLICT (device_id) DO UPDATE SET device_time = $2,
updated = now()`,
[deviceId, deviceTime],
),
db.none(
pgp.helpers.update(devices, null, 'devices') +
'WHERE device_id = ${deviceId}',
{
deviceId,
},
),
])
}
function pruneMachinesHeartbeat() { function pruneMachinesHeartbeat() {
const sql = `DELETE const sql = `DELETE
FROM machine_network_heartbeat h FROM machine_network_heartbeat h
@ -1062,7 +1037,6 @@ function plugins(settings, deviceId) {
return { return {
getRates, getRates,
recordPing,
buildRates, buildRates,
getRawRates, getRawRates,
buildRatesNoCommission, buildRatesNoCommission,

View file

@ -14,6 +14,7 @@ const coinAtmRadar = require('./coinatmradar/coinatmradar')
const configManager = require('./new-config-manager') const configManager = require('./new-config-manager')
const complianceTriggers = require('./compliance-triggers') const complianceTriggers = require('./compliance-triggers')
const settingsLoader = require('./new-settings-loader') const settingsLoader = require('./new-settings-loader')
const machineLoader = require('./machine-loader')
const NodeCache = require('node-cache') const NodeCache = require('node-cache')
const db = require('./db') const db = require('./db')
const processBatches = require('./tx-batching-processing') const processBatches = require('./tx-batching-processing')
@ -31,6 +32,7 @@ const PRUNE_MACHINES_HEARTBEAT = 1 * T.day
const TRANSACTION_BATCH_LIFECYCLE = 20 * T.minutes const TRANSACTION_BATCH_LIFECYCLE = 20 * T.minutes
const TICKER_RATES_INTERVAL = 59 * T.seconds const TICKER_RATES_INTERVAL = 59 * T.seconds
const FAILED_SCANS_INTERVAL = 1 * T.day const FAILED_SCANS_INTERVAL = 1 * T.day
const PENDING_PINGS_INTERVAL = 90 * T.seconds // lib/notifier/codes.js
const CHECK_NOTIFICATION_INTERVAL = 20 * T.seconds const CHECK_NOTIFICATION_INTERVAL = 20 * T.seconds
const PENDING_INTERVAL = 10 * T.seconds const PENDING_INTERVAL = 10 * T.seconds
@ -309,6 +311,11 @@ function doPolling() {
QUEUE.SLOW, QUEUE.SLOW,
settings, settings,
) )
addToQueue(
machineLoader.batchRecordPendingPings,
PENDING_PINGS_INTERVAL,
QUEUE.SLOW,
)
} }
module.exports = { setup, reload } module.exports = { setup, reload }