Improve cash out monitor transaction handling (#159)

Made the queries from 'monitorLiveIncoming' and 'monitorStaleIncoming'
return mutually exclusive records from the db.

Both this methods are polled and perform the same actions on the
records. As a result, transaction erros were prone to happen.
This commit is contained in:
Rafael Taranto 2018-09-05 12:21:35 -03:00 committed by Josh Harvey
parent 347a313f40
commit 4e287d1ecb

View file

@ -26,6 +26,8 @@ const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes
const MAX_NOTIFY_AGE = T.day const MAX_NOTIFY_AGE = T.day
const MIN_NOTIFY_AGE = 5 * T.minutes const MIN_NOTIFY_AGE = 5 * T.minutes
const INSUFFICIENT_FUNDS_CODE = 570 const INSUFFICIENT_FUNDS_CODE = 570
const SERIALIZATION_FAILURE_CODE = '40001'
const HARMLESS_DB_CONFLICT_ERROR = 'Harmless DB conflict, the query will be retried.'
const toObj = helper.toObj const toObj = helper.toObj
@ -86,15 +88,16 @@ function postProcess (txVector, pi) {
return Promise.resolve({}) return Promise.resolve({})
} }
function fetchOpenTxs (statuses, age) { function fetchOpenTxs (statuses, fromAge, toAge) {
const sql = `select * const sql = `select *
from cash_out_txs from cash_out_txs
where ((extract(epoch from (now() - created))) * 1000)<$1 where ((extract(epoch from (now() - created))) * 1000)>$1
and status in ($2^)` and ((extract(epoch from (now() - created))) * 1000)<$2
and status in ($3^)`
const statusClause = _.map(pgp.as.text, statuses).join(',') const statusClause = _.map(pgp.as.text, statuses).join(',')
return db.any(sql, [age, statusClause]) return db.any(sql, [fromAge, toAge, statusClause])
.then(rows => rows.map(toObj)) .then(rows => rows.map(toObj))
} }
@ -109,17 +112,25 @@ function processTxStatus (tx, settings) {
function monitorLiveIncoming (settings) { function monitorLiveIncoming (settings) {
const statuses = ['notSeen', 'published', 'insufficientFunds'] const statuses = ['notSeen', 'published', 'insufficientFunds']
return fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE) return monitorIncoming(settings, statuses, 0, STALE_LIVE_INCOMING_TX_AGE)
.then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings)))
.catch(logger.error)
} }
function monitorStaleIncoming (settings) { function monitorStaleIncoming (settings) {
const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds']
return fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) return monitorIncoming(settings, statuses, STALE_LIVE_INCOMING_TX_AGE, STALE_INCOMING_TX_AGE)
}
function monitorIncoming (settings, statuses, fromAge, toAge) {
return fetchOpenTxs(statuses, fromAge, toAge)
.then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings))) .then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings)))
.catch(logger.error) .catch(err => {
if (err.code === SERIALIZATION_FAILURE_CODE) {
logger.warn(HARMLESS_DB_CONFLICT_ERROR)
} else {
logger.error(err)
}
})
} }
function monitorUnnotified (settings) { function monitorUnnotified (settings) {