From 9808a67945e2fc8fbd8b4f4ae3399b1975796b25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Salgado?= Date: Wed, 23 Dec 2020 14:05:53 +0000 Subject: [PATCH] feat: add coin filtering to cashout polling --- lib/cash-out/cash-out-tx.js | 26 +++++++++++++++++--------- lib/poller.js | 20 ++++++++++++++++---- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/lib/cash-out/cash-out-tx.js b/lib/cash-out/cash-out-tx.js index 2a2ebac0..c83c1dc7 100644 --- a/lib/cash-out/cash-out-tx.js +++ b/lib/cash-out/cash-out-tx.js @@ -24,6 +24,7 @@ module.exports = { const STALE_INCOMING_TX_AGE = T.day const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes +const STALE_LIVE_INCOMING_TX_AGE_FILTER = 5 * T.minutes const MAX_NOTIFY_AGE = T.day const MIN_NOTIFY_AGE = 5 * T.minutes const INSUFFICIENT_FUNDS_CODE = 570 @@ -95,16 +96,21 @@ function postProcess (txVector, justAuthorized, pi) { return Promise.resolve({}) } -function fetchOpenTxs (statuses, fromAge, toAge) { +function fetchOpenTxs (statuses, fromAge, toAge, applyFilter, coinFilter) { + const notClause = applyFilter ? '' : 'not' const sql = `select * from cash_out_txs where ((extract(epoch from (now() - created))) * 1000)>$1 and ((extract(epoch from (now() - created))) * 1000)<$2 - and status in ($3^)` + ${_.isEmpty(coinFilter) + ? `` + : `and crypto_code ${notClause} in ($3^)`} + and status in ($4^)` + const coinClause = _.map(pgp.as.text, coinFilter).join(',') const statusClause = _.map(pgp.as.text, statuses).join(',') - return db.any(sql, [fromAge, toAge, statusClause]) + return db.any(sql, [fromAge, toAge, coinClause, statusClause]) .then(rows => rows.map(toObj)) } @@ -116,20 +122,22 @@ function processTxStatus (tx, settings) { .then(_tx => selfPost(_tx, pi)) } -function monitorLiveIncoming (settings) { +function monitorLiveIncoming (settings, applyFilter, coinFilter) { const statuses = ['notSeen', 'published', 'insufficientFunds'] + const toAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE - return monitorIncoming(settings, statuses, 0, STALE_LIVE_INCOMING_TX_AGE) + return monitorIncoming(settings, statuses, 0, toAge, applyFilter, coinFilter) } -function monitorStaleIncoming (settings) { +function monitorStaleIncoming (settings, applyFilter, coinFilter) { const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] + const fromAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE - return monitorIncoming(settings, statuses, STALE_LIVE_INCOMING_TX_AGE, STALE_INCOMING_TX_AGE) + return monitorIncoming(settings, statuses, fromAge, STALE_INCOMING_TX_AGE, applyFilter, coinFilter) } -function monitorIncoming (settings, statuses, fromAge, toAge) { - return fetchOpenTxs(statuses, fromAge, toAge) +function monitorIncoming (settings, statuses, fromAge, toAge, applyFilter, coinFilter) { + return fetchOpenTxs(statuses, fromAge, toAge, applyFilter, coinFilter) .then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings))) .catch(err => { if (err.code === dbErrorCodes.SERIALIZATION_FAILURE) { diff --git a/lib/poller.js b/lib/poller.js index a648b4cc..f94fd1c1 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -14,6 +14,8 @@ const complianceTriggers = require('./compliance-triggers') const INCOMING_TX_INTERVAL = 30 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds +const INCOMING_TX_INTERVAL_FILTER = 1 * T.minute +const LIVE_INCOMING_TX_INTERVAL_FILTER = 10 * T.seconds const UNNOTIFIED_INTERVAL = 10 * T.seconds const SWEEP_HD_INTERVAL = T.minute const TRADE_INTERVAL = 60 * T.seconds @@ -27,6 +29,8 @@ const CHECK_NOTIFICATION_INTERVAL = 20 * T.seconds const PENDING_INTERVAL = 10 * T.seconds +const coinFilter = [] + let _pi, _settings function reload (__settings) { @@ -71,16 +75,24 @@ function start (__settings) { pi().executeTrades() pi().pong() pi().clearOldLogs() - cashOutTx.monitorLiveIncoming(settings()) - cashOutTx.monitorStaleIncoming(settings()) + cashOutTx.monitorLiveIncoming(settings(), false, coinFilter) + cashOutTx.monitorStaleIncoming(settings(), false, coinFilter) + if (!_.isEmpty(coinFilter)) { + cashOutTx.monitorLiveIncoming(settings(), true, coinFilter) + cashOutTx.monitorStaleIncoming(settings(), true, coinFilter) + } cashOutTx.monitorUnnotified(settings()) pi().sweepHd() notifier.checkNotification(pi()) updateCoinAtmRadar() setInterval(() => pi().executeTrades(), TRADE_INTERVAL) - setInterval(() => cashOutTx.monitorLiveIncoming(settings()), LIVE_INCOMING_TX_INTERVAL) - setInterval(() => cashOutTx.monitorStaleIncoming(settings()), INCOMING_TX_INTERVAL) + setInterval(() => cashOutTx.monitorLiveIncoming(settings(), false, coinFilter), LIVE_INCOMING_TX_INTERVAL) + setInterval(() => cashOutTx.monitorStaleIncoming(settings(), false, coinFilter), INCOMING_TX_INTERVAL) + if (!_.isEmpty(coinFilter)) { + setInterval(() => cashOutTx.monitorLiveIncoming(settings(), true, coinFilter), LIVE_INCOMING_TX_INTERVAL_FILTER) + setInterval(() => cashOutTx.monitorStaleIncoming(settings(), true, coinFilter), INCOMING_TX_INTERVAL_FILTER) + } setInterval(() => cashOutTx.monitorUnnotified(settings()), UNNOTIFIED_INTERVAL) setInterval(() => cashInTx.monitorPending(settings()), PENDING_INTERVAL) setInterval(() => pi().sweepHd(), SWEEP_HD_INTERVAL)