From 6b7d14de2bfef325cdd26f0214a62651aa2eefc9 Mon Sep 17 00:00:00 2001 From: Taranto Date: Thu, 6 Jul 2023 16:56:17 +0100 Subject: [PATCH] fix: concurrency issues caused by poller --- lib/cash-out/cash-out-atomic.js | 6 ++++ lib/cash-out/cash-out-low.js | 5 +++ lib/middlewares/populateSettings.js | 2 +- lib/plugins.js | 8 ----- lib/poller.js | 49 ++++++++++++++++++++--------- package-lock.json | 2 +- package.json | 2 +- 7 files changed, 48 insertions(+), 26 deletions(-) diff --git a/lib/cash-out/cash-out-atomic.js b/lib/cash-out/cash-out-atomic.js index c82f6f65..8fbaa0f3 100644 --- a/lib/cash-out/cash-out-atomic.js +++ b/lib/cash-out/cash-out-atomic.js @@ -27,6 +27,12 @@ function atomic (tx, pi, fromClient) { const isStale = fromClient && oldTx && (oldTx.txVersion >= tx.txVersion) if (isStale) throw new E.StaleTxError({ txId: tx.id }) + const isStaleFromServer = !fromClient && oldTx && (oldTx.txVersion > tx.txVersion) + if (isStaleFromServer) { + logger.info('trying to update older version of tx', JSON.stringify(tx)) + return Promise.resolve() + } + return preProcess(t, oldTx, tx, pi) .then(preProcessedTx => cashOutLow.upsert(t, oldTx, preProcessedTx)) }) diff --git a/lib/cash-out/cash-out-low.js b/lib/cash-out/cash-out-low.js index 7ac5421c..7431334e 100644 --- a/lib/cash-out/cash-out-low.js +++ b/lib/cash-out/cash-out-low.js @@ -59,6 +59,11 @@ function diff (oldTx, newTx) { return updatedTx['customerId'] = newTx['customerId'] } return + case 'dispense': + if (!oldTx.dispense) { + return updatedTx[fieldKey] = newTx[fieldKey] + } + return default: return updatedTx[fieldKey] = newTx[fieldKey] } diff --git a/lib/middlewares/populateSettings.js b/lib/middlewares/populateSettings.js index 5f36fa69..07feb0bf 100644 --- a/lib/middlewares/populateSettings.js +++ b/lib/middlewares/populateSettings.js @@ -75,7 +75,7 @@ const populateSettings = function (req, res, next) { .catch(next) } - logger.debug('Fetching and caching a specific config version') + logger.debug('Fetching a cached specific config version') req.settings = cachedVersionedSettings return next() } diff --git a/lib/plugins.js b/lib/plugins.js index fd471836..8fd79dbe 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -342,13 +342,6 @@ function plugins (settings, deviceId) { return wallet.newAddress(settings, info, tx) } - function dispenseAck (tx) { - const cashOutConfig = configManager.getCashOut(deviceId, settings.config) - const cassettes = [cashOutConfig.cassette1, cashOutConfig.cassette2, cashOutConfig.cassette3, cashOutConfig.cassette4] - - return dbm.addDispense(deviceId, tx, cassettes) - } - function fiatBalance (fiatCode, cryptoCode) { const commissions = configManager.getCommissions(cryptoCode, deviceId, settings.config) return Promise.all([ @@ -864,7 +857,6 @@ function plugins (settings, deviceId) { isHd, isZeroConf, getStatus, - dispenseAck, getPhoneCode, executeTrades, pong, diff --git a/lib/poller.js b/lib/poller.js index 10e8343c..ca002612 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -16,7 +16,6 @@ const settingsLoader = require('./new-settings-loader') const NodeCache = require('node-cache') const util = require('util') const db = require('./db') -const state = require('./middlewares/state') const processBatches = require('./tx-batching-processing') const INCOMING_TX_INTERVAL = 30 * T.seconds @@ -99,8 +98,8 @@ function reload (schema) { }) } -function pi () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).pi } -function settings () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).settings } +function pi () { return cachedVariables.get('public').pi } +function settings () { return cachedVariables.get('public').settings } function initialSanctionsDownload () { const structs = sanctions.getStructs() @@ -146,19 +145,39 @@ function initializeEachSchema (schemas = ['public']) { }, schemas) } +function recursiveTimeout (func, timeout, ...vars) { + setTimeout(() => { + let promise = null + + const loadVariables = vars.length > 0 && typeof vars[0] === 'function' + if (loadVariables) { + const funcVars = [...vars] + funcVars[0] = vars[0]() + promise = func(...funcVars) + } else { + promise = func(...vars) + } + + promise.finally(() => { + recursiveTimeout(func, timeout, ...vars) + }) + }, timeout) +} + function addToQueue (func, interval, schema, queue, ...vars) { - return schemaCallbacks.get(schema).push(setInterval(() => { - return queue.enqueue().then(() => { - // get plugins or settings from the cache every time func is run - const loadVariables = vars.length > 0 && typeof vars[0] === 'function' - if (loadVariables) { - const funcVars = [...vars] - funcVars[0] = vars[0]() - return func(...funcVars) - } - return func(...vars) - }).catch(console.error) - }, interval)) + recursiveTimeout(func, interval, ...vars) + // return schemaCallbacks.get(schema).push(setInterval(() => { + // return queue.enqueue().then(() => { + // // get plugins or settings from the cache every time func is run + // const loadVariables = vars.length > 0 && typeof vars[0] === 'function' + // if (loadVariables) { + // const funcVars = [...vars] + // funcVars[0] = vars[0]() + // return func(...funcVars) + // } + // return func(...vars) + // }).catch(console.error) + // }, interval)) } function doPolling (schema) { diff --git a/package-lock.json b/package-lock.json index 0bc65cfc..2cc1656f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "lamassu-server", - "version": "8.1.2", + "version": "8.1.3", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index e32c6c55..6ddb1eac 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "lamassu-server", "description": "bitcoin atm client server protocol module", "keywords": [], - "version": "8.1.2", + "version": "8.1.3", "license": "Unlicense", "author": "Lamassu (https://lamassu.is)", "dependencies": {