WIP lots of fixes

This commit is contained in:
Josh Harvey 2014-11-27 14:06:08 -05:00
parent e4a4e556a2
commit ad65869229
4 changed files with 89 additions and 49 deletions

View file

@ -7,7 +7,7 @@ var logger = require('./logger');
var SATOSHI_FACTOR = 1e8; var SATOSHI_FACTOR = 1e8;
var POLLING_RATE = 60 * 1000; // poll each minute var POLLING_RATE = 60 * 1000; // poll each minute
var REAP_RATE = 5 * 1000; var REAP_RATE = 2 * 1000;
var PENDING_TIMEOUT = 70 * 1000; var PENDING_TIMEOUT = 70 * 1000;
// TODO: might have to update this if user is allowed to extend monitoring time // TODO: might have to update this if user is allowed to extend monitoring time
@ -53,7 +53,7 @@ function loadPlugin(name, config) {
trader: ['balance', 'purchase', 'sell'], trader: ['balance', 'purchase', 'sell'],
wallet: ['balance', 'sendBitcoins', 'newAddress'], wallet: ['balance', 'sendBitcoins', 'newAddress'],
idVerifier: ['verifyUser', 'verifyTransaction'], idVerifier: ['verifyUser', 'verifyTransaction'],
info: ['getAddressLastTx', 'getTx'] info: ['checkAddress']
}; };
var plugin = null; var plugin = null;
@ -223,8 +223,9 @@ function reapOutgoingTx(session, tx) {
} }
function reapIncomingTx(session, tx) { function reapIncomingTx(session, tx) {
infoPlugin.checkAddress(tx.toAddress, function(err, status, infoPlugin.checkAddress(tx.toAddress, tx.satoshis, function(err, status,
satoshisReceived, txHash) { satoshisReceived, txHash) {
if (err) return logger.error(err);
if (status === 'notSeen') return; if (status === 'notSeen') return;
var newTx = _.clone(tx); var newTx = _.clone(tx);
newTx.txHash = txHash; newTx.txHash = txHash;
@ -237,6 +238,7 @@ function reapIncomingTx(session, tx) {
function reapTx(row) { function reapTx(row) {
var session = {fingerprint: row.device_fingerprint, id: row.session_id}; var session = {fingerprint: row.device_fingerprint, id: row.session_id};
var tx = { var tx = {
satoshis: row.satoshis,
toAddress: row.to_address, toAddress: row.to_address,
currencyCode: row.currency_code, currencyCode: row.currency_code,
incoming: row.incoming incoming: row.incoming
@ -282,7 +284,8 @@ exports.trade = function trade(session, rawTrade, cb) {
}; };
async.parallel([ async.parallel([
async.apply(db.addPendingTx, session, tx), async.apply(db.addOutgoingPending, session, tx.currencyCode, tx.toAddress,
tx.satoshis),
async.apply(db.recordBill, session, rawTrade) async.apply(db.recordBill, session, rawTrade)
], cb); ], cb);
}; };
@ -308,7 +311,6 @@ exports.cashOut = function cashOut(session, tx, cb) {
}; };
exports.dispenseStatus = function dispenseStatus(session, cb) { exports.dispenseStatus = function dispenseStatus(session, cb) {
console.log('DEBUG1');
db.dispenseStatus(session, cb); db.dispenseStatus(session, cb);
}; };

View file

@ -5,13 +5,18 @@
var pg = require('pg'); var pg = require('pg');
var async = require('async'); var async = require('async');
var util = require('util');
var _ = require('lodash'); var _ = require('lodash');
var logger = require('./logger'); var logger = require('./logger');
var PG_ERRORS = { function inspect(rec) {
'23505': 'uniqueViolation' console.log(util.inspect(rec, {depth: null, colors: true}));
}; }
function isUniqueViolation(err) {
return err.code === '23505';
}
var conString = null; var conString = null;
@ -56,29 +61,29 @@ function connect(cb) {
// logs inputted bill and overall tx status (if available) // logs inputted bill and overall tx status (if available)
exports.recordBill = function recordBill(session, rec, cb) { exports.recordBill = function recordBill(session, rec, cb) {
var fields = [ var fields = [
'id',
'device_fingerprint', 'device_fingerprint',
'currency_code', 'currency_code',
'to_address', 'to_address',
'session_id', 'session_id',
'device_time', 'device_time',
'satoshis', 'satoshis',
'denomination' 'denomination'
]; ];
var values = [ var values = [
rec.uuid,
session.fingerprint, session.fingerprint,
rec.currency, rec.currency,
rec.toAddress, rec.toAddress,
session.id, session.id,
rec.deviceTime, rec.deviceTime,
rec.satoshis, rec.satoshis,
rec.fiat rec.fiat
]; ];
connect(function(err, client, done) { connect(function(cerr, client, done) {
if (err) return cb(err); if (cerr) return cb(cerr);
query(client, getInsertQuery('bills', fields), values, function(err) { query(client, getInsertQuery('bills', fields), values, function(err) {
done(); done();
// TODO: Handle unique violations more cleanly for idempotency // TODO: Handle unique violations more cleanly for idempotency
@ -110,7 +115,7 @@ function query(client, queryStr, values, cb) {
if (err) { if (err) {
console.log(queryStr); console.log(queryStr);
console.log(values); console.log(values);
return cb(new Error(err)); return cb(err);
} }
cb(null, results); cb(null, results);
}); });
@ -123,7 +128,11 @@ function silentQuery(client, queryStr, values, cb) {
} }
client.query(queryStr, values, function(err) { client.query(queryStr, values, function(err) {
if (err) cb(new Error(err)); if (err) {
console.log(queryStr);
console.log(values);
cb(err);
}
cb(); cb();
}); });
} }
@ -132,12 +141,13 @@ function silentQuery(client, queryStr, values, cb) {
function billsAndTxs(client, session, cb) { function billsAndTxs(client, session, cb) {
var sessionId = session.id; var sessionId = session.id;
var fingerprint = session.fingerprint; var fingerprint = session.fingerprint;
var billsQuery = 'SELECT SUM(denomination) as fiat, ' + var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' +
'SUM(satoshis) AS satoshis ' + 'COALESCE(SUM(satoshis), 0) AS satoshis ' +
'FROM bills ' + 'FROM bills ' +
'WHERE device_fingerprint=$1 AND session_id=$2'; 'WHERE device_fingerprint=$1 AND session_id=$2';
var billsValues = [fingerprint, sessionId]; var billsValues = [fingerprint, sessionId];
var txQuery = 'SELECT SUM(fiat) AS fiat, SUM(satoshis) AS satoshis ' + var txQuery = 'SELECT COALESCE(SUM(fiat), 0) AS fiat, ' +
'COALESCE(SUM(satoshis), 0) AS satoshis ' +
'FROM transactions ' + 'FROM transactions ' +
'WHERE device_fingerprint=$1 AND session_id=$2 AND stage=$3'; 'WHERE device_fingerprint=$1 AND session_id=$2 AND stage=$3';
var txValues = [fingerprint, sessionId, 'partial_request']; var txValues = [fingerprint, sessionId, 'partial_request'];
@ -236,7 +246,8 @@ function insertOutgoingCompleteTx(client, session, tx, cb) {
function insertIncoming(client, session, tx, satoshis, fiat, stage, authority, function insertIncoming(client, session, tx, satoshis, fiat, stage, authority,
cb) { cb) {
insertTx(client, session, true, tx, satoshis, fiat, stage, authority, cb); var realSatoshis = satoshis || 0;
insertTx(client, session, true, tx, realSatoshis, fiat, stage, authority, cb);
} }
function insertOutgoing(client, session, tx, satoshis, fiat, stage, authority, function insertOutgoing(client, session, tx, satoshis, fiat, stage, authority,
@ -281,44 +292,46 @@ function insertTx(client, session, incoming, tx, satoshis, fiat, stage,
}); });
} }
function addPendingTx(client, session, incoming, currencyCode, toAddress, cb) { function addPendingTx(client, session, incoming, currencyCode, toAddress,
console.log('DEBUG5: %s', incoming); satoshis, cb) {
connect(function(err, client, done) {
if (err) return cb(err);
var fields = ['device_fingerprint', 'session_id', 'incoming', var fields = ['device_fingerprint', 'session_id', 'incoming',
'currency_code', 'to_address']; 'currency_code', 'to_address', 'satoshis'];
var sql = getInsertQuery('pending_transactions', fields); var sql = getInsertQuery('pending_transactions', fields);
var values = [session.fingerprint, session.id, incoming, currencyCode, var values = [session.fingerprint, session.id, incoming, currencyCode,
toAddress]; toAddress, satoshis];
query(client, sql, values, function(_err) { query(client, sql, values, function(err) {
done();
// If pending tx already exists, do nothing // If pending tx already exists, do nothing
if (_err && PG_ERRORS[_err.code] !== 'uniqueViolation') if (err && !isUniqueViolation(err)) return cb(err);
logger.error(err);
cb(_err); cb();
});
}); });
} }
function buildOutgoingTx(client, session, tx, cb) {
async.waterfall([
async.apply(billsAndTxs, client, session),
async.apply(insertOutgoingTx, client, session, tx)
], cb);
}
// Calling function should only send bitcoins if result.satoshisToSend > 0 // Calling function should only send bitcoins if result.satoshisToSend > 0
exports.addOutgoingTx = function addOutgoingTx(session, tx, cb) { exports.addOutgoingTx = function addOutgoingTx(session, tx, cb) {
connect(function(err, client, done) { connect(function(err, client, done) {
if (err) return cb(err); if (err) return cb(err);
async.waterfall([ async.series([
async.apply(silentQuery, client, 'BEGIN', null), async.apply(silentQuery, client, 'BEGIN'),
async.apply(insertOutgoingCompleteTx, client, session, tx), async.apply(insertOutgoingCompleteTx, client, session, tx),
async.apply(removePendingTx, client, tx.sessionId), async.apply(removePendingTx, client, session),
async.apply(billsAndTxs, client, session, tx.currencyCode), async.apply(buildOutgoingTx, client, session, tx)
async.apply(insertOutgoingTx, client, session, tx), ], function(err, results) {
], function(err, satoshisToSend) {
if (err) { if (err) {
rollback(client, done); rollback(client, done);
return cb(err); return cb(err);
} }
silentQuery(client, 'COMMIT', null, function() { silentQuery(client, 'COMMIT', function() {
done(); done();
var satoshisToSend = results[3];
cb(null, satoshisToSend); cb(null, satoshisToSend);
}); });
}); });
@ -369,14 +382,26 @@ exports.addIncomingTx = function addIncomingTx(session, tx, authority,
}); });
}; };
exports.addOutgoingPending = function addOutgoingPending(session, currencyCode,
toAddress, satoshis, cb) {
connect(function(cerr, client, done) {
if (cerr) return cb(cerr);
addPendingTx(client, session, false, currencyCode, toAddress, satoshis,
function(err) {
done();
cb(err);
});
});
};
exports.addInitialIncoming = function addInitialIncoming(session, tx, cb) { exports.addInitialIncoming = function addInitialIncoming(session, tx, cb) {
console.log('DEBUG1: %s', tx.currencyCode);
connect(function(err, client, done) { connect(function(err, client, done) {
if (err) return cb(err); if (err) return cb(err);
async.waterfall([ async.waterfall([
async.apply(silentQuery, client, 'BEGIN', null), async.apply(silentQuery, client, 'BEGIN', null),
async.apply(addPendingTx, client, session, true, tx.currencyCode, async.apply(addPendingTx, client, session, true, tx.currencyCode,
tx.toAddress), tx.toAddress, tx.satoshis),
async.apply(insertIncoming, client, session, tx, tx.satoshis, tx.fiat, async.apply(insertIncoming, client, session, tx, tx.satoshis, tx.fiat,
'initial_request', 'pending') 'initial_request', 'pending')
], function(err) { ], function(err) {
@ -426,14 +451,15 @@ exports.dispenseStatus = function dispenseStatus(session, cb) {
(results[1].rows[0].stage == 'deposit'); (results[1].rows[0].stage == 'deposit');
if (!pending) return cb(null, null); if (!pending) return cb(null, null);
var requiredSatoshis = results[0].rows[0].requiredSatoshis; var requestedTx = results[0].rows[0];
var requiredSatoshis = requestedTx.requiredSatoshis;
var lastTx = results[1].rows[0]; var lastTx = results[1].rows[0];
// TODO: handle multiple deposits // TODO: handle multiple deposits
var status = (lastTx.satoshis < requiredSatoshis) ? var status = (lastTx.satoshis < requiredSatoshis) ?
'insufficientFunds' : 'insufficientFunds' :
lastTx.authority; lastTx.authority;
cb(null, status); cb(null, {status: status, fiat: requestedTx.fiat});
}); });
}); });
}; };
@ -470,7 +496,6 @@ function insertDispense(client, session, tx, transactionId, counts, cb) {
dispense1, reject1, count1, dispense2, reject2, count2, dispense1, reject1, count1, dispense2, reject2, count2,
false, tx.error false, tx.error
]; ];
console.dir(values); // DEBUG
client.query(sql, values, cb); client.query(sql, values, cb);
} }
@ -489,3 +514,14 @@ exports.addDispense = function addDispense(session, tx) {
}); });
}); });
}; };
/*
exports.init('postgres://lamassu:lamassu@localhost/lamassu');
connect(function(err, client, done) {
var sql = 'select * from transactions where id=$1';
query(client, sql, [130], function(_err, results) {
done();
console.dir(results.rows[0]);
});
});
*/

View file

@ -85,6 +85,7 @@ function poll(req, res) {
function trade(req, res) { function trade(req, res) {
plugins.trade(session(req), req.body, function(err) { plugins.trade(session(req), req.body, function(err) {
var statusCode = err ? 500 : 201; var statusCode = err ? 500 : 201;
console.dir(err); // DEBUG
res.json(statusCode, {err: err}); res.json(statusCode, {err: err});
}); });
} }

View file

@ -8,8 +8,8 @@ exports.up = function(next) {
var stages = ['initial_request', 'partial_request', 'final_request', var stages = ['initial_request', 'partial_request', 'final_request',
'partial_send', 'deposit', 'dispense_request', 'dispense']. 'partial_send', 'deposit', 'dispense_request', 'dispense'].
map(singleQuotify).join(','); map(singleQuotify).join(',');
var authorizations = ['timeout', 'machine', 'pending', 'published', var authorizations = ['timeout', 'machine', 'pending', 'rejected',
'authorized', 'rejected'].map(singleQuotify).join(','); 'published', 'authorized', 'confirmed'].map(singleQuotify).join(',');
var sqls = [ var sqls = [
'CREATE TYPE transaction_stage AS ENUM (' + stages + ')', 'CREATE TYPE transaction_stage AS ENUM (' + stages + ')',
@ -21,7 +21,7 @@ exports.up = function(next) {
'device_fingerprint text, ' + 'device_fingerprint text, ' +
'to_address text NOT NULL, ' + 'to_address text NOT NULL, ' +
'satoshis integer NOT NULL DEFAULT 0, ' + 'satoshis integer NOT NULL DEFAULT 0, ' +
'fiat decimal NOT NULL DEFAULT 0, ' + 'fiat integer NOT NULL DEFAULT 0, ' +
'currency_code text NOT NULL, ' + 'currency_code text NOT NULL, ' +
'fee integer NOT NULL DEFAULT 0, ' + 'fee integer NOT NULL DEFAULT 0, ' +
'incoming boolean NOT NULL, ' + 'incoming boolean NOT NULL, ' +
@ -40,6 +40,7 @@ exports.up = function(next) {
'incoming boolean NOT NULL, ' + 'incoming boolean NOT NULL, ' +
'currency_code text NOT NULL, ' + 'currency_code text NOT NULL, ' +
'to_address text NOT NULL, ' + 'to_address text NOT NULL, ' +
'satoshis integer NOT NULL, ' +
'created timestamp NOT NULL DEFAULT now() ' + 'created timestamp NOT NULL DEFAULT now() ' +
')', ')',