feat(txs): support for partial txs and full bill logging

This commit is contained in:
Damian Mee 2014-09-18 02:37:46 +02:00
parent bb4336b78f
commit 55abaa1bb9
3 changed files with 172 additions and 109 deletions

View file

@ -173,29 +173,93 @@ exports.logEvent = function event(rawEvent, deviceFingerprint) {
db.recordDeviceEvent(deviceFingerprint, rawEvent);
};
// Just prompts plugin to send BTC
function _sendBitcoins(tx, callback) {
logger.debug('executing tx: %j', tx);
db.changeTxStatus(tx.txId, 'executing');
walletPlugin.sendBitcoins(
tx.toAddress,
tx.satoshis,
cachedConfig.exchanges.settings.transactionFee,
function(err, txHash) {
if (err) {
var status = err.name === 'InsufficientFunds' ?
'insufficientFunds' :
'failed';
// report insufficient funds error
db.changeTxStatus(tx.txId, status, {error: err.message});
return callback(err);
}
if (txHash) db.changeTxStatus(tx.txId, 'completed', {hash: txHash});
else db.changeTxStatus(tx.txId, 'failed', {error: 'No txHash received'});
pollBalance();
callback(null, txHash);
}
);
}
function executeTransaction(deviceFingerprint, txId, autoTriggered, cb) {
cb = typeof cb === 'function' ? cb : function() {};
clearSession(deviceFingerprint);
// get remaining amount to be sent
db.getPendingAmount(txId, function(err, tx) {
if (err) {
logger.error(err);
return cb(err);
}
if (!tx) {
logger.info('Nothing to send (%s)', txId);
return cb(null, {statusCode: 304}); // Not Modified
}
db.summonTransaction(deviceFingerprint, tx, function(err, txInfo) {
if (err) return cb(err);
// actual sending
if (!txInfo) {
return _sendBitcoins(tx, function(err, txHash) {
cb(null, {
statusCode: 201, // Created
txHash: txHash
});
});
}
// Out of bitcoins: special case
var txErr = null;
if (txInfo.err) {
txErr = new Error(txInfo.err);
if (txInfo.status === 'insufficientFunds') {
txErr.name = 'InsufficientFunds';
}
}
pollBalance();
cb(txErr, txInfo.txHash);
});
});
}
// This is where we record starting trade balance at the beginning
// of the user session
exports.trade = function trade(rawTrade, deviceFingerprint) {
var sessionInfo = sessions[deviceFingerprint];
if (!sessionInfo) {
exports.trade = function trade(rawTrade, deviceFingerprint, cb) {
if (!sessions[deviceFingerprint]) {
sessions[deviceFingerprint] = {
timestamp: Date.now(),
reaper: setTimeout(function() {
// NOTE: at this point we either have bills ONLY *or a partial tx*
// TODO: deal with #1 from ^
db.getPendingTransactions(rawTrade.txId, function(err, txs) {
// NOTE: returns ARRAY of txs
_sendBitcoins(txs[0], function() { });
delete sessions[deviceFingerprint];
});
executeTransaction(deviceFingerprint, rawTrade.txId, true);
}, SESSION_TIMEOUT)
};
}
// record (vel log) inserted bill
db.recordBill(deviceFingerprint, rawTrade);
// add bill to trader queue (if trader is enabled)
if (traderPlugin) {
tradesQueue.push({
@ -203,8 +267,16 @@ exports.trade = function trade(rawTrade, deviceFingerprint) {
satoshis: rawTrade.satoshis
});
}
// record (vel log) inserted bill
db.recordBill(deviceFingerprint, rawTrade, cb);
};
exports.sendBitcoins = function sendBitcoins(deviceFingerprint, rawTx, callback) {
executeTransaction(deviceFingerprint, rawTx.txId, false, callback);
};
exports.fiatBalance = function fiatBalance() {
var rawRate = exports.getDeviceRate().rates.ask;
var commission = cachedConfig.exchanges.settings.commission;
@ -233,84 +305,6 @@ exports.fiatBalance = function fiatBalance() {
return fiatTransferBalance;
};
function _sendBitcoins(tx, callback) {
logger.debug('executing tx: %j', tx);
walletPlugin.sendBitcoins(
tx.toAddress,
tx.satoshis,
cachedConfig.exchanges.settings.transactionFee,
function(err, txHash) {
if (err) {
var status = err.name === 'InsufficientFunds' ?
'insufficientFunds' :
'failed';
// report insufficient funds error
db.changeTxStatus(tx.txId, status, {error: err.message});
return callback(err);
}
if (txHash) db.changeTxStatus(tx.txId, 'completed', {hash: txHash});
else db.changeTxStatus(tx.txId, 'failed', {error: 'No txHash received'});
pollBalance();
callback(null, txHash);
}
);
}
exports.sendBitcoins = function sendBitcoins(deviceFingerprint, tx, callback) {
db.summonTransaction(deviceFingerprint, tx, function(err, txInfo) {
if (err) return callback(err);
if (!txInfo || txInfo.status === 'partial') {
// TODO: make sure session exists, to prevent sending coins twice
clearSession(deviceFingerprint);
return _sendBitcoins(tx, callback);
}
// Out of bitcoins: special case
var txErr = null;
if (txInfo.err) {
txErr = new Error(txInfo.err);
if (txInfo.status === 'insufficientFunds') {
txErr.name = 'InsufficientFunds';
}
}
// transaction exists, but txHash might be null,
// in which case ATM should continue polling
pollBalance();
callback(txErr, txInfo.txHash);
});
};
// NOTE: temporarily here
exports.sendBitcoinians = function(deviceFingerprint, tx, callback) {
db.summonTransaction(deviceFingerprint, tx, function(err, txInfo) {
if (err) return callback(err);
if (txInfo) {
if (txInfo.status === 'insufficientFunds') {
}
if (txInfo.status === 'executing') {
}
// TODO: check `part` and what has already been sent
}
// no error & no tx record exists
// TODO: should it be confirmed with bills?
});
};
/*
* Polling livecycle
@ -472,6 +466,7 @@ function executeTrades() {
});
}
/*
* ID Verifier functions
*/

View file

@ -1,6 +1,8 @@
'use strict';
var pg = require('pg');
var async = require('async');
var logger = require('./logger');
var PG_ERRORS = {
@ -55,7 +57,7 @@ exports.recordBill = function recordBill(deviceFingerprint, rec, cb) {
deviceFingerprint,
rec.currency,
rec.toAddress,
res.txId,
rec.txId,
rec.deviceTime,
rec.satoshis,
@ -67,7 +69,18 @@ exports.recordBill = function recordBill(deviceFingerprint, rec, cb) {
fields.push('total_satoshis', 'total_fiat');
}
client.query(getInsertQuery('bills', fields), values, cb);
// NOTE: if is here to maintain compatibility with older machines
if (rec.uuid) {
values.push(rec.uuid);
fields.push('uuid');
}
client.query(getInsertQuery('bills', fields), values, function(err, billInfo) {
if (err && PG_ERRORS[err.code] === 'uniqueViolation')
return cb(null, {code: 304}); // 304 => Not Modified (vel. already noted)
cb(); // 201 => Accepted (vel. saved)
});
};
exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint, event, cb) {
@ -101,14 +114,59 @@ exports.getTransactions = function getTransactions(txId, cb) {
_getTransactions(txId, false, cb);
};
// TODO: this should probably return bills, associated with failed/inexistent tx
exports.getPendingTransactions = function getPendingTransactions(txId, cb) {
// should return
// is_completed === false
// amount from bils is less than sum of all parts with the same txId
// latest bill with txId not present in transactions
//
_getTransactions(txId, true, cb);
exports.getPendingAmount = function getPendingAmount(txId, cb) {
async.parallel({
// NOTE: `async.apply()` would strip context here
txs: function(callback) {
client.query(
'SELECT * FROM transactions WHERE id=$1',
[txId],
callback
);
},
bills: function(callback) {
client.query(
'SELECT * FROM bills WHERE transaction_id=$1 ORDER BY created DESC',
[txId],
callback
);
}
}, function(err, results) {
if (err) return cb(err);
// No bills == nothing to do
if (results.bills.rows.length === 0)
return cb();
var lastBill = results.bills.rows[0];
var newTx = {
txId: txId,
satoshis: lastBill.total_satoshis,
fiat: lastBill.total_fiat,
deviceDingerprint: lastBill.device_fingerprint,
toAddress: lastBill.to_address,
currencyCode: lastBill.currency_code
};
// if there are txs, substract already sent amount
if (results.txs.rows.length > 0) {
newTx.part = results.txs.rows.length + 1;
newTx.satoshis = lastBill.total_satoshis;
newTx.fiat = lastBill.total_fiat;
results.txs.rows.forEach(function(tx) {
newTx.satoshis -= tx.satoshis;
newTx.fiat -= tx.fiat;
});
}
// Nothing to send == nothing to do
if (newTx.satoshis <= 0)
return cb();
cb(null, newTx);
});
};
exports.summonTransaction = function summonTransaction(deviceFingerprint, tx, cb) {
@ -144,7 +202,7 @@ exports.summonTransaction = function summonTransaction(deviceFingerprint, tx, cb
function(err) {
if (err) {
if (PG_ERRORS[err.code] === 'uniqueViolation')
return exports.getTransactions(tx.txId, cb);
return _getTransactions(tx.txId, false, cb);
return cb(err);
}
@ -155,6 +213,7 @@ exports.summonTransaction = function summonTransaction(deviceFingerprint, tx, cb
// `@more` can contain `part`, `hash`, or `error`
exports.changeTxStatus = function changeTxStatus(txId, newStatus, more, cb) {
more = more || {};
cb = typeof cb === 'function' ? cb : function() {};
var query = 'UPDATE transactions SET status=$1';
@ -174,8 +233,14 @@ exports.changeTxStatus = function changeTxStatus(txId, newStatus, more, cb) {
values.push(more.hash);
}
query += ' WHERE id=$' + n;
query += ' WHERE id=$' + n++;
values.push(txId);
var part = parseInt(more.part);
if (part > 1) {
query += ' AND part=$' + n++;
values.push(part);
}
client.query(query, values, cb);
};

View file

@ -63,8 +63,10 @@ function poll(req, res) {
}
function trade(req, res) {
plugins.trade(req.body, getFingerprint(req));
res.json({err: null});
plugins.trade(req.body, getFingerprint(req), function(err, data) {
var statusCode = data && data.code !== null ? data.code : 201;
res.json(statusCode, {err: null});
});
}
function deviceEvent(req, res) {
@ -99,11 +101,12 @@ function verifyTransaction(req, res) {
}
function send(req, res) {
plugins.sendBitcoins(getFingerprint(req), req.body, function(err, txHash) {
plugins.sendBitcoins(getFingerprint(req), req.body, function(err, status) {
// TODO: use status.statusCode here after confirming machine compatibility
res.json({
errType: err && err.name,
err: err && err.message,
txHash: txHash,
errType: err && err.name
txHash: status && status.txHash
});
});
}