From 47dd9c89b6e2f7c15378e9080c59c0d3e3680b8d Mon Sep 17 00:00:00 2001 From: Josh Harvey Date: Mon, 17 Nov 2014 20:02:24 -0500 Subject: [PATCH] WIP --- lib/postgresql_interface.js | 245 +++++++++++++++++++----------------- 1 file changed, 131 insertions(+), 114 deletions(-) diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index dbd2d0c5..5e35e51f 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -2,6 +2,7 @@ var pg = require('pg'); var async = require('async'); +var _ = require('lodash'); var logger = require('./logger'); @@ -11,6 +12,16 @@ var PG_ERRORS = { var client = null; +function rollback(client) { + //terminating a client connection will + //automatically rollback any uncommitted transactions + //so while it's not technically mandatory to call + //ROLLBACK it is cleaner and more correct + logger.warn('Rolling back transaction.'); + client.query('ROLLBACK', function() { + client.end(); + }); +} function getInsertQuery(tableName, fields) { @@ -90,68 +101,107 @@ exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint, event, cb); }; -exports.getPendingAmount = function getPendingAmount(txId, cb) { - async.parallel({ - // NOTE: `async.apply()` would strip context here - txs: function(_cb) { - client.query( - 'SELECT * FROM transactions WHERE id=$1', - [txId], - _cb - ); - }, - bills: function(_cb) { - client.query( - 'SELECT * FROM bills WHERE transaction_id=$1 ORDER BY device_time DESC', - [txId], - _cb - ); - } - }, function(err, results) { +function query(queryStr, values, cb) { + client.query(queryStr, values, cb); +} + +function silentQuery(queryStr, values, cb) { + client.query(queryStr, values, function(err) { + cb(err); + }); +} + +function billsAndTxs(txid, currencyCode, deviceFingerprint, cb) { + var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' + + 'COALESCE(SUM(satoshis), 0) AS satoshis ' + + 'FROM bills ' + + 'WHERE transaction_id=$1 AND currency_code=$2 AND device_fingerprint=$3'; + var billsValues = [txid, currencyCode, deviceFingerprint]; + var txQuery = 'SELECT COALESCE(SUM(fiat), 0) AS fiat, ' + + 'COALESCE(SUM(satoshis), 0) AS satoshis ' + + 'FROM transactions ' + + 'WHERE txid=$1 AND currency_code=$2 AND device_fingerprint=$3'; + var txValues = billsValues; // They happen to be the same + + async.parallel([ + async.apply(query, billsQuery, billsValues), + async.apply(query, txQuery, txValues) + ], function(err, results) { if (err) return cb(err); - // No bills == nothing to do - if (results.bills.rows.length === 0) - return cb(); - - var lastBill = results.bills.rows[0]; - - var newTx = { - txId: txId, - satoshis: lastBill.total_satoshis, - fiat: lastBill.total_fiat, - deviceFingerprint: lastBill.device_fingerprint, - toAddress: lastBill.to_address, - currencyCode: lastBill.currency_code - }; - - // if there are txs, substract already sent amount - if (results.txs.rows.length > 0) { - newTx.partial_id = results.txs.rows.length + 1; - newTx.satoshis = lastBill.total_satoshis; - newTx.fiat = lastBill.total_fiat; - - results.txs.rows.forEach(function(tx) { - // try sending again only in case of a fail due to insufficientFunds - if (tx.status !== 'insufficientFunds') { - newTx.satoshis -= tx.satoshis; - newTx.fiat -= tx.fiat; - } - }); - } - - // Nothing to send == nothing to do - if (newTx.satoshis <= 0) { - if (newTx.satoshis < 0) - logger.error('Negative tx amount (%d) for txId: %s', newTx.satoshis, txId); - - return cb(); - } - - cb(null, newTx); + // Note: PG SUM function returns int8, which is returned as a string, so + // we need to parse, since we know these won't be huge numbers. + cb(null, { + billsFiat: parseInt(results[0].rows[0].fiat), + billsSatoshis: parseInt(results[0].rows[0].satoshis), + txFiat: parseInt(results[1].rows[0].fiat), + txSatoshis: parseInt(results[1].rows[0].satoshis) + }); }); -}; +} +function computeSendAmount(tx, totals) { + var result = { + fiat: (tx.fiat || totals.billsFiat) - totals.txFiat, + satoshis: (tx.satoshis || totals.billsSatoshis) - totals.txSatoshis + }; + if (result.fiat < 0 || result.satoshis < 0) { + logger.warn({tx: tx, totals: totals, result: result}, + 'computeSendAmount result < 0, this shouldn\'t happen'); + result.fiat = 0; + result.satoshis = 0; + } + return result; +} + +function insertTx(deviceFingerprint, tx, totals, cb) { + var sendAmount = computeSendAmount(tx, totals); + if (sendAmount.satoshis === 0) return cb(); + + var fields = [ + 'txid', + 'status', + 'tx_type', + 'device_fingerprint', + 'to_address', + 'satoshis', + 'currency_code', + 'fiat' + ]; + + var values = [ + tx.txId, + _.isNumber(tx.fiat) ? 'machineSend' : 'timeout', + tx.tx_type || 'buy', + deviceFingerprint, + tx.toAddress, + sendAmount.satoshis, + tx.currencyCode, + sendAmount.fiat + ]; + + query(getInsertQuery('transactions', fields), values, function(err, result) { + // unique violation shouldn't happen, since then sendAmount would be 0 + if (err) return cb(err); + cb(null, sendAmount.satoshis); + }); +} + +function processTx(deviceFingerprint, tx, cb) { + async.waterfall([ + async.apply(silentQuery, 'BEGIN'), + async.apply(billsAndTxs, tx.currencyCode, deviceFingerprint), + async.apply(insertTx, deviceFingerprint, tx) + ], function(err, satoshisToSend) { + // if (err) DO some rollback + silentQuery('COMMIT', function() { + client.end(); + cb(null, satoshisToSend); + }); + }); +} + +/* exports.insertTx = function insertTx(deviceFingerprint, tx, cb) { var fields = [ 'id', @@ -185,65 +235,25 @@ exports.insertTx = function insertTx(deviceFingerprint, tx, cb) { values.push(tx.is_completed); } - // First attampt an INSERT - // If it worked, go ahead with transaction - client.query(getInsertQuery('transactions', fields), - values, - function(err) { - if (err) { - if (PG_ERRORS[err.code] === 'uniqueViolation') { - var _err = new Error(err); - _err.name = 'UniqueViolation'; - return cb(_err); - } + // ---------------- - return cb(err); - } - - cb(); + async.waterfall([ + async.apply(query, 'BEGIN'), + async.apply(query, 'BEGIN'), + ]) + client.query('BEGIN', function(err, result) { + if(err) return rollback(client); + client.query('INSERT INTO account(money) VALUES(100) WHERE id = $1', [1], function(err, result) { + if(err) return rollback(client); + client.query('INSERT INTO account(money) VALUES(-100) WHERE id = $1', [2], function(err, result) { + if(err) return rollback(client); + //disconnect after successful commit + client.query('COMMIT', client.end.bind(client)); }); + }); +}); }; - -// `@data` can contain `partial_id`, `is_completed`, `hash`, or `error` -exports.changeTxStatus = function changeTxStatus(txId, newStatus, data, cb) { - data = data || {}; - cb = typeof cb === 'function' ? cb : function() {}; - - var query = 'UPDATE transactions SET status=$1'; - var values = [ - newStatus - ]; - - var n = 2; - - if (newStatus === 'error') { - query += ', error=$' + n++; - values.push(data.error); - } - - // set tx_hash (if available) - if (typeof data.hash !== 'undefined') { - query += ', tx_hash=$' + n++; - values.push(data.hash); - } - - // indicates if tx was finished by a `/send` call (and not timeout) - if (typeof data.is_completed !== 'undefined') { - query += ', is_completed=$' + n++; - values.push(data.is_completed); - } - - - query += ' WHERE id=$' + n++; - values.push(txId); - - var partial_id = parseInt(data.partial_id); - if (partial_id > 1) { - query += ' AND partial_id=$' + n++; - values.push(partial_id); - } - client.query(query, values, cb); -}; +*/ exports.decrementCartridges = function decrementCartridges(fingerprint, cartridge1, cartridge2, cb) { @@ -260,3 +270,10 @@ exports.fillCartridges = 'WHERE fingerprint = $3'; client.query(query, [cartridge1, cartridge2, fingerprint], cb); }; + +var tx = {fiat: 100, satoshis: 10090000}; +exports.init('psql://lamassu:lamassu@localhost/lamassu'); +billsAndTxs('5ef9c631-d948-4f0f-bf22-d2a563f5cd26', 'USD', 'AB:9C:09:AA:7B:48:51:9A:0E:13:59:4E:5E:69:D0:74:E5:0F:4A:66', + function(err, result) { console.dir(err); console.dir(result); + console.dir(computeSendAmount(tx, result)); + });