improve cash-in error handling

This commit is contained in:
Josh Harvey 2017-04-23 16:37:25 +03:00
parent 86adfd0a63
commit 84a93599f5
9 changed files with 263 additions and 47 deletions

View file

@ -2,13 +2,17 @@ const _ = require('lodash/fp')
const pgp = require('pg-promise')()
const db = require('./db')
const BN = require('./bn')
const plugins = require('./plugins')
const logger = require('./logger')
const mapValuesWithKey = _.mapValues.convert({cap: false})
module.exports = {post}
module.exports = {post, monitorPending}
const UPDATEABLE_FIELDS = ['fee', 'txHash', 'phone', 'error', 'send',
'cryptoAtoms', 'fiat', 'timedout']
const PENDING_INTERVAL = '1 day'
const MAX_PENDING = 10
function post (tx, pi) {
const TransactionMode = pgp.txMode.TransactionMode
@ -24,7 +28,10 @@ function post (tx, pi) {
.then(row => {
return t.any(sql2, [tx.id])
.then(billRows => {
return upsert(row, tx)
const oldTx = toObj(row)
return preProcess(oldTx, tx, pi)
.then(preProcessedTx => upsert(oldTx, preProcessedTx))
.then(vector => {
return insertNewBills(billRows, tx)
.then(newBills => _.concat(vector, [newBills]))
@ -122,10 +129,7 @@ function insertNewBills (billRows, tx) {
.then(() => bills)
}
function upsert (row, tx) {
console.log('DEBUG501: %j', row)
const oldTx = toObj(row)
function upsert (oldTx, tx) {
if (!oldTx) {
console.log('DEBUG500: %j', tx)
return insert(tx)
@ -162,24 +166,81 @@ function registerTrades (pi, txVector) {
_.forEach(bill => pi.buy(bill), newBills)
}
function logAction (rec, tx) {
const action = {
tx_id: tx.id,
action: 'sendCoins',
error: rec.error,
error_code: rec.errorCode,
tx_hash: rec.txHash,
success: rec.sendConfirmed === true
}
const sql = pgp.helpers.insert(action, null, 'cash_in_actions')
return db.none(sql)
.then(_.constant(rec))
}
function postProcess (txVector, pi) {
const [oldTx, newTx] = txVector
registerTrades(pi, txVector)
if (newTx.send && !oldTx.send) {
const isClearToSend = newTx.send &&
!oldTx.sendPending &&
!oldTx.sendConfirmed
if (isClearToSend) {
return pi.sendCoins(newTx)
.then(txHash => ({
txHash,
sendConfirmed: true,
sendTime: 'now()^'
sendTime: 'now()^',
sendPending: false,
error: null,
errorCode: null
}))
.catch(err => ({
sendTime: 'now()^',
error: err.message,
errorCode: err.name
errorCode: err.name,
sendPending: false
}))
.then(r => logAction(r, newTx))
}
return Promise.resolve({})
}
function preProcess (oldTx, newTx, pi) {
return new Promise(resolve => {
if (!oldTx) return resolve(newTx)
if (newTx.send && !oldTx.send) return resolve(_.set('sendPending', true, newTx))
return resolve(newTx)
})
}
function monitorPending (settings) {
const sql = `select * from cash_in_txs
where created > now() - interval $1
and send
and not send_confirmed
and not send_pending
and not operator_completed
order by created
limit $2`
const processPending = row => {
const tx = toObj(row)
const pi = plugins(settings, tx.deviceId)
return post(tx, pi)
.catch(logger.error)
}
return db.any(sql, [PENDING_INTERVAL, MAX_PENDING])
.then(_.tap(console.log))
.then(rows => Promise.all(_.map(processPending, rows)))
.catch(logger.error)
}

View file

@ -49,7 +49,60 @@ function post (tx, pi) {
const [, newTx] = txVector
return postProcess(txVector, pi)
.then(changes => update(newTx, changes))
.then(savedTx => {
return logAction(tx, savedTx)
.then(_.constant(savedTx))
})
})
}
function logError (action, err) {
return logAction({
action,
error: err.message,
error_code: err.name
})
}
function mapDispense (tx) {
const bills = tx.bills
if (_.isEmpty(bills)) return {}
const dispense = {
provisioned_1: bills[0].provisioned,
provisioned_2: bills[1].provisioned,
dispensed_1: bills[0].actualDispense,
dispensed_2: bills[1].actualDispense,
rejected_1: bills[0].rejected,
rejected_2: bills[1].rejected,
denomination_1: bills[0].denomination,
denomination_2: bills[1].denomination
}
return dispense
}
function logDispense (tx) {
const baseRec = {error: tx.error, errorCode: tx.errorCode}
const rec = _.merge(mapDispense(tx), baseRec)
return logAction('dispense', rec, tx)
}
function logActionById (action, _rec, txId) {
const rec = _.assign(_rec, {action, tx_id: txId, redeem: false})
const sql = pgp.helpers.insert(rec, null, 'cash_out_actions')
return db.none(sql)
}
function logAction (action, _rec, tx) {
const rec = _.assign(_rec, {action, tx_id: tx.id, redeem: tx.redeem})
const sql = pgp.helpers.insert(rec, null, 'cash_out_actions')
return db.none(sql)
.then(_.constant(tx))
}
function nilEqual (a, b) {
@ -95,8 +148,6 @@ function toObj (row) {
}
function upsert (oldTx, tx) {
// insert bills
if (!oldTx) {
return insert(tx)
.then(newTx => [oldTx, newTx])
@ -106,24 +157,6 @@ function upsert (oldTx, tx) {
.then(newTx => [oldTx, newTx])
}
function mapDispense (tx) {
const bills = tx.bills
if (_.isEmpty(bills)) return tx
const extra = {
dispensed1: bills[0].actualDispense,
dispensed2: bills[1].actualDispense,
rejected1: bills[0].rejected,
rejected2: bills[1].rejected,
denomination1: bills[0].denomination,
denomination2: bills[1].denomination,
'dispenseTime^': 'NOW()'
}
return _.assign(tx, extra)
}
function convertBigNumFields (obj) {
const convert = (value, key) => _.includes(key, ['cryptoAtoms', 'fiat'])
? value.toString()
@ -141,7 +174,7 @@ function convertField (key) {
}
function toDb (tx) {
const massager = _.flow(convertBigNumFields, mapDispense, _.omit(['direction', 'bills']), _.mapKeys(convertField))
const massager = _.flow(convertBigNumFields, _.omit(['direction', 'bills']), _.mapKeys(convertField))
return massager(tx)
}
@ -175,17 +208,47 @@ function nextHd (isHd, tx) {
.then(row => _.set('hdIndex', row.hd_index, tx))
}
function preProcess (tx, newTx, pi) {
if (!tx) {
function preProcess (oldTx, newTx, pi) {
if (!oldTx) {
return pi.isHd(newTx)
.then(isHd => nextHd(isHd, newTx))
.then(newTxHd => {
return pi.newAddress(newTxHd)
.then(_.set('toAddress', _, newTxHd))
})
.then(addressedTx => {
const rec = {toAddress: addressedTx.toAddress}
return logAction('provisionAddress', rec, addressedTx)
})
.catch(err => {
return logError('provisionAddress', err)
.then(() => { throw err })
})
}
return Promise.resolve(updateStatus(tx, newTx))
return Promise.resolve(updateStatus(oldTx, newTx))
.then(updatedTx => {
if (!oldTx) return updatedTx
if (updatedTx.status !== oldTx.status) {
return logAction(updatedTx.status, {}, updatedTx)
}
if (_.isNil(oldTx.dispenseConfirmed) && _.isBoolean(updatedTx.dispenseConfirmed)) {
return logDispense(updatedTx)
.then(pi.updateCassettes(updatedTx))
}
if (!oldTx.phone && newTx.phone) {
return logAction('addPhone', {}, updatedTx)
}
if (!oldTx.redeem && newTx.redeem) {
return logAction('redeemLater', {}, updatedTx)
}
return updatedTx
})
}
function postProcess (txVector, pi) {
@ -197,6 +260,14 @@ function postProcess (txVector, pi) {
pi.sell(newTx)
return _.set('bills', billMath.makeChange(cartridges.cartridges, newTx.fiat), newTx)
})
.then(tx => {
const rec = {provisioned_1: tx.bills[0], provisioned_2: tx.bills[1]}
return logAction('provisionNotes', rec, tx)
})
.catch(err => {
return logError('provisionNotes', err)
.then(() => { throw err })
})
}
return Promise.resolve(newTx)
@ -277,7 +348,7 @@ function monitorUnnotified (settings) {
function cancel (txId) {
const updateRec = {
'dispense_time': 'now()^',
dispense_error: 'Operator cancel',
error: 'Operator cancel',
dispensed: true
}
@ -290,4 +361,5 @@ function cancel (txId) {
.then(res => {
if (res.rowCount !== 1) throw new Error('No such tx-id')
})
.then(() => logActionById('operatorCompleted', {}, txId))
}

View file

@ -509,6 +509,20 @@ function plugins (settings, deviceId) {
.catch(err => logger.error(err))
}
function updateCassettes (tx) {
// Note: This is the only place we update config from outside admin,
// so should be safe even though it's not an atomic operation.
//
// However, we should make all config changes atomic in the future.
const config = configManager.machineScoped(deviceId, settings.config)
config.topCashOutDenomination -= tx.bills[0].actualDispense +
tx.bills[0].rejected
config.bottomCashOutDenomination -= tx.bills[1].actualDispense +
tx.bills[1].rejected
// save
}
return {
pollQueries,
sendCoins,
@ -525,6 +539,7 @@ function plugins (settings, deviceId) {
sendMessage,
checkBalances,
buildCartridges,
updateCassettes,
buy,
sell
}

View file

@ -19,17 +19,27 @@ function balance (account, cryptoCode) {
})
}
function isInsufficient (cryptoCode) {
if (cryptoCode === 'BTC') return BN(1e5 * 10)
if (cryptoCode === 'ETH') return BN(1e18 * 0.25)
// Note: This makes it easier to test insufficient funds errors
let sendCount = 0
function isInsufficient (cryptoAtoms, cryptoCode) {
if (cryptoCode === 'BTC') return cryptoAtoms.gt(1e5 * 10 * sendCount)
if (cryptoCode === 'ETH') return cryptoAtoms.gt(1e18 * 0.25 * sendCount)
throw new Error('Unsupported crypto: ' + cryptoCode)
}
function sendCoins (account, toAddress, cryptoAtoms, cryptoCode) {
sendCount++
return new Promise((resolve, reject) => {
setTimeout(() => {
if (isInsufficient(cryptoAtoms, cryptoCode)) {
console.log('[%s] DEBUG: Mock wallet insufficient funds: %s',
cryptoCode, cryptoAtoms.toString())
return reject(new E.InsufficientFundsError())
}
console.log('[%s] DEBUG: Mock wallet sending %s cryptoAtoms to %s',
cryptoCode, cryptoAtoms.toString(), toAddress)
if (isInsufficient(cryptoCode)) return reject(new E.InsufficientFundsError())
return resolve('<txHash>')
}, 2000)
})

View file

@ -3,6 +3,7 @@ const notifier = require('./notifier')
const T = require('./time')
const logger = require('./logger')
const cashOutTx = require('./cash-out-tx')
const cashInTx = require('./cash-in-tx')
const INCOMING_TX_INTERVAL = 30 * T.seconds
const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds
@ -12,6 +13,7 @@ const TRADE_INTERVAL = 10 * T.seconds
const PONG_INTERVAL = 10 * T.seconds
const PONG_CLEAR_INTERVAL = 1 * T.day
const CHECK_NOTIFICATION_INTERVAL = 30 * T.seconds
const PENDING_INTERVAL = 10 * T.seconds
let _pi, _settings
@ -40,6 +42,7 @@ function start (__settings) {
setInterval(() => cashOutTx.monitorLiveIncoming(settings()), LIVE_INCOMING_TX_INTERVAL)
setInterval(() => cashOutTx.monitorStaleIncoming(settings()), INCOMING_TX_INTERVAL)
setInterval(() => cashOutTx.monitorUnnotified(settings()), UNNOTIFIED_INTERVAL)
setInterval(() => cashInTx.monitorPending(settings()), PENDING_INTERVAL)
setInterval(() => pi().sweepHd(), SWEEP_HD_INTERVAL)
setInterval(() => pi().pong(), PONG_INTERVAL)
setInterval(() => pi().pongClear(), PONG_CLEAR_INTERVAL)

View file

@ -15,7 +15,6 @@ const plugins = require('./plugins')
const helpers = require('./route-helpers')
const poller = require('./poller')
const Tx = require('./tx')
const E = require('./error')
const argv = require('minimist')(process.argv.slice(2))
@ -104,14 +103,7 @@ function postTx (req, res, next) {
return Tx.post(_.set('deviceId', req.deviceId, req.body), pi)
.then(tx => {
if (tx.errorCode) {
console.log('DEBUG100: %s, %s', tx.errorCode, E.InsufficientFundsError.name)
if (tx.errorCode === E.InsufficientFundsError.code) {
throw httpError(tx.error, 570)
}
throw httpError(tx.error, 500)
}
if (tx.errorCode) throw httpError(tx.error, 500)
return res.json(tx)
})
.catch(next)

View file

@ -7,6 +7,8 @@ exports.up = function (next) {
'alter table cash_in_txs add column timedout boolean not null default false',
'alter table cash_in_txs add column send_time timestamptz',
'alter table cash_in_txs add column error_code text',
'alter table cash_in_txs add column operator_completed boolean not null default false',
'alter table cash_in_txs add column send_pending boolean not null default false',
'alter table cash_out_txs add column device_time bigint not null',
'alter table cash_out_txs add column timedout boolean not null default false'
]

View file

@ -0,0 +1,21 @@
var db = require('./db')
exports.up = function (next) {
var sql = [
`create table cash_in_actions (
id serial primary key,
tx_id uuid references cash_in_txs not null,
action text not null,
error text,
error_code text,
tx_hash text,
success boolean not null,
created timestamptz not null default now()
)`
]
db.multi(sql, next)
}
exports.down = function (next) {
next()
}

View file

@ -0,0 +1,40 @@
var db = require('./db')
exports.up = function (next) {
var sql = [
`create table cash_out_actions (
id serial primary key,
tx_id uuid references cash_out_txs not null,
action text not null,
to_address text,
error text,
error_code text,
tx_hash text,
success boolean not null,
provisioned_1 integer,
provisioned_2 integer,
dispensed_1 integer,
dispensed_2 integer,
rejected_1 integer,
rejected_2 integer,
denomination_1 integer,
denomination_2 integer,
redeem boolean not null default false,
device_time bigint,
created timestamptz not null default now()
)`,
'alter table cash_out_txs drop column dispensed_1',
'alter table cash_out_txs drop column dispensed_2',
'alter table cash_out_txs drop column rejected_1',
'alter table cash_out_txs drop column rejected_2',
'alter table cash_out_txs drop column denomination_1',
'alter table cash_out_txs drop column denomination_2',
'alter table cash_out_txs drop column dispense_error',
'alter table cash_out_txs add column dispense_confirmed boolean default false'
]
db.multi(sql, next)
}
exports.down = function (next) {
next()
}