From ad658692299c29e95e16ad1193dd0e93b296a207 Mon Sep 17 00:00:00 2001 From: Josh Harvey Date: Thu, 27 Nov 2014 14:06:08 -0500 Subject: [PATCH] WIP lots of fixes --- lib/plugins.js | 12 +-- lib/postgresql_interface.js | 118 +++++++++++++++++--------- lib/routes.js | 1 + migrations/004-transactions-reload.js | 7 +- 4 files changed, 89 insertions(+), 49 deletions(-) diff --git a/lib/plugins.js b/lib/plugins.js index 54bf3da6..fa2cab2a 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -7,7 +7,7 @@ var logger = require('./logger'); var SATOSHI_FACTOR = 1e8; var POLLING_RATE = 60 * 1000; // poll each minute -var REAP_RATE = 5 * 1000; +var REAP_RATE = 2 * 1000; var PENDING_TIMEOUT = 70 * 1000; // TODO: might have to update this if user is allowed to extend monitoring time @@ -53,7 +53,7 @@ function loadPlugin(name, config) { trader: ['balance', 'purchase', 'sell'], wallet: ['balance', 'sendBitcoins', 'newAddress'], idVerifier: ['verifyUser', 'verifyTransaction'], - info: ['getAddressLastTx', 'getTx'] + info: ['checkAddress'] }; var plugin = null; @@ -223,8 +223,9 @@ function reapOutgoingTx(session, tx) { } function reapIncomingTx(session, tx) { - infoPlugin.checkAddress(tx.toAddress, function(err, status, + infoPlugin.checkAddress(tx.toAddress, tx.satoshis, function(err, status, satoshisReceived, txHash) { + if (err) return logger.error(err); if (status === 'notSeen') return; var newTx = _.clone(tx); newTx.txHash = txHash; @@ -237,6 +238,7 @@ function reapIncomingTx(session, tx) { function reapTx(row) { var session = {fingerprint: row.device_fingerprint, id: row.session_id}; var tx = { + satoshis: row.satoshis, toAddress: row.to_address, currencyCode: row.currency_code, incoming: row.incoming @@ -282,7 +284,8 @@ exports.trade = function trade(session, rawTrade, cb) { }; async.parallel([ - async.apply(db.addPendingTx, session, tx), + async.apply(db.addOutgoingPending, session, tx.currencyCode, tx.toAddress, + tx.satoshis), async.apply(db.recordBill, session, rawTrade) ], cb); }; @@ -308,7 +311,6 @@ exports.cashOut = function cashOut(session, tx, cb) { }; exports.dispenseStatus = function dispenseStatus(session, cb) { - console.log('DEBUG1'); db.dispenseStatus(session, cb); }; diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index f319a7cd..f596f900 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -5,13 +5,18 @@ var pg = require('pg'); var async = require('async'); +var util = require('util'); var _ = require('lodash'); var logger = require('./logger'); -var PG_ERRORS = { - '23505': 'uniqueViolation' -}; +function inspect(rec) { + console.log(util.inspect(rec, {depth: null, colors: true})); +} + +function isUniqueViolation(err) { + return err.code === '23505'; +} var conString = null; @@ -56,29 +61,29 @@ function connect(cb) { // logs inputted bill and overall tx status (if available) exports.recordBill = function recordBill(session, rec, cb) { var fields = [ + 'id', 'device_fingerprint', 'currency_code', 'to_address', 'session_id', 'device_time', - 'satoshis', 'denomination' ]; var values = [ + rec.uuid, session.fingerprint, rec.currency, rec.toAddress, session.id, rec.deviceTime, - rec.satoshis, rec.fiat ]; - connect(function(err, client, done) { - if (err) return cb(err); + connect(function(cerr, client, done) { + if (cerr) return cb(cerr); query(client, getInsertQuery('bills', fields), values, function(err) { done(); // TODO: Handle unique violations more cleanly for idempotency @@ -110,7 +115,7 @@ function query(client, queryStr, values, cb) { if (err) { console.log(queryStr); console.log(values); - return cb(new Error(err)); + return cb(err); } cb(null, results); }); @@ -123,7 +128,11 @@ function silentQuery(client, queryStr, values, cb) { } client.query(queryStr, values, function(err) { - if (err) cb(new Error(err)); + if (err) { + console.log(queryStr); + console.log(values); + cb(err); + } cb(); }); } @@ -132,12 +141,13 @@ function silentQuery(client, queryStr, values, cb) { function billsAndTxs(client, session, cb) { var sessionId = session.id; var fingerprint = session.fingerprint; - var billsQuery = 'SELECT SUM(denomination) as fiat, ' + - 'SUM(satoshis) AS satoshis ' + + var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' + + 'COALESCE(SUM(satoshis), 0) AS satoshis ' + 'FROM bills ' + 'WHERE device_fingerprint=$1 AND session_id=$2'; var billsValues = [fingerprint, sessionId]; - var txQuery = 'SELECT SUM(fiat) AS fiat, SUM(satoshis) AS satoshis ' + + var txQuery = 'SELECT COALESCE(SUM(fiat), 0) AS fiat, ' + + 'COALESCE(SUM(satoshis), 0) AS satoshis ' + 'FROM transactions ' + 'WHERE device_fingerprint=$1 AND session_id=$2 AND stage=$3'; var txValues = [fingerprint, sessionId, 'partial_request']; @@ -236,7 +246,8 @@ function insertOutgoingCompleteTx(client, session, tx, cb) { function insertIncoming(client, session, tx, satoshis, fiat, stage, authority, cb) { - insertTx(client, session, true, tx, satoshis, fiat, stage, authority, cb); + var realSatoshis = satoshis || 0; + insertTx(client, session, true, tx, realSatoshis, fiat, stage, authority, cb); } function insertOutgoing(client, session, tx, satoshis, fiat, stage, authority, @@ -281,44 +292,46 @@ function insertTx(client, session, incoming, tx, satoshis, fiat, stage, }); } -function addPendingTx(client, session, incoming, currencyCode, toAddress, cb) { - console.log('DEBUG5: %s', incoming); - connect(function(err, client, done) { - if (err) return cb(err); - var fields = ['device_fingerprint', 'session_id', 'incoming', - 'currency_code', 'to_address']; - var sql = getInsertQuery('pending_transactions', fields); - var values = [session.fingerprint, session.id, incoming, currencyCode, - toAddress]; - query(client, sql, values, function(_err) { - done(); +function addPendingTx(client, session, incoming, currencyCode, toAddress, + satoshis, cb) { + var fields = ['device_fingerprint', 'session_id', 'incoming', + 'currency_code', 'to_address', 'satoshis']; + var sql = getInsertQuery('pending_transactions', fields); + var values = [session.fingerprint, session.id, incoming, currencyCode, + toAddress, satoshis]; + query(client, sql, values, function(err) { - // If pending tx already exists, do nothing - if (_err && PG_ERRORS[_err.code] !== 'uniqueViolation') - logger.error(err); + // If pending tx already exists, do nothing + if (err && !isUniqueViolation(err)) return cb(err); - cb(_err); - }); + cb(); }); } +function buildOutgoingTx(client, session, tx, cb) { + async.waterfall([ + async.apply(billsAndTxs, client, session), + async.apply(insertOutgoingTx, client, session, tx) + ], cb); +} + // Calling function should only send bitcoins if result.satoshisToSend > 0 exports.addOutgoingTx = function addOutgoingTx(session, tx, cb) { connect(function(err, client, done) { if (err) return cb(err); - async.waterfall([ - async.apply(silentQuery, client, 'BEGIN', null), + async.series([ + async.apply(silentQuery, client, 'BEGIN'), async.apply(insertOutgoingCompleteTx, client, session, tx), - async.apply(removePendingTx, client, tx.sessionId), - async.apply(billsAndTxs, client, session, tx.currencyCode), - async.apply(insertOutgoingTx, client, session, tx), - ], function(err, satoshisToSend) { + async.apply(removePendingTx, client, session), + async.apply(buildOutgoingTx, client, session, tx) + ], function(err, results) { if (err) { rollback(client, done); return cb(err); } - silentQuery(client, 'COMMIT', null, function() { + silentQuery(client, 'COMMIT', function() { done(); + var satoshisToSend = results[3]; cb(null, satoshisToSend); }); }); @@ -369,14 +382,26 @@ exports.addIncomingTx = function addIncomingTx(session, tx, authority, }); }; +exports.addOutgoingPending = function addOutgoingPending(session, currencyCode, + toAddress, satoshis, cb) { + connect(function(cerr, client, done) { + if (cerr) return cb(cerr); + + addPendingTx(client, session, false, currencyCode, toAddress, satoshis, + function(err) { + done(); + cb(err); + }); + }); +}; + exports.addInitialIncoming = function addInitialIncoming(session, tx, cb) { - console.log('DEBUG1: %s', tx.currencyCode); connect(function(err, client, done) { if (err) return cb(err); async.waterfall([ async.apply(silentQuery, client, 'BEGIN', null), async.apply(addPendingTx, client, session, true, tx.currencyCode, - tx.toAddress), + tx.toAddress, tx.satoshis), async.apply(insertIncoming, client, session, tx, tx.satoshis, tx.fiat, 'initial_request', 'pending') ], function(err) { @@ -426,14 +451,15 @@ exports.dispenseStatus = function dispenseStatus(session, cb) { (results[1].rows[0].stage == 'deposit'); if (!pending) return cb(null, null); - var requiredSatoshis = results[0].rows[0].requiredSatoshis; + var requestedTx = results[0].rows[0]; + var requiredSatoshis = requestedTx.requiredSatoshis; var lastTx = results[1].rows[0]; // TODO: handle multiple deposits var status = (lastTx.satoshis < requiredSatoshis) ? 'insufficientFunds' : lastTx.authority; - cb(null, status); + cb(null, {status: status, fiat: requestedTx.fiat}); }); }); }; @@ -470,7 +496,6 @@ function insertDispense(client, session, tx, transactionId, counts, cb) { dispense1, reject1, count1, dispense2, reject2, count2, false, tx.error ]; - console.dir(values); // DEBUG client.query(sql, values, cb); } @@ -489,3 +514,14 @@ exports.addDispense = function addDispense(session, tx) { }); }); }; + +/* +exports.init('postgres://lamassu:lamassu@localhost/lamassu'); +connect(function(err, client, done) { + var sql = 'select * from transactions where id=$1'; + query(client, sql, [130], function(_err, results) { + done(); + console.dir(results.rows[0]); + }); +}); +*/ diff --git a/lib/routes.js b/lib/routes.js index 6fb1483a..c07170a4 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -85,6 +85,7 @@ function poll(req, res) { function trade(req, res) { plugins.trade(session(req), req.body, function(err) { var statusCode = err ? 500 : 201; + console.dir(err); // DEBUG res.json(statusCode, {err: err}); }); } diff --git a/migrations/004-transactions-reload.js b/migrations/004-transactions-reload.js index c1467ea6..2b608cf3 100644 --- a/migrations/004-transactions-reload.js +++ b/migrations/004-transactions-reload.js @@ -8,8 +8,8 @@ exports.up = function(next) { var stages = ['initial_request', 'partial_request', 'final_request', 'partial_send', 'deposit', 'dispense_request', 'dispense']. map(singleQuotify).join(','); - var authorizations = ['timeout', 'machine', 'pending', 'published', - 'authorized', 'rejected'].map(singleQuotify).join(','); + var authorizations = ['timeout', 'machine', 'pending', 'rejected', + 'published', 'authorized', 'confirmed'].map(singleQuotify).join(','); var sqls = [ 'CREATE TYPE transaction_stage AS ENUM (' + stages + ')', @@ -21,7 +21,7 @@ exports.up = function(next) { 'device_fingerprint text, ' + 'to_address text NOT NULL, ' + 'satoshis integer NOT NULL DEFAULT 0, ' + - 'fiat decimal NOT NULL DEFAULT 0, ' + + 'fiat integer NOT NULL DEFAULT 0, ' + 'currency_code text NOT NULL, ' + 'fee integer NOT NULL DEFAULT 0, ' + 'incoming boolean NOT NULL, ' + @@ -40,6 +40,7 @@ exports.up = function(next) { 'incoming boolean NOT NULL, ' + 'currency_code text NOT NULL, ' + 'to_address text NOT NULL, ' + + 'satoshis integer NOT NULL, ' + 'created timestamp NOT NULL DEFAULT now() ' + ')',