chore: merge release-8.1 into telnyx (#1572)

* fix: concurrency issues caused by poller

* chore: version 8.1.4 (#1561)

* Revert "chore: version 8.1.4 (#1561)" (#1562)

This reverts commit eaa3dc5720.

---------

Co-authored-by: Taranto <rafael.taranto@protonmail.com>
This commit is contained in:
CrypticaScriptura 2023-07-31 17:49:27 -04:00 committed by GitHub
parent e8a3d2ed11
commit becd3e7a82
7 changed files with 48 additions and 26 deletions

View file

@ -27,6 +27,12 @@ function atomic (tx, pi, fromClient) {
const isStale = fromClient && oldTx && (oldTx.txVersion >= tx.txVersion)
if (isStale) throw new E.StaleTxError({ txId: tx.id })
const isStaleFromServer = !fromClient && oldTx && (oldTx.txVersion > tx.txVersion)
if (isStaleFromServer) {
logger.info('trying to update older version of tx', JSON.stringify(tx))
return Promise.resolve()
}
return preProcess(t, oldTx, tx, pi)
.then(preProcessedTx => cashOutLow.upsert(t, oldTx, preProcessedTx))
})

View file

@ -59,6 +59,11 @@ function diff (oldTx, newTx) {
return updatedTx['customerId'] = newTx['customerId']
}
return
case 'dispense':
if (!oldTx.dispense) {
return updatedTx[fieldKey] = newTx[fieldKey]
}
return
default:
return updatedTx[fieldKey] = newTx[fieldKey]
}

View file

@ -75,7 +75,7 @@ const populateSettings = function (req, res, next) {
.catch(next)
}
logger.debug('Fetching and caching a specific config version')
logger.debug('Fetching a cached specific config version')
req.settings = cachedVersionedSettings
return next()
}

View file

@ -342,13 +342,6 @@ function plugins (settings, deviceId) {
return wallet.newAddress(settings, info, tx)
}
function dispenseAck (tx) {
const cashOutConfig = configManager.getCashOut(deviceId, settings.config)
const cassettes = [cashOutConfig.cassette1, cashOutConfig.cassette2, cashOutConfig.cassette3, cashOutConfig.cassette4]
return dbm.addDispense(deviceId, tx, cassettes)
}
function fiatBalance (fiatCode, cryptoCode) {
const commissions = configManager.getCommissions(cryptoCode, deviceId, settings.config)
return Promise.all([
@ -864,7 +857,6 @@ function plugins (settings, deviceId) {
isHd,
isZeroConf,
getStatus,
dispenseAck,
getPhoneCode,
executeTrades,
pong,

View file

@ -16,7 +16,6 @@ const settingsLoader = require('./new-settings-loader')
const NodeCache = require('node-cache')
const util = require('util')
const db = require('./db')
const state = require('./middlewares/state')
const processBatches = require('./tx-batching-processing')
const INCOMING_TX_INTERVAL = 30 * T.seconds
@ -99,8 +98,8 @@ function reload (schema) {
})
}
function pi () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).pi }
function settings () { return cachedVariables.get(asyncLocalStorage.getStore().get('schema')).settings }
function pi () { return cachedVariables.get('public').pi }
function settings () { return cachedVariables.get('public').settings }
function initialSanctionsDownload () {
const structs = sanctions.getStructs()
@ -146,19 +145,39 @@ function initializeEachSchema (schemas = ['public']) {
}, schemas)
}
function recursiveTimeout (func, timeout, ...vars) {
setTimeout(() => {
let promise = null
const loadVariables = vars.length > 0 && typeof vars[0] === 'function'
if (loadVariables) {
const funcVars = [...vars]
funcVars[0] = vars[0]()
promise = func(...funcVars)
} else {
promise = func(...vars)
}
promise.finally(() => {
recursiveTimeout(func, timeout, ...vars)
})
}, timeout)
}
function addToQueue (func, interval, schema, queue, ...vars) {
return schemaCallbacks.get(schema).push(setInterval(() => {
return queue.enqueue().then(() => {
// get plugins or settings from the cache every time func is run
const loadVariables = vars.length > 0 && typeof vars[0] === 'function'
if (loadVariables) {
const funcVars = [...vars]
funcVars[0] = vars[0]()
return func(...funcVars)
}
return func(...vars)
}).catch(console.error)
}, interval))
recursiveTimeout(func, interval, ...vars)
// return schemaCallbacks.get(schema).push(setInterval(() => {
// return queue.enqueue().then(() => {
// // get plugins or settings from the cache every time func is run
// const loadVariables = vars.length > 0 && typeof vars[0] === 'function'
// if (loadVariables) {
// const funcVars = [...vars]
// funcVars[0] = vars[0]()
// return func(...funcVars)
// }
// return func(...vars)
// }).catch(console.error)
// }, interval))
}
function doPolling (schema) {

2
package-lock.json generated
View file

@ -1,6 +1,6 @@
{
"name": "lamassu-server",
"version": "8.1.2",
"version": "8.1.3",
"lockfileVersion": 1,
"requires": true,
"dependencies": {

View file

@ -2,7 +2,7 @@
"name": "lamassu-server",
"description": "bitcoin atm client server protocol module",
"keywords": [],
"version": "8.1.2",
"version": "8.1.3",
"license": "Unlicense",
"author": "Lamassu (https://lamassu.is)",
"dependencies": {