This commit is contained in:
Josh Harvey 2014-11-25 17:10:38 -05:00
parent 0b438a62e5
commit f38b02df94
5 changed files with 85 additions and 42 deletions

View file

@ -1,13 +1,16 @@
/* @flow weak */
'use strict';
// TODO: Consider using serializable transactions for true ACID
var pg = require('pg');
var async = require('async');
var async = require('async');
var _ = require('lodash');
var logger = require('./logger');
var PG_ERRORS = {
23505: 'uniqueViolation'
'23505': 'uniqueViolation'
};
var conString = null;
@ -98,14 +101,30 @@ exports.recordDeviceEvent = function recordDeviceEvent(session, event) {
};
function query(client, queryStr, values, cb) {
console.dir([queryStr, values]);
client.query(queryStr, values, cb);
if (!cb) {
cb = values;
values = [];
}
// console.log(queryStr);
// console.log(values);
// console.trace(); // DEBUG
client.query(queryStr, values, function(err, results) {
if (err) return cb(new Error(err));
cb(null, results);
});
}
function silentQuery(client, queryStr, values, cb) {
console.dir([queryStr, values]);
if (!cb) {
cb = values;
values = [];
}
client.query(queryStr, values, function(err) {
cb(err);
if (err) cb(new Error(err));
cb();
});
}
@ -158,7 +177,7 @@ function computeSendAmount(tx, totals) {
exports.removeOldPending = function removeOldPending(timeoutMS) {
connect(function(err, client, done) {
var sql = 'DELETE FROM pending_transactions ' +
'WHERE incoming AND extract(EPOCH FROM now() - created) > $1)';
'WHERE incoming AND extract(EPOCH FROM now() - created) > $1';
var timeoutS = timeoutMS / 1000;
var values = [timeoutS];
query(client, sql, values, function(err) {
@ -170,9 +189,9 @@ exports.removeOldPending = function removeOldPending(timeoutMS) {
exports.pendingTxs = function pendingTxs(timeoutMS, cb) {
connect(function(err, client, done) {
var sql = 'SELECT *, extract(EPOCH FROM now() - created) AS age ' +
var sql = 'SELECT * ' +
'FROM pending_transactions ' +
'WHERE (incoming OR age > $2) ' +
'WHERE (incoming OR extract(EPOCH FROM now() - created) > $1) ' +
'ORDER BY created ASC';
var timeoutS = timeoutMS / 1000;
var values = [timeoutS];
@ -195,7 +214,7 @@ function insertOutgoingTx(client, session, tx, totals, cb) {
var authority = tx.fiat ? 'machine' : 'timeout';
var satoshis = sendAmount.satoshis;
var fiat = sendAmount.fiat;
insertTx(client, session, tx, satoshis, fiat, stage, authority,
insertOutgoing(client, session, tx, satoshis, fiat, stage, authority,
function(err) {
if (err) return cb(err);
@ -212,10 +231,20 @@ function insertOutgoingCompleteTx(client, session, tx, cb) {
var authority = 'machine';
var satoshis = tx.satoshis;
var fiat = tx.fiat;
insertTx(client, session, tx, satoshis, fiat, stage, authority, cb);
insertOutgoing(client, session, tx, satoshis, fiat, stage, authority, cb);
}
function insertTx(client, session, tx, satoshis, fiat, stage,
function insertIncoming(client, session, tx, satoshis, fiat, stage, authority,
cb) {
insertTx(client, session, true, tx, satoshis, fiat, stage, authority, cb);
}
function insertOutgoing(client, session, tx, satoshis, fiat, stage, authority,
cb) {
insertTx(client, session, false, tx, satoshis, fiat, stage, authority, cb);
}
function insertTx(client, session, incoming, tx, satoshis, fiat, stage,
authority, cb) {
var fields = [
'session_id',
@ -235,7 +264,7 @@ function insertTx(client, session, tx, satoshis, fiat, stage,
session.id,
stage,
authority,
tx.incoming,
incoming,
session.fingerprint,
tx.toAddress,
satoshis,
@ -252,7 +281,8 @@ function insertTx(client, session, tx, satoshis, fiat, stage,
});
}
function addPendingTx(session, incoming, currencyCode, toAddress, cb) {
function addPendingTx(client, session, incoming, currencyCode, toAddress, cb) {
console.log('DEBUG5: %s', incoming);
connect(function(err, client, done) {
if (err) return cb(err);
var fields = ['device_fingerprint', 'session_id', 'incoming',
@ -264,7 +294,7 @@ function addPendingTx(session, incoming, currencyCode, toAddress, cb) {
done();
// If pending tx already exists, do nothing
if (_err && PG_ERRORS[err.code] !== 'uniqueViolation')
if (_err && PG_ERRORS[_err.code] !== 'uniqueViolation')
logger.error(err);
cb(_err);
@ -295,36 +325,37 @@ exports.addOutgoingTx = function addOutgoingTx(session, tx, cb) {
});
};
exports.sentCoins = function sentCoins(session, tx, satoshis, fee, error,
txHash) {
exports.sentCoins = function sentCoins(session, tx, authority, satoshis, fee,
error, txHash) {
connect(function(err, client, done) {
if (err) return logger.error(err);
var newTx = _.clone(tx);
newTx.txHash = txHash;
newTx.error = error;
insertTx(client, session, newTx, satoshis, newTx.fiat,
'partial_send', function(_err) {
insertOutgoing(client, session, newTx, satoshis, newTx.fiat, 'partial_send',
authority, function(_err) {
done();
if (err) logger.error(_err);
});
});
};
function maybeRemovePending(client, session, authority, cb) {
if (authority === 'published') return cb();
removePendingTx(client, session, cb);
}
exports.addIncomingTx = function addIncomingTx(session, tx, authority,
satoshisReceived, cb) {
connect(function(err, client, done) {
function maybeRemovePending(client, session, authority, cb) {
if (authority === 'published') return cb();
removePendingTx(client, session, cb);
}
connect(function(err, client, done) {
if (err) return cb(err);
async.waterfall([
async.apply(silentQuery, client, 'BEGIN', null),
async.apply(maybeRemovePending, client, session, authority),
async.apply(insertTx, client, session, tx, satoshisReceived, 0, 'deposit',
authority)
async.apply(insertIncoming, client, session, tx, satoshisReceived, 0,
'deposit', authority)
], function(err) {
if (err) {
rollback(client, done);
@ -338,15 +369,15 @@ exports.addIncomingTx = function addIncomingTx(session, tx, authority,
});
};
exports.addInitialIncoming = function addInitialIncoming(session, tx, address,
cb) {
exports.addInitialIncoming = function addInitialIncoming(session, tx, cb) {
console.log('DEBUG1: %s', tx.currencyCode);
connect(function(err, client, done) {
if (err) return cb(err);
async.waterfall([
async.apply(silentQuery, client, 'BEGIN', null),
async.apply(addPendingTx, client, session, true, tx.currencyCode,
tx.toAddress),
async.apply(insertTx, client, session, tx, tx.satoshis, tx.fiat,
async.apply(insertIncoming, client, session, tx, tx.satoshis, tx.fiat,
'initial_request', 'pending')
], function(err) {
if (err) {
@ -361,7 +392,7 @@ exports.addInitialIncoming = function addInitialIncoming(session, tx, address,
});
};
function lastTxStatus(client, session, sessionId, cb) {
function lastTxStatus(client, session, cb) {
var sql = 'SELECT satoshis, authority FROM transactions ' +
'WHERE device_fingerprint=$1 AND session_id=$2 AND incoming=$3 ' +
'ORDER BY id DESC LIMIT 1';
@ -382,9 +413,10 @@ function initialRequest(client, session, cb) {
exports.dispenseStatus = function dispenseStatus(session, cb) {
connect(function(err, client, done) {
if (err) return cb(err);
async.parallel([
async.apply(client, initialRequest, session),
async.apply(client, lastTxStatus, session)
async.apply(initialRequest, client, session),
async.apply(lastTxStatus, client, session)
], function(_err, results) {
done();
if (_err) return cb(_err);
@ -446,7 +478,7 @@ exports.addDispense = function addDispense(session, tx) {
if (err) return logger.error(err);
async.waterfall([
async.apply(insertTx, client, session, tx, 0, tx.fiat,
async.apply(insertIncoming, client, session, tx, 0, tx.fiat,
'deposit', 'authorized'),
async.apply(lastDispenseCount, client, session),
async.apply(insertDispense, client, tx)