diff --git a/lib/cash-out/cash-out-tx.js b/lib/cash-out/cash-out-tx.js index 4abf18ea..99f9604b 100644 --- a/lib/cash-out/cash-out-tx.js +++ b/lib/cash-out/cash-out-tx.js @@ -25,7 +25,6 @@ module.exports = { const STALE_INCOMING_TX_AGE = T.day const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes -const STALE_LIVE_INCOMING_TX_AGE_FILTER = 5 * T.minutes const MAX_NOTIFY_AGE = T.day const MIN_NOTIFY_AGE = 5 * T.minutes const INSUFFICIENT_FUNDS_CODE = 570 @@ -91,21 +90,17 @@ function postProcess (txVector, justAuthorized, pi) { return Promise.resolve({}) } -function fetchOpenTxs (statuses, fromAge, toAge, applyFilter, coinFilter) { - const notClause = applyFilter ? '' : 'not' +function fetchOpenTxs (statuses, fromAge, toAge) { const sql = `select * from cash_out_txs where ((extract(epoch from (now() - created))) * 1000)>$1 and ((extract(epoch from (now() - created))) * 1000)<$2 - ${_.isEmpty(coinFilter) - ? `` - : `and crypto_code ${notClause} in ($3^)`} - and status in ($4^)` + and status in ($3^) + and error is distinct from 'Operator cancel'` - const coinClause = _.map(pgp.as.text, coinFilter).join(',') const statusClause = _.map(pgp.as.text, statuses).join(',') - return db.any(sql, [fromAge, toAge, coinClause, statusClause]) + return db.any(sql, [fromAge, toAge, statusClause]) .then(rows => rows.map(toObj)) } @@ -164,22 +159,18 @@ function getWalletScore (tx, pi) { return tx } -function monitorLiveIncoming (settings, applyFilter, coinFilter) { +function monitorLiveIncoming (settings) { const statuses = ['notSeen', 'published', 'insufficientFunds'] - const toAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE - - return monitorIncoming(settings, statuses, 0, toAge, applyFilter, coinFilter) + return monitorIncoming(settings, statuses, 0, STALE_LIVE_INCOMING_TX_AGE) } -function monitorStaleIncoming (settings, applyFilter, coinFilter) { +function monitorStaleIncoming (settings) { const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] - const fromAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE - - return monitorIncoming(settings, statuses, fromAge, STALE_INCOMING_TX_AGE, applyFilter, coinFilter) + return monitorIncoming(settings, statuses, STALE_LIVE_INCOMING_TX_AGE, STALE_INCOMING_TX_AGE) } -function monitorIncoming (settings, statuses, fromAge, toAge, applyFilter, coinFilter) { - return fetchOpenTxs(statuses, fromAge, toAge, applyFilter, coinFilter) +function monitorIncoming (settings, statuses, fromAge, toAge) { + return fetchOpenTxs(statuses, fromAge, toAge) .then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings))) .catch(err => { if (err.code === dbErrorCodes.SERIALIZATION_FAILURE) { diff --git a/lib/constants.js b/lib/constants.js index ff1bc8f2..280c300a 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -32,6 +32,11 @@ const RECEIPT = 'sms_receipt' const WALLET_SCORE_THRESHOLD = 9 +const BALANCE_FETCH_SPEED_MULTIPLIER = { + NORMAL: 1, + SLOW: 3 +} + module.exports = { anonymousCustomer, CASSETTE_MAX_CAPACITY, @@ -48,5 +53,6 @@ module.exports = { CASH_OUT_MAXIMUM_AMOUNT_OF_CASSETTES, WALLET_SCORE_THRESHOLD, RECEIPT, - PSQL_URL + PSQL_URL, + BALANCE_FETCH_SPEED_MULTIPLIER } diff --git a/lib/plugins/wallet/infura/infura.js b/lib/plugins/wallet/infura/infura.js index 47d4e35c..0b5d702b 100644 --- a/lib/plugins/wallet/infura/infura.js +++ b/lib/plugins/wallet/infura/infura.js @@ -1,5 +1,10 @@ const _ = require('lodash/fp') +const NodeCache = require('node-cache') const base = require('../geth/base') +const T = require('../../../time') +const { BALANCE_FETCH_SPEED_MULTIPLIER } = require('../../../constants') + +const REGULAR_TX_POLLING = 5 * T.seconds const NAME = 'infura' @@ -12,4 +17,54 @@ function run (account) { base.connect(endpoint) } -module.exports = _.merge(base, { NAME, run }) +const txsCache = new NodeCache({ + stdTTL: T.hour / 1000, + checkperiod: T.minute / 1000, + deleteOnExpire: true +}) + +function shouldGetStatus (tx) { + const timePassedSinceTx = Date.now() - new Date(tx.created) + const timePassedSinceReq = Date.now() - new Date(txsCache.get(tx.id).lastReqTime) + + // Allow for infura to gradually lower the amount of requests based on the time passed since the transaction + // Until first 5 minutes - 1/2 regular polling speed + // Until first 10 minutes - 1/4 regular polling speed + // Until first hour - 1/8 polling speed + // Until first two hours - 1/12 polling speed + // Until first four hours - 1/16 polling speed + // Until first day - 1/24 polling speed + // After first day - 1/32 polling speed + if (timePassedSinceTx < 5 * T.minutes) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 2 * REGULAR_TX_POLLING + if (timePassedSinceTx < 10 * T.minutes) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 4 * REGULAR_TX_POLLING + if (timePassedSinceTx < 1 * T.hour) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 8 * REGULAR_TX_POLLING + if (timePassedSinceTx < 2 * T.hours) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 12 * REGULAR_TX_POLLING + if (timePassedSinceTx < 4 * T.hours) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 16 * REGULAR_TX_POLLING + if (timePassedSinceTx < 1 * T.day) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 24 * REGULAR_TX_POLLING + return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 32 * REGULAR_TX_POLLING +} + +// Override geth's getStatus function to allow for different polling timing +function getStatus (account, tx, requested, settings, operatorId) { + if (_.isNil(txsCache.get(tx.id))) { + txsCache.set(tx.id, { lastReqTime: Date.now() }) + } + + // return last available response + if (!shouldGetStatus(tx)) { + return Promise.resolve(txsCache.get(tx.id).res) + } + + return base.getStatus(account, tx, requested, settings, operatorId) + .then(res => { + if (res.status === 'confirmed') { + txsCache.del(tx.id) // Transaction reached final status, can trim it from the caching obj + } else { + txsCache.set(tx.id, { lastReqTime: Date.now(), res }) + txsCache.ttl(tx.id, T.hour / 1000) + } + return res + }) +} + +module.exports = _.merge(base, { NAME, run, getStatus, fetchSpeed: BALANCE_FETCH_SPEED_MULTIPLIER.SLOW }) diff --git a/lib/poller.js b/lib/poller.js index c6293a48..10e8343c 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -21,8 +21,6 @@ const processBatches = require('./tx-batching-processing') const INCOMING_TX_INTERVAL = 30 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds -const INCOMING_TX_INTERVAL_FILTER = 1 * T.minute -const LIVE_INCOMING_TX_INTERVAL_FILTER = 10 * T.seconds const UNNOTIFIED_INTERVAL = 10 * T.seconds const SWEEP_HD_INTERVAL = 5 * T.minute const TRADE_INTERVAL = 60 * T.seconds @@ -61,7 +59,6 @@ const QUEUE = { SLOW: SLOW_QUEUE } -const coinFilter = ['ETH'] const schemaCallbacks = new Map() const cachedVariables = new NodeCache({ @@ -168,12 +165,8 @@ function doPolling (schema) { pi().executeTrades() pi().pong() pi().clearOldLogs() - cashOutTx.monitorLiveIncoming(settings(), false, coinFilter) - cashOutTx.monitorStaleIncoming(settings(), false, coinFilter) - if (!_.isEmpty(coinFilter)) { - cashOutTx.monitorLiveIncoming(settings(), true, coinFilter) - cashOutTx.monitorStaleIncoming(settings(), true, coinFilter) - } + cashOutTx.monitorLiveIncoming(settings()) + cashOutTx.monitorStaleIncoming(settings()) cashOutTx.monitorUnnotified(settings()) pi().sweepHd() notifier.checkNotification(pi()) @@ -181,12 +174,8 @@ function doPolling (schema) { addToQueue(pi().getRawRates, TICKER_RATES_INTERVAL, schema, QUEUE.FAST) addToQueue(pi().executeTrades, TRADE_INTERVAL, schema, QUEUE.FAST) - addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter) - addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter) - if (!_.isEmpty(coinFilter)) { - addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter) - addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter) - } + addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings) + addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings) addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings) addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings) addToQueue(processBatches, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE) diff --git a/lib/wallet.js b/lib/wallet.js index 068ffc5b..9c6023d7 100644 --- a/lib/wallet.js +++ b/lib/wallet.js @@ -13,6 +13,7 @@ const httpError = require('./route-helpers').httpError const logger = require('./logger') const { getOpenBatchCryptoValue } = require('./tx-batching') const BN = require('./bn') +const { BALANCE_FETCH_SPEED_MULTIPLIER } = require('./constants') const FETCH_INTERVAL = 5000 const INSUFFICIENT_FUNDS_CODE = 570 @@ -255,20 +256,28 @@ function checkBlockchainStatus (settings, cryptoCode) { .then(({ checkBlockchainStatus }) => checkBlockchainStatus(cryptoCode)) } -const coinFilter = ['ETH'] - const balance = (settings, cryptoCode) => { - if (_.includes(coinFilter, cryptoCode)) return balanceFiltered(settings, cryptoCode) - return balanceUnfiltered(settings, cryptoCode) + return fetchWallet(settings, cryptoCode) + .then(r => r.wallet.fetchSpeed ?? BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL) + .then(multiplier => { + switch (multiplier) { + case BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL: + return balanceNormal(settings, cryptoCode) + case BALANCE_FETCH_SPEED_MULTIPLIER.SLOW: + return balanceSlow(settings, cryptoCode) + default: + throw new Error() + } + }) } -const balanceUnfiltered = mem(_balance, { - maxAge: FETCH_INTERVAL, +const balanceNormal = mem(_balance, { + maxAge: BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL * FETCH_INTERVAL, cacheKey: (settings, cryptoCode) => cryptoCode }) -const balanceFiltered = mem(_balance, { - maxAge: 3 * FETCH_INTERVAL, +const balanceSlow = mem(_balance, { + maxAge: BALANCE_FETCH_SPEED_MULTIPLIER.SLOW * FETCH_INTERVAL, cacheKey: (settings, cryptoCode) => cryptoCode })