diff --git a/.flowconfig b/.flowconfig new file mode 100644 index 00000000..88e6b033 --- /dev/null +++ b/.flowconfig @@ -0,0 +1,3 @@ +[ignore] + +[include] diff --git a/lib/app.js b/lib/app.js index 6d21ec5a..4dda97d3 100644 --- a/lib/app.js +++ b/lib/app.js @@ -8,6 +8,7 @@ var routes = require('./routes'); var plugins = require('./plugins'); var db = require('./postgresql_interface'); var logger = require('./logger'); +require('longjohn'); module.exports = function(options) { var app = express(); diff --git a/lib/plugins.js b/lib/plugins.js index c3665549..ed42f285 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -1,5 +1,7 @@ +/* @flow weak */ 'use strict'; +var _ = require('lodash'); var async = require('async'); var logger = require('./logger'); @@ -191,7 +193,7 @@ function _sendBitcoins(toAddress, satoshis, cb) { walletPlugin.sendBitcoins(toAddress, satoshis, transactionFee, cb); } -function executeTx(session, tx, cb) { +function executeTx(session, tx, authority, cb) { db.addOutgoingTx(session, tx, function(err, satoshisToSend) { if (err) return cb(err); @@ -200,7 +202,7 @@ function executeTx(session, tx, cb) { _sendBitcoins(tx.toAddress, satoshisToSend, function(_err, txHash) { var fee = null; // Need to fill this out in plugins - db.sentCoins(session, tx, satoshisToSend, fee, _err, txHash); + db.sentCoins(session, tx, authority, satoshisToSend, fee, _err, txHash); if (_err) return cb(err); @@ -215,7 +217,7 @@ function executeTx(session, tx, cb) { } function reapOutgoingTx(session, tx) { - executeTx(session, tx, function(err) { + executeTx(session, tx, 'timeout', function(err) { if (err) logger.error(err); }); } @@ -224,7 +226,9 @@ function reapIncomingTx(session, tx) { infoPlugin.checkAddress(tx.toAddress, function(err, status, satoshisReceived) { if (status === 'notSeen') return; - db.addIncomingTx(session, tx, status, satoshisReceived); + db.addIncomingTx(session, tx, status, satoshisReceived, function(err) { + if (err) logger.error(err); + }); }); } @@ -282,7 +286,7 @@ exports.trade = function trade(session, rawTrade, cb) { }; exports.sendBitcoins = function sendBitcoins(session, rawTx, cb) { - executeTx(session, rawTx, cb); + executeTx(session, rawTx, 'machine', cb); }; exports.cashOut = function cashOut(session, tx, cb) { @@ -293,14 +297,17 @@ exports.cashOut = function cashOut(session, tx, cb) { walletPlugin.newAddress(tmpInfo, function(err, address) { if (err) return cb(err); - db.addInitialIncoming(session, tx, address, function(_err) { + var newTx = _.clone(tx); + newTx.toAddress = address; + db.addInitialIncoming(session, newTx, function(_err) { cb(_err, address); }); }); }; exports.dispenseStatus = function dispenseStatus(session, cb) { - db.fetchDispenseStatus(session, cb); + console.log('DEBUG1'); + db.dispenseStatus(session, cb); }; exports.dispenseAck = function dispenseAck(session, tx) { diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index 0f3ea472..2931accc 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -1,13 +1,16 @@ +/* @flow weak */ 'use strict'; +// TODO: Consider using serializable transactions for true ACID + var pg = require('pg'); -var async = require('async'); +var async = require('async'); var _ = require('lodash'); var logger = require('./logger'); var PG_ERRORS = { - 23505: 'uniqueViolation' + '23505': 'uniqueViolation' }; var conString = null; @@ -98,14 +101,30 @@ exports.recordDeviceEvent = function recordDeviceEvent(session, event) { }; function query(client, queryStr, values, cb) { - console.dir([queryStr, values]); - client.query(queryStr, values, cb); + if (!cb) { + cb = values; + values = []; + } + +// console.log(queryStr); +// console.log(values); +// console.trace(); // DEBUG + + client.query(queryStr, values, function(err, results) { + if (err) return cb(new Error(err)); + cb(null, results); + }); } function silentQuery(client, queryStr, values, cb) { - console.dir([queryStr, values]); + if (!cb) { + cb = values; + values = []; + } + client.query(queryStr, values, function(err) { - cb(err); + if (err) cb(new Error(err)); + cb(); }); } @@ -158,7 +177,7 @@ function computeSendAmount(tx, totals) { exports.removeOldPending = function removeOldPending(timeoutMS) { connect(function(err, client, done) { var sql = 'DELETE FROM pending_transactions ' + - 'WHERE incoming AND extract(EPOCH FROM now() - created) > $1)'; + 'WHERE incoming AND extract(EPOCH FROM now() - created) > $1'; var timeoutS = timeoutMS / 1000; var values = [timeoutS]; query(client, sql, values, function(err) { @@ -170,9 +189,9 @@ exports.removeOldPending = function removeOldPending(timeoutMS) { exports.pendingTxs = function pendingTxs(timeoutMS, cb) { connect(function(err, client, done) { - var sql = 'SELECT *, extract(EPOCH FROM now() - created) AS age ' + + var sql = 'SELECT * ' + 'FROM pending_transactions ' + - 'WHERE (incoming OR age > $2) ' + + 'WHERE (incoming OR extract(EPOCH FROM now() - created) > $1) ' + 'ORDER BY created ASC'; var timeoutS = timeoutMS / 1000; var values = [timeoutS]; @@ -195,7 +214,7 @@ function insertOutgoingTx(client, session, tx, totals, cb) { var authority = tx.fiat ? 'machine' : 'timeout'; var satoshis = sendAmount.satoshis; var fiat = sendAmount.fiat; - insertTx(client, session, tx, satoshis, fiat, stage, authority, + insertOutgoing(client, session, tx, satoshis, fiat, stage, authority, function(err) { if (err) return cb(err); @@ -212,10 +231,20 @@ function insertOutgoingCompleteTx(client, session, tx, cb) { var authority = 'machine'; var satoshis = tx.satoshis; var fiat = tx.fiat; - insertTx(client, session, tx, satoshis, fiat, stage, authority, cb); + insertOutgoing(client, session, tx, satoshis, fiat, stage, authority, cb); } -function insertTx(client, session, tx, satoshis, fiat, stage, +function insertIncoming(client, session, tx, satoshis, fiat, stage, authority, + cb) { + insertTx(client, session, true, tx, satoshis, fiat, stage, authority, cb); +} + +function insertOutgoing(client, session, tx, satoshis, fiat, stage, authority, + cb) { + insertTx(client, session, false, tx, satoshis, fiat, stage, authority, cb); +} + +function insertTx(client, session, incoming, tx, satoshis, fiat, stage, authority, cb) { var fields = [ 'session_id', @@ -235,7 +264,7 @@ function insertTx(client, session, tx, satoshis, fiat, stage, session.id, stage, authority, - tx.incoming, + incoming, session.fingerprint, tx.toAddress, satoshis, @@ -252,7 +281,8 @@ function insertTx(client, session, tx, satoshis, fiat, stage, }); } -function addPendingTx(session, incoming, currencyCode, toAddress, cb) { +function addPendingTx(client, session, incoming, currencyCode, toAddress, cb) { + console.log('DEBUG5: %s', incoming); connect(function(err, client, done) { if (err) return cb(err); var fields = ['device_fingerprint', 'session_id', 'incoming', @@ -264,7 +294,7 @@ function addPendingTx(session, incoming, currencyCode, toAddress, cb) { done(); // If pending tx already exists, do nothing - if (_err && PG_ERRORS[err.code] !== 'uniqueViolation') + if (_err && PG_ERRORS[_err.code] !== 'uniqueViolation') logger.error(err); cb(_err); @@ -295,36 +325,37 @@ exports.addOutgoingTx = function addOutgoingTx(session, tx, cb) { }); }; -exports.sentCoins = function sentCoins(session, tx, satoshis, fee, error, - txHash) { +exports.sentCoins = function sentCoins(session, tx, authority, satoshis, fee, + error, txHash) { connect(function(err, client, done) { if (err) return logger.error(err); var newTx = _.clone(tx); newTx.txHash = txHash; newTx.error = error; - insertTx(client, session, newTx, satoshis, newTx.fiat, - 'partial_send', function(_err) { + insertOutgoing(client, session, newTx, satoshis, newTx.fiat, 'partial_send', + authority, function(_err) { done(); if (err) logger.error(_err); }); }); }; +function maybeRemovePending(client, session, authority, cb) { + if (authority === 'published') return cb(); + removePendingTx(client, session, cb); +} + exports.addIncomingTx = function addIncomingTx(session, tx, authority, satoshisReceived, cb) { - connect(function(err, client, done) { - function maybeRemovePending(client, session, authority, cb) { - if (authority === 'published') return cb(); - removePendingTx(client, session, cb); - } + connect(function(err, client, done) { if (err) return cb(err); async.waterfall([ async.apply(silentQuery, client, 'BEGIN', null), async.apply(maybeRemovePending, client, session, authority), - async.apply(insertTx, client, session, tx, satoshisReceived, 0, 'deposit', - authority) + async.apply(insertIncoming, client, session, tx, satoshisReceived, 0, + 'deposit', authority) ], function(err) { if (err) { rollback(client, done); @@ -338,15 +369,15 @@ exports.addIncomingTx = function addIncomingTx(session, tx, authority, }); }; -exports.addInitialIncoming = function addInitialIncoming(session, tx, address, - cb) { +exports.addInitialIncoming = function addInitialIncoming(session, tx, cb) { + console.log('DEBUG1: %s', tx.currencyCode); connect(function(err, client, done) { if (err) return cb(err); async.waterfall([ async.apply(silentQuery, client, 'BEGIN', null), async.apply(addPendingTx, client, session, true, tx.currencyCode, tx.toAddress), - async.apply(insertTx, client, session, tx, tx.satoshis, tx.fiat, + async.apply(insertIncoming, client, session, tx, tx.satoshis, tx.fiat, 'initial_request', 'pending') ], function(err) { if (err) { @@ -361,7 +392,7 @@ exports.addInitialIncoming = function addInitialIncoming(session, tx, address, }); }; -function lastTxStatus(client, session, sessionId, cb) { +function lastTxStatus(client, session, cb) { var sql = 'SELECT satoshis, authority FROM transactions ' + 'WHERE device_fingerprint=$1 AND session_id=$2 AND incoming=$3 ' + 'ORDER BY id DESC LIMIT 1'; @@ -382,9 +413,10 @@ function initialRequest(client, session, cb) { exports.dispenseStatus = function dispenseStatus(session, cb) { connect(function(err, client, done) { if (err) return cb(err); + async.parallel([ - async.apply(client, initialRequest, session), - async.apply(client, lastTxStatus, session) + async.apply(initialRequest, client, session), + async.apply(lastTxStatus, client, session) ], function(_err, results) { done(); if (_err) return cb(_err); @@ -446,7 +478,7 @@ exports.addDispense = function addDispense(session, tx) { if (err) return logger.error(err); async.waterfall([ - async.apply(insertTx, client, session, tx, 0, tx.fiat, + async.apply(insertIncoming, client, session, tx, 0, tx.fiat, 'deposit', 'authorized'), async.apply(lastDispenseCount, client, session), async.apply(insertDispense, client, tx) diff --git a/lib/routes.js b/lib/routes.js index 208e6875..6fb1483a 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -51,15 +51,15 @@ function poll(req, res) { if (!fiatRate) logger.warn('No bid rate, using ask rate'); - var fiatBalance = plugins.fiatBalance(fingerprint); + var fiatBalance = plugins.fiatBalance(); if (fiatBalance === null) return res.json({err: 'No balance available'}); var config = plugins.getCachedConfig(); var complianceSettings = config.exchanges.settings.compliance; var fiatCommission = config.exchanges.settings.fiatCommission || config.exchanges.settings.commission; - var sessionId = req.get('session-id'); + console.log('DEBUG0'); plugins.dispenseStatus(session(req), function(err, dispenseStatus) { if (err) return logger.error(err); var response = {