From 9ea8af28f8fd53265fc46245d913e5d6e70f17e5 Mon Sep 17 00:00:00 2001 From: Damian Mee Date: Wed, 1 Oct 2014 01:54:11 +0200 Subject: [PATCH] feat(resend): Re-send process improvements --- lib/plugins.js | 64 ++++++++++++++++++++------------- lib/postgresql_interface.js | 71 ++++++++++++++++++------------------- lib/routes.js | 9 +++-- 3 files changed, 79 insertions(+), 65 deletions(-) diff --git a/lib/plugins.js b/lib/plugins.js index b8c8a165..73c0081e 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -203,12 +203,12 @@ function _sendBitcoins(tx, cb) { ); } -function executeTx(deviceFingerprint, txId, autoTriggered, cb) { +function executeTx(deviceFingerprint, txId, triggeredByUser, cb) { cb = typeof cb === 'function' ? cb : function() {}; clearSession(deviceFingerprint); - // get remaining amount to be sent + // 1. get remaining amount to be sent db.getPendingAmount(txId, function(err, tx) { if (err) { logger.error(err); @@ -217,33 +217,47 @@ function executeTx(deviceFingerprint, txId, autoTriggered, cb) { if (!tx) { logger.info('Nothing to send (%s)', txId); - return cb(null, {statusCode: 304}); // Not Modified + + // all bills were already sent by a timeout trigger; + // now only mark that user's `/send` arrived + if (triggeredByUser) + db.changeTxStatus(txId, 'completed', {is_completed: true}); + + // indicate ACK to machine + return cb(null, { + statusCode: 204, // No Content + txId: txId + }); } - db.summonTx(deviceFingerprint, tx, function(err, txInfo) { - if (err) return cb(err); + // indicate whether this call was initiated by user or timeout + if (triggeredByUser) + tx.is_completed = true; - // actual sending - if (!txInfo) { - return _sendBitcoins(tx, function(err, txHash) { - cb(null, { - statusCode: 201, // Created - txHash: txHash - }); - }); - } - - // Out of bitcoins: special case - var txErr = null; - if (txInfo.err) { - txErr = new Error(txInfo.err); - if (txInfo.status === 'insufficientFunds') { - txErr.name = 'InsufficientFunds'; + // 2. BEFORE sending insert tx to a db + db.insertTx(deviceFingerprint, tx, function(err) { + if (err) { + // `getPendingAmount` generated new `partial_id`, so this can occur + // only when 2nd executeTx gets called before 1st executes it's insert + if (err.name === 'UniqueViolation') { + // this will calculate again and then send only "pending" coins + return executeTx(deviceFingerprint, txId, triggeredByUser, cb); } + + return cb(err); } - pollBalance(); - cb(txErr, txInfo.txHash); + // 3. actual sending (of the same amount, that was just inserted to the db) + return _sendBitcoins(tx, function(err, txHash) { + pollBalance(); + // TODO: should we indicate error to the machine here? + + // indicate ACK to machine + cb(null, { + statusCode: 201, // Created + txId: txId + }); + }); }); }); } @@ -255,7 +269,7 @@ exports.trade = function trade(rawTrade, deviceFingerprint, cb) { sessions[deviceFingerprint] = { timestamp: Date.now(), reaper: setTimeout(function() { - executeTx(deviceFingerprint, rawTrade.txId, true); + executeTx(deviceFingerprint, rawTrade.txId, false); }, SESSION_TIMEOUT) }; } @@ -273,7 +287,7 @@ exports.trade = function trade(rawTrade, deviceFingerprint, cb) { }; exports.sendBitcoins = function sendBitcoins(deviceFingerprint, rawTx, cb) { - executeTx(deviceFingerprint, rawTx.txId, false, cb); + executeTx(deviceFingerprint, rawTx.txId, true, cb); }; diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index 8ef61639..6624df1a 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -77,9 +77,9 @@ exports.recordBill = function recordBill(deviceFingerprint, rec, cb) { client.query(getInsertQuery('bills', fields), values, function(err, billInfo) { if (err && PG_ERRORS[err.code] === 'uniqueViolation') - return cb(null, {code: 304}); // 304 => Not Modified (vel. already noted) + return cb(null, {code: 204}); - cb(); // 201 => Accepted (vel. saved) + cb(); // 201 => Accepted / saved }); }; @@ -90,30 +90,6 @@ exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint, event, cb); }; -function _getTxs(txId, onlyPending, cb) { - var query = 'SELECT * FROM transactions WHERE id=$1'; - var values = [txId]; - - if (onlyPending) { - query += ' AND status=$2 AND tx_hash IS NULL'; - values.push('pending'); - } - - client.query(query, values, function(err, results) { - if (err) return cb(err); - - if (results.rows.length === 0) - return cb(new Error('Couldn\'t find transaction')); - - cb(null, results.rows); - }); -} - -// returns complete [txs] -exports.getTxs = function getTxs(txId, cb) { - _getTxs(txId, false, cb); -}; - exports.getPendingAmount = function getPendingAmount(txId, cb) { async.parallel({ // NOTE: `async.apply()` would strip context here @@ -126,7 +102,7 @@ exports.getPendingAmount = function getPendingAmount(txId, cb) { }, bills: function(_cb) { client.query( - 'SELECT * FROM bills WHERE transaction_id=$1 ORDER BY created DESC', + 'SELECT * FROM bills WHERE transaction_id=$1 ORDER BY device_time DESC', [txId], _cb ); @@ -156,14 +132,19 @@ exports.getPendingAmount = function getPendingAmount(txId, cb) { newTx.fiat = lastBill.total_fiat; results.txs.rows.forEach(function(tx) { - newTx.satoshis -= tx.satoshis; - newTx.fiat -= tx.fiat; + // try sending again only in case of a fail due to insufficientFunds + if (tx.status !== 'insufficientFunds') { + newTx.satoshis -= tx.satoshis; + newTx.fiat -= tx.fiat; + } }); } // Nothing to send == nothing to do if (newTx.satoshis <= 0) { - logger.error('Negative tx amount (%d) for txId: %s', newTx.satoshis, txId); + if (newTx.satoshis < 0) + logger.error('Negative tx amount (%d) for txId: %s', newTx.satoshis, txId); + return cb(); } @@ -171,7 +152,7 @@ exports.getPendingAmount = function getPendingAmount(txId, cb) { }); }; -exports.summonTx = function summonTx(deviceFingerprint, tx, cb) { +exports.insertTx = function insertTx(deviceFingerprint, tx, cb) { var fields = [ 'id', 'status', @@ -197,14 +178,22 @@ exports.summonTx = function summonTx(deviceFingerprint, tx, cb) { values.push(tx.partial_id); } + if (typeof tx.is_completed !== 'undefined') { + fields.push('is_completed'); + values.push(tx.is_completed); + } + // First attampt an INSERT // If it worked, go ahead with transaction client.query(getInsertQuery('transactions', fields), values, function(err) { if (err) { - if (PG_ERRORS[err.code] === 'uniqueViolation') - return _getTxs(tx.txId, false, cb); + if (PG_ERRORS[err.code] === 'uniqueViolation') { + var _err = new Error(err); + _err.name = 'UniqueViolation'; + return cb(_err); + } return cb(err); } @@ -213,7 +202,7 @@ exports.summonTx = function summonTx(deviceFingerprint, tx, cb) { }); }; -// `@data` can contain `partial_id`, `hash`, or `error` +// `@data` can contain `partial_id`, `is_completed`, `hash`, or `error` exports.changeTxStatus = function changeTxStatus(txId, newStatus, data, cb) { data = data || {}; cb = typeof cb === 'function' ? cb : function() {}; @@ -231,8 +220,17 @@ exports.changeTxStatus = function changeTxStatus(txId, newStatus, data, cb) { } if (newStatus === 'completed') { - query += ', tx_hash=$' + n++; - values.push(data.hash); + // set tx_hash (if available) + if (typeof data.hash !== 'undefined') { + query += ', tx_hash=$' + n++; + values.push(data.hash); + } + + // indicates if tx was finished by a `/send` call (and not timeout) + if (typeof data.is_completed !== 'undefined') { + query += ', is_completed=$' + n++; + values.push(data.is_completed); + } } query += ' WHERE id=$' + n++; @@ -243,6 +241,5 @@ exports.changeTxStatus = function changeTxStatus(txId, newStatus, data, cb) { query += ' AND partial_id=$' + n++; values.push(partial_id); } - client.query(query, values, cb); }; diff --git a/lib/routes.js b/lib/routes.js index c3072b21..824485b9 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -103,11 +103,14 @@ function verifyTx(req, res) { function send(req, res) { plugins.sendBitcoins(getFingerprint(req), req.body, function(err, status) { // TODO: use status.statusCode here after confirming machine compatibility - res.json({ + var j = { errType: err && err.name, err: err && err.message, - txHash: status && status.txHash - }); + txHash: status && status.txHash, + txId: status && status.txId + }; + logger.debug('send: %j', j); + res.json(j); }); }