From 1b261504994c9fb0657678085abbd59e2b990226 Mon Sep 17 00:00:00 2001 From: siiky Date: Tue, 3 Jun 2025 09:44:42 +0100 Subject: [PATCH] refactor: batch record pings --- packages/server/lib/machine-loader.js | 48 +++++++++++++++++++ packages/server/lib/middlewares/recordPing.js | 26 +++------- packages/server/lib/plugins.js | 26 ---------- packages/server/lib/poller.js | 7 +++ 4 files changed, 61 insertions(+), 46 deletions(-) diff --git a/packages/server/lib/machine-loader.js b/packages/server/lib/machine-loader.js index f5fb29dc..b3d3c43a 100644 --- a/packages/server/lib/machine-loader.js +++ b/packages/server/lib/machine-loader.js @@ -702,6 +702,52 @@ function updatePhotos(dir, photoPairs) { ) } +let pendingRecordPings = new Map() +const enqueueRecordPing = ping => { + pendingRecordPings.set(ping.deviceId, ping) +} + +// from @sindresorhus/is-empty-iterable +const isEmptyIterable = iter => { + for (const _ of iter) return false + return true +} + +const batchRecordPendingPings = () => { + let pings = pendingRecordPings.values() + pendingRecordPings = new Map() + if (isEmptyIterable(pings)) return Promise.resolve() + + const prepareChunk = (t, chunk) => + chunk + .flatMap(({ deviceId, last_online, version, model }) => [ + t.none( + `INSERT INTO machine_pings (device_id, device_time) + VALUES ($1, $2) + ON CONFLICT (device_id) DO + UPDATE SET device_time = $2, + updated = now()`, + [deviceId, last_online], + ), + t.none( + pgp.helpers.update({ last_online, version, model }, null, 'devices') + + 'WHERE device_id = ${deviceId}', + { deviceId }, + ), + ]) + .toArray() + + const MaxBatchSize = 500 + return db + .task(async t => { + while (!isEmptyIterable(pings)) { + const chunk = pings.take(MaxBatchSize) + await t.batch(prepareChunk(t, chunk)).catch(err => logger.error(err)) + } + }) + .catch(err => logger.error(err)) +} + module.exports = { getMachineName, getMachines, @@ -719,4 +765,6 @@ module.exports = { updateDiagnostics, updateFailedQRScans, batchDiagnostics, + enqueueRecordPing, + batchRecordPendingPings, } diff --git a/packages/server/lib/middlewares/recordPing.js b/packages/server/lib/middlewares/recordPing.js index b81358bc..14d21b57 100644 --- a/packages/server/lib/middlewares/recordPing.js +++ b/packages/server/lib/middlewares/recordPing.js @@ -1,27 +1,13 @@ -const mem = require('mem') +const { enqueueRecordPing } = require('../machine-loader') -const logger = require('../logger') -const plugins = require('../plugins') -const T = require('../time') - -const record = mem( - ({ deviceId, deviceTime, model, version, settings }) => - plugins(settings, deviceId) - .recordPing(deviceTime, version, model) - .catch(logger.error), - { - cacheKey: ({ deviceId }) => deviceId, - maxAge: (3 / 2) * T.minute, // lib/notifier/codes.js - }, -) - -module.exports = (req, res, next) => { - record({ +const record = (req, res, next) => { + enqueueRecordPing({ deviceId: req.deviceId, - deviceTime: req.deviceTime, + last_online: req.deviceTime, model: req.query.model, version: req.query.version, - settings: req.settings, }) next() } + +module.exports = record diff --git a/packages/server/lib/plugins.js b/packages/server/lib/plugins.js index f92fce24..29af2440 100644 --- a/packages/server/lib/plugins.js +++ b/packages/server/lib/plugins.js @@ -409,31 +409,6 @@ function plugins(settings, deviceId) { }) } - function recordPing(deviceTime, version, model) { - const devices = { - version, - model, - last_online: deviceTime, - } - - return Promise.all([ - db.none( - `insert into machine_pings(device_id, device_time) - values ($1, $2) - ON CONFLICT (device_id) DO UPDATE SET device_time = $2, - updated = now()`, - [deviceId, deviceTime], - ), - db.none( - pgp.helpers.update(devices, null, 'devices') + - 'WHERE device_id = ${deviceId}', - { - deviceId, - }, - ), - ]) - } - function pruneMachinesHeartbeat() { const sql = `DELETE FROM machine_network_heartbeat h @@ -1062,7 +1037,6 @@ function plugins(settings, deviceId) { return { getRates, - recordPing, buildRates, getRawRates, buildRatesNoCommission, diff --git a/packages/server/lib/poller.js b/packages/server/lib/poller.js index 227707a2..4c947c59 100644 --- a/packages/server/lib/poller.js +++ b/packages/server/lib/poller.js @@ -14,6 +14,7 @@ const coinAtmRadar = require('./coinatmradar/coinatmradar') const configManager = require('./new-config-manager') const complianceTriggers = require('./compliance-triggers') const settingsLoader = require('./new-settings-loader') +const machineLoader = require('./machine-loader') const NodeCache = require('node-cache') const db = require('./db') const processBatches = require('./tx-batching-processing') @@ -31,6 +32,7 @@ const PRUNE_MACHINES_HEARTBEAT = 1 * T.day const TRANSACTION_BATCH_LIFECYCLE = 20 * T.minutes const TICKER_RATES_INTERVAL = 59 * T.seconds const FAILED_SCANS_INTERVAL = 1 * T.day +const PENDING_PINGS_INTERVAL = 90 * T.seconds // lib/notifier/codes.js const CHECK_NOTIFICATION_INTERVAL = 20 * T.seconds const PENDING_INTERVAL = 10 * T.seconds @@ -309,6 +311,11 @@ function doPolling() { QUEUE.SLOW, settings, ) + addToQueue( + machineLoader.batchRecordPendingPings, + PENDING_PINGS_INTERVAL, + QUEUE.SLOW, + ) } module.exports = { setup, reload }