feat(resend): Re-send process improvements

This commit is contained in:
Damian Mee 2014-10-01 01:54:11 +02:00
parent f7f98d986e
commit 9ea8af28f8
3 changed files with 79 additions and 65 deletions

View file

@ -203,12 +203,12 @@ function _sendBitcoins(tx, cb) {
); );
} }
function executeTx(deviceFingerprint, txId, autoTriggered, cb) { function executeTx(deviceFingerprint, txId, triggeredByUser, cb) {
cb = typeof cb === 'function' ? cb : function() {}; cb = typeof cb === 'function' ? cb : function() {};
clearSession(deviceFingerprint); clearSession(deviceFingerprint);
// get remaining amount to be sent // 1. get remaining amount to be sent
db.getPendingAmount(txId, function(err, tx) { db.getPendingAmount(txId, function(err, tx) {
if (err) { if (err) {
logger.error(err); logger.error(err);
@ -217,33 +217,47 @@ function executeTx(deviceFingerprint, txId, autoTriggered, cb) {
if (!tx) { if (!tx) {
logger.info('Nothing to send (%s)', txId); logger.info('Nothing to send (%s)', txId);
return cb(null, {statusCode: 304}); // Not Modified
// all bills were already sent by a timeout trigger;
// now only mark that user's `/send` arrived
if (triggeredByUser)
db.changeTxStatus(txId, 'completed', {is_completed: true});
// indicate ACK to machine
return cb(null, {
statusCode: 204, // No Content
txId: txId
});
} }
db.summonTx(deviceFingerprint, tx, function(err, txInfo) { // indicate whether this call was initiated by user or timeout
if (err) return cb(err); if (triggeredByUser)
tx.is_completed = true;
// actual sending // 2. BEFORE sending insert tx to a db
if (!txInfo) { db.insertTx(deviceFingerprint, tx, function(err) {
return _sendBitcoins(tx, function(err, txHash) { if (err) {
cb(null, { // `getPendingAmount` generated new `partial_id`, so this can occur
statusCode: 201, // Created // only when 2nd executeTx gets called before 1st executes it's insert
txHash: txHash if (err.name === 'UniqueViolation') {
}); // this will calculate again and then send only "pending" coins
}); return executeTx(deviceFingerprint, txId, triggeredByUser, cb);
}
// Out of bitcoins: special case
var txErr = null;
if (txInfo.err) {
txErr = new Error(txInfo.err);
if (txInfo.status === 'insufficientFunds') {
txErr.name = 'InsufficientFunds';
} }
return cb(err);
} }
pollBalance(); // 3. actual sending (of the same amount, that was just inserted to the db)
cb(txErr, txInfo.txHash); return _sendBitcoins(tx, function(err, txHash) {
pollBalance();
// TODO: should we indicate error to the machine here?
// indicate ACK to machine
cb(null, {
statusCode: 201, // Created
txId: txId
});
});
}); });
}); });
} }
@ -255,7 +269,7 @@ exports.trade = function trade(rawTrade, deviceFingerprint, cb) {
sessions[deviceFingerprint] = { sessions[deviceFingerprint] = {
timestamp: Date.now(), timestamp: Date.now(),
reaper: setTimeout(function() { reaper: setTimeout(function() {
executeTx(deviceFingerprint, rawTrade.txId, true); executeTx(deviceFingerprint, rawTrade.txId, false);
}, SESSION_TIMEOUT) }, SESSION_TIMEOUT)
}; };
} }
@ -273,7 +287,7 @@ exports.trade = function trade(rawTrade, deviceFingerprint, cb) {
}; };
exports.sendBitcoins = function sendBitcoins(deviceFingerprint, rawTx, cb) { exports.sendBitcoins = function sendBitcoins(deviceFingerprint, rawTx, cb) {
executeTx(deviceFingerprint, rawTx.txId, false, cb); executeTx(deviceFingerprint, rawTx.txId, true, cb);
}; };

View file

@ -77,9 +77,9 @@ exports.recordBill = function recordBill(deviceFingerprint, rec, cb) {
client.query(getInsertQuery('bills', fields), values, function(err, billInfo) { client.query(getInsertQuery('bills', fields), values, function(err, billInfo) {
if (err && PG_ERRORS[err.code] === 'uniqueViolation') if (err && PG_ERRORS[err.code] === 'uniqueViolation')
return cb(null, {code: 304}); // 304 => Not Modified (vel. already noted) return cb(null, {code: 204});
cb(); // 201 => Accepted (vel. saved) cb(); // 201 => Accepted / saved
}); });
}; };
@ -90,30 +90,6 @@ exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint, event,
cb); cb);
}; };
function _getTxs(txId, onlyPending, cb) {
var query = 'SELECT * FROM transactions WHERE id=$1';
var values = [txId];
if (onlyPending) {
query += ' AND status=$2 AND tx_hash IS NULL';
values.push('pending');
}
client.query(query, values, function(err, results) {
if (err) return cb(err);
if (results.rows.length === 0)
return cb(new Error('Couldn\'t find transaction'));
cb(null, results.rows);
});
}
// returns complete [txs]
exports.getTxs = function getTxs(txId, cb) {
_getTxs(txId, false, cb);
};
exports.getPendingAmount = function getPendingAmount(txId, cb) { exports.getPendingAmount = function getPendingAmount(txId, cb) {
async.parallel({ async.parallel({
// NOTE: `async.apply()` would strip context here // NOTE: `async.apply()` would strip context here
@ -126,7 +102,7 @@ exports.getPendingAmount = function getPendingAmount(txId, cb) {
}, },
bills: function(_cb) { bills: function(_cb) {
client.query( client.query(
'SELECT * FROM bills WHERE transaction_id=$1 ORDER BY created DESC', 'SELECT * FROM bills WHERE transaction_id=$1 ORDER BY device_time DESC',
[txId], [txId],
_cb _cb
); );
@ -156,14 +132,19 @@ exports.getPendingAmount = function getPendingAmount(txId, cb) {
newTx.fiat = lastBill.total_fiat; newTx.fiat = lastBill.total_fiat;
results.txs.rows.forEach(function(tx) { results.txs.rows.forEach(function(tx) {
newTx.satoshis -= tx.satoshis; // try sending again only in case of a fail due to insufficientFunds
newTx.fiat -= tx.fiat; if (tx.status !== 'insufficientFunds') {
newTx.satoshis -= tx.satoshis;
newTx.fiat -= tx.fiat;
}
}); });
} }
// Nothing to send == nothing to do // Nothing to send == nothing to do
if (newTx.satoshis <= 0) { if (newTx.satoshis <= 0) {
logger.error('Negative tx amount (%d) for txId: %s', newTx.satoshis, txId); if (newTx.satoshis < 0)
logger.error('Negative tx amount (%d) for txId: %s', newTx.satoshis, txId);
return cb(); return cb();
} }
@ -171,7 +152,7 @@ exports.getPendingAmount = function getPendingAmount(txId, cb) {
}); });
}; };
exports.summonTx = function summonTx(deviceFingerprint, tx, cb) { exports.insertTx = function insertTx(deviceFingerprint, tx, cb) {
var fields = [ var fields = [
'id', 'id',
'status', 'status',
@ -197,14 +178,22 @@ exports.summonTx = function summonTx(deviceFingerprint, tx, cb) {
values.push(tx.partial_id); values.push(tx.partial_id);
} }
if (typeof tx.is_completed !== 'undefined') {
fields.push('is_completed');
values.push(tx.is_completed);
}
// First attampt an INSERT // First attampt an INSERT
// If it worked, go ahead with transaction // If it worked, go ahead with transaction
client.query(getInsertQuery('transactions', fields), client.query(getInsertQuery('transactions', fields),
values, values,
function(err) { function(err) {
if (err) { if (err) {
if (PG_ERRORS[err.code] === 'uniqueViolation') if (PG_ERRORS[err.code] === 'uniqueViolation') {
return _getTxs(tx.txId, false, cb); var _err = new Error(err);
_err.name = 'UniqueViolation';
return cb(_err);
}
return cb(err); return cb(err);
} }
@ -213,7 +202,7 @@ exports.summonTx = function summonTx(deviceFingerprint, tx, cb) {
}); });
}; };
// `@data` can contain `partial_id`, `hash`, or `error` // `@data` can contain `partial_id`, `is_completed`, `hash`, or `error`
exports.changeTxStatus = function changeTxStatus(txId, newStatus, data, cb) { exports.changeTxStatus = function changeTxStatus(txId, newStatus, data, cb) {
data = data || {}; data = data || {};
cb = typeof cb === 'function' ? cb : function() {}; cb = typeof cb === 'function' ? cb : function() {};
@ -231,8 +220,17 @@ exports.changeTxStatus = function changeTxStatus(txId, newStatus, data, cb) {
} }
if (newStatus === 'completed') { if (newStatus === 'completed') {
query += ', tx_hash=$' + n++; // set tx_hash (if available)
values.push(data.hash); if (typeof data.hash !== 'undefined') {
query += ', tx_hash=$' + n++;
values.push(data.hash);
}
// indicates if tx was finished by a `/send` call (and not timeout)
if (typeof data.is_completed !== 'undefined') {
query += ', is_completed=$' + n++;
values.push(data.is_completed);
}
} }
query += ' WHERE id=$' + n++; query += ' WHERE id=$' + n++;
@ -243,6 +241,5 @@ exports.changeTxStatus = function changeTxStatus(txId, newStatus, data, cb) {
query += ' AND partial_id=$' + n++; query += ' AND partial_id=$' + n++;
values.push(partial_id); values.push(partial_id);
} }
client.query(query, values, cb); client.query(query, values, cb);
}; };

View file

@ -103,11 +103,14 @@ function verifyTx(req, res) {
function send(req, res) { function send(req, res) {
plugins.sendBitcoins(getFingerprint(req), req.body, function(err, status) { plugins.sendBitcoins(getFingerprint(req), req.body, function(err, status) {
// TODO: use status.statusCode here after confirming machine compatibility // TODO: use status.statusCode here after confirming machine compatibility
res.json({ var j = {
errType: err && err.name, errType: err && err.name,
err: err && err.message, err: err && err.message,
txHash: status && status.txHash txHash: status && status.txHash,
}); txId: status && status.txId
};
logger.debug('send: %j', j);
res.json(j);
}); });
} }