Fix cash-in-tx bugs

This commit is contained in:
Josh Harvey 2017-04-25 20:29:12 +03:00
parent 999c834ed7
commit 9783591332
2 changed files with 83 additions and 47 deletions

View file

@ -12,7 +12,7 @@ module.exports = {post, monitorPending}
const PENDING_INTERVAL = '1 day' const PENDING_INTERVAL = '1 day'
const MAX_PENDING = 10 const MAX_PENDING = 10
function post (tx, pi) { function atomic (machineTx, pi) {
const TransactionMode = pgp.txMode.TransactionMode const TransactionMode = pgp.txMode.TransactionMode
const isolationLevel = pgp.txMode.isolationLevel const isolationLevel = pgp.txMode.isolationLevel
const tmSRD = new TransactionMode({tiLevel: isolationLevel.serializable}) const tmSRD = new TransactionMode({tiLevel: isolationLevel.serializable})
@ -21,17 +21,17 @@ function post (tx, pi) {
const sql = 'select * from cash_in_txs where id=$1' const sql = 'select * from cash_in_txs where id=$1'
const sql2 = 'select * from bills where cash_in_txs_id=$1' const sql2 = 'select * from bills where cash_in_txs_id=$1'
return t.oneOrNone(sql, [tx.id]) return t.oneOrNone(sql, [machineTx.id])
.then(row => { .then(row => {
return t.any(sql2, [tx.id]) return t.any(sql2, [machineTx.id])
.then(billRows => { .then(billRows => {
const oldTx = toObj(row) const dbTx = toObj(row)
return preProcess(oldTx, tx, pi) return preProcess(dbTx, machineTx, pi)
.then(preProcessedTx => upsert(oldTx, preProcessedTx)) .then(preProcessedTx => upsert(dbTx, preProcessedTx))
.then(vector => { .then(vector => {
return insertNewBills(billRows, tx) return insertNewBills(billRows, machineTx)
.then(newBills => _.concat(vector, [newBills])) .then(_.constant(_.concat(vector, machineTx.bills)))
}) })
}) })
}) })
@ -39,14 +39,23 @@ function post (tx, pi) {
transaction.txMode = tmSRD transaction.txMode = tmSRD
return db.tx(transaction) return transaction
}
function post (machineTx, pi) {
const pp = require('./pp')
pp('DEBUG98')(machineTx)
return db.tx(atomic(machineTx, pi))
.then(txVector => { .then(txVector => {
const [oldTx,, newBills] = txVector const [, updatedTx] = txVector
const oldBills = oldTx ? oldTx.bills : []
return postProcess(txVector, pi) return postProcess(txVector, pi)
.then(changes => update(oldTx, changes)) .then(_.tap(pp('DEBUG99')))
.then(tx => _.merge({bills: _.concat(oldBills, newBills)}, tx)) .then(changes => update(updatedTx, changes))
.then(_.tap(pp('DEBUG100')))
.then(tx => _.set('bills', machineTx.bills, tx))
.then(_.tap(pp('DEBUG101')))
}) })
} }
@ -146,19 +155,19 @@ function convertBigNumFields (obj) {
return _.mapKeys(convertKey, mapValuesWithKey(convert, obj)) return _.mapKeys(convertKey, mapValuesWithKey(convert, obj))
} }
function pullNewBills (billRows, tx) { function pullNewBills (billRows, machineTx) {
if (_.isEmpty(tx.bills)) return [] if (_.isEmpty(machineTx.bills)) return []
const toBill = _.mapKeys(_.camelCase) const toBill = _.mapKeys(_.camelCase)
const bills = _.map(toBill, billRows) const bills = _.map(toBill, billRows)
return _.differenceBy(_.get('id'), tx.bills, bills) return _.differenceBy(_.get('id'), machineTx.bills, bills)
} }
const massage = _.flow(_.omit(['direction', 'bills']), convertBigNumFields, _.mapKeys(_.snakeCase)) const massage = _.flow(_.omit(['direction', 'bills']), convertBigNumFields, _.mapKeys(_.snakeCase))
function insertNewBills (billRows, tx) { function insertNewBills (billRows, machineTx) {
const bills = pullNewBills(billRows, tx) const bills = pullNewBills(billRows, machineTx)
if (_.isEmpty(bills)) return Promise.resolve([]) if (_.isEmpty(bills)) return Promise.resolve([])
const dbBills = _.map(massage, bills) const dbBills = _.map(massage, bills)
@ -166,17 +175,16 @@ function insertNewBills (billRows, tx) {
const sql = pgp.helpers.insert(dbBills, columns, 'bills') const sql = pgp.helpers.insert(dbBills, columns, 'bills')
return db.none(sql) return db.none(sql)
.then(() => bills)
} }
function upsert (oldTx, newTx) { function upsert (dbTx, preProcessedTx) {
if (!oldTx) { if (!dbTx) {
return insert(newTx) return insert(preProcessedTx)
.then(tx => [oldTx, tx]) .then(tx => [dbTx, tx])
} }
return update(oldTx, diff(oldTx, newTx)) return update(dbTx, diff(dbTx, preProcessedTx))
.then(tx => [oldTx, tx]) .then(tx => [dbTx, tx])
} }
function insert (tx) { function insert (tx) {
@ -198,8 +206,7 @@ function update (tx, changes) {
.then(toObj) .then(toObj)
} }
function registerTrades (pi, txVector) { function registerTrades (pi, newBills) {
const newBills = _.last(txVector)
_.forEach(bill => pi.buy(bill), newBills) _.forEach(bill => pi.buy(bill), newBills)
} }
@ -218,17 +225,18 @@ function logAction (rec, tx) {
.then(_.constant(rec)) .then(_.constant(rec))
} }
function isClearToSend (oldTx, newTx) {
return newTx.send &&
(!oldTx || (!oldTx.sendPending && !oldTx.sendConfirmed))
}
function postProcess (txVector, pi) { function postProcess (txVector, pi) {
const [oldTx, newTx] = txVector const [dbTx, updatedTx, newBills] = txVector
registerTrades(pi, txVector) registerTrades(pi, newBills)
const isClearToSend = newTx.send && if (isClearToSend(dbTx, updatedTx)) {
!oldTx.sendPending && return pi.sendCoins(updatedTx)
!oldTx.sendConfirmed
if (isClearToSend) {
return pi.sendCoins(newTx)
.then(txHash => ({ .then(txHash => ({
txHash, txHash,
sendConfirmed: true, sendConfirmed: true,
@ -237,23 +245,46 @@ function postProcess (txVector, pi) {
error: null, error: null,
errorCode: null errorCode: null
})) }))
.catch(err => ({ .catch(err => {
// Important: We don't know what kind of error this is
// so not safe to assume that funds weren't sent.
// Therefore, don't set sendPending to false except for
// errors (like InsufficientFundsError) that are guaranteed
// not to send.
const sendPending = err.name !== 'InsufficientFundsError'
return {
sendTime: 'now()^', sendTime: 'now()^',
error: err.message, error: err.message,
errorCode: err.name, errorCode: err.name,
sendPending: false sendPending
})) }
.then(r => logAction(r, newTx)) })
.then(r => logAction(r, updatedTx))
} }
return Promise.resolve({}) return Promise.resolve({})
} }
function preProcess (oldTx, newTx, pi) { function preProcess (dbTx, machineTx, pi) {
// Note: The way this works is if we're clear to send,
// we mark the transaction as sendPending.
//
// If another process is trying to also mark this as sendPending
// that means that it saw the tx as sendPending=false.
// But if that's true, then it must be serialized before this
// (otherwise it would see sendPending=true), and therefore we can't
// be seeing sendPending=false (a pre-condition of clearToSend()).
// Therefore, one of the conflicting transactions will error,
// which is what we want.
return new Promise(resolve => { return new Promise(resolve => {
if (!oldTx) return resolve(newTx) if (!dbTx) return resolve(machineTx)
if (newTx.send && !oldTx.send) return resolve(_.set('sendPending', true, newTx))
return resolve(newTx) if (isClearToSend(dbTx, machineTx)) {
return resolve(_.set('sendPending', true, machineTx))
}
return resolve(machineTx)
}) })
} }

View file

@ -1 +1,6 @@
module.exports = o => console.log(require('util').inspect(o, {depth: null, colors: true})) module.exports = function (label) {
return function (o) {
console.log(label)
console.log(require('util').inspect(o, {depth: null, colors: true}))
}
}