feat: standardize balance memoizing speed

feat: add infura-only override for getStatus()
fix: remove coin filtering on poller
feat: add getStatus() response caching on infura
This commit is contained in:
Sérgio Salgado 2022-07-06 14:48:49 +01:00
parent 2d5cbe771a
commit d6bc692af4
5 changed files with 99 additions and 44 deletions

View file

@ -25,7 +25,6 @@ module.exports = {
const STALE_INCOMING_TX_AGE = T.day const STALE_INCOMING_TX_AGE = T.day
const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes
const STALE_LIVE_INCOMING_TX_AGE_FILTER = 5 * T.minutes
const MAX_NOTIFY_AGE = T.day const MAX_NOTIFY_AGE = T.day
const MIN_NOTIFY_AGE = 5 * T.minutes const MIN_NOTIFY_AGE = 5 * T.minutes
const INSUFFICIENT_FUNDS_CODE = 570 const INSUFFICIENT_FUNDS_CODE = 570
@ -91,21 +90,17 @@ function postProcess (txVector, justAuthorized, pi) {
return Promise.resolve({}) return Promise.resolve({})
} }
function fetchOpenTxs (statuses, fromAge, toAge, applyFilter, coinFilter) { function fetchOpenTxs (statuses, fromAge, toAge) {
const notClause = applyFilter ? '' : 'not'
const sql = `select * const sql = `select *
from cash_out_txs from cash_out_txs
where ((extract(epoch from (now() - created))) * 1000)>$1 where ((extract(epoch from (now() - created))) * 1000)>$1
and ((extract(epoch from (now() - created))) * 1000)<$2 and ((extract(epoch from (now() - created))) * 1000)<$2
${_.isEmpty(coinFilter) and status in ($3^)
? `` and error is distinct from 'Operator cancel'`
: `and crypto_code ${notClause} in ($3^)`}
and status in ($4^)`
const coinClause = _.map(pgp.as.text, coinFilter).join(',')
const statusClause = _.map(pgp.as.text, statuses).join(',') const statusClause = _.map(pgp.as.text, statuses).join(',')
return db.any(sql, [fromAge, toAge, coinClause, statusClause]) return db.any(sql, [fromAge, toAge, statusClause])
.then(rows => rows.map(toObj)) .then(rows => rows.map(toObj))
} }
@ -164,22 +159,18 @@ function getWalletScore (tx, pi) {
return tx return tx
} }
function monitorLiveIncoming (settings, applyFilter, coinFilter) { function monitorLiveIncoming (settings) {
const statuses = ['notSeen', 'published', 'insufficientFunds'] const statuses = ['notSeen', 'published', 'insufficientFunds']
const toAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE return monitorIncoming(settings, statuses, 0, STALE_LIVE_INCOMING_TX_AGE)
return monitorIncoming(settings, statuses, 0, toAge, applyFilter, coinFilter)
} }
function monitorStaleIncoming (settings, applyFilter, coinFilter) { function monitorStaleIncoming (settings) {
const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds']
const fromAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE return monitorIncoming(settings, statuses, STALE_LIVE_INCOMING_TX_AGE, STALE_INCOMING_TX_AGE)
return monitorIncoming(settings, statuses, fromAge, STALE_INCOMING_TX_AGE, applyFilter, coinFilter)
} }
function monitorIncoming (settings, statuses, fromAge, toAge, applyFilter, coinFilter) { function monitorIncoming (settings, statuses, fromAge, toAge) {
return fetchOpenTxs(statuses, fromAge, toAge, applyFilter, coinFilter) return fetchOpenTxs(statuses, fromAge, toAge)
.then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings))) .then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings)))
.catch(err => { .catch(err => {
if (err.code === dbErrorCodes.SERIALIZATION_FAILURE) { if (err.code === dbErrorCodes.SERIALIZATION_FAILURE) {

View file

@ -32,6 +32,11 @@ const RECEIPT = 'sms_receipt'
const WALLET_SCORE_THRESHOLD = 9 const WALLET_SCORE_THRESHOLD = 9
const BALANCE_FETCH_SPEED_MULTIPLIER = {
NORMAL: 1,
SLOW: 3
}
module.exports = { module.exports = {
anonymousCustomer, anonymousCustomer,
CASSETTE_MAX_CAPACITY, CASSETTE_MAX_CAPACITY,
@ -48,5 +53,6 @@ module.exports = {
CASH_OUT_MAXIMUM_AMOUNT_OF_CASSETTES, CASH_OUT_MAXIMUM_AMOUNT_OF_CASSETTES,
WALLET_SCORE_THRESHOLD, WALLET_SCORE_THRESHOLD,
RECEIPT, RECEIPT,
PSQL_URL PSQL_URL,
BALANCE_FETCH_SPEED_MULTIPLIER
} }

View file

@ -1,5 +1,9 @@
const _ = require('lodash/fp') const _ = require('lodash/fp')
const base = require('../geth/base') const base = require('../geth/base')
const T = require('../../../time')
const { BALANCE_FETCH_SPEED_MULTIPLIER } = require('../../../constants')
const REGULAR_TX_POLLING = 5 * T.seconds
const NAME = 'infura' const NAME = 'infura'
@ -12,4 +16,60 @@ function run (account) {
base.connect(endpoint) base.connect(endpoint)
} }
module.exports = _.merge(base, { NAME, run }) const txsCache = {}
setInterval(() => {
_.forEach(it => {
// A transaction not being updated for over an hour means that it stopped being fetched on poll,
// which means that it probably was cancelled by the operator or something else caused the
// transaction to stop being listened to. So, trim the cache to save memory
const lastReq = _.isNil(txsCache[it].lastReqTime) ? Date.now() : new Date(txsCache[it].lastReqTime)
const timePassedSinceReq = Date.now() - lastReq
if (timePassedSinceReq > T.hour) delete txsCache[it]
}, _.keys(txsCache))
}, T.minute)
function shouldGetStatus (tx) {
const timePassedSinceTx = Date.now() - new Date(tx.created)
const timePassedSinceReq = Date.now() - new Date(txsCache[tx.id].lastReqTime)
// Allow for infura to gradually lower the amount of requests based on the time passed since the transaction
// Until first 5 minutes - 1/2 regular polling speed
// Until first 10 minutes - 1/4 regular polling speed
// Until first hour - 1/8 polling speed
// Until first two hours - 1/12 polling speed
// Until first four hours - 1/16 polling speed
// Until first day - 1/24 polling speed
// After first day - 1/32 polling speed
if (timePassedSinceTx < 5 * T.minutes) return _.isNil(txsCache[tx.id].res) || timePassedSinceReq > 2 * REGULAR_TX_POLLING
if (timePassedSinceTx < 10 * T.minutes) return _.isNil(txsCache[tx.id].res) || timePassedSinceReq > 4 * REGULAR_TX_POLLING
if (timePassedSinceTx < 1 * T.hour) return _.isNil(txsCache[tx.id].res) || timePassedSinceReq > 8 * REGULAR_TX_POLLING
if (timePassedSinceTx < 2 * T.hours) return _.isNil(txsCache[tx.id].res) || timePassedSinceReq > 12 * REGULAR_TX_POLLING
if (timePassedSinceTx < 4 * T.hours) return _.isNil(txsCache[tx.id].res) || timePassedSinceReq > 16 * REGULAR_TX_POLLING
if (timePassedSinceTx < 1 * T.day) return _.isNil(txsCache[tx.id].res) || timePassedSinceReq > 24 * REGULAR_TX_POLLING
return _.isNil(txsCache[tx.id].res) || timePassedSinceReq > 32 * REGULAR_TX_POLLING
}
// Override geth's getStatus function to allow for different polling timing
function getStatus (account, tx, requested, settings, operatorId) {
if (_.isNil(txsCache[tx.id])) {
txsCache[tx.id] = { lastReqTime: Date.now() }
}
// return last available response
if (!shouldGetStatus(tx)) {
return Promise.resolve(txsCache[tx.id].res)
}
return base.getStatus(account, tx, requested, settings, operatorId)
.then(res => {
if (res.status === 'confirmed') {
delete txsCache[tx.id] // Transaction reached final status, can trim it from the caching obj
} else {
txsCache[tx.id].lastReqTime = Date.now()
txsCache[tx.id].res = res
}
return res
})
}
module.exports = _.merge(base, { NAME, run, getStatus, fetchSpeed: BALANCE_FETCH_SPEED_MULTIPLIER.SLOW })

View file

@ -21,8 +21,6 @@ const processBatches = require('./tx-batching-processing')
const INCOMING_TX_INTERVAL = 30 * T.seconds const INCOMING_TX_INTERVAL = 30 * T.seconds
const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds
const INCOMING_TX_INTERVAL_FILTER = 1 * T.minute
const LIVE_INCOMING_TX_INTERVAL_FILTER = 10 * T.seconds
const UNNOTIFIED_INTERVAL = 10 * T.seconds const UNNOTIFIED_INTERVAL = 10 * T.seconds
const SWEEP_HD_INTERVAL = T.minute const SWEEP_HD_INTERVAL = T.minute
const TRADE_INTERVAL = 60 * T.seconds const TRADE_INTERVAL = 60 * T.seconds
@ -61,7 +59,6 @@ const QUEUE = {
SLOW: SLOW_QUEUE SLOW: SLOW_QUEUE
} }
const coinFilter = ['ETH']
const schemaCallbacks = new Map() const schemaCallbacks = new Map()
const cachedVariables = new NodeCache({ const cachedVariables = new NodeCache({
@ -168,12 +165,8 @@ function doPolling (schema) {
pi().executeTrades() pi().executeTrades()
pi().pong() pi().pong()
pi().clearOldLogs() pi().clearOldLogs()
cashOutTx.monitorLiveIncoming(settings(), false, coinFilter) cashOutTx.monitorLiveIncoming(settings())
cashOutTx.monitorStaleIncoming(settings(), false, coinFilter) cashOutTx.monitorStaleIncoming(settings())
if (!_.isEmpty(coinFilter)) {
cashOutTx.monitorLiveIncoming(settings(), true, coinFilter)
cashOutTx.monitorStaleIncoming(settings(), true, coinFilter)
}
cashOutTx.monitorUnnotified(settings()) cashOutTx.monitorUnnotified(settings())
pi().sweepHd() pi().sweepHd()
notifier.checkNotification(pi()) notifier.checkNotification(pi())
@ -181,12 +174,8 @@ function doPolling (schema) {
addToQueue(pi().getRawRates, TICKER_RATES_INTERVAL, schema, QUEUE.FAST) addToQueue(pi().getRawRates, TICKER_RATES_INTERVAL, schema, QUEUE.FAST)
addToQueue(pi().executeTrades, TRADE_INTERVAL, schema, QUEUE.FAST) addToQueue(pi().executeTrades, TRADE_INTERVAL, schema, QUEUE.FAST)
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter) addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings)
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter) addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings)
if (!_.isEmpty(coinFilter)) {
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter)
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter)
}
addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings) addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings)
addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings) addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings)
addToQueue(processBatches, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE) addToQueue(processBatches, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE)

View file

@ -13,6 +13,7 @@ const httpError = require('./route-helpers').httpError
const logger = require('./logger') const logger = require('./logger')
const { getOpenBatchCryptoValue } = require('./tx-batching') const { getOpenBatchCryptoValue } = require('./tx-batching')
const BN = require('./bn') const BN = require('./bn')
const { BALANCE_FETCH_SPEED_MULTIPLIER } = require('./constants')
const FETCH_INTERVAL = 5000 const FETCH_INTERVAL = 5000
const INSUFFICIENT_FUNDS_CODE = 570 const INSUFFICIENT_FUNDS_CODE = 570
@ -255,20 +256,28 @@ function checkBlockchainStatus (settings, cryptoCode) {
.then(({ checkBlockchainStatus }) => checkBlockchainStatus(cryptoCode)) .then(({ checkBlockchainStatus }) => checkBlockchainStatus(cryptoCode))
} }
const coinFilter = ['ETH']
const balance = (settings, cryptoCode) => { const balance = (settings, cryptoCode) => {
if (_.includes(coinFilter, cryptoCode)) return balanceFiltered(settings, cryptoCode) return fetchWallet(settings, cryptoCode)
return balanceUnfiltered(settings, cryptoCode) .then(r => r.wallet.fetchSpeed ?? BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL)
.then(multiplier => {
switch (multiplier) {
case BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL:
return balanceNormal(settings, cryptoCode)
case BALANCE_FETCH_SPEED_MULTIPLIER.SLOW:
return balanceSlow(settings, cryptoCode)
default:
throw new Error()
}
})
} }
const balanceUnfiltered = mem(_balance, { const balanceNormal = mem(_balance, {
maxAge: FETCH_INTERVAL, maxAge: BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL * FETCH_INTERVAL,
cacheKey: (settings, cryptoCode) => cryptoCode cacheKey: (settings, cryptoCode) => cryptoCode
}) })
const balanceFiltered = mem(_balance, { const balanceSlow = mem(_balance, {
maxAge: 3 * FETCH_INTERVAL, maxAge: BALANCE_FETCH_SPEED_MULTIPLIER.SLOW * FETCH_INTERVAL,
cacheKey: (settings, cryptoCode) => cryptoCode cacheKey: (settings, cryptoCode) => cryptoCode
}) })