diff --git a/lib/plugins.js b/lib/plugins.js index f5dccecb..363c2e5f 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -19,6 +19,9 @@ var ALERT_SEND_INTERVAL = 60 * 60 * 1000 var POLLING_RATE = 60 * 1000 // poll each minute var REAP_RATE = 2 * 1000 var PENDING_TIMEOUT = 70 * 1000 +var INCOMING_TX_INTERVAL = 5 * 1000 +var LIVE_INCOMING_TX_INTERVAL = 30 * 1000 +var STALE_INCOMING_TX_AGE = 7 * 24 * 60 * 60 * 1000 if (argv.timeout) PENDING_TIMEOUT = argv.timeout / 1000 @@ -439,6 +442,35 @@ exports.fiatBalance = function fiatBalance (cryptoCode) { return fiatTransferBalance.round(3).toNumber() } +function processTxStatus (tx) { + const cryptoCode = tx.cryptoCode + const walletPlugin = walletPlugins[cryptoCode] + + walletPlugin.getStatus(tx, function (err, status) { + if (err) return logger.error(err) + if (tx.status === status) return + db.updateTxStatus(tx, status, function (_err) { + if (_err) return logger.error(err) + }) + }) +} + +function monitorLiveIncoming () { + const statuses = ['notSeen', 'published'] + db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) { + if (err) return + txs.forEach(processTxStatus) + }) +} + +function monitorIncoming () { + const statuses = ['notSeen', 'published', 'authorized', 'rejected'] + db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) { + if (err) return + txs.forEach(processTxStatus) + }) +} + /* * Polling livecycle */ @@ -452,6 +484,8 @@ exports.startPolling = function startPolling () { }) setInterval(reapTxs, REAP_RATE) + setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL) + setInterval(monitorIncoming, INCOMING_TX_INTERVAL) } function startTrader (cryptoCode) { diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index 8c80d205..0701b781 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -555,19 +555,6 @@ function updateDispense (client, session, dispensed, cb) { }) } -exports.updateAuthorized = function updateAuthorized (session, cb) { - var sql = 'UPDATE transactions SET dispense=$1 ' + - 'WHERE stage=$2 AND authority=$3 AND device_fingerprint=$4 AND session_id=$5' - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - var values = [true, 'initial_request', 'deposit', session.fingerprint, session.id] - query(client, sql, values, function (err) { - done(err) - cb(err) - }) - }) -} - exports.addDispense = function addDispense (session, tx, cartridges) { connect(function (cerr, client, done) { if (cerr) return @@ -647,6 +634,35 @@ exports.machineEvents = function machineEvents (cb) { }) } +exports.fetchOpenTxs = function fetchOpenTxs (statuses, age, cb) { + var sql = 'SELECT *, (EXTRACT(EPOCH FROM (now() - created))) * 1000 AS age ' + + 'FROM transactions ' + + 'WHERE incoming=$1 age<$2 AND status IN $3' + + connect(function (cerr, client, done) { + if (cerr) return cb(cerr) + + query(client, sql, [true, age, statuses], function (err, results) { + done() + if (err) return cb(err) + cb(null, results) + }) + }) +} + +exports.updateTxStatus = function updateTxStatus (tx, status, cb) { + var sql = 'UPDATE transactions SET status=$1 WHERE id=$2' + + connect(function (cerr, client, done) { + if (cerr) return cb(cerr) + var values = [status, tx.id] + query(client, sql, values, function (err) { + done(err) + cb(err) + }) + }) +} + /* exports.init('postgres://lamassu:lamassu@localhost/lamassu') connect(function(err, client, done) { diff --git a/migrations/008-add-two-way.js b/migrations/008-add-two-way.js index 1d96df5d..c8c1d6a8 100644 --- a/migrations/008-add-two-way.js +++ b/migrations/008-add-two-way.js @@ -1,9 +1,15 @@ var db = require('./db') +function singleQuotify (item) { return '\'' + item + '\'' } + exports.up = function (next) { + var statuses = ['notSeen', 'published', 'authorized', 'confirmed', 'rejected'] + .map(singleQuotify).join(',') + var sql = [ + 'create type status_stage AS enum (' + statuses + ')', 'alter table transactions add dispensed boolean NOT NULL DEFAULT false', - 'alter table transactions add authorized boolean NOT NULL DEFAULT false' + 'alter table transactions add status status_stage NOT NULL DEFAULT \'notSeen\'' ] db.multi(sql, next) } diff --git a/todo.txt b/todo.txt index 2e4394cf..0ff41731 100644 --- a/todo.txt +++ b/todo.txt @@ -1 +1,5 @@ - update tx statuses in background + - pull all unpublished and unauthorized txs in last x seconds + - pull all unconfirmed txs + - for each tx: poll wallet plugin to check status + - update db