diff --git a/lib/cash-in-tx.js b/lib/cash-in-tx.js index 6eee0b1f..07319764 100644 --- a/lib/cash-in-tx.js +++ b/lib/cash-in-tx.js @@ -12,7 +12,7 @@ module.exports = {post, monitorPending} const PENDING_INTERVAL = '1 day' const MAX_PENDING = 10 -function post (tx, pi) { +function atomic (machineTx, pi) { const TransactionMode = pgp.txMode.TransactionMode const isolationLevel = pgp.txMode.isolationLevel const tmSRD = new TransactionMode({tiLevel: isolationLevel.serializable}) @@ -21,17 +21,17 @@ function post (tx, pi) { const sql = 'select * from cash_in_txs where id=$1' const sql2 = 'select * from bills where cash_in_txs_id=$1' - return t.oneOrNone(sql, [tx.id]) + return t.oneOrNone(sql, [machineTx.id]) .then(row => { - return t.any(sql2, [tx.id]) + return t.any(sql2, [machineTx.id]) .then(billRows => { - const oldTx = toObj(row) + const dbTx = toObj(row) - return preProcess(oldTx, tx, pi) - .then(preProcessedTx => upsert(oldTx, preProcessedTx)) + return preProcess(dbTx, machineTx, pi) + .then(preProcessedTx => upsert(dbTx, preProcessedTx)) .then(vector => { - return insertNewBills(billRows, tx) - .then(newBills => _.concat(vector, [newBills])) + return insertNewBills(billRows, machineTx) + .then(_.constant(_.concat(vector, machineTx.bills))) }) }) }) @@ -39,14 +39,23 @@ function post (tx, pi) { transaction.txMode = tmSRD - return db.tx(transaction) + return transaction +} + +function post (machineTx, pi) { + const pp = require('./pp') + pp('DEBUG98')(machineTx) + + return db.tx(atomic(machineTx, pi)) .then(txVector => { - const [oldTx,, newBills] = txVector - const oldBills = oldTx ? oldTx.bills : [] + const [, updatedTx] = txVector return postProcess(txVector, pi) - .then(changes => update(oldTx, changes)) - .then(tx => _.merge({bills: _.concat(oldBills, newBills)}, tx)) + .then(_.tap(pp('DEBUG99'))) + .then(changes => update(updatedTx, changes)) + .then(_.tap(pp('DEBUG100'))) + .then(tx => _.set('bills', machineTx.bills, tx)) + .then(_.tap(pp('DEBUG101'))) }) } @@ -146,19 +155,19 @@ function convertBigNumFields (obj) { return _.mapKeys(convertKey, mapValuesWithKey(convert, obj)) } -function pullNewBills (billRows, tx) { - if (_.isEmpty(tx.bills)) return [] +function pullNewBills (billRows, machineTx) { + if (_.isEmpty(machineTx.bills)) return [] const toBill = _.mapKeys(_.camelCase) const bills = _.map(toBill, billRows) - return _.differenceBy(_.get('id'), tx.bills, bills) + return _.differenceBy(_.get('id'), machineTx.bills, bills) } const massage = _.flow(_.omit(['direction', 'bills']), convertBigNumFields, _.mapKeys(_.snakeCase)) -function insertNewBills (billRows, tx) { - const bills = pullNewBills(billRows, tx) +function insertNewBills (billRows, machineTx) { + const bills = pullNewBills(billRows, machineTx) if (_.isEmpty(bills)) return Promise.resolve([]) const dbBills = _.map(massage, bills) @@ -166,17 +175,16 @@ function insertNewBills (billRows, tx) { const sql = pgp.helpers.insert(dbBills, columns, 'bills') return db.none(sql) - .then(() => bills) } -function upsert (oldTx, newTx) { - if (!oldTx) { - return insert(newTx) - .then(tx => [oldTx, tx]) +function upsert (dbTx, preProcessedTx) { + if (!dbTx) { + return insert(preProcessedTx) + .then(tx => [dbTx, tx]) } - return update(oldTx, diff(oldTx, newTx)) - .then(tx => [oldTx, tx]) + return update(dbTx, diff(dbTx, preProcessedTx)) + .then(tx => [dbTx, tx]) } function insert (tx) { @@ -198,8 +206,7 @@ function update (tx, changes) { .then(toObj) } -function registerTrades (pi, txVector) { - const newBills = _.last(txVector) +function registerTrades (pi, newBills) { _.forEach(bill => pi.buy(bill), newBills) } @@ -218,17 +225,18 @@ function logAction (rec, tx) { .then(_.constant(rec)) } +function isClearToSend (oldTx, newTx) { + return newTx.send && + (!oldTx || (!oldTx.sendPending && !oldTx.sendConfirmed)) +} + function postProcess (txVector, pi) { - const [oldTx, newTx] = txVector + const [dbTx, updatedTx, newBills] = txVector - registerTrades(pi, txVector) + registerTrades(pi, newBills) - const isClearToSend = newTx.send && - !oldTx.sendPending && - !oldTx.sendConfirmed - - if (isClearToSend) { - return pi.sendCoins(newTx) + if (isClearToSend(dbTx, updatedTx)) { + return pi.sendCoins(updatedTx) .then(txHash => ({ txHash, sendConfirmed: true, @@ -237,23 +245,46 @@ function postProcess (txVector, pi) { error: null, errorCode: null })) - .catch(err => ({ - sendTime: 'now()^', - error: err.message, - errorCode: err.name, - sendPending: false - })) - .then(r => logAction(r, newTx)) + .catch(err => { + // Important: We don't know what kind of error this is + // so not safe to assume that funds weren't sent. + // Therefore, don't set sendPending to false except for + // errors (like InsufficientFundsError) that are guaranteed + // not to send. + const sendPending = err.name !== 'InsufficientFundsError' + + return { + sendTime: 'now()^', + error: err.message, + errorCode: err.name, + sendPending + } + }) + .then(r => logAction(r, updatedTx)) } return Promise.resolve({}) } -function preProcess (oldTx, newTx, pi) { +function preProcess (dbTx, machineTx, pi) { + // Note: The way this works is if we're clear to send, + // we mark the transaction as sendPending. + // + // If another process is trying to also mark this as sendPending + // that means that it saw the tx as sendPending=false. + // But if that's true, then it must be serialized before this + // (otherwise it would see sendPending=true), and therefore we can't + // be seeing sendPending=false (a pre-condition of clearToSend()). + // Therefore, one of the conflicting transactions will error, + // which is what we want. return new Promise(resolve => { - if (!oldTx) return resolve(newTx) - if (newTx.send && !oldTx.send) return resolve(_.set('sendPending', true, newTx)) - return resolve(newTx) + if (!dbTx) return resolve(machineTx) + + if (isClearToSend(dbTx, machineTx)) { + return resolve(_.set('sendPending', true, machineTx)) + } + + return resolve(machineTx) }) } diff --git a/lib/pp.js b/lib/pp.js index 8aa7b76c..e82754fd 100644 --- a/lib/pp.js +++ b/lib/pp.js @@ -1 +1,6 @@ -module.exports = o => console.log(require('util').inspect(o, {depth: null, colors: true})) +module.exports = function (label) { + return function (o) { + console.log(label) + console.log(require('util').inspect(o, {depth: null, colors: true})) + } +}