WIP migrations

This commit is contained in:
Josh Harvey 2014-11-20 00:40:27 -05:00
parent e650e591d5
commit 7a2ab111d6
3 changed files with 38 additions and 107 deletions

View file

@ -7,9 +7,6 @@ var SATOSHI_FACTOR = 1e8;
var POLLING_RATE = 60 * 1000; // poll each minute var POLLING_RATE = 60 * 1000; // poll each minute
var REAP_RATE = 5 * 1000; var REAP_RATE = 5 * 1000;
var PENDING_TIMEOUT = 70 * 1000; var PENDING_TIMEOUT = 70 * 1000;
var RECOMMENDED_FEE = 10000;
var TX_0CONF_WAIT_TIME = 20 * 1000; // wait 20 seconds
var MIN_CONFIDENCE = 0.7;
var db = null; var db = null;
@ -215,16 +212,29 @@ function executeTx(deviceFingerprint, tx, cb) {
}); });
} }
function reapIncomingTx(deviceFingerprint, tx) {
executeTx(deviceFingerprint, tx, function(err) {
if (err) logger.error(err);
});
}
function reapOutgoingTx(deviceFingerprint, tx) {
infoPlugin.checkAddress(tx.toAddress, function(err, status, satoshisReceived) {
if (status === 'notSeen') return;
db.addOutgoingTx(deviceFingerprint, tx, status, satoshisReceived);
});
}
function reapTx(row) { function reapTx(row) {
var deviceFingerprint = row.device_fingerprint; var deviceFingerprint = row.device_fingerprint;
var tx = { var tx = {
txId: row.txid, txId: row.txid,
toAddress: row.to_address, toAddress: row.to_address,
currencyCode: row.currency_code currencyCode: row.currency_code,
incoming: row.incoming
}; };
executeTx(deviceFingerprint, tx, function(err) { if (row.incoming) reapIncomingTx(deviceFingerprint, tx);
if (err) logger.error(err); else reapOutgoingTx(deviceFingerprint, tx);
});
} }
function reapTxs() { function reapTxs() {
@ -253,6 +263,8 @@ exports.trade = function trade(deviceFingerprint, rawTrade, cb) {
var tx = { var tx = {
txId: rawTrade.txId, txId: rawTrade.txId,
fiat: 0,
satoshis: 0,
toAddress: rawTrade.toAddress, toAddress: rawTrade.toAddress,
currencyCode: rawTrade.currency currencyCode: rawTrade.currency
}; };
@ -297,82 +309,6 @@ function _setDispenseStatus(deviceFingerprint, tx, status, deposit) {
dispenseStatuses[deviceFingerprint] = statusObject; dispenseStatuses[deviceFingerprint] = statusObject;
} }
function _checkTx(deviceFingerprint, tx, txInfo) {
// accept if tx is already confirmed
if (txInfo.confirmations > 0) {
_setDispenseStatus(deviceFingerprint, tx, 'authorizedDeposit', txInfo.amount);
return true;
}
// NOTE: we can put some heuristics here
// consider authorization raported by the 'info' plugin
if (txInfo.authorized === true && txInfo.confidence >= MIN_CONFIDENCE) {
_setDispenseStatus(deviceFingerprint, tx, 'authorizedDeposit', txInfo.amount);
return true;
}
// SHOULD TAKE MUCH MORE FACTORS INTO ACCOUNT HERE
// accept txs with recommended fee and with at least 20s of propagation time
if (txInfo.fees >= RECOMMENDED_FEE && txInfo.tsReceived + TX_0CONF_WAIT_TIME < Date.now()) {
_setDispenseStatus(deviceFingerprint, tx, 'authorizedDeposit', txInfo.amount);
return true;
}
return false;
}
// this is invoked only when tx is fresh enough AND is for a right amount
function _monitorTx(deviceFingerprint, tx) {
infoPlugin.getTx(tx.txHash, tx.toAddress, function(err, txInfo) {
if (err) {
logger.error(err);
return setTimeout(_monitorTx, 300, deviceFingerprint, tx);
}
if (_checkTx(deviceFingerprint, tx, txInfo))
return;
setTimeout(_monitorTx, 300, deviceFingerprint, tx);
});
}
function _monitorAddress(deviceFingerprint, tx) {
if (!tx) throw new Error('No tx');
infoPlugin.getAddressLastTx(tx.toAddress, function(err, txInfo) {
if (err) {
logger.error(err);
return setTimeout(_monitorAddress, 300, deviceFingerprint, tx);
}
// no tx occured at all or deposit address was reused; some previous tx was returned
if (!txInfo || txInfo.tsReceived < tx.created) {
return setTimeout(_monitorAddress, 300, deviceFingerprint, tx);
}
// when sent TX is not enough
if (txInfo.amount < tx.satoshis)
return _setDispenseStatus(deviceFingerprint, tx, 'insufficientDeposit', txInfo.amount);
// store txHash for later reference
tx.txHash = txInfo.txHash;
// warn about dangerous TX
if (txInfo.fees < RECOMMENDED_FEE)
logger.warn('TXs w/o fee can take forever to confirm!');
// make sure tx isn't already in an acceptable state
if (_checkTx(deviceFingerprint, tx, txInfo))
return;
// update tx status and save first txHash
_setDispenseStatus(deviceFingerprint, tx, 'fullDeposit', txInfo.amount);
// start monitoring TX (instead of an address)
setTimeout(_monitorTx, 300, deviceFingerprint, tx);
});
}
exports.cashOut = function cashOut(deviceFingerprint, tx, cb) { exports.cashOut = function cashOut(deviceFingerprint, tx, cb) {
var tmpInfo = { var tmpInfo = {
label: 'TX ' + Date.now(), label: 'TX ' + Date.now(),
@ -383,19 +319,10 @@ exports.cashOut = function cashOut(deviceFingerprint, tx, cb) {
return cb(new Error(err)); return cb(new Error(err));
tx.toAddress = address; tx.toAddress = address;
tx.tx_type = 'sell'; tx.incoming = false;
db.insertTx(deviceFingerprint, tx, function(err) { db.addPendingTx(deviceFingerprint, tx, function(err) {
if (err) cb(err, address);
return cb(new Error(err));
_setDispenseStatus(deviceFingerprint, tx, 'noDeposit');
// start watching address for incoming txs
_monitorAddress(deviceFingerprint, tx);
// return address to the machine
return cb(null, address);
}); });
}); });
}; };

View file

@ -110,16 +110,16 @@ function silentQuery(client, queryStr, values, cb) {
} }
// OPTIMIZE: No need to query bills if tx.fiat and tx.satoshis are set // OPTIMIZE: No need to query bills if tx.fiat and tx.satoshis are set
function billsAndTxs(client, txid, currencyCode, deviceFingerprint, cb) { function billsAndTxs(client, sessionId, currencyCode, deviceFingerprint, cb) {
var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' + var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' +
'COALESCE(SUM(satoshis), 0) AS satoshis ' + 'COALESCE(SUM(satoshis), 0) AS satoshis ' +
'FROM bills ' + 'FROM bills ' +
'WHERE transaction_id=$1 AND currency_code=$2 AND device_fingerprint=$3'; 'WHERE transaction_id=$1 AND currency_code=$2 AND device_fingerprint=$3';
var billsValues = [txid, currencyCode, deviceFingerprint]; var billsValues = [sessionId, currencyCode, deviceFingerprint];
var txQuery = 'SELECT COALESCE(SUM(fiat), 0) AS fiat, ' + var txQuery = 'SELECT COALESCE(SUM(fiat), 0) AS fiat, ' +
'COALESCE(SUM(satoshis), 0) AS satoshis ' + 'COALESCE(SUM(satoshis), 0) AS satoshis ' +
'FROM transactions ' + 'FROM transactions ' +
'WHERE txid=$1 AND currency_code=$2 AND device_fingerprint=$3'; 'WHERE session_id=$1 AND currency_code=$2 AND device_fingerprint=$3';
var txValues = billsValues; // They happen to be the same var txValues = billsValues; // They happen to be the same
async.parallel([ async.parallel([
@ -157,7 +157,7 @@ exports.pendingTxs = function pendingTxs(timeoutMS, cb) {
connect(function(err, client, done) { connect(function(err, client, done) {
var sql = 'SELECT * FROM transactions ' + var sql = 'SELECT * FROM transactions ' +
'WHERE status=$1 AND ' + 'WHERE status=$1 AND ' +
'EXTRACT(EPOCH FROM now() - created) > $2 ' + '(NOT incoming OR EXTRACT(EPOCH FROM now() - created > $2) ' +
'ORDER BY created ASC'; 'ORDER BY created ASC';
var timeoutS = timeoutMS / 1000; var timeoutS = timeoutMS / 1000;
var values = ['pending', timeoutS]; var values = ['pending', timeoutS];
@ -169,7 +169,7 @@ exports.pendingTxs = function pendingTxs(timeoutMS, cb) {
}; };
function removePendingTx(client, tx, cb) { function removePendingTx(client, tx, cb) {
silentQuery(client, 'DELETE FROM transactions WHERE txid=$1 AND status=$2', silentQuery(client, 'DELETE FROM transactions WHERE session_id=$1 AND status=$2',
[tx.txId, 'pending'], cb); [tx.txId, 'pending'], cb);
} }
@ -187,9 +187,9 @@ function maybeInsertTx(client, deviceFingerprint, tx, totals, cb) {
function insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, cb) { function insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, cb) {
var fields = [ var fields = [
'txid', 'session_id',
'status', 'status',
'tx_type', 'incoming',
'device_fingerprint', 'device_fingerprint',
'to_address', 'to_address',
'satoshis', 'satoshis',
@ -200,7 +200,7 @@ function insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, cb) {
var values = [ var values = [
tx.txId, tx.txId,
status, status,
tx.tx_type || 'buy', tx.incoming === false ? false : true,
deviceFingerprint, deviceFingerprint,
tx.toAddress, tx.toAddress,
satoshis, satoshis,
@ -211,16 +211,19 @@ function insertTx(client, deviceFingerprint, tx, satoshis, fiat, status, cb) {
query(client, getInsertQuery('transactions', fields, true), values, cb); query(client, getInsertQuery('transactions', fields, true), values, cb);
} }
exports.addPendingTx = function addPendingTx(deviceFingerprint, tx) { exports.addPendingTx = function addPendingTx(deviceFingerprint, tx, cb) {
connect(function(err, client, done) { connect(function(err, client, done) {
if (err) return; if (err) return cb(err);
insertTx(client, deviceFingerprint, tx, 0, 0, 'pending', insertTx(client, deviceFingerprint, tx, tx.satoshis, tx.fiat, 'pending',
function(err) { function(err) {
done(); done();
// If pending tx already exists, do nothing // If pending tx already exists, do nothing
if (err && PG_ERRORS[err.code] !== 'uniqueViolation') if (err && PG_ERRORS[err.code] !== 'uniqueViolation') {
logger.error(err); logger.error(err);
return cb(err);
}
cb();
}); });
}); });
}; };

View file

@ -12,6 +12,7 @@ exports.up = function(next){
'ALTER TABLE transactions ADD PRIMARY KEY (id)', 'ALTER TABLE transactions ADD PRIMARY KEY (id)',
'CREATE INDEX ON transactions (session_id)', 'CREATE INDEX ON transactions (session_id)',
'ALTER TABLE transactions ADD CONSTRAINT transactions_session_status UNIQUE (session_id,status)', 'ALTER TABLE transactions ADD CONSTRAINT transactions_session_status UNIQUE (session_id,status)',
'ALTER TABLE transactions ADD COLUMN incoming boolean',
'CREATE TABLE digital_transactions ( ' + 'CREATE TABLE digital_transactions ( ' +
'id serial PRIMARY KEY, ' + 'id serial PRIMARY KEY, ' +