refactor: replace batch() with async/await

This commit is contained in:
siiky 2025-06-16 17:34:28 +01:00
parent af8a8d395c
commit 8cf68e480a

View file

@ -721,38 +721,31 @@ const isEmptyIterable = iter => {
} }
const batchRecordPendingPings = () => { const batchRecordPendingPings = () => {
let pings = pendingRecordPings.values() const pings = pendingRecordPings.values()
pendingRecordPings = new Map() pendingRecordPings = new Map()
if (isEmptyIterable(pings)) return Promise.resolve() if (isEmptyIterable(pings)) return Promise.resolve()
const prepareChunk = (t, chunk) => return db.task(async t => {
chunk for (const { deviceId, last_online, version, model } of pings) {
.flatMap(({ deviceId, last_online, version, model }) => [ await t
t.none( .none(
`INSERT INTO machine_pings (device_id, device_time) `INSERT INTO machine_pings (device_id, device_time)
VALUES ($1, $2) VALUES ($1, $2)
ON CONFLICT (device_id) DO ON CONFLICT (device_id) DO
UPDATE SET device_time = $2, UPDATE SET device_time = $2,
updated = now()`, updated = now()`,
[deviceId, last_online], [deviceId, last_online],
), )
t.none( .catch(err => logger.error(err))
await t
.none(
pgp.helpers.update({ last_online, version, model }, null, 'devices') + pgp.helpers.update({ last_online, version, model }, null, 'devices') +
'WHERE device_id = ${deviceId}', 'WHERE device_id = ${deviceId}',
{ deviceId }, { deviceId },
), )
]) .catch(err => logger.error(err))
.toArray()
const MaxBatchSize = 500
return db
.task(async t => {
while (!isEmptyIterable(pings)) {
const chunk = pings.take(MaxBatchSize)
await t.batch(prepareChunk(t, chunk)).catch(err => logger.error(err))
} }
}) })
.catch(err => logger.error(err))
} }
module.exports = { module.exports = {