WIP
This commit is contained in:
parent
f189438094
commit
59db6d803c
2 changed files with 2 additions and 243 deletions
|
|
@ -9,16 +9,12 @@ BigNumber.config({CRYPTO: true})
|
|||
var logger = require('./logger')
|
||||
var notifier = require('./notifier')
|
||||
|
||||
var argv = require('minimist')(process.argv.slice(2))
|
||||
|
||||
var uuid = require('node-uuid')
|
||||
var tradeIntervals = {}
|
||||
|
||||
var CHECK_NOTIFICATION_INTERVAL = 60 * 1000
|
||||
var ALERT_SEND_INTERVAL = 60 * 60 * 1000
|
||||
var POLLING_RATE = 60 * 1000 // poll each minute
|
||||
var REAP_RATE = 2 * 1000
|
||||
var PENDING_TIMEOUT = 70 * 1000
|
||||
var INCOMING_TX_INTERVAL = 5 * 1000
|
||||
var LIVE_INCOMING_TX_INTERVAL = 30 * 1000
|
||||
var STALE_INCOMING_TX_AGE = 7 * 24 * 60 * 60 * 1000
|
||||
|
|
@ -27,11 +23,6 @@ var MAX_NOTIFY_AGE = 48 * 60 * 60 * 1000
|
|||
var MIN_NOTIFY_AGE = 5 * 60 * 1000
|
||||
var TRANSACTION_EXPIRATION = 48 * 60 * 60 * 1000
|
||||
|
||||
if (argv.timeout) PENDING_TIMEOUT = argv.timeout / 1000
|
||||
|
||||
// TODO: might have to update this if user is allowed to extend monitoring time
|
||||
var DEPOSIT_TIMEOUT = 130 * 1000
|
||||
|
||||
var db = null
|
||||
|
||||
var cryptoCodes = null
|
||||
|
|
@ -319,40 +310,6 @@ function executeTx (session, tx, authority, cb) {
|
|||
})
|
||||
}
|
||||
|
||||
function reapOutgoingTx (session, tx) {
|
||||
executeTx(session, tx, 'timeout', function (err) {
|
||||
if (err) logger.error(err)
|
||||
})
|
||||
}
|
||||
|
||||
function reapTx (row) {
|
||||
var session = {fingerprint: row.device_fingerprint, id: row.session_id}
|
||||
var tx = {
|
||||
fiat: 0,
|
||||
cryptoAtoms: new BigNumber(row.cryptoAtoms),
|
||||
toAddress: row.toAddress,
|
||||
currencyCode: row.currencyCode,
|
||||
cryptoCode: row.cryptoCode,
|
||||
incoming: row.incoming
|
||||
}
|
||||
if (!row.incoming) reapOutgoingTx(session, tx)
|
||||
}
|
||||
|
||||
function reapTxs () {
|
||||
db.removeOldPending(DEPOSIT_TIMEOUT)
|
||||
|
||||
// NOTE: No harm in processing old pending tx, we don't need to wait for
|
||||
// removeOldPending to complete.
|
||||
db.pendingTxs(PENDING_TIMEOUT, function (err, rows) {
|
||||
if (err) return logger.warn(err)
|
||||
var rowCount = rows.length
|
||||
for (var i = 0; i < rowCount; i++) {
|
||||
var row = rows[i]
|
||||
reapTx(row)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: Run these in parallel and return success
|
||||
exports.trade = function trade (session, rawTrade, cb) {
|
||||
// TODO: move this to DB, too
|
||||
|
|
@ -369,10 +326,7 @@ exports.trade = function trade (session, rawTrade, cb) {
|
|||
})
|
||||
}
|
||||
|
||||
async.parallel([
|
||||
async.apply(db.addOutgoingPending, session, rawTrade.currency, rawTrade.cryptoCode, rawTrade.toAddress),
|
||||
async.apply(db.recordBill, session, rawTrade)
|
||||
], cb)
|
||||
db.recordBill(session, rawTrade, cb)
|
||||
}
|
||||
|
||||
exports.stateChange = function stateChange (session, rec, cb) {
|
||||
|
|
@ -519,7 +473,6 @@ exports.startPolling = function startPolling () {
|
|||
startTrader(cryptoCode)
|
||||
})
|
||||
|
||||
setInterval(reapTxs, REAP_RATE)
|
||||
setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL)
|
||||
setInterval(monitorIncoming, INCOMING_TX_INTERVAL)
|
||||
setInterval(monitorUnnotified, UNNOTIFIED_INTERVAL)
|
||||
|
|
|
|||
|
|
@ -153,110 +153,6 @@ function silentQuery (client, queryStr, values, cb) {
|
|||
})
|
||||
}
|
||||
|
||||
// OPTIMIZE: No need to query bills if tx.fiat and tx.cryptoAtoms are set
|
||||
function billsAndTxs (client, session, cb) {
|
||||
var sessionId = session.id
|
||||
var fingerprint = session.fingerprint
|
||||
var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' +
|
||||
'COALESCE(SUM(satoshis), 0) AS satoshis ' +
|
||||
'FROM bills ' +
|
||||
'WHERE device_fingerprint=$1 AND session_id=$2'
|
||||
var billsValues = [fingerprint, sessionId]
|
||||
var txQuery = 'SELECT COALESCE(SUM(fiat), 0) AS fiat, ' +
|
||||
'COALESCE(SUM(satoshis), 0) AS satoshis ' +
|
||||
'FROM transactions ' +
|
||||
'WHERE device_fingerprint=$1 AND session_id=$2 AND stage=$3'
|
||||
var txValues = [fingerprint, sessionId, 'partial_request']
|
||||
|
||||
async.parallel([
|
||||
async.apply(query, client, billsQuery, billsValues),
|
||||
async.apply(query, client, txQuery, txValues)
|
||||
], function (err, results) {
|
||||
if (err) return cb(err)
|
||||
|
||||
// Note: PG SUM function returns int8, which is returned as a string, so
|
||||
// we need to parse, since we know these won't be huge numbers.
|
||||
cb(null, {
|
||||
billsFiat: parseInt(results[0].rows[0].fiat, 10),
|
||||
billsCryptoAtoms: new BigNumber(results[0].rows[0].satoshis),
|
||||
txFiat: parseInt(results[1].rows[0].fiat, 10),
|
||||
txCryptoAtoms: new BigNumber(results[1].rows[0].satoshis)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function computeSendAmount (tx, totals) {
|
||||
var fiatRemaining = (tx.fiat || totals.billsFiat) - totals.txFiat
|
||||
|
||||
var cryptoAtomsRemaining = tx.cryptoAtoms.eq(0)
|
||||
? totals.billsCryptoAtoms.minus(totals.txCryptoAtoms)
|
||||
: tx.cryptoAtoms.minus(totals.txCryptoAtoms)
|
||||
|
||||
var result = {
|
||||
fiat: fiatRemaining,
|
||||
cryptoAtoms: cryptoAtomsRemaining
|
||||
}
|
||||
if (result.fiat < 0 || result.cryptoAtoms.lt(0)) {
|
||||
logger.warn({tx: tx, totals: totals, result: result},
|
||||
"computeSendAmount result < 0, this shouldn't happen. " +
|
||||
'Maybe timeout arrived after machineSend.')
|
||||
result.fiat = 0
|
||||
result.cryptoAtoms = new BigNumber(0)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
exports.removeOldPending = function removeOldPending (timeoutMS) {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return
|
||||
var sql = 'DELETE FROM pending_transactions ' +
|
||||
'WHERE incoming AND extract(EPOCH FROM now() - updated) > $1'
|
||||
var timeoutS = timeoutMS / 1000
|
||||
var values = [timeoutS]
|
||||
query(client, sql, values, function (err) {
|
||||
done()
|
||||
if (err) logger.error(err)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
exports.pendingTxs = function pendingTxs (timeoutMS, cb) {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
var sql = 'SELECT * ' +
|
||||
'FROM pending_transactions ' +
|
||||
'WHERE (incoming OR extract(EPOCH FROM now() - updated) > $1) ' +
|
||||
'ORDER BY updated ASC'
|
||||
var timeoutS = timeoutMS / 1000
|
||||
var values = [timeoutS]
|
||||
query(client, sql, values, function (err, results) {
|
||||
done()
|
||||
cb(err, normalizeTxs(results.rows))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function removePendingTx (client, session, cb) {
|
||||
var sql = 'DELETE FROM pending_transactions ' +
|
||||
'WHERE device_fingerprint=$1 AND session_id=$2'
|
||||
silentQuery(client, sql, [session.fingerprint, session.id], cb)
|
||||
}
|
||||
|
||||
function insertOutgoingTx (client, session, tx, totals, cb) {
|
||||
var sendAmount = computeSendAmount(tx, totals)
|
||||
var stage = 'partial_request'
|
||||
var authority = tx.fiat ? 'machine' : 'timeout'
|
||||
var cryptoAtoms = sendAmount.cryptoAtoms
|
||||
var fiat = sendAmount.fiat
|
||||
if (cryptoAtoms.eq(0)) return cb(null, {cryptoAtoms: new BigNumber(0), fiat: 0})
|
||||
|
||||
insertOutgoing(client, session, tx, cryptoAtoms, fiat, stage, authority,
|
||||
function (err) {
|
||||
if (err) return cb(err)
|
||||
cb(null, {cryptoAtoms: cryptoAtoms, fiat: fiat})
|
||||
})
|
||||
}
|
||||
|
||||
function insertOutgoingCompleteTx (client, session, tx, cb) {
|
||||
// Only relevant for machine source transactions, not timeouts
|
||||
if (!tx.fiat) return cb()
|
||||
|
|
@ -319,18 +215,6 @@ function insertTx (client, session, incoming, tx, cryptoAtoms, fiat, stage,
|
|||
})
|
||||
}
|
||||
|
||||
function refreshPendingTx (client, session, cb) {
|
||||
var sql = 'UPDATE pending_transactions SET updated=now() ' +
|
||||
'WHERE device_fingerprint=$1 AND session_id=$2'
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
query(client, sql, [session.fingerprint, session.id], function (err) {
|
||||
done(err)
|
||||
cb(err)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function addPendingTx (client, session, incoming, currencyCode, cryptoCode, toAddress,
|
||||
cryptoAtoms, cb) {
|
||||
var fields = ['device_fingerprint', 'session_id', 'incoming',
|
||||
|
|
@ -343,34 +227,11 @@ function addPendingTx (client, session, incoming, currencyCode, cryptoCode, toAd
|
|||
})
|
||||
}
|
||||
|
||||
function buildOutgoingTx (client, session, tx, cb) {
|
||||
async.waterfall([
|
||||
async.apply(billsAndTxs, client, session),
|
||||
async.apply(insertOutgoingTx, client, session, tx)
|
||||
], cb)
|
||||
}
|
||||
|
||||
// Calling function should only send bitcoins if result.cryptoAtomsToSend > 0
|
||||
exports.addOutgoingTx = function addOutgoingTx (session, tx, cb) {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
async.series([
|
||||
async.apply(silentQuery, client, 'BEGIN'),
|
||||
async.apply(silentQuery, client, 'LOCK TABLE transactions'),
|
||||
async.apply(insertOutgoingCompleteTx, client, session, tx),
|
||||
async.apply(removePendingTx, client, session),
|
||||
async.apply(buildOutgoingTx, client, session, tx)
|
||||
], function (err, results) {
|
||||
if (err) {
|
||||
rollback(client, done)
|
||||
return cb(err)
|
||||
}
|
||||
silentQuery(client, 'COMMIT', [], function () {
|
||||
done()
|
||||
var toSend = results[4]
|
||||
cb(null, toSend)
|
||||
})
|
||||
})
|
||||
insertOutgoingCompleteTx(client, session, tx, cb)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -390,61 +251,6 @@ exports.sentCoins = function sentCoins (session, tx, authority, toSend, fee,
|
|||
})
|
||||
}
|
||||
|
||||
function ensureNotFinal (client, session, cb) {
|
||||
var sql = 'SELECT id FROM transactions ' +
|
||||
'WHERE device_fingerprint=$1 AND session_id=$2 AND incoming=$3 ' +
|
||||
'AND stage=$4' +
|
||||
'LIMIT 1'
|
||||
var values = [session.fingerprint, session.id, false, 'final_request']
|
||||
|
||||
client.query(sql, values, function (err, results) {
|
||||
var error
|
||||
if (err) return cb(err)
|
||||
if (results.rows.length > 0) {
|
||||
error = new Error('Final request already exists')
|
||||
error.name = 'staleBill'
|
||||
error.severity = 'low'
|
||||
return cb(error)
|
||||
}
|
||||
cb()
|
||||
})
|
||||
}
|
||||
|
||||
exports.addOutgoingPending = function addOutgoingPending (session, currencyCode,
|
||||
cryptoCode, toAddress, cb) {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
|
||||
async.series([
|
||||
async.apply(silentQuery, client, 'BEGIN', null),
|
||||
async.apply(ensureNotFinal, client, session),
|
||||
async.apply(addPendingTx, client, session, false, currencyCode, cryptoCode, toAddress,
|
||||
0)
|
||||
], function (err) {
|
||||
if (err) {
|
||||
return rollback(client, function (rberr) {
|
||||
done(rberr)
|
||||
|
||||
if (isUniqueViolation(err)) {
|
||||
// Pending tx exists, refresh it.
|
||||
return refreshPendingTx(client, session, cb)
|
||||
}
|
||||
if (err.name === 'staleBill') {
|
||||
logger.info('Received a bill insert after send coins request')
|
||||
return cb()
|
||||
}
|
||||
logger.error(err)
|
||||
return cb(err)
|
||||
})
|
||||
}
|
||||
silentQuery(client, 'COMMIT', null, function () {
|
||||
done()
|
||||
cb()
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
exports.addInitialIncoming = function addInitialIncoming (session, tx, cb) {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue