feat: add coin filtering to cashout polling
This commit is contained in:
parent
e843020215
commit
9808a67945
2 changed files with 33 additions and 13 deletions
|
|
@ -24,6 +24,7 @@ module.exports = {
|
||||||
|
|
||||||
const STALE_INCOMING_TX_AGE = T.day
|
const STALE_INCOMING_TX_AGE = T.day
|
||||||
const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes
|
const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes
|
||||||
|
const STALE_LIVE_INCOMING_TX_AGE_FILTER = 5 * T.minutes
|
||||||
const MAX_NOTIFY_AGE = T.day
|
const MAX_NOTIFY_AGE = T.day
|
||||||
const MIN_NOTIFY_AGE = 5 * T.minutes
|
const MIN_NOTIFY_AGE = 5 * T.minutes
|
||||||
const INSUFFICIENT_FUNDS_CODE = 570
|
const INSUFFICIENT_FUNDS_CODE = 570
|
||||||
|
|
@ -95,16 +96,21 @@ function postProcess (txVector, justAuthorized, pi) {
|
||||||
return Promise.resolve({})
|
return Promise.resolve({})
|
||||||
}
|
}
|
||||||
|
|
||||||
function fetchOpenTxs (statuses, fromAge, toAge) {
|
function fetchOpenTxs (statuses, fromAge, toAge, applyFilter, coinFilter) {
|
||||||
|
const notClause = applyFilter ? '' : 'not'
|
||||||
const sql = `select *
|
const sql = `select *
|
||||||
from cash_out_txs
|
from cash_out_txs
|
||||||
where ((extract(epoch from (now() - created))) * 1000)>$1
|
where ((extract(epoch from (now() - created))) * 1000)>$1
|
||||||
and ((extract(epoch from (now() - created))) * 1000)<$2
|
and ((extract(epoch from (now() - created))) * 1000)<$2
|
||||||
and status in ($3^)`
|
${_.isEmpty(coinFilter)
|
||||||
|
? ``
|
||||||
|
: `and crypto_code ${notClause} in ($3^)`}
|
||||||
|
and status in ($4^)`
|
||||||
|
|
||||||
|
const coinClause = _.map(pgp.as.text, coinFilter).join(',')
|
||||||
const statusClause = _.map(pgp.as.text, statuses).join(',')
|
const statusClause = _.map(pgp.as.text, statuses).join(',')
|
||||||
|
|
||||||
return db.any(sql, [fromAge, toAge, statusClause])
|
return db.any(sql, [fromAge, toAge, coinClause, statusClause])
|
||||||
.then(rows => rows.map(toObj))
|
.then(rows => rows.map(toObj))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -116,20 +122,22 @@ function processTxStatus (tx, settings) {
|
||||||
.then(_tx => selfPost(_tx, pi))
|
.then(_tx => selfPost(_tx, pi))
|
||||||
}
|
}
|
||||||
|
|
||||||
function monitorLiveIncoming (settings) {
|
function monitorLiveIncoming (settings, applyFilter, coinFilter) {
|
||||||
const statuses = ['notSeen', 'published', 'insufficientFunds']
|
const statuses = ['notSeen', 'published', 'insufficientFunds']
|
||||||
|
const toAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE
|
||||||
|
|
||||||
return monitorIncoming(settings, statuses, 0, STALE_LIVE_INCOMING_TX_AGE)
|
return monitorIncoming(settings, statuses, 0, toAge, applyFilter, coinFilter)
|
||||||
}
|
}
|
||||||
|
|
||||||
function monitorStaleIncoming (settings) {
|
function monitorStaleIncoming (settings, applyFilter, coinFilter) {
|
||||||
const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds']
|
const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds']
|
||||||
|
const fromAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE
|
||||||
|
|
||||||
return monitorIncoming(settings, statuses, STALE_LIVE_INCOMING_TX_AGE, STALE_INCOMING_TX_AGE)
|
return monitorIncoming(settings, statuses, fromAge, STALE_INCOMING_TX_AGE, applyFilter, coinFilter)
|
||||||
}
|
}
|
||||||
|
|
||||||
function monitorIncoming (settings, statuses, fromAge, toAge) {
|
function monitorIncoming (settings, statuses, fromAge, toAge, applyFilter, coinFilter) {
|
||||||
return fetchOpenTxs(statuses, fromAge, toAge)
|
return fetchOpenTxs(statuses, fromAge, toAge, applyFilter, coinFilter)
|
||||||
.then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings)))
|
.then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings)))
|
||||||
.catch(err => {
|
.catch(err => {
|
||||||
if (err.code === dbErrorCodes.SERIALIZATION_FAILURE) {
|
if (err.code === dbErrorCodes.SERIALIZATION_FAILURE) {
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,8 @@ const complianceTriggers = require('./compliance-triggers')
|
||||||
|
|
||||||
const INCOMING_TX_INTERVAL = 30 * T.seconds
|
const INCOMING_TX_INTERVAL = 30 * T.seconds
|
||||||
const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds
|
const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds
|
||||||
|
const INCOMING_TX_INTERVAL_FILTER = 1 * T.minute
|
||||||
|
const LIVE_INCOMING_TX_INTERVAL_FILTER = 10 * T.seconds
|
||||||
const UNNOTIFIED_INTERVAL = 10 * T.seconds
|
const UNNOTIFIED_INTERVAL = 10 * T.seconds
|
||||||
const SWEEP_HD_INTERVAL = T.minute
|
const SWEEP_HD_INTERVAL = T.minute
|
||||||
const TRADE_INTERVAL = 60 * T.seconds
|
const TRADE_INTERVAL = 60 * T.seconds
|
||||||
|
|
@ -27,6 +29,8 @@ const CHECK_NOTIFICATION_INTERVAL = 20 * T.seconds
|
||||||
|
|
||||||
const PENDING_INTERVAL = 10 * T.seconds
|
const PENDING_INTERVAL = 10 * T.seconds
|
||||||
|
|
||||||
|
const coinFilter = []
|
||||||
|
|
||||||
let _pi, _settings
|
let _pi, _settings
|
||||||
|
|
||||||
function reload (__settings) {
|
function reload (__settings) {
|
||||||
|
|
@ -71,16 +75,24 @@ function start (__settings) {
|
||||||
pi().executeTrades()
|
pi().executeTrades()
|
||||||
pi().pong()
|
pi().pong()
|
||||||
pi().clearOldLogs()
|
pi().clearOldLogs()
|
||||||
cashOutTx.monitorLiveIncoming(settings())
|
cashOutTx.monitorLiveIncoming(settings(), false, coinFilter)
|
||||||
cashOutTx.monitorStaleIncoming(settings())
|
cashOutTx.monitorStaleIncoming(settings(), false, coinFilter)
|
||||||
|
if (!_.isEmpty(coinFilter)) {
|
||||||
|
cashOutTx.monitorLiveIncoming(settings(), true, coinFilter)
|
||||||
|
cashOutTx.monitorStaleIncoming(settings(), true, coinFilter)
|
||||||
|
}
|
||||||
cashOutTx.monitorUnnotified(settings())
|
cashOutTx.monitorUnnotified(settings())
|
||||||
pi().sweepHd()
|
pi().sweepHd()
|
||||||
notifier.checkNotification(pi())
|
notifier.checkNotification(pi())
|
||||||
updateCoinAtmRadar()
|
updateCoinAtmRadar()
|
||||||
|
|
||||||
setInterval(() => pi().executeTrades(), TRADE_INTERVAL)
|
setInterval(() => pi().executeTrades(), TRADE_INTERVAL)
|
||||||
setInterval(() => cashOutTx.monitorLiveIncoming(settings()), LIVE_INCOMING_TX_INTERVAL)
|
setInterval(() => cashOutTx.monitorLiveIncoming(settings(), false, coinFilter), LIVE_INCOMING_TX_INTERVAL)
|
||||||
setInterval(() => cashOutTx.monitorStaleIncoming(settings()), INCOMING_TX_INTERVAL)
|
setInterval(() => cashOutTx.monitorStaleIncoming(settings(), false, coinFilter), INCOMING_TX_INTERVAL)
|
||||||
|
if (!_.isEmpty(coinFilter)) {
|
||||||
|
setInterval(() => cashOutTx.monitorLiveIncoming(settings(), true, coinFilter), LIVE_INCOMING_TX_INTERVAL_FILTER)
|
||||||
|
setInterval(() => cashOutTx.monitorStaleIncoming(settings(), true, coinFilter), INCOMING_TX_INTERVAL_FILTER)
|
||||||
|
}
|
||||||
setInterval(() => cashOutTx.monitorUnnotified(settings()), UNNOTIFIED_INTERVAL)
|
setInterval(() => cashOutTx.monitorUnnotified(settings()), UNNOTIFIED_INTERVAL)
|
||||||
setInterval(() => cashInTx.monitorPending(settings()), PENDING_INTERVAL)
|
setInterval(() => cashInTx.monitorPending(settings()), PENDING_INTERVAL)
|
||||||
setInterval(() => pi().sweepHd(), SWEEP_HD_INTERVAL)
|
setInterval(() => pi().sweepHd(), SWEEP_HD_INTERVAL)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue