diff --git a/lib/cash-out/cash-out-tx.js b/lib/cash-out/cash-out-tx.js index 4abf18ea..99f9604b 100644 --- a/lib/cash-out/cash-out-tx.js +++ b/lib/cash-out/cash-out-tx.js @@ -25,7 +25,6 @@ module.exports = { const STALE_INCOMING_TX_AGE = T.day const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes -const STALE_LIVE_INCOMING_TX_AGE_FILTER = 5 * T.minutes const MAX_NOTIFY_AGE = T.day const MIN_NOTIFY_AGE = 5 * T.minutes const INSUFFICIENT_FUNDS_CODE = 570 @@ -91,21 +90,17 @@ function postProcess (txVector, justAuthorized, pi) { return Promise.resolve({}) } -function fetchOpenTxs (statuses, fromAge, toAge, applyFilter, coinFilter) { - const notClause = applyFilter ? '' : 'not' +function fetchOpenTxs (statuses, fromAge, toAge) { const sql = `select * from cash_out_txs where ((extract(epoch from (now() - created))) * 1000)>$1 and ((extract(epoch from (now() - created))) * 1000)<$2 - ${_.isEmpty(coinFilter) - ? `` - : `and crypto_code ${notClause} in ($3^)`} - and status in ($4^)` + and status in ($3^) + and error is distinct from 'Operator cancel'` - const coinClause = _.map(pgp.as.text, coinFilter).join(',') const statusClause = _.map(pgp.as.text, statuses).join(',') - return db.any(sql, [fromAge, toAge, coinClause, statusClause]) + return db.any(sql, [fromAge, toAge, statusClause]) .then(rows => rows.map(toObj)) } @@ -164,22 +159,18 @@ function getWalletScore (tx, pi) { return tx } -function monitorLiveIncoming (settings, applyFilter, coinFilter) { +function monitorLiveIncoming (settings) { const statuses = ['notSeen', 'published', 'insufficientFunds'] - const toAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE - - return monitorIncoming(settings, statuses, 0, toAge, applyFilter, coinFilter) + return monitorIncoming(settings, statuses, 0, STALE_LIVE_INCOMING_TX_AGE) } -function monitorStaleIncoming (settings, applyFilter, coinFilter) { +function monitorStaleIncoming (settings) { const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] - const fromAge = applyFilter ? STALE_LIVE_INCOMING_TX_AGE_FILTER : STALE_LIVE_INCOMING_TX_AGE - - return monitorIncoming(settings, statuses, fromAge, STALE_INCOMING_TX_AGE, applyFilter, coinFilter) + return monitorIncoming(settings, statuses, STALE_LIVE_INCOMING_TX_AGE, STALE_INCOMING_TX_AGE) } -function monitorIncoming (settings, statuses, fromAge, toAge, applyFilter, coinFilter) { - return fetchOpenTxs(statuses, fromAge, toAge, applyFilter, coinFilter) +function monitorIncoming (settings, statuses, fromAge, toAge) { + return fetchOpenTxs(statuses, fromAge, toAge) .then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings))) .catch(err => { if (err.code === dbErrorCodes.SERIALIZATION_FAILURE) { diff --git a/lib/constants.js b/lib/constants.js index c4bc2fbc..c6ed2021 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -24,6 +24,11 @@ const RECEIPT = 'sms_receipt' const WALLET_SCORE_THRESHOLD = 9 +const BALANCE_FETCH_SPEED_MULTIPLIER = { + NORMAL: 1, + SLOW: 3 +} + module.exports = { anonymousCustomer, CASSETTE_MAX_CAPACITY, @@ -39,5 +44,6 @@ module.exports = { CASH_OUT_MINIMUM_AMOUNT_OF_CASSETTES, CASH_OUT_MAXIMUM_AMOUNT_OF_CASSETTES, WALLET_SCORE_THRESHOLD, - RECEIPT + RECEIPT, + BALANCE_FETCH_SPEED_MULTIPLIER } diff --git a/lib/new-admin/filters.js b/lib/new-admin/filters.js index d6a37922..d9b6c32d 100644 --- a/lib/new-admin/filters.js +++ b/lib/new-admin/filters.js @@ -21,7 +21,8 @@ function transaction () { SELECT 'address' AS type, to_address AS value FROM cash_in_txs UNION SELECT 'address' AS type, to_address AS value FROM cash_out_txs UNION SELECT 'status' AS type, ${cashInTx.TRANSACTION_STATES} AS value FROM cash_in_txs UNION - SELECT 'status' AS type, ${CASH_OUT_TRANSACTION_STATES} AS value FROM cash_out_txs + SELECT 'status' AS type, ${CASH_OUT_TRANSACTION_STATES} AS value FROM cash_out_txs UNION + SELECT 'sweep status' AS type, CASE WHEN swept THEN 'Swept' WHEN NOT swept THEN 'Unswept' END AS value FROM cash_out_txs ) f` return db.any(sql) diff --git a/lib/new-admin/graphql/resolvers/transaction.resolver.js b/lib/new-admin/graphql/resolvers/transaction.resolver.js index e41c17f9..f92a8b69 100644 --- a/lib/new-admin/graphql/resolvers/transaction.resolver.js +++ b/lib/new-admin/graphql/resolvers/transaction.resolver.js @@ -19,11 +19,11 @@ const resolvers = { isAnonymous: parent => (parent.customerId === anonymous.uuid) }, Query: { - transactions: (...[, { from, until, limit, offset, deviceId, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status, excludeTestingCustomers }]) => - transactions.batch(from, until, limit, offset, deviceId, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status, excludeTestingCustomers), - transactionsCsv: (...[, { from, until, limit, offset, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status, timezone, excludeTestingCustomers, simplified }]) => - transactions.batch(from, until, limit, offset, null, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status, excludeTestingCustomers, simplified) - .then(data => parseAsync(logDateFormat(timezone, data, ['created', 'sendTime']))), + transactions: (...[, { from, until, limit, offset, deviceId, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status, swept, excludeTestingCustomers }]) => + transactions.batch(from, until, limit, offset, deviceId, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status, swept, excludeTestingCustomers), + transactionsCsv: (...[, { from, until, limit, offset, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status, swept, timezone, excludeTestingCustomers, simplified }]) => + transactions.batch(from, until, limit, offset, null, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status, swept, excludeTestingCustomers, simplified) + .then(data => parseAsync(logDateFormat(timezone, data, ['created', 'sendTime', 'publishedAt']))), transactionCsv: (...[, { id, txClass, timezone }]) => transactions.getTx(id, txClass).then(data => parseAsync(logDateFormat(timezone, [data], ['created', 'sendTime'])) diff --git a/lib/new-admin/graphql/types/transaction.type.js b/lib/new-admin/graphql/types/transaction.type.js index a7385eda..2d95d751 100644 --- a/lib/new-admin/graphql/types/transaction.type.js +++ b/lib/new-admin/graphql/types/transaction.type.js @@ -50,6 +50,7 @@ const typeDef = gql` batchError: String walletScore: Int profit: String + swept: Boolean } type Filter { @@ -58,8 +59,8 @@ const typeDef = gql` } type Query { - transactions(from: Date, until: Date, limit: Int, offset: Int, deviceId: ID, txClass: String, machineName: String, customerName: String, fiatCode: String, cryptoCode: String, toAddress: String, status: String, excludeTestingCustomers: Boolean): [Transaction] @auth - transactionsCsv(from: Date, until: Date, limit: Int, offset: Int, txClass: String, machineName: String, customerName: String, fiatCode: String, cryptoCode: String, toAddress: String, status: String, timezone: String, excludeTestingCustomers: Boolean, simplified: Boolean): String @auth + transactions(from: Date, until: Date, limit: Int, offset: Int, deviceId: ID, txClass: String, machineName: String, customerName: String, fiatCode: String, cryptoCode: String, toAddress: String, status: String, swept: Boolean, excludeTestingCustomers: Boolean): [Transaction] @auth + transactionsCsv(from: Date, until: Date, limit: Int, offset: Int, txClass: String, machineName: String, customerName: String, fiatCode: String, cryptoCode: String, toAddress: String, status: String, swept: Boolean, timezone: String, excludeTestingCustomers: Boolean, simplified: Boolean): String @auth transactionCsv(id: ID, txClass: String, timezone: String): String @auth txAssociatedDataCsv(id: ID, txClass: String, timezone: String): String @auth transactionFilters: [Filter] @auth diff --git a/lib/new-admin/services/transactions.js b/lib/new-admin/services/transactions.js index 0f111937..3dc2679a 100644 --- a/lib/new-admin/services/transactions.js +++ b/lib/new-admin/services/transactions.js @@ -46,6 +46,7 @@ function batch ( cryptoCode = null, toAddress = null, status = null, + swept = null, excludeTestingCustomers = false, simplified ) { @@ -109,14 +110,33 @@ function batch ( AND ($11 is null or txs.crypto_code = $11) AND ($12 is null or txs.to_address = $12) AND ($13 is null or txs.txStatus = $13) + AND ($14 is null or txs.swept = $14) ${excludeTestingCustomers ? `AND c.is_test_customer is false` : ``} AND (fiat > 0) ORDER BY created DESC limit $4 offset $5` - return Promise.all([ - db.any(cashInSql, [cashInTx.PENDING_INTERVAL, from, until, limit, offset, id, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status]), - db.any(cashOutSql, [REDEEMABLE_AGE, from, until, limit, offset, id, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status]) - ]) + // The swept filter is cash-out only, so omit the cash-in query entirely + const hasCashInOnlyFilters = false + const hasCashOutOnlyFilters = !_.isNil(swept) + + let promises + + if (hasCashInOnlyFilters && hasCashOutOnlyFilters) { + throw new Error('Trying to filter transactions with mutually exclusive filters') + } + + if (hasCashInOnlyFilters) { + promises = [db.any(cashInSql, [cashInTx.PENDING_INTERVAL, from, until, limit, offset, id, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status])] + } else if (hasCashOutOnlyFilters) { + promises = [db.any(cashOutSql, [REDEEMABLE_AGE, from, until, limit, offset, id, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status, swept])] + } else { + promises = [ + db.any(cashInSql, [cashInTx.PENDING_INTERVAL, from, until, limit, offset, id, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status]), + db.any(cashOutSql, [REDEEMABLE_AGE, from, until, limit, offset, id, txClass, machineName, customerName, fiatCode, cryptoCode, toAddress, status, swept]) + ] + } + + return Promise.all(promises) .then(packager) .then(res => { if (simplified) return simplifiedBatch(res) diff --git a/lib/plugins.js b/lib/plugins.js index 9f6688cd..dc3e324b 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -806,8 +806,8 @@ function plugins (settings, deviceId) { } function sweepHd () { - const sql = `select id, crypto_code, hd_index from cash_out_txs - where hd_index is not null and not swept and status in ('confirmed', 'instant')` + const sql = `SELECT id, crypto_code, hd_index FROM cash_out_txs + WHERE hd_index IS NOT NULL AND NOT swept AND status IN ('confirmed', 'instant') AND created > now() - interval '1 week'` return db.any(sql) .then(rows => Promise.all(rows.map(sweepHdRow))) diff --git a/lib/plugins/wallet/geth/base.js b/lib/plugins/wallet/geth/base.js index bdd7ac55..e10ca6a1 100644 --- a/lib/plugins/wallet/geth/base.js +++ b/lib/plugins/wallet/geth/base.js @@ -9,6 +9,8 @@ const { default: Common, Chain, Hardfork } = require('@ethereumjs/common') const Tx = require('ethereumjs-tx') const util = require('ethereumjs-util') const coins = require('@lamassu/coins') +const { default: PQueue } = require('p-queue') + const _pify = require('pify') const BN = require('../../../bn') const ABI = require('../../tokens') @@ -48,6 +50,11 @@ const logInfuraCall = call => { logger.info(`Calling web3 method ${call} via Infura. Current count for this session: ${JSON.stringify(infuraCalls)}`) } +const SWEEP_QUEUE = new PQueue({ + concurrency: 3, + interval: 250, +}) + function connect (url) { web3.setProvider(new web3.providers.HttpProvider(url)) } @@ -236,13 +243,14 @@ function sweep (account, cryptoCode, hdIndex, settings, operatorId) { const wallet = paymentHdNode(account).deriveChild(hdIndex).getWallet() const fromAddress = wallet.getChecksumAddressString() - return confirmedBalance(fromAddress, cryptoCode) + return SWEEP_QUEUE.add(() => confirmedBalance(fromAddress, cryptoCode) .then(r => { if (r.eq(0)) return return generateTx(defaultAddress(account), wallet, r, true, cryptoCode) .then(signedTx => pify(web3.eth.sendSignedTransaction)(signedTx)) }) + ) } function newAddress (account, info, tx, settings, operatorId) { diff --git a/lib/plugins/wallet/infura/infura.js b/lib/plugins/wallet/infura/infura.js index 47d4e35c..0b5d702b 100644 --- a/lib/plugins/wallet/infura/infura.js +++ b/lib/plugins/wallet/infura/infura.js @@ -1,5 +1,10 @@ const _ = require('lodash/fp') +const NodeCache = require('node-cache') const base = require('../geth/base') +const T = require('../../../time') +const { BALANCE_FETCH_SPEED_MULTIPLIER } = require('../../../constants') + +const REGULAR_TX_POLLING = 5 * T.seconds const NAME = 'infura' @@ -12,4 +17,54 @@ function run (account) { base.connect(endpoint) } -module.exports = _.merge(base, { NAME, run }) +const txsCache = new NodeCache({ + stdTTL: T.hour / 1000, + checkperiod: T.minute / 1000, + deleteOnExpire: true +}) + +function shouldGetStatus (tx) { + const timePassedSinceTx = Date.now() - new Date(tx.created) + const timePassedSinceReq = Date.now() - new Date(txsCache.get(tx.id).lastReqTime) + + // Allow for infura to gradually lower the amount of requests based on the time passed since the transaction + // Until first 5 minutes - 1/2 regular polling speed + // Until first 10 minutes - 1/4 regular polling speed + // Until first hour - 1/8 polling speed + // Until first two hours - 1/12 polling speed + // Until first four hours - 1/16 polling speed + // Until first day - 1/24 polling speed + // After first day - 1/32 polling speed + if (timePassedSinceTx < 5 * T.minutes) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 2 * REGULAR_TX_POLLING + if (timePassedSinceTx < 10 * T.minutes) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 4 * REGULAR_TX_POLLING + if (timePassedSinceTx < 1 * T.hour) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 8 * REGULAR_TX_POLLING + if (timePassedSinceTx < 2 * T.hours) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 12 * REGULAR_TX_POLLING + if (timePassedSinceTx < 4 * T.hours) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 16 * REGULAR_TX_POLLING + if (timePassedSinceTx < 1 * T.day) return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 24 * REGULAR_TX_POLLING + return _.isNil(txsCache.get(tx.id).res) || timePassedSinceReq > 32 * REGULAR_TX_POLLING +} + +// Override geth's getStatus function to allow for different polling timing +function getStatus (account, tx, requested, settings, operatorId) { + if (_.isNil(txsCache.get(tx.id))) { + txsCache.set(tx.id, { lastReqTime: Date.now() }) + } + + // return last available response + if (!shouldGetStatus(tx)) { + return Promise.resolve(txsCache.get(tx.id).res) + } + + return base.getStatus(account, tx, requested, settings, operatorId) + .then(res => { + if (res.status === 'confirmed') { + txsCache.del(tx.id) // Transaction reached final status, can trim it from the caching obj + } else { + txsCache.set(tx.id, { lastReqTime: Date.now(), res }) + txsCache.ttl(tx.id, T.hour / 1000) + } + return res + }) +} + +module.exports = _.merge(base, { NAME, run, getStatus, fetchSpeed: BALANCE_FETCH_SPEED_MULTIPLIER.SLOW }) diff --git a/lib/poller.js b/lib/poller.js index 99ea5543..c1fbec00 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -21,10 +21,8 @@ const processBatches = require('./tx-batching-processing') const INCOMING_TX_INTERVAL = 30 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds -const INCOMING_TX_INTERVAL_FILTER = 1 * T.minute -const LIVE_INCOMING_TX_INTERVAL_FILTER = 10 * T.seconds const UNNOTIFIED_INTERVAL = 10 * T.seconds -const SWEEP_HD_INTERVAL = T.minute +const SWEEP_HD_INTERVAL = 5 * T.minute const TRADE_INTERVAL = 60 * T.seconds const PONG_INTERVAL = 10 * T.seconds const LOGS_CLEAR_INTERVAL = 1 * T.day @@ -60,7 +58,6 @@ const QUEUE = { SLOW: SLOW_QUEUE } -const coinFilter = ['ETH'] const schemaCallbacks = new Map() const cachedVariables = new NodeCache({ @@ -167,24 +164,16 @@ function doPolling (schema) { pi().executeTrades() pi().pong() pi().clearOldLogs() - cashOutTx.monitorLiveIncoming(settings(), false, coinFilter) - cashOutTx.monitorStaleIncoming(settings(), false, coinFilter) - if (!_.isEmpty(coinFilter)) { - cashOutTx.monitorLiveIncoming(settings(), true, coinFilter) - cashOutTx.monitorStaleIncoming(settings(), true, coinFilter) - } + cashOutTx.monitorLiveIncoming(settings()) + cashOutTx.monitorStaleIncoming(settings()) cashOutTx.monitorUnnotified(settings()) pi().sweepHd() notifier.checkNotification(pi()) updateCoinAtmRadar() addToQueue(pi().executeTrades, TRADE_INTERVAL, schema, QUEUE.FAST) - addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter) - addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings, false, coinFilter) - if (!_.isEmpty(coinFilter)) { - addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter) - addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL_FILTER, schema, QUEUE.FAST, settings, true, coinFilter) - } + addToQueue(cashOutTx.monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings) + addToQueue(cashOutTx.monitorStaleIncoming, INCOMING_TX_INTERVAL, schema, QUEUE.FAST, settings) addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings) addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings) addToQueue(processBatches, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE) diff --git a/lib/wallet.js b/lib/wallet.js index 1073a8a0..6186cded 100644 --- a/lib/wallet.js +++ b/lib/wallet.js @@ -14,6 +14,7 @@ const httpError = require('./route-helpers').httpError const logger = require('./logger') const { getOpenBatchCryptoValue } = require('./tx-batching') const BN = require('./bn') +const { BALANCE_FETCH_SPEED_MULTIPLIER } = require('./constants') const FETCH_INTERVAL = 5000 const INSUFFICIENT_FUNDS_CODE = 570 @@ -254,20 +255,28 @@ function checkBlockchainStatus (settings, cryptoCode) { .then(({ checkBlockchainStatus }) => checkBlockchainStatus(cryptoCode)) } -const coinFilter = ['ETH'] - const balance = (settings, cryptoCode) => { - if (_.includes(coinFilter, cryptoCode)) return balanceFiltered(settings, cryptoCode) - return balanceUnfiltered(settings, cryptoCode) + return fetchWallet(settings, cryptoCode) + .then(r => r.wallet.fetchSpeed ?? BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL) + .then(multiplier => { + switch (multiplier) { + case BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL: + return balanceNormal(settings, cryptoCode) + case BALANCE_FETCH_SPEED_MULTIPLIER.SLOW: + return balanceSlow(settings, cryptoCode) + default: + throw new Error() + } + }) } -const balanceUnfiltered = mem(_balance, { - maxAge: FETCH_INTERVAL, +const balanceNormal = mem(_balance, { + maxAge: BALANCE_FETCH_SPEED_MULTIPLIER.NORMAL * FETCH_INTERVAL, cacheKey: (settings, cryptoCode) => cryptoCode }) -const balanceFiltered = mem(_balance, { - maxAge: 3 * FETCH_INTERVAL, +const balanceSlow = mem(_balance, { + maxAge: BALANCE_FETCH_SPEED_MULTIPLIER.SLOW * FETCH_INTERVAL, cacheKey: (settings, cryptoCode) => cryptoCode }) diff --git a/new-lamassu-admin/src/components/SearchFilter.js b/new-lamassu-admin/src/components/SearchFilter.js index 0f5abba1..adb16647 100644 --- a/new-lamassu-admin/src/components/SearchFilter.js +++ b/new-lamassu-admin/src/components/SearchFilter.js @@ -7,7 +7,7 @@ import { P, Label3 } from 'src/components/typography' import { ReactComponent as CloseIcon } from 'src/styling/icons/action/close/zodiac.svg' import { ReactComponent as FilterIcon } from 'src/styling/icons/button/filter/white.svg' import { ReactComponent as ReverseFilterIcon } from 'src/styling/icons/button/filter/zodiac.svg' -import { onlyFirstToUpper } from 'src/utils/string' +import { onlyFirstToUpper, singularOrPlural } from 'src/utils/string' import { chipStyles, styles } from './SearchFilter.styles' @@ -18,7 +18,7 @@ const SearchFilter = ({ filters, onFilterDelete, deleteAllFilters, - entries + entries = 0 }) => { const chipClasses = useChipStyles() const classes = useStyles() @@ -40,8 +40,11 @@ const SearchFilter = ({