From 8cf5f2a4539a1ebd127a2dbb06cedd9edbbcf755 Mon Sep 17 00:00:00 2001 From: Josh Harvey Date: Tue, 18 Nov 2014 15:18:13 -0500 Subject: [PATCH] WIP most partial tx functionality working --- lib/plugins.js | 26 +++++++++++++------ lib/postgresql_interface.js | 52 +++++++++++++++++++++++++++++-------- lib/routes.js | 4 +-- 3 files changed, 61 insertions(+), 21 deletions(-) diff --git a/lib/plugins.js b/lib/plugins.js index 5e10f014..b9923835 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -7,7 +7,8 @@ var logger = require('./logger'); var SATOSHI_FACTOR = 1e8; var POLLING_RATE = 60 * 1000; // poll each minute - +var REAP_RATE = 5 * 1000; +var PENDING_TIMEOUT = 70 * 1000; var RECOMMENDED_FEE = 10000; var TX_0CONF_WAIT_TIME = 20 * 1000; // wait 20 seconds var MIN_CONFIDENCE = 0.7; @@ -195,11 +196,13 @@ function _sendBitcoins(toAddress, satoshis, cb) { function executeTx(deviceFingerprint, tx, cb) { db.addTx(deviceFingerprint, tx, function(err, result) { if (err) return cb(err); - if (!result) return cb(null, {statusCode: 204}); var satoshisToSend = result.satoshisToSend; var dbTxId = result.id; - return _sendBitcoins(tx.toAddress, satoshisToSend, function(err, txHash) { + if (satoshisToSend === 0) + return cb(null, {statusCode: 204, txId: tx.txId, txHash: null}); + + _sendBitcoins(tx.toAddress, satoshisToSend, function(err, txHash) { var fee = null; // Need to fill this out in plugins db.addDigitalTx(dbTxId, err, txHash, fee); @@ -208,7 +211,8 @@ function executeTx(deviceFingerprint, tx, cb) { pollBalance(); cb(null, { statusCode: 201, // Created - txHash: txHash + txHash: txHash, + txId: tx.txId }); }); }); @@ -227,7 +231,8 @@ function reapTx(row) { } function reapTxs() { - db.pendingTxs(function(err, results) { + db.pendingTxs(PENDING_TIMEOUT, function(err, results) { + if (err) return logger.warn(err); var rows = results.rows; var rowCount = rows.length; for (var i = 0; i < rowCount; i++) { @@ -237,8 +242,13 @@ function reapTxs() { }); } -exports.trade = function trade(rawTrade, deviceFingerprint, cb) { - db.addPendingTx(deviceFingerprint, rawTrade); +exports.trade = function trade(deviceFingerprint, rawTrade, cb) { + var tx = { + txId: rawTrade.txId, + toAddress: rawTrade.toAddress, + currencyCode: rawTrade.currency + }; + db.addPendingTx(deviceFingerprint, tx); // add bill to trader queue (if trader is enabled) if (traderPlugin) { @@ -440,7 +450,7 @@ exports.startPolling = function startPolling() { rateInterval = setInterval(pollRate, POLLING_RATE); if (!reapTxInterval) - reapTxInterval = setInterval(reapTxs, POLLING_RATE); + reapTxInterval = setInterval(reapTxs, REAP_RATE); startTrader(); }; diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index ec9c3030..b66bd686 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -88,7 +88,7 @@ exports.recordBill = function recordBill(deviceFingerprint, rec, cb) { connect(function(err, client, done) { if (err) return cb(err); - client.query(client, getInsertQuery('bills', fields), values, function(err) { + query(client, getInsertQuery('bills', fields), values, function(err) { done(); if (err && PG_ERRORS[err.code] === 'uniqueViolation') return cb(null, {code: 204}); @@ -109,15 +109,18 @@ exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint, event) }; function query(client, queryStr, values, cb) { + console.dir([queryStr, values]); client.query(queryStr, values, cb); } function silentQuery(client, queryStr, values, cb) { + console.dir([queryStr, values]); client.query(queryStr, values, function(err) { cb(err); }); } +// OPTIMIZE: No need to query bills if tx.fiat and tx.satoshis are set function billsAndTxs(client, txid, currencyCode, deviceFingerprint, cb) { var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' + 'COALESCE(SUM(satoshis), 0) AS satoshis ' + @@ -154,27 +157,40 @@ function computeSendAmount(tx, totals) { }; if (result.fiat < 0 || result.satoshis < 0) { logger.warn({tx: tx, totals: totals, result: result}, - 'computeSendAmount result < 0, this shouldn\'t happen'); + 'computeSendAmount result < 0, this shouldn\'t happen. Maybe timeout arrived after machineSend.'); result.fiat = 0; result.satoshis = 0; } return result; } +exports.pendingTxs = function pendingTxs(timeoutMS, cb) { + connect(function(err, client, done) { + var sql = 'SELECT * FROM transactions ' + + 'WHERE status=$1 AND ' + + 'EXTRACT(EPOCH FROM now() - created) > $2 ' + + 'ORDER BY created ASC'; + var timeoutS = timeoutMS / 1000; + var values = ['pending', timeoutS]; + query(client, sql, values, function(err, results) { + done(); + cb(err, results); + }); + }); +}; + function removePendingTx(client, tx, cb) { - silentQuery(client, 'DELETE FROM TRANSACTIONS WHERE txid=$1 AND status=$2', - [tx.txid, 'pending'], cb); + silentQuery(client, 'DELETE FROM transactions WHERE txid=$1 AND status=$2', + [tx.txId, 'pending'], cb); } function maybeInsertTx(client, deviceFingerprint, tx, totals, cb) { var sendAmount = computeSendAmount(tx, totals); - if (sendAmount.satoshis === 0) return cb(); - var status = _.isNumber(tx.fiat) ? 'machineSend' : 'timeout'; var satoshis = sendAmount.satoshis; var fiat = sendAmount.fiat; insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, function(err, results) { - // unique violation shouldn't happen, since then sendAmount would be 0 + // TODO: Don't worry about unique violation if (err) return cb(err); cb(null, {id: results.rows[0].id, satoshisToSend: sendAmount.satoshis}); }); @@ -220,21 +236,21 @@ exports.addPendingTx = function addPendingTx(deviceFingerprint, tx) { }); }; -// Calling function should only send bitcoins if result !== null +// Calling function should only send bitcoins if result.satoshisToSend > 0 exports.addTx = function addTx(deviceFingerprint, tx, cb) { connect(function(err, client, done) { if (err) return cb(err); async.waterfall([ - async.apply(silentQuery, client, 'BEGIN'), + async.apply(silentQuery, client, 'BEGIN', null), async.apply(removePendingTx, client, tx), - async.apply(billsAndTxs, client, tx.currencyCode, deviceFingerprint), + async.apply(billsAndTxs, client, tx.txId, tx.currencyCode, deviceFingerprint), async.apply(maybeInsertTx, client, deviceFingerprint, tx) ], function(err, result) { if (err) { rollback(client, done); return cb(err); } - silentQuery(client, 'COMMIT', function() { + silentQuery(client, 'COMMIT', null, function() { done(); cb(null, result); }); @@ -242,6 +258,11 @@ exports.addTx = function addTx(deviceFingerprint, tx, cb) { }); }; +exports.addDigitalTx = function addDigitalTx(dbTxId, err, txHash, fee) { + console.log('DEBUG: adding digitaltx'); + console.dir([dbTxId, err, txHash, fee]); +}; + /* exports.decrementCartridges = function decrementCartridges(fingerprint, cartridge1, cartridge2, cb) { @@ -259,3 +280,12 @@ exports.fillCartridges = client.query(query, [cartridge1, cartridge2, fingerprint], cb); }; */ + +/* DEBUG +exports.init('psql://lamassu:lamassu@localhost/lamassu'); + +var fp = 'AB:9C:09:AA:7B:48:51:9A:0E:13:59:4E:5E:69:D0:74:E5:0F:4A:66'; +var txId = '5ef9c631-d948-4f0f-bf22-d2a563f5cd26'; +var tx = {txId: txId, currencyCode: 'USD', toAddress: '1xxx'}; +exports.addTx(fp, tx, function(err, result) { pg.end(); console.dir(result); }); +*/ diff --git a/lib/routes.js b/lib/routes.js index 6af33da5..6b435795 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -81,7 +81,7 @@ function poll(req, res) { } function trade(req, res) { - plugins.trade(req.body, getFingerprint(req), function(err, data) { + plugins.trade(getFingerprint(req), req.body, function(err, data) { var statusCode = data && data.code !== null ? data.code : 201; res.json(statusCode, {err: null}); }); @@ -95,7 +95,7 @@ function send(req, res) { res.json({ errType: err && err.name, err: err && err.message, - txHash: status && status.txId, + txHash: status && status.txHash, txId: status && status.txId }); });