From 4e287d1ecbcc163b96100ec72b682d0f8d2bc867 Mon Sep 17 00:00:00 2001 From: Rafael Taranto Date: Wed, 5 Sep 2018 12:21:35 -0300 Subject: [PATCH] Improve cash out monitor transaction handling (#159) Made the queries from 'monitorLiveIncoming' and 'monitorStaleIncoming' return mutually exclusive records from the db. Both this methods are polled and perform the same actions on the records. As a result, transaction erros were prone to happen. --- lib/cash-out/cash-out-tx.js | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/lib/cash-out/cash-out-tx.js b/lib/cash-out/cash-out-tx.js index e24fbb05..925fdf35 100644 --- a/lib/cash-out/cash-out-tx.js +++ b/lib/cash-out/cash-out-tx.js @@ -26,6 +26,8 @@ const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes const MAX_NOTIFY_AGE = T.day const MIN_NOTIFY_AGE = 5 * T.minutes const INSUFFICIENT_FUNDS_CODE = 570 +const SERIALIZATION_FAILURE_CODE = '40001' +const HARMLESS_DB_CONFLICT_ERROR = 'Harmless DB conflict, the query will be retried.' const toObj = helper.toObj @@ -86,15 +88,16 @@ function postProcess (txVector, pi) { return Promise.resolve({}) } -function fetchOpenTxs (statuses, age) { +function fetchOpenTxs (statuses, fromAge, toAge) { const sql = `select * from cash_out_txs - where ((extract(epoch from (now() - created))) * 1000)<$1 - and status in ($2^)` + where ((extract(epoch from (now() - created))) * 1000)>$1 + and ((extract(epoch from (now() - created))) * 1000)<$2 + and status in ($3^)` const statusClause = _.map(pgp.as.text, statuses).join(',') - return db.any(sql, [age, statusClause]) + return db.any(sql, [fromAge, toAge, statusClause]) .then(rows => rows.map(toObj)) } @@ -109,17 +112,25 @@ function processTxStatus (tx, settings) { function monitorLiveIncoming (settings) { const statuses = ['notSeen', 'published', 'insufficientFunds'] - return fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE) - .then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings))) - .catch(logger.error) + return monitorIncoming(settings, statuses, 0, STALE_LIVE_INCOMING_TX_AGE) } function monitorStaleIncoming (settings) { const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] - return fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) + return monitorIncoming(settings, statuses, STALE_LIVE_INCOMING_TX_AGE, STALE_INCOMING_TX_AGE) +} + +function monitorIncoming (settings, statuses, fromAge, toAge) { + return fetchOpenTxs(statuses, fromAge, toAge) .then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings))) - .catch(logger.error) + .catch(err => { + if (err.code === SERIALIZATION_FAILURE_CODE) { + logger.warn(HARMLESS_DB_CONFLICT_ERROR) + } else { + logger.error(err) + } + }) } function monitorUnnotified (settings) {