This commit is contained in:
Josh Harvey 2014-11-18 01:16:39 -05:00
parent 47dd9c89b6
commit 234dd9ef94
2 changed files with 142 additions and 206 deletions

View file

@ -6,7 +6,6 @@ var logger = require('./logger');
var SATOSHI_FACTOR = 1e8; var SATOSHI_FACTOR = 1e8;
var SESSION_TIMEOUT = 60 * 60 * 1000;
var POLLING_RATE = 60 * 1000; // poll each minute var POLLING_RATE = 60 * 1000; // poll each minute
var RECOMMENDED_FEE = 10000; var RECOMMENDED_FEE = 10000;
@ -34,9 +33,9 @@ var lastRates = {};
var balanceInterval = null; var balanceInterval = null;
var rateInterval = null; var rateInterval = null;
var tradeInterval = null; var tradeInterval = null;
var reapTxInterval = null;
var tradesQueue = []; var tradesQueue = [];
var sessions = {};
var dispenseStatuses = {}; var dispenseStatuses = {};
@ -188,106 +187,58 @@ exports.logEvent = function event(rawEvent, deviceFingerprint) {
db.recordDeviceEvent(deviceFingerprint, rawEvent); db.recordDeviceEvent(deviceFingerprint, rawEvent);
}; };
function _sendBitcoins(toAddress, satoshis, cb) {
// Just prompts plugin to send BTC var transactionFee = cachedConfig.exchanges.settings.transactionFee;
function _sendBitcoins(tx, cb) { walletPlugin.sendBitcoins(toAddress, satoshis, transactionFee, cb);
logger.debug('executing tx: %j', tx);
db.changeTxStatus(tx.txId, 'executing');
walletPlugin.sendBitcoins(
tx.toAddress,
tx.satoshis,
cachedConfig.exchanges.settings.transactionFee,
function(err, txHash) {
if (err) {
var status = err.name === 'InsufficientFunds' ?
'insufficientFunds' :
'failed';
// report insufficient funds error
db.changeTxStatus(tx.txId, status, {error: err.message});
return cb(err);
}
if (txHash) db.changeTxStatus(tx.txId, 'completed', {hash: txHash});
else db.changeTxStatus(tx.txId, 'failed', {error: 'No txHash received'});
pollBalance();
cb(null, txHash);
}
);
} }
function executeTx(deviceFingerprint, txId, triggeredByUser, cb) { function executeTx(deviceFingerprint, tx, cb) {
cb = typeof cb === 'function' ? cb : function() {}; db.addTx(deviceFingerprint, tx, function(err, result) {
if (err) return cb(err);
if (!result) return cb(null, {statusCode: 204});
var satoshisToSend = result.satoshisToSend;
var dbTxId = result.id;
clearSession(deviceFingerprint); return _sendBitcoins(tx.toAddress, satoshisToSend, function(err, txHash) {
var fee = null; // Need to fill this out in plugins
db.addDigitalTx(dbTxId, err, txHash, fee);
// 1. get remaining amount to be sent if (err) return cb(err);
db.getPendingAmount(txId, function(err, tx) {
if (err) {
logger.error(err);
return cb(err);
}
if (!tx) {
logger.info('Nothing to send (%s)', txId);
// all bills were already sent by a timeout trigger;
// now only mark that user's `/send` arrived
if (triggeredByUser)
db.changeTxStatus(txId, 'completed', {is_completed: true});
// indicate ACK to machine
return cb(null, {
statusCode: 204, // No Content
txId: txId
});
}
// indicate whether this call was initiated by user or timeout
if (triggeredByUser)
tx.is_completed = true;
// 2. BEFORE sending insert tx to a db
db.insertTx(deviceFingerprint, tx, function(err) {
if (err) {
// `getPendingAmount` generated new `partial_id`, so this can occur
// only when 2nd executeTx gets called before 1st executes it's insert
if (err.name === 'UniqueViolation') {
// this will calculate again and then send only "pending" coins
return executeTx(deviceFingerprint, txId, triggeredByUser, cb);
}
return cb(err);
}
// 3. actual sending (of the same amount, that was just inserted to the db)
return _sendBitcoins(tx, function(err, txHash) {
pollBalance(); pollBalance();
// TODO: should we indicate error to the machine here?
// indicate ACK to machine
cb(null, { cb(null, {
statusCode: 201, // Created statusCode: 201, // Created
txId: txHash txHash: txHash
});
}); });
}); });
}); });
} }
// This is where we record starting trade balance at the beginning function reapTx(row) {
// of the user session var deviceFingerprint = row.device_fingerprint;
exports.trade = function trade(rawTrade, deviceFingerprint, cb) { var tx = {
if (!sessions[deviceFingerprint]) { txId: row.txid,
sessions[deviceFingerprint] = { toAddress: row.to_address,
timestamp: Date.now(), currencyCode: row.currency_code
reaper: setTimeout(function() {
executeTx(deviceFingerprint, rawTrade.txId, false);
}, SESSION_TIMEOUT)
}; };
executeTx(deviceFingerprint, tx, function(err) {
if (err) logger.error(err);
});
}
function reapTxs() {
db.pendingTxs(function(err, results) {
var rows = results.rows;
var rowCount = rows.length;
for (var i = 0; i < rowCount; i++) {
var row = rows[i];
reapTx(row);
} }
});
}
exports.trade = function trade(rawTrade, deviceFingerprint, cb) {
db.addPendingTx(deviceFingerprint, rawTrade);
// add bill to trader queue (if trader is enabled) // add bill to trader queue (if trader is enabled)
if (traderPlugin) { if (traderPlugin) {
@ -302,7 +253,7 @@ exports.trade = function trade(rawTrade, deviceFingerprint, cb) {
}; };
exports.sendBitcoins = function sendBitcoins(deviceFingerprint, rawTx, cb) { exports.sendBitcoins = function sendBitcoins(deviceFingerprint, rawTx, cb) {
executeTx(deviceFingerprint, rawTx.txId, true, cb); executeTx(deviceFingerprint, rawTx, cb);
}; };
@ -482,13 +433,14 @@ exports.fiatBalance = function fiatBalance() {
exports.startPolling = function startPolling() { exports.startPolling = function startPolling() {
executeTrades(); executeTrades();
if (!balanceInterval) { if (!balanceInterval)
balanceInterval = setInterval(pollBalance, POLLING_RATE); balanceInterval = setInterval(pollBalance, POLLING_RATE);
}
if (!rateInterval) { if (!rateInterval)
rateInterval = setInterval(pollRate, POLLING_RATE); rateInterval = setInterval(pollRate, POLLING_RATE);
}
if (!reapTxInterval)
reapTxInterval = setInterval(reapTxs, POLLING_RATE);
startTrader(); startTrader();
}; };
@ -580,15 +532,6 @@ exports.getBalance = function getBalance() {
return lastBalances.transferBalance; return lastBalances.transferBalance;
}; };
function clearSession(deviceFingerprint) {
var session = sessions[deviceFingerprint];
if (session) {
clearTimeout(session.reaper);
delete sessions[deviceFingerprint];
}
}
/* /*
* Trader functions * Trader functions
*/ */

View file

@ -10,46 +10,50 @@ var PG_ERRORS = {
23505: 'uniqueViolation' 23505: 'uniqueViolation'
}; };
var client = null; var conString = null;
function rollback(client) { function rollback(client, done) {
//terminating a client connection will //terminating a client connection will
//automatically rollback any uncommitted transactions //automatically rollback any uncommitted transactions
//so while it's not technically mandatory to call //so while it's not technically mandatory to call
//ROLLBACK it is cleaner and more correct //ROLLBACK it is cleaner and more correct
logger.warn('Rolling back transaction.'); logger.warn('Rolling back transaction.');
client.query('ROLLBACK', function() { client.query('ROLLBACK', function(err) {
client.end(); return done(err);
}); });
} }
function getInsertQuery(tableName, fields) { function getInsertQuery(tableName, fields, hasId) {
// outputs string like: '$1, $2, $3...' with proper No of items // outputs string like: '$1, $2, $3...' with proper No of items
var placeholders = fields.map(function(_, i) { var placeholders = fields.map(function(_, i) {
return '$' + (i + 1); return '$' + (i + 1);
}).join(', '); }).join(', ');
return 'INSERT INTO ' + tableName + var query = 'INSERT INTO ' + tableName +
' (' + fields.join(', ') + ')' + ' (' + fields.join(', ') + ')' +
' VALUES' + ' VALUES' +
' (' + placeholders + ')'; ' (' + placeholders + ')';
if (hasId) query += ' RETURNING id';
return query;
} }
exports.init = function init(conString) { exports.init = function init(_conString) {
if (client !== null) return; conString = _conString;
if (!conString) { if (!conString) {
throw new Error('Postgres connection string is required'); throw new Error('Postgres connection string is required');
} }
client = new pg.Client(conString);
client.on('error', function (err) { logger.error(err); });
client.connect();
}; };
function connect(cb) {
pg.connect(conString, function(err, client, done) {
if (err) logger.error(err);
cb(err, client, done);
});
}
// logs inputted bill and overall tx status (if available) // logs inputted bill and overall tx status (if available)
exports.recordBill = function recordBill(deviceFingerprint, rec, cb) { exports.recordBill = function recordBill(deviceFingerprint, rec, cb) {
@ -86,32 +90,39 @@ exports.recordBill = function recordBill(deviceFingerprint, rec, cb) {
fields.push('uuid'); fields.push('uuid');
} }
client.query(getInsertQuery('bills', fields), values, function(err) { connect(function(err, client, done) {
if (err) return cb(err);
client.query(client, getInsertQuery('bills', fields), values, function(err) {
done();
if (err && PG_ERRORS[err.code] === 'uniqueViolation') if (err && PG_ERRORS[err.code] === 'uniqueViolation')
return cb(null, {code: 204}); return cb(null, {code: 204});
cb(); // 201 => Accepted / saved cb(); // 201 => Accepted / saved
}); });
});
}; };
exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint, event, cb) { exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint, event) {
connect(function(err, client, done) {
if (err) return;
client.query('INSERT INTO device_events (device_fingerprint, event_type, note, device_time)' + client.query('INSERT INTO device_events (device_fingerprint, event_type, note, device_time)' +
'VALUES ($1, $2, $3, $4)', 'VALUES ($1, $2, $3, $4)',
[deviceFingerprint, event.eventType, event.note, event.deviceTime], [deviceFingerprint, event.eventType, event.note, event.deviceTime],
cb); done);
});
}; };
function query(queryStr, values, cb) { function query(client, queryStr, values, cb) {
client.query(queryStr, values, cb); client.query(queryStr, values, cb);
} }
function silentQuery(queryStr, values, cb) { function silentQuery(client, queryStr, values, cb) {
client.query(queryStr, values, function(err) { client.query(queryStr, values, function(err) {
cb(err); cb(err);
}); });
} }
function billsAndTxs(txid, currencyCode, deviceFingerprint, cb) { function billsAndTxs(client, txid, currencyCode, deviceFingerprint, cb) {
var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' + var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' +
'COALESCE(SUM(satoshis), 0) AS satoshis ' + 'COALESCE(SUM(satoshis), 0) AS satoshis ' +
'FROM bills ' + 'FROM bills ' +
@ -124,8 +135,8 @@ function billsAndTxs(txid, currencyCode, deviceFingerprint, cb) {
var txValues = billsValues; // They happen to be the same var txValues = billsValues; // They happen to be the same
async.parallel([ async.parallel([
async.apply(query, billsQuery, billsValues), async.apply(query, client, billsQuery, billsValues),
async.apply(query, txQuery, txValues) async.apply(query, client, txQuery, txValues)
], function(err, results) { ], function(err, results) {
if (err) return cb(err); if (err) return cb(err);
@ -154,10 +165,26 @@ function computeSendAmount(tx, totals) {
return result; return result;
} }
function insertTx(deviceFingerprint, tx, totals, cb) { function removePendingTx(client, tx, cb) {
silentQuery(client, 'DELETE FROM TRANSACTIONS WHERE txid=$1 AND status=$2',
[tx.txid, 'pending'], cb);
}
function maybeInsertTx(client, deviceFingerprint, tx, totals, cb) {
var sendAmount = computeSendAmount(tx, totals); var sendAmount = computeSendAmount(tx, totals);
if (sendAmount.satoshis === 0) return cb(); if (sendAmount.satoshis === 0) return cb();
var status = _.isNumber(tx.fiat) ? 'machineSend' : 'timeout';
var satoshis = sendAmount.satoshis;
var fiat = sendAmount.fiat;
insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, function(err, results) {
// unique violation shouldn't happen, since then sendAmount would be 0
if (err) return cb(err);
cb(null, {id: results.rows[0].id, satoshisToSend: sendAmount.satoshis});
});
}
function insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, cb) {
var fields = [ var fields = [
'txid', 'txid',
'status', 'status',
@ -171,90 +198,55 @@ function insertTx(deviceFingerprint, tx, totals, cb) {
var values = [ var values = [
tx.txId, tx.txId,
_.isNumber(tx.fiat) ? 'machineSend' : 'timeout', status,
tx.tx_type || 'buy', tx.tx_type || 'buy',
deviceFingerprint, deviceFingerprint,
tx.toAddress, tx.toAddress,
sendAmount.satoshis, satoshis,
tx.currencyCode, tx.currencyCode,
sendAmount.fiat fiat
]; ];
query(getInsertQuery('transactions', fields), values, function(err, result) { query(client, getInsertQuery('transactions', fields, true), values, cb);
// unique violation shouldn't happen, since then sendAmount would be 0
if (err) return cb(err);
cb(null, sendAmount.satoshis);
});
} }
function processTx(deviceFingerprint, tx, cb) { exports.addPendingTx = function addPendingTx(deviceFingerprint, tx) {
connect(function(err, client, done) {
if (err) return;
insertTx(client, deviceFingerprint, tx, 0, 0, 'pending',
function(err) {
done();
// If pending tx already exists, do nothing
if (err && PG_ERRORS[err.code] !== 'uniqueViolation')
logger.error(err);
});
});
};
// Calling function should only send bitcoins if result !== null
exports.addTx = function addTx(deviceFingerprint, tx, cb) {
connect(function(err, client, done) {
if (err) return cb(err);
async.waterfall([ async.waterfall([
async.apply(silentQuery, 'BEGIN'), async.apply(silentQuery, client, 'BEGIN'),
async.apply(billsAndTxs, tx.currencyCode, deviceFingerprint), async.apply(removePendingTx, client, tx),
async.apply(insertTx, deviceFingerprint, tx) async.apply(billsAndTxs, client, tx.currencyCode, deviceFingerprint),
], function(err, satoshisToSend) { async.apply(maybeInsertTx, client, deviceFingerprint, tx)
// if (err) DO some rollback ], function(err, result) {
silentQuery('COMMIT', function() { if (err) {
client.end(); rollback(client, done);
cb(null, satoshisToSend); return cb(err);
}
silentQuery(client, 'COMMIT', function() {
done();
cb(null, result);
}); });
}); });
} });
};
/* /*
exports.insertTx = function insertTx(deviceFingerprint, tx, cb) {
var fields = [
'id',
'status',
'tx_type',
'device_fingerprint',
'to_address',
'satoshis',
'currency_code',
'fiat'
];
var values = [
tx.txId,
tx.status || 'pending',
tx.tx_type || 'buy',
deviceFingerprint,
tx.toAddress,
tx.satoshis,
tx.currencyCode,
tx.fiat
];
if (tx.partial_id && tx.partial_id > 1) {
fields.push('partial_id');
values.push(tx.partial_id);
}
if (typeof tx.is_completed !== 'undefined') {
fields.push('is_completed');
values.push(tx.is_completed);
}
// ----------------
async.waterfall([
async.apply(query, 'BEGIN'),
async.apply(query, 'BEGIN'),
])
client.query('BEGIN', function(err, result) {
if(err) return rollback(client);
client.query('INSERT INTO account(money) VALUES(100) WHERE id = $1', [1], function(err, result) {
if(err) return rollback(client);
client.query('INSERT INTO account(money) VALUES(-100) WHERE id = $1', [2], function(err, result) {
if(err) return rollback(client);
//disconnect after successful commit
client.query('COMMIT', client.end.bind(client));
});
});
});
};
*/
exports.decrementCartridges = exports.decrementCartridges =
function decrementCartridges(fingerprint, cartridge1, cartridge2, cb) { function decrementCartridges(fingerprint, cartridge1, cartridge2, cb) {
var query = 'UPDATE devices SET cartridge_1_bills = cartridge_1_bills - $1, ' + var query = 'UPDATE devices SET cartridge_1_bills = cartridge_1_bills - $1, ' +
@ -270,6 +262,7 @@ exports.fillCartridges =
'WHERE fingerprint = $3'; 'WHERE fingerprint = $3';
client.query(query, [cartridge1, cartridge2, fingerprint], cb); client.query(query, [cartridge1, cartridge2, fingerprint], cb);
}; };
*/
var tx = {fiat: 100, satoshis: 10090000}; var tx = {fiat: 100, satoshis: 10090000};
exports.init('psql://lamassu:lamassu@localhost/lamassu'); exports.init('psql://lamassu:lamassu@localhost/lamassu');