Merge pull request #1286 from chaotixkilla/fix-improve-infura-request-handling-8.1
[Release 8.1] Improve Infura network usage
This commit is contained in:
commit
88760af853
5 changed files with 94 additions and 44 deletions
|
|
@ -25,7 +25,6 @@ module.exports = {
|
||||||
|
|
||||||
const STALE_INCOMING_TX_AGE = T.day
|
const STALE_INCOMING_TX_AGE = T.day
|
||||||
const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes
|
const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes
|
||||||
const STALE_LIVE_INCOMING_TX_AGE_FILTER = 5 * T.minutes
|
|
||||||
const MAX_NOTIFY_AGE = T.day
|
const MAX_NOTIFY_AGE = T.day
|
||||||
const MIN_NOTIFY_AGE = 5 * T.minutes
|
const MIN_NOTIFY_AGE = 5 * T.minutes
|
||||||
const INSUFFICIENT_FUNDS_CODE = 570
|
const INSUFFICIENT_FUNDS_CODE = 570
|
||||||
|
|
@ -91,21 +90,17 @@ function postProcess (txVector, justAuthorized, pi) {
|
||||||
return Promise.resolve({})
|
return Promise.resolve({})
|
||||||
}
|
}
|
||||||
|
|
||||||
function fetchOpenTxs (statuses, fromAge, toAge, applyFilter, coinFilter) {
|
function fetchOpenTxs (statuses, fromAge, toAge) {
|
||||||
const notClause = applyFilter ? '' : 'not'
|
|
||||||
const sql = `select *
|
const sql = `select *
|
||||||
from cash_out_txs
|
from cash_out_txs
|
||||||
where ((extract(epoch from (now() - created))) * 1000)>$1
|
where ((extract(epoch from (now() - created))) * 1000)>$1
|
||||||
and ((extract(epoch from (now() - created))) * 1000)<$2
|
and ((extract(epoch from (now() - created))) * 1000)<$2
|
||||||
${_.isEmpty(coinFilter)
|
and status in ($3^)
|
||||||
? ``
|
and error is distinct from 'Operator cancel'`
|
||||||
: `and crypto_code ${notClause} in ($3^)`}
|
|
||||||
and status in ($4^)`
|
|
||||||
|
|
||||||
const coinClause = _.map(pgp.as.text, coinFilter).join(',')
|
|
||||||
const statusClause = _.map(pgp.as.text, statuses).join(',')
|
const statusClause = _.map(pgp.as.text, statuses).join(',')
|
||||||
|
|
||||||
return db.any(sql, [fromAge, toAge, coinClause, statusClause])
|
return db.any(sql, [fromAge, toAge, statusClause])
|
||||||
.then(rows => rows.map(toObj))
|
.then(rows => rows.map(toObj))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -164,22 +159,18 @@ function getWalletScore (tx, pi) {
|
||||||
return tx
|
return tx
|
||||||
}
|
}
|
||||||
|
|
||||||
function monitorLiveIncoming (settings, applyFilter, coinFilter) {
|
function monitorLiveIncoming (settings) {
|
||||||
const statuses = ['notSeen', 'published', 'insufficientFunds']
|
const statuses = ['notSeen', 'published', 'insufficientFunds']
|
||||||
const toAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE
|
return monitorIncoming(settings, statuses, 0, STALE_LIVE_INCOMING_TX_AGE)
|
||||||
|
|
||||||
return monitorIncoming(settings, statuses, 0, toAge, applyFilter, coinFilter)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function monitorStaleIncoming (settings, applyFilter, coinFilter) {
|
function monitorStaleIncoming (settings) {
|
||||||
const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds']
|
const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds']
|
||||||
const fromAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE
|
return monitorIncoming(settings, statuses, STALE_LIVE_INCOMING_TX_AGE, STALE_INCOMING_TX_AGE)
|
||||||
|
|
||||||
return monitorIncoming(settings, statuses, fromAge, STALE_INCOMING_TX_AGE, applyFilter, coinFilter)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function monitorIncoming (settings, statuses, fromAge, toAge, applyFilter, coinFilter) {
|
function monitorIncoming (settings, statuses, fromAge, toAge) {
|
||||||
return fetchOpenTxs(statuses, fromAge, toAge, applyFilter, coinFilter)
|
return fetchOpenTxs(statuses, fromAge, toAge)
|
||||||
.then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings)))
|
.then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings)))
|
||||||
.catch(err => {
|
.catch(err => {
|
||||||
if (err.code === dbErrorCodes.SERIALIZATION_FAILURE) {
|
if (err.code === dbErrorCodes.SERIALIZATION_FAILURE) {
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,11 @@ const RECEIPT = 'sms_receipt'
|
||||||
|
|
||||||
const WALLET_SCORE_THRESHOLD = 9
|
const WALLET_SCORE_THRESHOLD = 9
|
||||||
|
|
||||||
|
const BALANCE_FETCH_SPEED_MULTIPLIER = {
|
||||||
|
NORMAL: 1,
|
||||||
|
SLOW: 3
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
anonymousCustomer,
|
anonymousCustomer,
|
||||||
CASSETTE_MAX_CAPACITY,
|
CASSETTE_MAX_CAPACITY,
|
||||||
|
|
@ -48,5 +53,6 @@ module.exports = {
|
||||||
CASH_OUT_MAXIMUM_AMOUNT_OF_CASSETTES,
|
CASH_OUT_MAXIMUM_AMOUNT_OF_CASSETTES,
|
||||||
WALLET_SCORE_THRESHOLD,
|
WALLET_SCORE_THRESHOLD,
|
||||||
RECEIPT,
|
RECEIPT,
|
||||||
PSQL_URL
|
PSQL_URL,
|
||||||
|
BALANCE_FETCH_SPEED_MULTIPLIER
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,10 @@
|
||||||
const _ = require('lodash/fp')
|
const _ = require('lodash/fp')
|
||||||
|
const NodeCache = require('node-cache')
|
||||||
const base = require('../geth/base')
|
const base = require('../geth/base')
|
||||||
|
const T = require('../../../time')
|
||||||
|
const { BALANCE_FETCH_SPEED_MULTIPLIER } = require('../../../constants')
|
||||||
|
|
||||||
|
const REGULAR_TX_POLLING = 5 * T.seconds
|
||||||
|
|
||||||
const NAME = 'infura'
|
const NAME = 'infura'
|
||||||
|
|
||||||
|
|
@ -12,4 +17,54 @@ function run (account) {
|
||||||
base.connect(endpoint)
|
base.connect(endpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = _.merge(base, { NAME, run })
|
const txsCache = new NodeCache({
|
||||||
|
stdTTL: T.hour / 1000,
|
||||||
|
checkperiod: T.minute / 1000,
|
||||||
|
deleteOnExpire: true
|
||||||
|
})
|
||||||
|
|
||||||
|
function shouldGetStatus (tx) {
|
||||||
|
const timePassedSinceTx = Date.now() - new Date(tx.created)
|
||||||
|
const timePassedSinceReq = Date.now() - new Date(txsCache.get(tx.id).lastReqTime)
|
||||||
|
|
||||||
|
// Allow for infura to gradually lower the amount of requests based on the time passed since the transaction
|
||||||
|
// Until first 5 minutes - 1/2 regular polling speed
|
||||||
|
// Until first 10 minutes - 1/4 regular polling speed
|
||||||
|
// Until first hour - 1/8 polling speed
|
||||||
|
// Until first two hours - 1/12 polling speed
|
||||||
|
// Until first four hours - 1/16 polling speed
|
||||||
|
// Until first day - 1/24 polling speed
|
||||||
|
// After first day - 1/32 polling speed
|
||||||
|
if (timePassedSinceTx < 5 * T.minutes) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 2 * REGULAR_TX_POLLING
|
||||||
|
if (timePassedSinceTx < 10 * T.minutes) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 4 * REGULAR_TX_POLLING
|
||||||
|
if (timePassedSinceTx < 1 * T.hour) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 8 * REGULAR_TX_POLLING
|
||||||
|
if (timePassedSinceTx < 2 * T.hours) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 12 * REGULAR_TX_POLLING
|
||||||
|
if (timePassedSinceTx < 4 * T.hours) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 16 * REGULAR_TX_POLLING
|
||||||
|
if (timePassedSinceTx < 1 * T.day) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 24 * REGULAR_TX_POLLING
|
||||||
|
return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 32 * REGULAR_TX_POLLING
|
||||||
|
}
|
||||||
|
|
||||||
|
// Override geth's getStatus function to allow for different polling timing
|
||||||
|
function getStatus (account, tx, requested, settings, operatorId) {
|
||||||
|
if (_.isNil(txsCache.get(tx.id))) {
|
||||||
|
txsCache.set(tx.id, { lastReqTime: Date.now() })
|
||||||
|
}
|
||||||
|
|
||||||
|
// return last available response
|
||||||
|
if (!shouldGetStatus(tx)) {
|
||||||
|
return Promise.resolve(txsCache.get(tx.id).res)
|
||||||
|
}
|
||||||
|
|
||||||
|
return base.getStatus(account, tx, requested, settings, operatorId)
|
||||||
|
.then(res => {
|
||||||
|
if (res.status === 'confirmed') {
|
||||||
|
txsCache.del(tx.id) // Transaction reached final status, can trim it from the caching obj
|
||||||
|
} else {
|
||||||
|
txsCache.set(tx.id, { lastReqTime: Date.now(), res })
|
||||||
|
txsCache.ttl(tx.id, T.hour / 1000)
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = _.merge(base, { NAME, run, getStatus, fetchSpeed: BALANCE_FETCH_SPEED_MULTIPLIER.SLOW })
|
||||||
|
|
|
||||||
|
|
@ -21,8 +21,6 @@ const processBatches = require('./tx-batching-processing')
|
||||||
|
|
||||||
const INCOMING_TX_INTERVAL = 30 * T.seconds
|
const INCOMING_TX_INTERVAL = 30 * T.seconds
|
||||||
const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds
|
const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds
|
||||||
const INCOMING_TX_INTERVAL_FILTER = 1 * T.minute
|
|
||||||
const LIVE_INCOMING_TX_INTERVAL_FILTER = 10 * T.seconds
|
|
||||||
const UNNOTIFIED_INTERVAL = 10 * T.seconds
|
const UNNOTIFIED_INTERVAL = 10 * T.seconds
|
||||||
const SWEEP_HD_INTERVAL = 5 * T.minute
|
const SWEEP_HD_INTERVAL = 5 * T.minute
|
||||||
const TRADE_INTERVAL = 60 * T.seconds
|
const TRADE_INTERVAL = 60 * T.seconds
|
||||||
|
|
@ -61,7 +59,6 @@ const QUEUE = {
|
||||||
SLOW: SLOW_QUEUE
|
SLOW: SLOW_QUEUE
|
||||||
}
|
}
|
||||||
|
|
||||||
const coinFilter = ['ETH']
|
|
||||||
const schemaCallbacks = new Map()
|
const schemaCallbacks = new Map()
|
||||||
|
|
||||||
const cachedVariables = new NodeCache({
|
const cachedVariables = new NodeCache({
|
||||||
|
|
@ -168,12 +165,8 @@ function doPolling (schema) {
|
||||||
pi().executeTrades()
|
pi().executeTrades()
|
||||||
pi().pong()
|
pi().pong()
|
||||||
pi().clearOldLogs()
|
pi().clearOldLogs()
|
||||||
cashOutTx.monitorLiveIncoming(settings(), false, coinFilter)
|
cashOutTx.monitorLiveIncoming(settings())
|
||||||
cashOutTx.monitorStaleIncoming(settings(), false, coinFilter)
|
cashOutTx.monitorStaleIncoming(settings())
|
||||||
if (!_.isEmpty(coinFilter)) {
|
|
||||||
cashOutTx.monitorLiveIncoming(settings(), true, coinFilter)
|
|
||||||
cashOutTx.monitorStaleIncoming(settings(), true, coinFilter)
|
|
||||||
}
|
|
||||||
cashOutTx.monitorUnnotified(settings())
|
cashOutTx.monitorUnnotified(settings())
|
||||||
pi().sweepHd()
|
pi().sweepHd()
|
||||||
notifier.checkNotification(pi())
|
notifier.checkNotification(pi())
|
||||||
|
|
@ -181,12 +174,8 @@ function doPolling (schema) {
|
||||||
|
|
||||||
addToQueue(pi().getRawRates, TICKER_RATES_INTERVAL, schema, QUEUE.FAST)
|
addToQueue(pi().getRawRates, TICKER_RATES_INTERVAL, schema, QUEUE.FAST)
|
||||||
addToQueue(pi().executeTrades, TRADE_INTERVAL, schema, QUEUE.FAST)
|
addToQueue(pi().executeTrades, TRADE_INTERVAL, schema, QUEUE.FAST)
|
||||||
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter)
|
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings)
|
||||||
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter)
|
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings)
|
||||||
if (!_.isEmpty(coinFilter)) {
|
|
||||||
addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter)
|
|
||||||
addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter)
|
|
||||||
}
|
|
||||||
addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings)
|
addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings)
|
||||||
addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings)
|
addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings)
|
||||||
addToQueue(processBatches, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE)
|
addToQueue(processBatches, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE)
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ const httpError = require('./route-helpers').httpError
|
||||||
const logger = require('./logger')
|
const logger = require('./logger')
|
||||||
const { getOpenBatchCryptoValue } = require('./tx-batching')
|
const { getOpenBatchCryptoValue } = require('./tx-batching')
|
||||||
const BN = require('./bn')
|
const BN = require('./bn')
|
||||||
|
const { BALANCE_FETCH_SPEED_MULTIPLIER } = require('./constants')
|
||||||
|
|
||||||
const FETCH_INTERVAL = 5000
|
const FETCH_INTERVAL = 5000
|
||||||
const INSUFFICIENT_FUNDS_CODE = 570
|
const INSUFFICIENT_FUNDS_CODE = 570
|
||||||
|
|
@ -255,20 +256,28 @@ function checkBlockchainStatus (settings, cryptoCode) {
|
||||||
.then(({ checkBlockchainStatus }) => checkBlockchainStatus(cryptoCode))
|
.then(({ checkBlockchainStatus }) => checkBlockchainStatus(cryptoCode))
|
||||||
}
|
}
|
||||||
|
|
||||||
const coinFilter = ['ETH']
|
|
||||||
|
|
||||||
const balance = (settings, cryptoCode) => {
|
const balance = (settings, cryptoCode) => {
|
||||||
if (_.includes(coinFilter, cryptoCode)) return balanceFiltered(settings, cryptoCode)
|
return fetchWallet(settings, cryptoCode)
|
||||||
return balanceUnfiltered(settings, cryptoCode)
|
.then(r => r.wallet.fetchSpeed ?? BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL)
|
||||||
|
.then(multiplier => {
|
||||||
|
switch (multiplier) {
|
||||||
|
case BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL:
|
||||||
|
return balanceNormal(settings, cryptoCode)
|
||||||
|
case BALANCE_FETCH_SPEED_MULTIPLIER.SLOW:
|
||||||
|
return balanceSlow(settings, cryptoCode)
|
||||||
|
default:
|
||||||
|
throw new Error()
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
const balanceUnfiltered = mem(_balance, {
|
const balanceNormal = mem(_balance, {
|
||||||
maxAge: FETCH_INTERVAL,
|
maxAge: BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL * FETCH_INTERVAL,
|
||||||
cacheKey: (settings, cryptoCode) => cryptoCode
|
cacheKey: (settings, cryptoCode) => cryptoCode
|
||||||
})
|
})
|
||||||
|
|
||||||
const balanceFiltered = mem(_balance, {
|
const balanceSlow = mem(_balance, {
|
||||||
maxAge: 3 * FETCH_INTERVAL,
|
maxAge: BALANCE_FETCH_SPEED_MULTIPLIER.SLOW * FETCH_INTERVAL,
|
||||||
cacheKey: (settings, cryptoCode) => cryptoCode
|
cacheKey: (settings, cryptoCode) => cryptoCode
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue