major refactoring to use session object

This commit is contained in:
Josh Harvey 2014-11-25 11:54:03 -05:00
parent fe008a9db6
commit 0b438a62e5
4 changed files with 94 additions and 120 deletions

View file

@ -182,8 +182,8 @@ exports.getCachedConfig = function getCachedConfig() {
return cachedConfig;
};
exports.logEvent = function event(rawEvent, deviceFingerprint) {
db.recordDeviceEvent(deviceFingerprint, rawEvent);
exports.logEvent = function event(session, rawEvent) {
db.recordDeviceEvent(session, rawEvent);
};
function _sendBitcoins(toAddress, satoshis, cb) {
@ -191,8 +191,8 @@ function _sendBitcoins(toAddress, satoshis, cb) {
walletPlugin.sendBitcoins(toAddress, satoshis, transactionFee, cb);
}
function executeTx(deviceFingerprint, tx, cb) {
db.addOutgoingTx(deviceFingerprint, tx, function(err, satoshisToSend) {
function executeTx(session, tx, cb) {
db.addOutgoingTx(session, tx, function(err, satoshisToSend) {
if (err) return cb(err);
if (satoshisToSend === 0)
@ -200,7 +200,7 @@ function executeTx(deviceFingerprint, tx, cb) {
_sendBitcoins(tx.toAddress, satoshisToSend, function(_err, txHash) {
var fee = null; // Need to fill this out in plugins
db.sentCoins(tx, satoshisToSend, fee, _err, txHash);
db.sentCoins(session, tx, satoshisToSend, fee, _err, txHash);
if (_err) return cb(err);
@ -214,30 +214,29 @@ function executeTx(deviceFingerprint, tx, cb) {
});
}
function reapOutgoingTx(deviceFingerprint, tx) {
executeTx(deviceFingerprint, tx, function(err) {
function reapOutgoingTx(session, tx) {
executeTx(session, tx, function(err) {
if (err) logger.error(err);
});
}
function reapIncomingTx(deviceFingerprint, tx) {
function reapIncomingTx(session, tx) {
infoPlugin.checkAddress(tx.toAddress, function(err, status,
satoshisReceived) {
if (status === 'notSeen') return;
db.addIncomingTx(deviceFingerprint, tx, status, satoshisReceived);
db.addIncomingTx(session, tx, status, satoshisReceived);
});
}
function reapTx(row) {
var deviceFingerprint = row.device_fingerprint;
var session = {fingerprint: row.device_fingerprint, id: row.session_id};
var tx = {
txId: row.txid,
toAddress: row.to_address,
currencyCode: row.currency_code,
incoming: row.incoming
};
if (row.incoming) reapIncomingTx(deviceFingerprint, tx);
else reapOutgoingTx(deviceFingerprint, tx);
if (row.incoming) reapIncomingTx(session, tx);
else reapOutgoingTx(session, tx);
}
function reapTxs() {
@ -257,7 +256,7 @@ function reapTxs() {
}
// TODO: Run these in parallel and return success
exports.trade = function trade(deviceFingerprint, rawTrade, cb) {
exports.trade = function trade(session, rawTrade, cb) {
// TODO: move this to DB, too
// add bill to trader queue (if trader is enabled)
@ -277,16 +276,16 @@ exports.trade = function trade(deviceFingerprint, rawTrade, cb) {
};
async.parallel([
async.apply(db.addPendingTx, deviceFingerprint, tx),
async.apply(db.recordBill, deviceFingerprint, rawTrade)
async.apply(db.addPendingTx, session, tx),
async.apply(db.recordBill, session, rawTrade)
], cb);
};
exports.sendBitcoins = function sendBitcoins(deviceFingerprint, rawTx, cb) {
executeTx(deviceFingerprint, rawTx, cb);
exports.sendBitcoins = function sendBitcoins(session, rawTx, cb) {
executeTx(session, rawTx, cb);
};
exports.cashOut = function cashOut(deviceFingerprint, tx, cb) {
exports.cashOut = function cashOut(session, tx, cb) {
var tmpInfo = {
label: 'TX ' + Date.now(),
account: 'deposit'
@ -294,19 +293,18 @@ exports.cashOut = function cashOut(deviceFingerprint, tx, cb) {
walletPlugin.newAddress(tmpInfo, function(err, address) {
if (err) return cb(err);
db.addInitialIncoming(deviceFingerprint, tx, address, function(_err) {
db.addInitialIncoming(session, tx, address, function(_err) {
cb(_err, address);
});
});
};
exports.dispenseStatus = function dispenseStatus(deviceFingerprint, sessionId,
cb) {
db.fetchDispenseStatus(deviceFingerprint, sessionId, cb);
exports.dispenseStatus = function dispenseStatus(session, cb) {
db.fetchDispenseStatus(session, cb);
};
exports.dispenseAck = function dispenseAck(deviceFingerprint, tx) {
db.addDispense(deviceFingerprint, tx);
exports.dispenseAck = function dispenseAck(session, tx) {
db.addDispense(session, tx);
};
exports.fiatBalance = function fiatBalance() {

View file

@ -51,12 +51,12 @@ function connect(cb) {
}
// logs inputted bill and overall tx status (if available)
exports.recordBill = function recordBill(deviceFingerprint, rec, cb) {
exports.recordBill = function recordBill(session, rec, cb) {
var fields = [
'device_fingerprint',
'currency_code',
'to_address',
'transaction_id',
'session_id',
'device_time',
'satoshis',
@ -64,10 +64,10 @@ exports.recordBill = function recordBill(deviceFingerprint, rec, cb) {
];
var values = [
deviceFingerprint,
session.fingerprint,
rec.currency,
rec.toAddress,
rec.txId,
session.id,
rec.deviceTime,
rec.satoshis,
@ -86,13 +86,12 @@ exports.recordBill = function recordBill(deviceFingerprint, rec, cb) {
});
};
exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint,
event) {
exports.recordDeviceEvent = function recordDeviceEvent(session, event) {
connect(function(err, client, done) {
if (err) return;
var sql = 'INSERT INTO device_events (device_fingerprint, event_type, ' +
'note, device_time) VALUES ($1, $2, $3, $4)';
var values = [deviceFingerprint, event.eventType, event.note,
var values = [session.fingerprint, event.eventType, event.note,
event.deviceTime];
client.query(sql, values, done);
});
@ -111,16 +110,18 @@ function silentQuery(client, queryStr, values, cb) {
}
// OPTIMIZE: No need to query bills if tx.fiat and tx.satoshis are set
function billsAndTxs(client, sessionId, cb) {
function billsAndTxs(client, session, cb) {
var sessionId = session.id;
var fingerprint = session.fingerprint;
var billsQuery = 'SELECT SUM(denomination) as fiat, ' +
'SUM(satoshis) AS satoshis ' +
'FROM bills ' +
'WHERE session_id=$1';
var billsValues = [sessionId];
'WHERE device_fingerprint=$1 AND session_id=$2';
var billsValues = [fingerprint, sessionId];
var txQuery = 'SELECT SUM(fiat) AS fiat, SUM(satoshis) AS satoshis ' +
'FROM transactions ' +
'WHERE session_id=$1 AND stage=$2';
var txValues = [sessionId, 'partial_request'];
'WHERE device_fingerprint=$1 AND session_id=$2 AND stage=$3';
var txValues = [fingerprint, sessionId, 'partial_request'];
async.parallel([
async.apply(query, client, billsQuery, billsValues),
@ -182,18 +183,19 @@ exports.pendingTxs = function pendingTxs(timeoutMS, cb) {
});
};
function removePendingTx(client, sessionId, cb) {
var sql = 'DELETE FROM pending_transactions WHERE session_id=$1';
silentQuery(client, sql, [sessionId], cb);
function removePendingTx(client, session, cb) {
var sql = 'DELETE FROM pending_transactions ' +
'WHERE device_fingerprint=$1 AND session_id=$2';
silentQuery(client, sql, [session.fingerprint, session.id], cb);
}
function insertOutgoingTx(client, deviceFingerprint, tx, totals, cb) {
function insertOutgoingTx(client, session, tx, totals, cb) {
var sendAmount = computeSendAmount(tx, totals);
var stage = 'partial_request';
var authority = tx.fiat ? 'machine' : 'timeout';
var satoshis = sendAmount.satoshis;
var fiat = sendAmount.fiat;
insertTx(client, deviceFingerprint, tx, satoshis, fiat, stage, authority,
insertTx(client, session, tx, satoshis, fiat, stage, authority,
function(err) {
if (err) return cb(err);
@ -201,7 +203,7 @@ function insertOutgoingTx(client, deviceFingerprint, tx, totals, cb) {
});
}
function insertOutgoingCompleteTx(client, deviceFingerprint, tx, cb) {
function insertOutgoingCompleteTx(client, session, tx, cb) {
// Only relevant for machine source transactions, not timeouts
if (!tx.fiat) return cb();
@ -210,10 +212,10 @@ function insertOutgoingCompleteTx(client, deviceFingerprint, tx, cb) {
var authority = 'machine';
var satoshis = tx.satoshis;
var fiat = tx.fiat;
insertTx(client, deviceFingerprint, tx, satoshis, fiat, stage, authority, cb);
insertTx(client, session, tx, satoshis, fiat, stage, authority, cb);
}
function insertTx(client, deviceFingerprint, tx, satoshis, fiat, stage,
function insertTx(client, session, tx, satoshis, fiat, stage,
authority, cb) {
var fields = [
'session_id',
@ -230,11 +232,11 @@ function insertTx(client, deviceFingerprint, tx, satoshis, fiat, stage,
];
var values = [
tx.txId,
session.id,
stage,
authority,
tx.incoming,
deviceFingerprint,
session.fingerprint,
tx.toAddress,
satoshis,
tx.currencyCode,
@ -250,13 +252,14 @@ function insertTx(client, deviceFingerprint, tx, satoshis, fiat, stage,
});
}
function addPendingTx(deviceFingerprint, sessionId,
incoming, currencyCode, toAddress, cb) {
function addPendingTx(session, incoming, currencyCode, toAddress, cb) {
connect(function(err, client, done) {
if (err) return cb(err);
var fields = ['session_id', 'incoming', 'currency_code', 'to_address'];
var fields = ['device_fingerprint', 'session_id', 'incoming',
'currency_code', 'to_address'];
var sql = getInsertQuery('pending_transactions', fields);
var values = [sessionId, incoming, currencyCode, toAddress];
var values = [session.fingerprint, session.id, incoming, currencyCode,
toAddress];
query(client, sql, values, function(_err) {
done();
@ -270,16 +273,15 @@ function addPendingTx(deviceFingerprint, sessionId,
}
// Calling function should only send bitcoins if result.satoshisToSend > 0
exports.addOutgoingTx = function addOutgoingTx(deviceFingerprint, tx, cb) {
exports.addOutgoingTx = function addOutgoingTx(session, tx, cb) {
connect(function(err, client, done) {
if (err) return cb(err);
async.waterfall([
async.apply(silentQuery, client, 'BEGIN', null),
async.apply(insertOutgoingCompleteTx, client, deviceFingerprint, tx),
async.apply(insertOutgoingCompleteTx, client, session, tx),
async.apply(removePendingTx, client, tx.sessionId),
async.apply(billsAndTxs, client, tx.txId, tx.currencyCode,
deviceFingerprint),
async.apply(insertOutgoingTx, client, deviceFingerprint, tx),
async.apply(billsAndTxs, client, session, tx.currencyCode),
async.apply(insertOutgoingTx, client, session, tx),
], function(err, satoshisToSend) {
if (err) {
rollback(client, done);
@ -293,14 +295,15 @@ exports.addOutgoingTx = function addOutgoingTx(deviceFingerprint, tx, cb) {
});
};
exports.sentCoins = function sentCoins(tx, satoshis, fee, error, txHash) {
exports.sentCoins = function sentCoins(session, tx, satoshis, fee, error,
txHash) {
connect(function(err, client, done) {
if (err) return logger.error(err);
var newTx = _.clone(tx);
newTx.txHash = txHash;
newTx.error = error;
insertTx(client, newTx.deviceFingerprint, newTx, satoshis, newTx.fiat,
insertTx(client, session, newTx, satoshis, newTx.fiat,
'partial_send', function(_err) {
done();
if (err) logger.error(_err);
@ -308,19 +311,20 @@ exports.sentCoins = function sentCoins(tx, satoshis, fee, error, txHash) {
});
};
exports.addIncomingTx = function addIncomingTx(deviceFingerprint, tx, authority,
exports.addIncomingTx = function addIncomingTx(session, tx, authority,
satoshisReceived, cb) {
connect(function(err, client, done) {
function maybeRemovePending(client, sessionId, authority, cb) {
function maybeRemovePending(client, session, authority, cb) {
if (authority === 'published') return cb();
removePendingTx(client, sessionId, cb);
removePendingTx(client, session, cb);
}
if (err) return cb(err);
async.waterfall([
async.apply(silentQuery, client, 'BEGIN', null),
async.apply(maybeRemovePending, client, tx.sessionId, authority),
async.apply(insertTx, client, tx, satoshisReceived, 0, 'deposit', authority)
async.apply(maybeRemovePending, client, session, authority),
async.apply(insertTx, client, session, tx, satoshisReceived, 0, 'deposit',
authority)
], function(err) {
if (err) {
rollback(client, done);
@ -334,15 +338,15 @@ exports.addIncomingTx = function addIncomingTx(deviceFingerprint, tx, authority,
});
};
exports.addInitialIncoming = function addInitialIncoming(deviceFingerprint, tx,
address, cb) {
exports.addInitialIncoming = function addInitialIncoming(session, tx, address,
cb) {
connect(function(err, client, done) {
if (err) return cb(err);
async.waterfall([
async.apply(silentQuery, client, 'BEGIN', null),
async.apply(addPendingTx, client, deviceFingerprint, tx.sessionId,
true, tx.currencyCode, tx.toAddress),
async.apply(insertTx, client, tx, tx.satoshis, tx.fiat,
async.apply(addPendingTx, client, session, true, tx.currencyCode,
tx.toAddress),
async.apply(insertTx, client, session, tx, tx.satoshis, tx.fiat,
'initial_request', 'pending')
], function(err) {
if (err) {
@ -357,34 +361,30 @@ exports.addInitialIncoming = function addInitialIncoming(deviceFingerprint, tx,
});
};
function lastTxStatus(client, deviceFingerprint, sessionId, cb) {
function lastTxStatus(client, session, sessionId, cb) {
var sql = 'SELECT satoshis, authority FROM transactions ' +
'WHERE device_fingerprint=$1 AND session_id=$2 AND incoming=$3 ' +
'ORDER BY id DESC LIMIT 1';
var values = [deviceFingerprint, sessionId, true];
var values = [session.fingerprint, session.id, true];
query(client, sql, values, cb);
}
function initialRequest(client, deviceFingerprint, sessionId, cb) {
function initialRequest(client, session, cb) {
var sql = 'SELECT fiat, satoshis FROM transactions ' +
'WHERE device_fingerprint=$1 AND session_id=$2 AND incoming=$3 ' +
'AND stage=$4';
var values = [deviceFingerprint, sessionId, true, 'initial_request'];
var values = [session.fingerprint, session.id, true, 'initial_request'];
query(client, sql, values, cb);
}
exports.dispenseStatus = function dispenseStatus(deviceFingerprint, sessionId,
cb) {
// NOTE: select for both device_fingerprint and session_id for security.
// Don't want to allow operators to read other machines' transactions.
exports.dispenseStatus = function dispenseStatus(session, cb) {
connect(function(err, client, done) {
if (err) return cb(err);
async.parallel([
async.apply(client, initialRequest, deviceFingerprint, sessionId),
async.apply(client, lastTxStatus, deviceFingerprint, sessionId)
async.apply(client, initialRequest, session),
async.apply(client, lastTxStatus, session)
], function(_err, results) {
done();
if (_err) return cb(_err);
@ -406,11 +406,11 @@ exports.dispenseStatus = function dispenseStatus(deviceFingerprint, sessionId,
});
};
function lastDispenseCount(client, deviceFingerprint, transactionId, cb) {
function lastDispenseCount(client, session, transactionId, cb) {
var sql = 'SELECT count1, count2 FROM dispenses ' +
'WHERE device_fingerprint=$1 ' +
'ORDER BY id DESC LIMIT 1';
client.query(sql, [deviceFingerprint], function(err, results) {
client.query(sql, [session.fingerprint], function(err, results) {
if (err) return cb(err);
if (results.rows.length === 0) return cb(null, [0, 0]);
cb(null, transactionId, [results.rows[0].count1, results.rows[0].count2]);
@ -441,14 +441,14 @@ function insertDispense(client, tx, transactionId, counts, cb) {
client.query(sql, values, cb);
}
exports.addDispense = function addDispense(deviceFingerprint, tx) {
exports.addDispense = function addDispense(session, tx) {
connect(function(err, client, done) {
if (err) return logger.error(err);
async.waterfall([
async.apply(insertTx, client, deviceFingerprint, tx, 0, tx.fiat,
async.apply(insertTx, client, session, tx, 0, tx.fiat,
'deposit', 'authorized'),
async.apply(lastDispenseCount, client, deviceFingerprint),
async.apply(lastDispenseCount, client, session),
async.apply(insertDispense, client, tx)
], function(_err) {
done();
@ -456,29 +456,3 @@ exports.addDispense = function addDispense(deviceFingerprint, tx) {
});
});
};
/*
exports.decrementCartridges =
function decrementCartridges(fingerprint, cartridge1, cartridge2, cb) {
var query = 'UPDATE devices SET cartridge_1_bills = cartridge_1_bills - $1, ' +
'cartridge_2_bills = cartridge_2_bills - $2 ' +
'WHERE fingerprint = $3';
client.query(query, [cartridge1, cartridge2, fingerprint], cb);
};
exports.fillCartridges =
function fillCartridges(fingerprint, cartridge1, cartridge2, cb) {
var query = 'UPDATE devices SET cartridge_1_bills = $1, ' +
'cartridge_2_bills = $2 ' +
'WHERE fingerprint = $3';
client.query(query, [cartridge1, cartridge2, fingerprint], cb);
};
*/
/*
exports.init('psql://lamassu:lamassu@localhost/lamassu');
var fp = 'AB:9C:09:AA:7B:48:51:9A:0E:13:59:4E:5E:69:D0:74:E5:0F:4A:66';
var txId = '5ef9c631-d948-4f0f-bf22-d2a563f5cd26';
var tx = {txId: txId, currencyCode: 'USD', toAddress: '1xxx'};
exports.addDigitalTx(198, new Error('insufficient funds'), null, function(err, result) { pg.end(); console.dir(result); });
*/

View file

@ -28,9 +28,6 @@ try {
function poll(req, res) {
var rateRec = plugins.getDeviceRate();
var balanceRec = plugins.getBalance();
var fingerprint = getFingerprint(req);
logger.debug('poll request from: %s', fingerprint);
// `rateRec` and `balanceRec` are both objects, so there's no danger
// of misinterpreting rate or balance === 0 as 'Server initializing'.
@ -63,7 +60,7 @@ function poll(req, res) {
config.exchanges.settings.commission;
var sessionId = req.get('session-id');
plugins.dispenseStatus(fingerprint, sessionId, function(err, dispenseStatus) {
plugins.dispenseStatus(session(req), function(err, dispenseStatus) {
if (err) return logger.error(err);
var response = {
err: null,
@ -86,14 +83,14 @@ function poll(req, res) {
}
function trade(req, res) {
plugins.trade(getFingerprint(req), req.body, function(err) {
plugins.trade(session(req), req.body, function(err) {
var statusCode = err ? 500 : 201;
res.json(statusCode, {err: err});
});
}
function send(req, res) {
plugins.sendBitcoins(getFingerprint(req), req.body, function(err, status) {
plugins.sendBitcoins(session(req), req.body, function(err, status) {
// TODO: use status.statusCode here after confirming machine compatibility
// FIX: (joshm) set txHash to status.txId instead of previous status.txHash which wasn't being set
// Need to clean up txHash vs txId
@ -108,7 +105,7 @@ function send(req, res) {
function cashOut(req, res) {
logger.info({tx: req.body, cmd: 'cashOut'});
plugins.cashOut(getFingerprint(req), req.body, function(err, bitcoinAddress) {
plugins.cashOut(session(req), req.body, function(err, bitcoinAddress) {
if (err) logger.error(err);
res.json({
@ -120,12 +117,12 @@ function cashOut(req, res) {
}
function dispenseAck(req, res) {
plugins.dispenseAck(getFingerprint(req), req.body);
plugins.dispenseAck(session(req), req.body);
res.json(200);
}
function deviceEvent(req, res) {
plugins.logEvent(req.body, getFingerprint(req));
plugins.logEvent(session(req), req.body);
res.json({err: null});
}
@ -195,6 +192,10 @@ function init(localConfig) {
return app;
}
function session(req) {
return {fingerprint: getFingerprint(req), id: req.get('session-id')};
}
function getFingerprint(req) {
return typeof req.connection.getPeerCertificate === 'function' &&
req.connection.getPeerCertificate().fingerprint;

View file

@ -17,7 +17,7 @@ exports.up = function(next) {
'CREATE TABLE transactions ( ' +
'id serial PRIMARY KEY, ' +
'session_id uuid UNIQUE NOT NULL, ' +
'session_id uuid NOT NULL, ' +
'device_fingerprint text, ' +
'to_address text NOT NULL, ' +
'satoshis integer NOT NULL DEFAULT 0, ' +
@ -35,6 +35,7 @@ exports.up = function(next) {
'CREATE TABLE pending_transactions ( ' +
'id serial PRIMARY KEY, ' +
'device_fingerprint text NOT NULL, ' +
'session_id uuid UNIQUE NOT NULL, ' +
'incoming boolean NOT NULL, ' +
'currency_code text NOT NULL, ' +