This commit is contained in:
Josh Harvey 2016-05-07 02:07:50 +03:00
parent e7960c217c
commit 808a8ab4a4
4 changed files with 74 additions and 14 deletions

View file

@ -19,6 +19,9 @@ var ALERT_SEND_INTERVAL = 60 * 60 * 1000
var POLLING_RATE = 60 * 1000 // poll each minute
var REAP_RATE = 2 * 1000
var PENDING_TIMEOUT = 70 * 1000
var INCOMING_TX_INTERVAL = 5 * 1000
var LIVE_INCOMING_TX_INTERVAL = 30 * 1000
var STALE_INCOMING_TX_AGE = 7 * 24 * 60 * 60 * 1000
if (argv.timeout) PENDING_TIMEOUT = argv.timeout / 1000
@ -439,6 +442,35 @@ exports.fiatBalance = function fiatBalance (cryptoCode) {
return fiatTransferBalance.round(3).toNumber()
}
function processTxStatus (tx) {
const cryptoCode = tx.cryptoCode
const walletPlugin = walletPlugins[cryptoCode]
walletPlugin.getStatus(tx, function (err, status) {
if (err) return logger.error(err)
if (tx.status === status) return
db.updateTxStatus(tx, status, function (_err) {
if (_err) return logger.error(err)
})
})
}
function monitorLiveIncoming () {
const statuses = ['notSeen', 'published']
db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) {
if (err) return
txs.forEach(processTxStatus)
})
}
function monitorIncoming () {
const statuses = ['notSeen', 'published', 'authorized', 'rejected']
db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) {
if (err) return
txs.forEach(processTxStatus)
})
}
/*
* Polling livecycle
*/
@ -452,6 +484,8 @@ exports.startPolling = function startPolling () {
})
setInterval(reapTxs, REAP_RATE)
setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL)
setInterval(monitorIncoming, INCOMING_TX_INTERVAL)
}
function startTrader (cryptoCode) {

View file

@ -555,19 +555,6 @@ function updateDispense (client, session, dispensed, cb) {
})
}
exports.updateAuthorized = function updateAuthorized (session, cb) {
var sql = 'UPDATE transactions SET dispense=$1 ' +
'WHERE stage=$2 AND authority=$3 AND device_fingerprint=$4 AND session_id=$5'
connect(function (cerr, client, done) {
if (cerr) return cb(cerr)
var values = [true, 'initial_request', 'deposit', session.fingerprint, session.id]
query(client, sql, values, function (err) {
done(err)
cb(err)
})
})
}
exports.addDispense = function addDispense (session, tx, cartridges) {
connect(function (cerr, client, done) {
if (cerr) return
@ -647,6 +634,35 @@ exports.machineEvents = function machineEvents (cb) {
})
}
exports.fetchOpenTxs = function fetchOpenTxs (statuses, age, cb) {
var sql = 'SELECT *, (EXTRACT(EPOCH FROM (now() - created))) * 1000 AS age ' +
'FROM transactions ' +
'WHERE incoming=$1 age<$2 AND status IN $3'
connect(function (cerr, client, done) {
if (cerr) return cb(cerr)
query(client, sql, [true, age, statuses], function (err, results) {
done()
if (err) return cb(err)
cb(null, results)
})
})
}
exports.updateTxStatus = function updateTxStatus (tx, status, cb) {
var sql = 'UPDATE transactions SET status=$1 WHERE id=$2'
connect(function (cerr, client, done) {
if (cerr) return cb(cerr)
var values = [status, tx.id]
query(client, sql, values, function (err) {
done(err)
cb(err)
})
})
}
/*
exports.init('postgres://lamassu:lamassu@localhost/lamassu')
connect(function(err, client, done) {

View file

@ -1,9 +1,15 @@
var db = require('./db')
function singleQuotify (item) { return '\'' + item + '\'' }
exports.up = function (next) {
var statuses = ['notSeen', 'published', 'authorized', 'confirmed', 'rejected']
.map(singleQuotify).join(',')
var sql = [
'create type status_stage AS enum (' + statuses + ')',
'alter table transactions add dispensed boolean NOT NULL DEFAULT false',
'alter table transactions add authorized boolean NOT NULL DEFAULT false'
'alter table transactions add status status_stage NOT NULL DEFAULT \'notSeen\''
]
db.multi(sql, next)
}

View file

@ -1 +1,5 @@
- update tx statuses in background
- pull all unpublished and unauthorized txs in last x seconds
- pull all unconfirmed txs
- for each tx: poll wallet plugin to check status
- update db