diff --git a/lib/plugins.js b/lib/plugins.js index 0486fbc2..5e10f014 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -6,7 +6,6 @@ var logger = require('./logger'); var SATOSHI_FACTOR = 1e8; -var SESSION_TIMEOUT = 60 * 60 * 1000; var POLLING_RATE = 60 * 1000; // poll each minute var RECOMMENDED_FEE = 10000; @@ -34,9 +33,9 @@ var lastRates = {}; var balanceInterval = null; var rateInterval = null; var tradeInterval = null; +var reapTxInterval = null; var tradesQueue = []; -var sessions = {}; var dispenseStatuses = {}; @@ -188,106 +187,58 @@ exports.logEvent = function event(rawEvent, deviceFingerprint) { db.recordDeviceEvent(deviceFingerprint, rawEvent); }; - -// Just prompts plugin to send BTC -function _sendBitcoins(tx, cb) { - logger.debug('executing tx: %j', tx); - db.changeTxStatus(tx.txId, 'executing'); - walletPlugin.sendBitcoins( - tx.toAddress, - tx.satoshis, - cachedConfig.exchanges.settings.transactionFee, - - function(err, txHash) { - if (err) { - var status = err.name === 'InsufficientFunds' ? - 'insufficientFunds' : - 'failed'; - - // report insufficient funds error - db.changeTxStatus(tx.txId, status, {error: err.message}); - return cb(err); - } - - if (txHash) db.changeTxStatus(tx.txId, 'completed', {hash: txHash}); - else db.changeTxStatus(tx.txId, 'failed', {error: 'No txHash received'}); - - pollBalance(); - cb(null, txHash); - } - ); +function _sendBitcoins(toAddress, satoshis, cb) { + var transactionFee = cachedConfig.exchanges.settings.transactionFee; + walletPlugin.sendBitcoins(toAddress, satoshis, transactionFee, cb); } -function executeTx(deviceFingerprint, txId, triggeredByUser, cb) { - cb = typeof cb === 'function' ? cb : function() {}; +function executeTx(deviceFingerprint, tx, cb) { + db.addTx(deviceFingerprint, tx, function(err, result) { + if (err) return cb(err); + if (!result) return cb(null, {statusCode: 204}); + var satoshisToSend = result.satoshisToSend; + var dbTxId = result.id; - clearSession(deviceFingerprint); + return _sendBitcoins(tx.toAddress, satoshisToSend, function(err, txHash) { + var fee = null; // Need to fill this out in plugins + db.addDigitalTx(dbTxId, err, txHash, fee); - // 1. get remaining amount to be sent - db.getPendingAmount(txId, function(err, tx) { - if (err) { - logger.error(err); - return cb(err); - } + if (err) return cb(err); - if (!tx) { - logger.info('Nothing to send (%s)', txId); - - // all bills were already sent by a timeout trigger; - // now only mark that user's `/send` arrived - if (triggeredByUser) - db.changeTxStatus(txId, 'completed', {is_completed: true}); - - // indicate ACK to machine - return cb(null, { - statusCode: 204, // No Content - txId: txId - }); - } - - // indicate whether this call was initiated by user or timeout - if (triggeredByUser) - tx.is_completed = true; - - // 2. BEFORE sending insert tx to a db - db.insertTx(deviceFingerprint, tx, function(err) { - if (err) { - // `getPendingAmount` generated new `partial_id`, so this can occur - // only when 2nd executeTx gets called before 1st executes it's insert - if (err.name === 'UniqueViolation') { - // this will calculate again and then send only "pending" coins - return executeTx(deviceFingerprint, txId, triggeredByUser, cb); - } - - return cb(err); - } - - // 3. actual sending (of the same amount, that was just inserted to the db) - return _sendBitcoins(tx, function(err, txHash) { - pollBalance(); - // TODO: should we indicate error to the machine here? - - // indicate ACK to machine - cb(null, { - statusCode: 201, // Created - txId: txHash - }); + pollBalance(); + cb(null, { + statusCode: 201, // Created + txHash: txHash }); }); }); } -// This is where we record starting trade balance at the beginning -// of the user session +function reapTx(row) { + var deviceFingerprint = row.device_fingerprint; + var tx = { + txId: row.txid, + toAddress: row.to_address, + currencyCode: row.currency_code + }; + executeTx(deviceFingerprint, tx, function(err) { + if (err) logger.error(err); + }); +} + +function reapTxs() { + db.pendingTxs(function(err, results) { + var rows = results.rows; + var rowCount = rows.length; + for (var i = 0; i < rowCount; i++) { + var row = rows[i]; + reapTx(row); + } + }); +} + exports.trade = function trade(rawTrade, deviceFingerprint, cb) { - if (!sessions[deviceFingerprint]) { - sessions[deviceFingerprint] = { - timestamp: Date.now(), - reaper: setTimeout(function() { - executeTx(deviceFingerprint, rawTrade.txId, false); - }, SESSION_TIMEOUT) - }; - } + db.addPendingTx(deviceFingerprint, rawTrade); // add bill to trader queue (if trader is enabled) if (traderPlugin) { @@ -302,7 +253,7 @@ exports.trade = function trade(rawTrade, deviceFingerprint, cb) { }; exports.sendBitcoins = function sendBitcoins(deviceFingerprint, rawTx, cb) { - executeTx(deviceFingerprint, rawTx.txId, true, cb); + executeTx(deviceFingerprint, rawTx, cb); }; @@ -482,13 +433,14 @@ exports.fiatBalance = function fiatBalance() { exports.startPolling = function startPolling() { executeTrades(); - if (!balanceInterval) { + if (!balanceInterval) balanceInterval = setInterval(pollBalance, POLLING_RATE); - } - if (!rateInterval) { + if (!rateInterval) rateInterval = setInterval(pollRate, POLLING_RATE); - } + + if (!reapTxInterval) + reapTxInterval = setInterval(reapTxs, POLLING_RATE); startTrader(); }; @@ -580,15 +532,6 @@ exports.getBalance = function getBalance() { return lastBalances.transferBalance; }; -function clearSession(deviceFingerprint) { - var session = sessions[deviceFingerprint]; - if (session) { - clearTimeout(session.reaper); - delete sessions[deviceFingerprint]; - } -} - - /* * Trader functions */ diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index 5e35e51f..50e384b2 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -10,46 +10,50 @@ var PG_ERRORS = { 23505: 'uniqueViolation' }; -var client = null; +var conString = null; -function rollback(client) { +function rollback(client, done) { //terminating a client connection will //automatically rollback any uncommitted transactions //so while it's not technically mandatory to call //ROLLBACK it is cleaner and more correct logger.warn('Rolling back transaction.'); - client.query('ROLLBACK', function() { - client.end(); + client.query('ROLLBACK', function(err) { + return done(err); }); } -function getInsertQuery(tableName, fields) { +function getInsertQuery(tableName, fields, hasId) { // outputs string like: '$1, $2, $3...' with proper No of items var placeholders = fields.map(function(_, i) { return '$' + (i + 1); }).join(', '); - return 'INSERT INTO ' + tableName + + var query = 'INSERT INTO ' + tableName + ' (' + fields.join(', ') + ')' + ' VALUES' + ' (' + placeholders + ')'; + + if (hasId) query += ' RETURNING id'; + + return query; } -exports.init = function init(conString) { - if (client !== null) return; - +exports.init = function init(_conString) { + conString = _conString; if (!conString) { throw new Error('Postgres connection string is required'); } - - client = new pg.Client(conString); - client.on('error', function (err) { logger.error(err); }); - - client.connect(); }; +function connect(cb) { + pg.connect(conString, function(err, client, done) { + if (err) logger.error(err); + cb(err, client, done); + }); +} // logs inputted bill and overall tx status (if available) exports.recordBill = function recordBill(deviceFingerprint, rec, cb) { @@ -86,32 +90,39 @@ exports.recordBill = function recordBill(deviceFingerprint, rec, cb) { fields.push('uuid'); } - client.query(getInsertQuery('bills', fields), values, function(err) { - if (err && PG_ERRORS[err.code] === 'uniqueViolation') - return cb(null, {code: 204}); + connect(function(err, client, done) { + if (err) return cb(err); + client.query(client, getInsertQuery('bills', fields), values, function(err) { + done(); + if (err && PG_ERRORS[err.code] === 'uniqueViolation') + return cb(null, {code: 204}); - cb(); // 201 => Accepted / saved + cb(); // 201 => Accepted / saved + }); }); }; -exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint, event, cb) { - client.query('INSERT INTO device_events (device_fingerprint, event_type, note, device_time)' + - 'VALUES ($1, $2, $3, $4)', - [deviceFingerprint, event.eventType, event.note, event.deviceTime], - cb); +exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint, event) { + connect(function(err, client, done) { + if (err) return; + client.query('INSERT INTO device_events (device_fingerprint, event_type, note, device_time)' + + 'VALUES ($1, $2, $3, $4)', + [deviceFingerprint, event.eventType, event.note, event.deviceTime], + done); + }); }; -function query(queryStr, values, cb) { +function query(client, queryStr, values, cb) { client.query(queryStr, values, cb); } -function silentQuery(queryStr, values, cb) { +function silentQuery(client, queryStr, values, cb) { client.query(queryStr, values, function(err) { cb(err); }); } -function billsAndTxs(txid, currencyCode, deviceFingerprint, cb) { +function billsAndTxs(client, txid, currencyCode, deviceFingerprint, cb) { var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' + 'COALESCE(SUM(satoshis), 0) AS satoshis ' + 'FROM bills ' + @@ -124,8 +135,8 @@ function billsAndTxs(txid, currencyCode, deviceFingerprint, cb) { var txValues = billsValues; // They happen to be the same async.parallel([ - async.apply(query, billsQuery, billsValues), - async.apply(query, txQuery, txValues) + async.apply(query, client, billsQuery, billsValues), + async.apply(query, client, txQuery, txValues) ], function(err, results) { if (err) return cb(err); @@ -154,10 +165,26 @@ function computeSendAmount(tx, totals) { return result; } -function insertTx(deviceFingerprint, tx, totals, cb) { +function removePendingTx(client, tx, cb) { + silentQuery(client, 'DELETE FROM TRANSACTIONS WHERE txid=$1 AND status=$2', + [tx.txid, 'pending'], cb); +} + +function maybeInsertTx(client, deviceFingerprint, tx, totals, cb) { var sendAmount = computeSendAmount(tx, totals); if (sendAmount.satoshis === 0) return cb(); + var status = _.isNumber(tx.fiat) ? 'machineSend' : 'timeout'; + var satoshis = sendAmount.satoshis; + var fiat = sendAmount.fiat; + insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, function(err, results) { + // unique violation shouldn't happen, since then sendAmount would be 0 + if (err) return cb(err); + cb(null, {id: results.rows[0].id, satoshisToSend: sendAmount.satoshis}); + }); +} + +function insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, cb) { var fields = [ 'txid', 'status', @@ -171,90 +198,55 @@ function insertTx(deviceFingerprint, tx, totals, cb) { var values = [ tx.txId, - _.isNumber(tx.fiat) ? 'machineSend' : 'timeout', + status, tx.tx_type || 'buy', deviceFingerprint, tx.toAddress, - sendAmount.satoshis, + satoshis, tx.currencyCode, - sendAmount.fiat + fiat ]; - query(getInsertQuery('transactions', fields), values, function(err, result) { - // unique violation shouldn't happen, since then sendAmount would be 0 - if (err) return cb(err); - cb(null, sendAmount.satoshis); - }); + query(client, getInsertQuery('transactions', fields, true), values, cb); } -function processTx(deviceFingerprint, tx, cb) { - async.waterfall([ - async.apply(silentQuery, 'BEGIN'), - async.apply(billsAndTxs, tx.currencyCode, deviceFingerprint), - async.apply(insertTx, deviceFingerprint, tx) - ], function(err, satoshisToSend) { - // if (err) DO some rollback - silentQuery('COMMIT', function() { - client.end(); - cb(null, satoshisToSend); +exports.addPendingTx = function addPendingTx(deviceFingerprint, tx) { + connect(function(err, client, done) { + if (err) return; + insertTx(client, deviceFingerprint, tx, 0, 0, 'pending', + function(err) { + done(); + + // If pending tx already exists, do nothing + if (err && PG_ERRORS[err.code] !== 'uniqueViolation') + logger.error(err); + }); + }); +}; + +// Calling function should only send bitcoins if result !== null +exports.addTx = function addTx(deviceFingerprint, tx, cb) { + connect(function(err, client, done) { + if (err) return cb(err); + async.waterfall([ + async.apply(silentQuery, client, 'BEGIN'), + async.apply(removePendingTx, client, tx), + async.apply(billsAndTxs, client, tx.currencyCode, deviceFingerprint), + async.apply(maybeInsertTx, client, deviceFingerprint, tx) + ], function(err, result) { + if (err) { + rollback(client, done); + return cb(err); + } + silentQuery(client, 'COMMIT', function() { + done(); + cb(null, result); + }); }); }); -} +}; /* -exports.insertTx = function insertTx(deviceFingerprint, tx, cb) { - var fields = [ - 'id', - 'status', - 'tx_type', - 'device_fingerprint', - 'to_address', - 'satoshis', - 'currency_code', - 'fiat' - ]; - - var values = [ - tx.txId, - tx.status || 'pending', - tx.tx_type || 'buy', - deviceFingerprint, - tx.toAddress, - tx.satoshis, - tx.currencyCode, - tx.fiat - ]; - - if (tx.partial_id && tx.partial_id > 1) { - fields.push('partial_id'); - values.push(tx.partial_id); - } - - if (typeof tx.is_completed !== 'undefined') { - fields.push('is_completed'); - values.push(tx.is_completed); - } - - // ---------------- - - async.waterfall([ - async.apply(query, 'BEGIN'), - async.apply(query, 'BEGIN'), - ]) - client.query('BEGIN', function(err, result) { - if(err) return rollback(client); - client.query('INSERT INTO account(money) VALUES(100) WHERE id = $1', [1], function(err, result) { - if(err) return rollback(client); - client.query('INSERT INTO account(money) VALUES(-100) WHERE id = $1', [2], function(err, result) { - if(err) return rollback(client); - //disconnect after successful commit - client.query('COMMIT', client.end.bind(client)); - }); - }); -}); -}; -*/ - exports.decrementCartridges = function decrementCartridges(fingerprint, cartridge1, cartridge2, cb) { var query = 'UPDATE devices SET cartridge_1_bills = cartridge_1_bills - $1, ' + @@ -270,6 +262,7 @@ exports.fillCartridges = 'WHERE fingerprint = $3'; client.query(query, [cartridge1, cartridge2, fingerprint], cb); }; +*/ var tx = {fiat: 100, satoshis: 10090000}; exports.init('psql://lamassu:lamassu@localhost/lamassu');