fix: concurrency issues caused by poller

This commit is contained in:
Taranto 2023-07-06 16:56:17 +01:00
parent 7307466df4
commit 6b7d14de2b
7 changed files with 48 additions and 26 deletions

View file

@ -27,6 +27,12 @@ function atomic (tx, pi, fromClient) {
const isStale = fromClient && oldTx && (oldTx.txVersion >= tx.txVersion) const isStale = fromClient && oldTx && (oldTx.txVersion >= tx.txVersion)
if (isStale) throw new E.StaleTxError({ txId: tx.id }) if (isStale) throw new E.StaleTxError({ txId: tx.id })
const isStaleFromServer = !fromClient && oldTx && (oldTx.txVersion > tx.txVersion)
if (isStaleFromServer) {
logger.info('trying to update older version of tx', JSON.stringify(tx))
return Promise.resolve()
}
return preProcess(t, oldTx, tx, pi) return preProcess(t, oldTx, tx, pi)
.then(preProcessedTx => cashOutLow.upsert(t, oldTx, preProcessedTx)) .then(preProcessedTx => cashOutLow.upsert(t, oldTx, preProcessedTx))
}) })

View file

@ -59,6 +59,11 @@ function diff (oldTx, newTx) {
return updatedTx['customerId'] = newTx['customerId'] return updatedTx['customerId'] = newTx['customerId']
} }
return return
case 'dispense':
if (!oldTx.dispense) {
return updatedTx[fieldKey] = newTx[fieldKey]
}
return
default: default:
return updatedTx[fieldKey] = newTx[fieldKey] return updatedTx[fieldKey] = newTx[fieldKey]
} }

View file

@ -75,7 +75,7 @@ const populateSettings = function (req, res, next) {
.catch(next) .catch(next)
} }
logger.debug('Fetching and caching a specific config version') logger.debug('Fetching a cached specific config version')
req.settings = cachedVersionedSettings req.settings = cachedVersionedSettings
return next() return next()
} }

View file

@ -342,13 +342,6 @@ function plugins (settings, deviceId) {
return wallet.newAddress(settings, info, tx) return wallet.newAddress(settings, info, tx)
} }
function dispenseAck (tx) {
const cashOutConfig = configManager.getCashOut(deviceId, settings.config)
const cassettes = [cashOutConfig.cassette1, cashOutConfig.cassette2, cashOutConfig.cassette3, cashOutConfig.cassette4]
return dbm.addDispense(deviceId, tx, cassettes)
}
function fiatBalance (fiatCode, cryptoCode) { function fiatBalance (fiatCode, cryptoCode) {
const commissions = configManager.getCommissions(cryptoCode, deviceId, settings.config) const commissions = configManager.getCommissions(cryptoCode, deviceId, settings.config)
return Promise.all([ return Promise.all([
@ -864,7 +857,6 @@ function plugins (settings, deviceId) {
isHd, isHd,
isZeroConf, isZeroConf,
getStatus, getStatus,
dispenseAck,
getPhoneCode, getPhoneCode,
executeTrades, executeTrades,
pong, pong,

View file

@ -16,7 +16,6 @@ const settingsLoader = require('./new-settings-loader')
const NodeCache = require('node-cache') const NodeCache = require('node-cache')
const util = require('util') const util = require('util')
const db = require('./db') const db = require('./db')
const state = require('./middlewares/state')
const processBatches = require('./tx-batching-processing') const processBatches = require('./tx-batching-processing')
const INCOMING_TX_INTERVAL = 30 * T.seconds const INCOMING_TX_INTERVAL = 30 * T.seconds
@ -99,8 +98,8 @@ function reload (schema) {
}) })
} }
function pi () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).pi } function pi () { return cachedVariables.get('public').pi }
function settings () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).settings } function settings () { return cachedVariables.get('public').settings }
function initialSanctionsDownload () { function initialSanctionsDownload () {
const structs = sanctions.getStructs() const structs = sanctions.getStructs()
@ -146,19 +145,39 @@ function initializeEachSchema (schemas = ['public']) {
}, schemas) }, schemas)
} }
function addToQueue (func, interval, schema, queue, ...vars) { function recursiveTimeout (func, timeout, ...vars) {
return schemaCallbacks.get(schema).push(setInterval(() => { setTimeout(() => {
return queue.enqueue().then(() => { let promise = null
// get plugins or settings from the cache every time func is run
const loadVariables = vars.length > 0 && typeof vars[0] === 'function' const loadVariables = vars.length > 0 && typeof vars[0] === 'function'
if (loadVariables) { if (loadVariables) {
const funcVars = [...vars] const funcVars = [...vars]
funcVars[0] = vars[0]() funcVars[0] = vars[0]()
return func(...funcVars) promise = func(...funcVars)
} else {
promise = func(...vars)
} }
return func(...vars)
}).catch(console.error) promise.finally(() => {
}, interval)) recursiveTimeout(func, timeout, ...vars)
})
}, timeout)
}
function addToQueue (func, interval, schema, queue, ...vars) {
recursiveTimeout(func, interval, ...vars)
// return schemaCallbacks.get(schema).push(setInterval(() => {
// return queue.enqueue().then(() => {
// // get plugins or settings from the cache every time func is run
// const loadVariables = vars.length > 0 && typeof vars[0] === 'function'
// if (loadVariables) {
// const funcVars = [...vars]
// funcVars[0] = vars[0]()
// return func(...funcVars)
// }
// return func(...vars)
// }).catch(console.error)
// }, interval))
} }
function doPolling (schema) { function doPolling (schema) {

2
package-lock.json generated
View file

@ -1,6 +1,6 @@
{ {
"name": "lamassu-server", "name": "lamassu-server",
"version": "8.1.2", "version": "8.1.3",
"lockfileVersion": 1, "lockfileVersion": 1,
"requires": true, "requires": true,
"dependencies": { "dependencies": {

View file

@ -2,7 +2,7 @@
"name": "lamassu-server", "name": "lamassu-server",
"description": "bitcoin atm client server protocol module", "description": "bitcoin atm client server protocol module",
"keywords": [], "keywords": [],
"version": "8.1.2", "version": "8.1.3",
"license": "Unlicense", "license": "Unlicense",
"author": "Lamassu (https://lamassu.is)", "author": "Lamassu (https://lamassu.is)",
"dependencies": { "dependencies": {