diff --git a/lib/plugins.js b/lib/plugins.js index ca32265e..9e32d3b5 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -9,16 +9,12 @@ BigNumber.config({CRYPTO: true}) var logger = require('./logger') var notifier = require('./notifier') -var argv = require('minimist')(process.argv.slice(2)) - var uuid = require('node-uuid') var tradeIntervals = {} var CHECK_NOTIFICATION_INTERVAL = 60 * 1000 var ALERT_SEND_INTERVAL = 60 * 60 * 1000 var POLLING_RATE = 60 * 1000 // poll each minute -var REAP_RATE = 2 * 1000 -var PENDING_TIMEOUT = 70 * 1000 var INCOMING_TX_INTERVAL = 5 * 1000 var LIVE_INCOMING_TX_INTERVAL = 30 * 1000 var STALE_INCOMING_TX_AGE = 7 * 24 * 60 * 60 * 1000 @@ -27,11 +23,6 @@ var MAX_NOTIFY_AGE = 48 * 60 * 60 * 1000 var MIN_NOTIFY_AGE = 5 * 60 * 1000 var TRANSACTION_EXPIRATION = 48 * 60 * 60 * 1000 -if (argv.timeout) PENDING_TIMEOUT = argv.timeout / 1000 - -// TODO: might have to update this if user is allowed to extend monitoring time -var DEPOSIT_TIMEOUT = 130 * 1000 - var db = null var cryptoCodes = null @@ -319,40 +310,6 @@ function executeTx (session, tx, authority, cb) { }) } -function reapOutgoingTx (session, tx) { - executeTx(session, tx, 'timeout', function (err) { - if (err) logger.error(err) - }) -} - -function reapTx (row) { - var session = {fingerprint: row.device_fingerprint, id: row.session_id} - var tx = { - fiat: 0, - cryptoAtoms: new BigNumber(row.cryptoAtoms), - toAddress: row.toAddress, - currencyCode: row.currencyCode, - cryptoCode: row.cryptoCode, - incoming: row.incoming - } - if (!row.incoming) reapOutgoingTx(session, tx) -} - -function reapTxs () { - db.removeOldPending(DEPOSIT_TIMEOUT) - - // NOTE: No harm in processing old pending tx, we don't need to wait for - // removeOldPending to complete. - db.pendingTxs(PENDING_TIMEOUT, function (err, rows) { - if (err) return logger.warn(err) - var rowCount = rows.length - for (var i = 0; i < rowCount; i++) { - var row = rows[i] - reapTx(row) - } - }) -} - // TODO: Run these in parallel and return success exports.trade = function trade (session, rawTrade, cb) { // TODO: move this to DB, too @@ -369,10 +326,7 @@ exports.trade = function trade (session, rawTrade, cb) { }) } - async.parallel([ - async.apply(db.addOutgoingPending, session, rawTrade.currency, rawTrade.cryptoCode, rawTrade.toAddress), - async.apply(db.recordBill, session, rawTrade) - ], cb) + db.recordBill(session, rawTrade, cb) } exports.stateChange = function stateChange (session, rec, cb) { @@ -519,7 +473,6 @@ exports.startPolling = function startPolling () { startTrader(cryptoCode) }) - setInterval(reapTxs, REAP_RATE) setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL) setInterval(monitorIncoming, INCOMING_TX_INTERVAL) setInterval(monitorUnnotified, UNNOTIFIED_INTERVAL) diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index 47aa11ba..d115e4fe 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -153,110 +153,6 @@ function silentQuery (client, queryStr, values, cb) { }) } -// OPTIMIZE: No need to query bills if tx.fiat and tx.cryptoAtoms are set -function billsAndTxs (client, session, cb) { - var sessionId = session.id - var fingerprint = session.fingerprint - var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' + - 'COALESCE(SUM(satoshis), 0) AS satoshis ' + - 'FROM bills ' + - 'WHERE device_fingerprint=$1 AND session_id=$2' - var billsValues = [fingerprint, sessionId] - var txQuery = 'SELECT COALESCE(SUM(fiat), 0) AS fiat, ' + - 'COALESCE(SUM(satoshis), 0) AS satoshis ' + - 'FROM transactions ' + - 'WHERE device_fingerprint=$1 AND session_id=$2 AND stage=$3' - var txValues = [fingerprint, sessionId, 'partial_request'] - - async.parallel([ - async.apply(query, client, billsQuery, billsValues), - async.apply(query, client, txQuery, txValues) - ], function (err, results) { - if (err) return cb(err) - - // Note: PG SUM function returns int8, which is returned as a string, so - // we need to parse, since we know these won't be huge numbers. - cb(null, { - billsFiat: parseInt(results[0].rows[0].fiat, 10), - billsCryptoAtoms: new BigNumber(results[0].rows[0].satoshis), - txFiat: parseInt(results[1].rows[0].fiat, 10), - txCryptoAtoms: new BigNumber(results[1].rows[0].satoshis) - }) - }) -} - -function computeSendAmount (tx, totals) { - var fiatRemaining = (tx.fiat || totals.billsFiat) - totals.txFiat - - var cryptoAtomsRemaining = tx.cryptoAtoms.eq(0) - ? totals.billsCryptoAtoms.minus(totals.txCryptoAtoms) - : tx.cryptoAtoms.minus(totals.txCryptoAtoms) - - var result = { - fiat: fiatRemaining, - cryptoAtoms: cryptoAtomsRemaining - } - if (result.fiat < 0 || result.cryptoAtoms.lt(0)) { - logger.warn({tx: tx, totals: totals, result: result}, - "computeSendAmount result < 0, this shouldn't happen. " + - 'Maybe timeout arrived after machineSend.') - result.fiat = 0 - result.cryptoAtoms = new BigNumber(0) - } - return result -} - -exports.removeOldPending = function removeOldPending (timeoutMS) { - connect(function (cerr, client, done) { - if (cerr) return - var sql = 'DELETE FROM pending_transactions ' + - 'WHERE incoming AND extract(EPOCH FROM now() - updated) > $1' - var timeoutS = timeoutMS / 1000 - var values = [timeoutS] - query(client, sql, values, function (err) { - done() - if (err) logger.error(err) - }) - }) -} - -exports.pendingTxs = function pendingTxs (timeoutMS, cb) { - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - var sql = 'SELECT * ' + - 'FROM pending_transactions ' + - 'WHERE (incoming OR extract(EPOCH FROM now() - updated) > $1) ' + - 'ORDER BY updated ASC' - var timeoutS = timeoutMS / 1000 - var values = [timeoutS] - query(client, sql, values, function (err, results) { - done() - cb(err, normalizeTxs(results.rows)) - }) - }) -} - -function removePendingTx (client, session, cb) { - var sql = 'DELETE FROM pending_transactions ' + - 'WHERE device_fingerprint=$1 AND session_id=$2' - silentQuery(client, sql, [session.fingerprint, session.id], cb) -} - -function insertOutgoingTx (client, session, tx, totals, cb) { - var sendAmount = computeSendAmount(tx, totals) - var stage = 'partial_request' - var authority = tx.fiat ? 'machine' : 'timeout' - var cryptoAtoms = sendAmount.cryptoAtoms - var fiat = sendAmount.fiat - if (cryptoAtoms.eq(0)) return cb(null, {cryptoAtoms: new BigNumber(0), fiat: 0}) - - insertOutgoing(client, session, tx, cryptoAtoms, fiat, stage, authority, - function (err) { - if (err) return cb(err) - cb(null, {cryptoAtoms: cryptoAtoms, fiat: fiat}) - }) -} - function insertOutgoingCompleteTx (client, session, tx, cb) { // Only relevant for machine source transactions, not timeouts if (!tx.fiat) return cb() @@ -319,18 +215,6 @@ function insertTx (client, session, incoming, tx, cryptoAtoms, fiat, stage, }) } -function refreshPendingTx (client, session, cb) { - var sql = 'UPDATE pending_transactions SET updated=now() ' + - 'WHERE device_fingerprint=$1 AND session_id=$2' - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - query(client, sql, [session.fingerprint, session.id], function (err) { - done(err) - cb(err) - }) - }) -} - function addPendingTx (client, session, incoming, currencyCode, cryptoCode, toAddress, cryptoAtoms, cb) { var fields = ['device_fingerprint', 'session_id', 'incoming', @@ -343,34 +227,11 @@ function addPendingTx (client, session, incoming, currencyCode, cryptoCode, toAd }) } -function buildOutgoingTx (client, session, tx, cb) { - async.waterfall([ - async.apply(billsAndTxs, client, session), - async.apply(insertOutgoingTx, client, session, tx) - ], cb) -} - // Calling function should only send bitcoins if result.cryptoAtomsToSend > 0 exports.addOutgoingTx = function addOutgoingTx (session, tx, cb) { connect(function (cerr, client, done) { if (cerr) return cb(cerr) - async.series([ - async.apply(silentQuery, client, 'BEGIN'), - async.apply(silentQuery, client, 'LOCK TABLE transactions'), - async.apply(insertOutgoingCompleteTx, client, session, tx), - async.apply(removePendingTx, client, session), - async.apply(buildOutgoingTx, client, session, tx) - ], function (err, results) { - if (err) { - rollback(client, done) - return cb(err) - } - silentQuery(client, 'COMMIT', [], function () { - done() - var toSend = results[4] - cb(null, toSend) - }) - }) + insertOutgoingCompleteTx(client, session, tx, cb) }) } @@ -390,61 +251,6 @@ exports.sentCoins = function sentCoins (session, tx, authority, toSend, fee, }) } -function ensureNotFinal (client, session, cb) { - var sql = 'SELECT id FROM transactions ' + - 'WHERE device_fingerprint=$1 AND session_id=$2 AND incoming=$3 ' + - 'AND stage=$4' + - 'LIMIT 1' - var values = [session.fingerprint, session.id, false, 'final_request'] - - client.query(sql, values, function (err, results) { - var error - if (err) return cb(err) - if (results.rows.length > 0) { - error = new Error('Final request already exists') - error.name = 'staleBill' - error.severity = 'low' - return cb(error) - } - cb() - }) -} - -exports.addOutgoingPending = function addOutgoingPending (session, currencyCode, - cryptoCode, toAddress, cb) { - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - - async.series([ - async.apply(silentQuery, client, 'BEGIN', null), - async.apply(ensureNotFinal, client, session), - async.apply(addPendingTx, client, session, false, currencyCode, cryptoCode, toAddress, - 0) - ], function (err) { - if (err) { - return rollback(client, function (rberr) { - done(rberr) - - if (isUniqueViolation(err)) { - // Pending tx exists, refresh it. - return refreshPendingTx(client, session, cb) - } - if (err.name === 'staleBill') { - logger.info('Received a bill insert after send coins request') - return cb() - } - logger.error(err) - return cb(err) - }) - } - silentQuery(client, 'COMMIT', null, function () { - done() - cb() - }) - }) - }) -} - exports.addInitialIncoming = function addInitialIncoming (session, tx, cb) { connect(function (cerr, client, done) { if (cerr) return cb(cerr)