diff --git a/lib/plugins.js b/lib/plugins.js index f938da63..39d1b326 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -22,6 +22,9 @@ var PENDING_TIMEOUT = 70 * 1000 var INCOMING_TX_INTERVAL = 5 * 1000 var LIVE_INCOMING_TX_INTERVAL = 30 * 1000 var STALE_INCOMING_TX_AGE = 7 * 24 * 60 * 60 * 1000 +var UNNOTIFIED_INTERVAL = 60 * 1000 +var MAX_NOTIFY_AGE = 48 * 60 * 60 * 1000 +var MIN_NOTIFY_AGE = 5 * 60 * 1000 if (argv.timeout) PENDING_TIMEOUT = argv.timeout / 1000 @@ -442,21 +445,42 @@ exports.fiatBalance = function fiatBalance (cryptoCode) { } function processTxStatus (tx) { - const cryptoCode = tx.cryptoCode - const walletPlugin = walletPlugins[cryptoCode] + return new Promise((resolve, reject) => { + const cryptoCode = tx.cryptoCode + const walletPlugin = walletPlugins[cryptoCode] - walletPlugin.getStatus(tx.toAddress, tx.cryptoAtoms, function (err, res) { - if (err) return logger.error(err) - console.log('DEBUG5') - console.log(res) - var status = res.status - if (tx.status === status) return - db.updateTxStatus(tx, status, function (_err) { - if (_err) return logger.error(err) + walletPlugin.getStatus(tx.toAddress, tx.cryptoAtoms, function (err, res) { + if (err) { + logger.error(err) + return resolve() // Resolve on error because we ignore errors + } + + var status = res.status + if (tx.status === status) return resolve() + + db.updateTxStatus(tx, status, function (_err) { + if (_err) logger.error(err) + resolve() + }) }) }) } +function notifyConfirmation (tx) { + logger.debug('notifyConfirmation\n%j', tx) + + const phone = tx.phone + const rec = { + sms: { + toNumber: phone, + body: 'Your cash is waiting! Go to the Cryptomat and press Redeem.' + } + } + + return smsPlugin.sendMessage(rec) + .then(() => db.updateNotify(tx)) +} + function monitorLiveIncoming () { const statuses = ['notSeen', 'published'] db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) { @@ -469,7 +493,17 @@ function monitorIncoming () { const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected'] db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) { if (err) return - txs.forEach(processTxStatus) + const promises = txs.map(tx => processTxStatus(tx)) + + return Promise.all(promises) + .then(monitorUnnotified) + }) +} + +function monitorUnnotified () { + db.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE, function (err, txs) { + if (err) return + txs.forEach(notifyConfirmation) }) } @@ -488,6 +522,11 @@ exports.startPolling = function startPolling () { setInterval(reapTxs, REAP_RATE) setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL) setInterval(monitorIncoming, INCOMING_TX_INTERVAL) + setInterval(monitorUnnotified, UNNOTIFIED_INTERVAL) + + monitorLiveIncoming() + monitorIncoming() + monitorUnnotified() } function startTrader (cryptoCode) { @@ -763,6 +802,10 @@ exports.updatePhone = function updatePhone (session, tx) { return db.addIncomingPhone(session, tx) } +exports.registerRedeem = function registerRedeem (session) { + return db.updateRedeem(session) +} + exports.fetchPhoneTx = function fetchPhoneTx (phone) { db.fetchPhoneTxs(phone) .then(txs => { diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index 12d9e249..1e442426 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -511,12 +511,14 @@ function normalizeTxs (txs) { tx.txHash = tx.tx_hash tx.cryptoCode = tx.crypto_code tx.cryptoAtoms = new BigNumber(tx.satoshis) + tx.sessionId = tx.session_id tx.to_address = undefined tx.currency_code = undefined tx.tx_hash = undefined tx.crypto_code = undefined tx.satoshis = undefined + tx.session_id = undefined return tx }) @@ -525,12 +527,14 @@ function normalizeTxs (txs) { exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) { var sql = 'SELECT * FROM transactions ' + 'WHERE phone=$1 AND dispensed=$2 ' + - 'AND (EXTRACT(EPOCH FROM (now() - created))) * 1000 < $1' + 'AND (EXTRACT(EPOCH FROM (now() - created))) * 1000 < $1 ' + + 'AND stage=$3 AND authority=$4 AND incoming=$5' return new Promise((resolve, reject) => { connect(function (cerr, client, done) { if (cerr) return reject(cerr) - query(client, sql, [phone, false, dispenseTimeout], function (err, results) { + var values = [phone, false, dispenseTimeout, 'initial_request', 'pending', true] + query(client, sql, values, function (err, results) { done() if (err) return reject(err) resolve(normalizeTxs(results.rows)) @@ -542,12 +546,12 @@ exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) { exports.fetchTx = function fetchTx (session) { var sql = 'SELECT * FROM transactions ' + 'WHERE device_fingerprint=$1 AND session_id=$2 ' + - 'AND stage=$3 AND authority=$4' + 'AND stage=$3 AND authority=$4 and incoming=$5' return new Promise((resolve, reject) => { connect(function (cerr, client, done) { if (cerr) return reject(cerr) - var values = [session.fingerprint, session.id, 'initial_request', 'pending'] + var values = [session.fingerprint, session.id, 'initial_request', 'pending', true] query(client, sql, values, function (err, results) { done() if (err) return reject(err) @@ -578,8 +582,9 @@ exports.addDispenseRequest = function addDispenseRequest (session, tx) { function updateDispense (client, session, dispensed, cb) { var sql = 'UPDATE transactions SET dispensed=$1 ' + - 'WHERE stage=$2 AND authority=$3 AND device_fingerprint=$4 AND session_id=$5' - var values = [dispensed, 'initial_request', 'pending', session.fingerprint, session.id] + 'WHERE stage=$2 AND authority=$3 AND device_fingerprint=$4 AND ' + + 'session_id=$5 AND incoming=$6' + var values = [dispensed, 'initial_request', 'pending', session.fingerprint, session.id, true] query(client, sql, values, function (err) { cb(err) }) @@ -686,6 +691,28 @@ exports.fetchOpenTxs = function fetchOpenTxs (statuses, age, cb) { }) } +exports.fetchUnnotifiedTxs = function fetchUnnotifiedTxs (age, waitPeriod, cb) { + var sql = 'SELECT * ' + + 'FROM transactions ' + + 'WHERE incoming=$1 AND ' + + '((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$2 ' + + 'AND stage=$3 AND authority=$4 AND notified=$5 AND dispensed=$6 ' + + 'AND phone IS NOT NULL ' + + "AND status IN ('instant', 'confirmed') " + + 'AND (redeem=$7 OR ((EXTRACT(EPOCH FROM (now() - created))) * 1000)>$8)' + + connect(function (cerr, client, done) { + if (cerr) return cb(cerr) + + var values = [true, age, 'initial_request', 'pending', false, false, true, waitPeriod] + query(client, sql, values, function (err, results) { + done() + if (err) return cb(err) + cb(null, normalizeTxs(results.rows)) + }) + }) +} + exports.updateTxStatus = function updateTxStatus (tx, status, cb) { var sql = 'UPDATE transactions SET status=$1 WHERE id=$2' @@ -699,6 +726,38 @@ exports.updateTxStatus = function updateTxStatus (tx, status, cb) { }) } +exports.updateRedeem = function updateRedeem (session, cb) { + var sql = 'UPDATE transactions SET redeem=$1 ' + + 'WHERE incoming=$1 AND device_fingerprint=$2 AND session_id=$3 ' + + 'AND stage=$4 AND authority=$5' + + connect(function (cerr, client, done) { + if (cerr) return cb(cerr) + var values = [true, true, session.fingerprint, session.id, 'initial_request', 'pending'] + query(client, sql, values, function (err) { + done(err) + cb(err) + }) + }) +} + +exports.updateNotify = function updateNotify (tx) { + var sql = 'UPDATE transactions SET notified=$1 ' + + 'WHERE id=$2' + + return new Promise((resolve, reject) => { + connect(function (cerr, client, done) { + if (cerr) return reject(cerr) + var values = [true, tx.id] + query(client, sql, values, function (err) { + done(err) + if (err) return reject(err) + resolve() + }) + }) + }) +} + /* exports.init('postgres://lamassu:lamassu@localhost/lamassu') connect(function(err, client, done) { diff --git a/lib/routes.js b/lib/routes.js index a1b58aab..52c918aa 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -242,6 +242,15 @@ function fetchPhoneTx (req, res) { }) } +function registerRedeem (req, res) { + return plugins.registerRedeem(session(req)) + .then(r => res.json()) + .catch(err => { + logger.error(err) + res.send(500) + }) +} + function waitForDispense (req, res) { return plugins.fetchTx(session(req)) .then(tx => { @@ -292,6 +301,7 @@ function init (localConfig) { app.post('/phone_code', authMiddleware, phoneCode) app.post('/update_phone', authMiddleware, updatePhone) app.get('/phone_tx', authMiddleware, fetchPhoneTx) + app.post('/register_redeem', authMiddleware, registerRedeem) app.get('/await_dispense', authMiddleware, waitForDispense) app.post('/dispense', authMiddleware, dispense) diff --git a/migrations/008-add-two-way.js b/migrations/008-add-two-way.js index 8da9b9ae..4f9a00d6 100644 --- a/migrations/008-add-two-way.js +++ b/migrations/008-add-two-way.js @@ -10,6 +10,8 @@ exports.up = function (next) { var sql = [ 'create type status_stage AS enum (' + statuses + ')', 'alter table transactions add dispensed boolean NOT NULL DEFAULT false', + 'alter table transactions add notified boolean NOT NULL DEFAULT false', + 'alter table transactions add redeem boolean NOT NULL DEFAULT false', 'alter table transactions add status status_stage NOT NULL DEFAULT \'notSeen\'' ] db.multi(sql, next) diff --git a/todo.txt b/todo.txt index 24401ab7..2d841d29 100644 --- a/todo.txt +++ b/todo.txt @@ -1,2 +1,4 @@ - change satoshis to crypto_atoms in db (ask neal about this) -- start testing + +- on upgrade, make sure we're not sending out lots of notifications +- fix plugin reloading, in current version and master