From 008dd3e9f61d4dfcb7b03e7ebb15038c7e624395 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Salgado?= Date: Wed, 24 Nov 2021 19:09:15 +0000 Subject: [PATCH] feat: add crypto reservation for batched transaction --- lib/poller.js | 4 ++-- lib/tx-batching-processing.js | 29 +++++++++++++++++++++++++++++ lib/tx-batching.js | 34 +++++++++++++--------------------- lib/wallet.js | 6 +++++- 4 files changed, 49 insertions(+), 24 deletions(-) create mode 100644 lib/tx-batching-processing.js diff --git a/lib/poller.js b/lib/poller.js index 1c05bef1..25011b17 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -17,7 +17,7 @@ const NodeCache = require('node-cache') const util = require('util') const db = require('./db') const state = require('./middlewares/state') -const batching = require('./tx-batching') +const processBatches = require('./tx-batching-processing') const INCOMING_TX_INTERVAL = 30 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds @@ -216,7 +216,7 @@ function doPolling (schema) { } addToQueue(cashOutTx.monitorUnnotified, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings) addToQueue(cashInTx.monitorPending, PENDING_INTERVAL, schema, QUEUE.FAST, settings) - addToQueue(batching.processBatches, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE) + addToQueue(processBatches, UNNOTIFIED_INTERVAL, schema, QUEUE.FAST, settings, TRANSACTION_BATCH_LIFECYCLE) addToQueue(pi().sweepHd, SWEEP_HD_INTERVAL, schema, QUEUE.FAST, settings) addToQueue(pi().pong, PONG_INTERVAL, schema, QUEUE.FAST) addToQueue(pi().clearOldLogs, LOGS_CLEAR_INTERVAL, schema, QUEUE.SLOW) diff --git a/lib/tx-batching-processing.js b/lib/tx-batching-processing.js new file mode 100644 index 00000000..1e7045d1 --- /dev/null +++ b/lib/tx-batching-processing.js @@ -0,0 +1,29 @@ +const _ = require('lodash/fp') + +const txBatching = require('./tx-batching') +const wallet = require('./wallet') + +function submitBatch (settings, batch) { + txBatching.getBatchTransactions(batch) + .then(txs => { + wallet.sendCoinsBatch(settings, txs, batch.crypto_code) + .then(res => txBatching.confirmSentBatch(batch, res)) + .catch(err => txBatching.setErroredBatch(batch, err.message)) + }) +} + +function processBatches (settings, lifecycle) { + return txBatching.getBatchesByStatus(['open']) + .then(batches => { + _.each(batch => { + const elapsedMS = batch.time_elapsed * 1000 + + if (elapsedMS >= lifecycle) { + return txBatching.closeTransactionBatch(batch) + .then(() => submitBatch(settings, batch)) + } + }, batches) + }) +} + +module.exports = processBatches diff --git a/lib/tx-batching.js b/lib/tx-batching.js index a2cc3b42..e36c48d7 100644 --- a/lib/tx-batching.js +++ b/lib/tx-batching.js @@ -4,7 +4,6 @@ const uuid = require('uuid') const BN = require('./bn') const db = require('./db') -const wallet = require('./wallet') function addTransactionToBatch (tx) { const sql = `SELECT * FROM transaction_batches WHERE crypto_code=$1 AND status='open' ORDER BY created_at DESC LIMIT 1` @@ -57,30 +56,23 @@ function getBatchesByStatus (statuses) { return db.manyOrNone(sql, [_.map(pgp.as.text, statuses).join(',')]) } -function submitBatch (settings, batch) { - getBatchTransactions(batch) - .then(txs => { - wallet.sendCoinsBatch(settings, txs, batch.crypto_code) - .then(res => confirmSentBatch(batch, res)) - .catch(err => setErroredBatch(batch, err.message)) - }) -} +function getOpenBatchCryptoValue (cryptoCode) { + const sql = `SELECT * FROM transaction_batches WHERE crypto_code=$1 AND status='open' ORDER BY created_at DESC LIMIT 1` -function processBatches (settings, lifecycle) { - return getBatchesByStatus(['open']) - .then(batches => { - _.each(batch => { - const elapsedMS = batch.time_elapsed * 1000 - - if (elapsedMS >= lifecycle) { - return closeTransactionBatch(batch) - .then(() => submitBatch(settings, batch)) - } - }, batches) + return db.oneOrNone(sql, [cryptoCode]) + .then(batch => { + if (_.isNil(batch)) return Promise.resolve([]) + return db.any(`SELECT * FROM cash_in_txs WHERE batch_id=$1`, [batch.id]) }) + .then(txs => _.reduce((acc, tx) => acc.plus(tx.cash_in_fee_crypto).plus(tx.crypto_atoms), BN(0), txs)) } module.exports = { addTransactionToBatch, - processBatches + closeTransactionBatch, + confirmSentBatch, + setErroredBatch, + getBatchTransactions, + getBatchesByStatus, + getOpenBatchCryptoValue } diff --git a/lib/wallet.js b/lib/wallet.js index 6ed54f63..92d1a7b4 100644 --- a/lib/wallet.js +++ b/lib/wallet.js @@ -13,6 +13,8 @@ const ph = require('./plugin-helper') const layer2 = require('./layer2') const httpError = require('./route-helpers').httpError const logger = require('./logger') +const { getOpenBatchCryptoValue } = require('./tx-batching') +const BN = require('./bn') const FETCH_INTERVAL = 5000 const INSUFFICIENT_FUNDS_CODE = 570 @@ -46,7 +48,9 @@ const lastBalance = {} function _balance (settings, cryptoCode) { return fetchWallet(settings, cryptoCode) .then(r => r.wallet.balance(r.account, cryptoCode, settings, r.operatorId)) - .then(balance => ({ balance, timestamp: Date.now() })) + .then(balance => Promise.all([balance, supportsBatching(settings, cryptoCode)])) + .then(([balance, supportsBatching]) => Promise.all([balance, supportsBatching ? getOpenBatchCryptoValue(cryptoCode) : Promise.resolve(BN(0))])) + .then(([balance, reservedBalance]) => ({ balance: balance.minus(reservedBalance), reservedBalance, timestamp: Date.now() })) .then(r => { lastBalance[cryptoCode] = r return r