implement confirmation sms, bug fixes

This commit is contained in:
Josh Harvey 2016-05-09 18:40:31 +03:00
parent 380f47082e
commit 014033b73e
5 changed files with 134 additions and 18 deletions

View file

@ -22,6 +22,9 @@ var PENDING_TIMEOUT = 70 * 1000
var INCOMING_TX_INTERVAL = 5 * 1000
var LIVE_INCOMING_TX_INTERVAL = 30 * 1000
var STALE_INCOMING_TX_AGE = 7 * 24 * 60 * 60 * 1000
var UNNOTIFIED_INTERVAL = 60 * 1000
var MAX_NOTIFY_AGE = 48 * 60 * 60 * 1000
var MIN_NOTIFY_AGE = 5 * 60 * 1000
if (argv.timeout) PENDING_TIMEOUT = argv.timeout / 1000
@ -442,21 +445,42 @@ exports.fiatBalance = function fiatBalance (cryptoCode) {
}
function processTxStatus (tx) {
const cryptoCode = tx.cryptoCode
const walletPlugin = walletPlugins[cryptoCode]
return new Promise((resolve, reject) => {
const cryptoCode = tx.cryptoCode
const walletPlugin = walletPlugins[cryptoCode]
walletPlugin.getStatus(tx.toAddress, tx.cryptoAtoms, function (err, res) {
if (err) return logger.error(err)
console.log('DEBUG5')
console.log(res)
var status = res.status
if (tx.status === status) return
db.updateTxStatus(tx, status, function (_err) {
if (_err) return logger.error(err)
walletPlugin.getStatus(tx.toAddress, tx.cryptoAtoms, function (err, res) {
if (err) {
logger.error(err)
return resolve() // Resolve on error because we ignore errors
}
var status = res.status
if (tx.status === status) return resolve()
db.updateTxStatus(tx, status, function (_err) {
if (_err) logger.error(err)
resolve()
})
})
})
}
function notifyConfirmation (tx) {
logger.debug('notifyConfirmation\n%j', tx)
const phone = tx.phone
const rec = {
sms: {
toNumber: phone,
body: 'Your cash is waiting! Go to the Cryptomat and press Redeem.'
}
}
return smsPlugin.sendMessage(rec)
.then(() => db.updateNotify(tx))
}
function monitorLiveIncoming () {
const statuses = ['notSeen', 'published']
db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) {
@ -469,7 +493,17 @@ function monitorIncoming () {
const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected']
db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) {
if (err) return
txs.forEach(processTxStatus)
const promises = txs.map(tx => processTxStatus(tx))
return Promise.all(promises)
.then(monitorUnnotified)
})
}
function monitorUnnotified () {
db.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE, function (err, txs) {
if (err) return
txs.forEach(notifyConfirmation)
})
}
@ -488,6 +522,11 @@ exports.startPolling = function startPolling () {
setInterval(reapTxs, REAP_RATE)
setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL)
setInterval(monitorIncoming, INCOMING_TX_INTERVAL)
setInterval(monitorUnnotified, UNNOTIFIED_INTERVAL)
monitorLiveIncoming()
monitorIncoming()
monitorUnnotified()
}
function startTrader (cryptoCode) {
@ -763,6 +802,10 @@ exports.updatePhone = function updatePhone (session, tx) {
return db.addIncomingPhone(session, tx)
}
exports.registerRedeem = function registerRedeem (session) {
return db.updateRedeem(session)
}
exports.fetchPhoneTx = function fetchPhoneTx (phone) {
db.fetchPhoneTxs(phone)
.then(txs => {

View file

@ -511,12 +511,14 @@ function normalizeTxs (txs) {
tx.txHash = tx.tx_hash
tx.cryptoCode = tx.crypto_code
tx.cryptoAtoms = new BigNumber(tx.satoshis)
tx.sessionId = tx.session_id
tx.to_address = undefined
tx.currency_code = undefined
tx.tx_hash = undefined
tx.crypto_code = undefined
tx.satoshis = undefined
tx.session_id = undefined
return tx
})
@ -525,12 +527,14 @@ function normalizeTxs (txs) {
exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) {
var sql = 'SELECT * FROM transactions ' +
'WHERE phone=$1 AND dispensed=$2 ' +
'AND (EXTRACT(EPOCH FROM (now() - created))) * 1000 < $1'
'AND (EXTRACT(EPOCH FROM (now() - created))) * 1000 < $1 ' +
'AND stage=$3 AND authority=$4 AND incoming=$5'
return new Promise((resolve, reject) => {
connect(function (cerr, client, done) {
if (cerr) return reject(cerr)
query(client, sql, [phone, false, dispenseTimeout], function (err, results) {
var values = [phone, false, dispenseTimeout, 'initial_request', 'pending', true]
query(client, sql, values, function (err, results) {
done()
if (err) return reject(err)
resolve(normalizeTxs(results.rows))
@ -542,12 +546,12 @@ exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) {
exports.fetchTx = function fetchTx (session) {
var sql = 'SELECT * FROM transactions ' +
'WHERE device_fingerprint=$1 AND session_id=$2 ' +
'AND stage=$3 AND authority=$4'
'AND stage=$3 AND authority=$4 and incoming=$5'
return new Promise((resolve, reject) => {
connect(function (cerr, client, done) {
if (cerr) return reject(cerr)
var values = [session.fingerprint, session.id, 'initial_request', 'pending']
var values = [session.fingerprint, session.id, 'initial_request', 'pending', true]
query(client, sql, values, function (err, results) {
done()
if (err) return reject(err)
@ -578,8 +582,9 @@ exports.addDispenseRequest = function addDispenseRequest (session, tx) {
function updateDispense (client, session, dispensed, cb) {
var sql = 'UPDATE transactions SET dispensed=$1 ' +
'WHERE stage=$2 AND authority=$3 AND device_fingerprint=$4 AND session_id=$5'
var values = [dispensed, 'initial_request', 'pending', session.fingerprint, session.id]
'WHERE stage=$2 AND authority=$3 AND device_fingerprint=$4 AND ' +
'session_id=$5 AND incoming=$6'
var values = [dispensed, 'initial_request', 'pending', session.fingerprint, session.id, true]
query(client, sql, values, function (err) {
cb(err)
})
@ -686,6 +691,28 @@ exports.fetchOpenTxs = function fetchOpenTxs (statuses, age, cb) {
})
}
exports.fetchUnnotifiedTxs = function fetchUnnotifiedTxs (age, waitPeriod, cb) {
var sql = 'SELECT * ' +
'FROM transactions ' +
'WHERE incoming=$1 AND ' +
'((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$2 ' +
'AND stage=$3 AND authority=$4 AND notified=$5 AND dispensed=$6 ' +
'AND phone IS NOT NULL ' +
"AND status IN ('instant', 'confirmed') " +
'AND (redeem=$7 OR ((EXTRACT(EPOCH FROM (now() - created))) * 1000)>$8)'
connect(function (cerr, client, done) {
if (cerr) return cb(cerr)
var values = [true, age, 'initial_request', 'pending', false, false, true, waitPeriod]
query(client, sql, values, function (err, results) {
done()
if (err) return cb(err)
cb(null, normalizeTxs(results.rows))
})
})
}
exports.updateTxStatus = function updateTxStatus (tx, status, cb) {
var sql = 'UPDATE transactions SET status=$1 WHERE id=$2'
@ -699,6 +726,38 @@ exports.updateTxStatus = function updateTxStatus (tx, status, cb) {
})
}
exports.updateRedeem = function updateRedeem (session, cb) {
var sql = 'UPDATE transactions SET redeem=$1 ' +
'WHERE incoming=$1 AND device_fingerprint=$2 AND session_id=$3 ' +
'AND stage=$4 AND authority=$5'
connect(function (cerr, client, done) {
if (cerr) return cb(cerr)
var values = [true, true, session.fingerprint, session.id, 'initial_request', 'pending']
query(client, sql, values, function (err) {
done(err)
cb(err)
})
})
}
exports.updateNotify = function updateNotify (tx) {
var sql = 'UPDATE transactions SET notified=$1 ' +
'WHERE id=$2'
return new Promise((resolve, reject) => {
connect(function (cerr, client, done) {
if (cerr) return reject(cerr)
var values = [true, tx.id]
query(client, sql, values, function (err) {
done(err)
if (err) return reject(err)
resolve()
})
})
})
}
/*
exports.init('postgres://lamassu:lamassu@localhost/lamassu')
connect(function(err, client, done) {

View file

@ -242,6 +242,15 @@ function fetchPhoneTx (req, res) {
})
}
function registerRedeem (req, res) {
return plugins.registerRedeem(session(req))
.then(r => res.json())
.catch(err => {
logger.error(err)
res.send(500)
})
}
function waitForDispense (req, res) {
return plugins.fetchTx(session(req))
.then(tx => {
@ -292,6 +301,7 @@ function init (localConfig) {
app.post('/phone_code', authMiddleware, phoneCode)
app.post('/update_phone', authMiddleware, updatePhone)
app.get('/phone_tx', authMiddleware, fetchPhoneTx)
app.post('/register_redeem', authMiddleware, registerRedeem)
app.get('/await_dispense', authMiddleware, waitForDispense)
app.post('/dispense', authMiddleware, dispense)