WIP 1st stage refactor to new DB

This commit is contained in:
Josh Harvey 2014-11-20 17:26:17 -05:00
parent dc6740ddf2
commit 0cec3670a9
3 changed files with 96 additions and 64 deletions

View file

@ -188,19 +188,17 @@ function _sendBitcoins(toAddress, satoshis, cb) {
}
function executeTx(deviceFingerprint, tx, cb) {
db.addTx(deviceFingerprint, tx, function(err, result) {
db.addOutgoingTx(deviceFingerprint, tx, function(err, satoshisToSend) {
if (err) return cb(err);
var satoshisToSend = result.satoshisToSend;
var dbTxId = result.id;
if (satoshisToSend === 0)
return cb(null, {statusCode: 204, txId: tx.txId, txHash: null});
_sendBitcoins(tx.toAddress, satoshisToSend, function(err, txHash) {
_sendBitcoins(tx.toAddress, satoshisToSend, function(_err, txHash) {
var fee = null; // Need to fill this out in plugins
db.addDigitalTx(dbTxId, err, txHash, fee);
db.sentCoins(tx, satoshisToSend, fee, _err, txHash);
if (err) return cb(err);
if (_err) return cb(err);
pollBalance();
cb(null, {
@ -212,16 +210,16 @@ function executeTx(deviceFingerprint, tx, cb) {
});
}
function reapIncomingTx(deviceFingerprint, tx) {
function reapOutgoingTx(deviceFingerprint, tx) {
executeTx(deviceFingerprint, tx, function(err) {
if (err) logger.error(err);
});
}
function reapOutgoingTx(deviceFingerprint, tx) {
function reapIncomingTx(deviceFingerprint, tx) {
infoPlugin.checkAddress(tx.toAddress, function(err, status, satoshisReceived) {
if (status === 'notSeen') return;
db.addOutgoingTx(deviceFingerprint, tx, status, satoshisReceived);
db.addIngoingTx(deviceFingerprint, tx, status, satoshisReceived);
});
}
@ -319,7 +317,7 @@ exports.cashOut = function cashOut(deviceFingerprint, tx, cb) {
return cb(new Error(err));
tx.toAddress = address;
tx.incoming = false;
tx.incoming = true;
db.addPendingTx(deviceFingerprint, tx, function(err) {
cb(err, address);

View file

@ -2,7 +2,6 @@
var pg = require('pg');
var async = require('async');
var _ = require('lodash');
var logger = require('./logger');
@ -110,17 +109,16 @@ function silentQuery(client, queryStr, values, cb) {
}
// OPTIMIZE: No need to query bills if tx.fiat and tx.satoshis are set
function billsAndTxs(client, sessionId, currencyCode, deviceFingerprint, cb) {
var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' +
'COALESCE(SUM(satoshis), 0) AS satoshis ' +
function billsAndTxs(client, sessionId, cb) {
var billsQuery = 'SELECT SUM(denomination) as fiat, ' +
'SUM(satoshis) AS satoshis ' +
'FROM bills ' +
'WHERE transaction_id=$1 AND currency_code=$2 AND device_fingerprint=$3';
var billsValues = [sessionId, currencyCode, deviceFingerprint];
var txQuery = 'SELECT COALESCE(SUM(fiat), 0) AS fiat, ' +
'COALESCE(SUM(satoshis), 0) AS satoshis ' +
'WHERE session_id=$1';
var billsValues = [sessionId];
var txQuery = 'SELECT SUM(fiat) AS fiat, SUM(satoshis) AS satoshis ' +
'FROM transactions ' +
'WHERE session_id=$1 AND currency_code=$2 AND device_fingerprint=$3';
var txValues = billsValues; // They happen to be the same
'WHERE session_id=$1 AND stage=$2';
var txValues = [sessionId, 'partialRequest'];
async.parallel([
async.apply(query, client, billsQuery, billsValues),
@ -155,12 +153,11 @@ function computeSendAmount(tx, totals) {
exports.pendingTxs = function pendingTxs(timeoutMS, cb) {
connect(function(err, client, done) {
var sql = 'SELECT * FROM transactions ' +
'WHERE status=$1 AND ' +
'(NOT incoming OR EXTRACT(EPOCH FROM now() - created > $2) ' +
var sql = 'SELECT * FROM pending_transactions ' +
'WHERE (incoming OR EXTRACT(EPOCH FROM now() - created > $2) ' +
'ORDER BY created ASC';
var timeoutS = timeoutMS / 1000;
var values = ['pending', timeoutS];
var values = [timeoutS];
query(client, sql, values, function(err, results) {
done();
cb(err, results);
@ -169,26 +166,42 @@ exports.pendingTxs = function pendingTxs(timeoutMS, cb) {
};
function removePendingTx(client, tx, cb) {
silentQuery(client, 'DELETE FROM transactions WHERE session_id=$1 AND status=$2',
[tx.txId, 'pending'], cb);
var sql = 'DELETE FROM pending_transactions WHERE session_id=$1';
silentQuery(client, sql, [tx.txId], cb);
}
function maybeInsertTx(client, deviceFingerprint, tx, totals, cb) {
function insertOutgoingTx(client, deviceFingerprint, tx, totals, cb) {
var sendAmount = computeSendAmount(tx, totals);
var status = _.isNumber(tx.fiat) ? 'machineSend' : 'timeout';
var stage = 'partial_request';
var source = tx.fiat ? 'machine' : 'timeout';
var satoshis = sendAmount.satoshis;
var fiat = sendAmount.fiat;
insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, function(err, results) {
// TODO: Don't worry about unique violation
insertTx(client, deviceFingerprint, tx, satoshis, fiat, stage, source,
function(err) {
if (err) return cb(err);
cb(null, {id: results.rows[0].id, satoshisToSend: sendAmount.satoshis});
cb(null, satoshis);
});
}
function insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, cb) {
function insertOutgoingCompleteTx(client, deviceFingerprint, tx, cb) {
// Only relevant for machine source transactions, not timeouts
if (!tx.fiat) return cb();
var stage = 'final_request';
var source = 'machine';
var satoshis = tx.satoshis;
var fiat = tx.fiat;
insertTx(client, deviceFingerprint, tx, satoshis, fiat, stage, source, cb);
}
function insertTx(client, deviceFingerprint, tx, satoshis, fiat, stage,
source, cb) {
var fields = [
'session_id',
'status',
'stage',
'source',
'incoming',
'device_fingerprint',
'to_address',
@ -199,8 +212,9 @@ function insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, cb) {
var values = [
tx.txId,
status,
tx.incoming === false ? false : true,
stage,
source,
tx.incoming,
deviceFingerprint,
tx.toAddress,
satoshis,
@ -211,32 +225,60 @@ function insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, cb) {
query(client, getInsertQuery('transactions', fields, true), values, cb);
}
exports.addPendingTx = function addPendingTx(deviceFingerprint, tx, cb) {
exports.addPendingTx = function addPendingTx(deviceFingerprint, sessionId,
incoming, cb) {
connect(function(err, client, done) {
if (err) return cb(err);
insertTx(client, deviceFingerprint, tx, tx.satoshis, tx.fiat, 'pending',
function(err) {
done();
var fields = ['session_id', 'incoming'];
var sql = getInsertQuery('pending_transactions', fields);
query(client, sql, [sessionId, incoming], function(_err) {
done();
// If pending tx already exists, do nothing
if (err && PG_ERRORS[err.code] !== 'uniqueViolation') {
logger.error(err);
return cb(err);
}
cb();
});
// If pending tx already exists, do nothing
if (_err && PG_ERRORS[err.code] !== 'uniqueViolation')
logger.error(err);
cb(_err);
});
});
};
// Calling function should only send bitcoins if result.satoshisToSend > 0
exports.addTx = function addTx(deviceFingerprint, tx, cb) {
exports.addOutgoingTx = function addOutgoingTx(deviceFingerprint, tx, cb) {
connect(function(err, client, done) {
if (err) return cb(err);
async.waterfall([
async.apply(silentQuery, client, 'BEGIN', null),
async.apply(insertOutgoingCompleteTx, client, deviceFingerprint, tx)
async.apply(removePendingTx, client, tx),
async.apply(billsAndTxs, client, tx.txId, tx.currencyCode, deviceFingerprint),
async.apply(maybeInsertTx, client, deviceFingerprint, tx)
async.apply(insertOutgoingTx, client, deviceFingerprint, tx),
], function(err, satoshisToSend) {
if (err) {
rollback(client, done);
return cb(err);
}
silentQuery(client, 'COMMIT', null, function() {
done();
cb(null, satoshisToSend);
});
});
});
};
function removeIncomingPendingTx(client, tx, status, cb) {
if (status !== 'published') return removePendingTx(client, tx, cb);
cb();
}
exports.addIncomingTx = function addIncomingTx(deviceFingerprint, tx, status,
satoshisReceived, cb) {
connect(function(err, client, done) {
if (err) return cb(err);
async.waterfall([
async.apply(silentQuery, client, 'BEGIN', null),
async.apply(removeOutgoingPendingTx, client, tx, status),
async.apply(insertTx, client, tx, satoshisReceived, 0, 'deposit')
], function(err, result) {
if (err) {
rollback(client, done);
@ -250,19 +292,6 @@ exports.addTx = function addTx(deviceFingerprint, tx, cb) {
});
};
exports.addDigitalTx = function addDigitalTx(dbTxId, err, txHash) {
var keys = ['transaction_id', 'tx_hash', 'error'];
var values = [dbTxId, txHash, err && err.message];
var sql = getInsertQuery('digital_transactions', keys);
connect(function(err, client, done) {
query(client, sql, values, function(_err) {
done(_err);
if (_err) logger.error(_err);
});
});
};
/*
exports.decrementCartridges =
function decrementCartridges(fingerprint, cartridge1, cartridge2, cb) {

View file

@ -5,9 +5,9 @@ var db = require('./db');
function singleQuotify(item) { return '\'' + item + '\''; }
exports.up = function(next){
var stages = ['partial_request', 'partial_send', 'complete',
'initial_request', 'deposit', 'supplemental_request', 'dispense_request',
'dispense'].map(singleQuotify).join(',');
var stages = ['initial_request', 'partial_request', 'final_request',
'partial_send', 'deposit', 'dispense_request', 'dispense'].
map(singleQuotify).join(',');
var sources = ['timeout', 'machine', 'published', 'authorized', 'rejected'].
map(singleQuotify).join(',');
var sqls = [
@ -23,7 +23,12 @@ exports.up = function(next){
'ALTER TABLE transactions ADD COLUMN incoming boolean DEFAULT false',
'ALTER TABLE transactions ADD COLUMN stage transaction_stage NULL',
'ALTER TABLE transactions ADD COLUMN source transaction_source NULL',
'ALTER TABLE transactions ADD COLUMN fee integer NOT NULL DEFAULT 0',
'ALTER TABLE transactions ADD COLUMN error text NULL',
'ALTER TABLE transactions ALTER COLUMN fiat SET DEFAULT 0',
'ALTER TABLE transactions ALTER COLUMN satoshis SET DEFAULT 0',
'ALTER TABLE transactions ALTER COLUMN fiat SET NOT NULL',
'ALTER TABLE transactions ALTER COLUMN satoshis SET NOT NULL',
'ALTER TABLE transactions ADD CONSTRAINT transactions_unique_source ' +
'UNIQUE (session_id,to_address,stage,source)',