Add transaction versioning, tx cancellation

This commit is contained in:
Josh Harvey 2017-08-27 20:41:54 +03:00
parent 4a97535dec
commit 13933c3fb2
15 changed files with 2223 additions and 1468 deletions

View file

@ -4,13 +4,15 @@ const db = require('./db')
const BN = require('./bn')
const plugins = require('./plugins')
const logger = require('./logger')
const pp = require('./pp')
const T = require('./time')
const E = require('./error')
module.exports = {post, monitorPending}
const PENDING_INTERVAL = '1 day'
const PENDING_INTERVAL = '60 minutes'
const PENDING_INTERVAL_MS = 60 * T.minutes
const MAX_PENDING = 10
module.exports = {post, monitorPending, cancel, PENDING_INTERVAL}
function atomic (machineTx, pi) {
const TransactionMode = pgp.txMode.TransactionMode
const isolationLevel = pgp.txMode.isolationLevel
@ -22,6 +24,8 @@ function atomic (machineTx, pi) {
return t.oneOrNone(sql, [machineTx.id])
.then(row => {
if (row && row.tx_version >= machineTx.txVersion) throw new E.StaleTxError('Stale tx')
return t.any(sql2, [machineTx.id])
.then(billRows => {
const dbTx = toObj(row)
@ -29,10 +33,8 @@ function atomic (machineTx, pi) {
return preProcess(dbTx, machineTx, pi)
.then(preProcessedTx => upsert(dbTx, preProcessedTx))
.then(r => {
pp('DEBUG701.5')(r)
return insertNewBills(billRows, machineTx)
.then(newBills => _.set('newBills', newBills, r))
.then(pp('DEBUG702'))
})
})
})
@ -44,7 +46,6 @@ function atomic (machineTx, pi) {
}
function post (machineTx, pi) {
console.log('DEBUG700: %j', machineTx)
return db.tx(atomic(machineTx, pi))
.then(r => {
const updatedTx = r.tx
@ -67,11 +68,11 @@ function isMonotonic (oldField, newField, fieldKey) {
if (oldField.isBigNumber) return oldField.lte(newField)
if (_.isNumber(oldField)) return oldField <= newField
throw new Error(`Unexpected value: ${oldField}`)
throw new Error(`Unexpected value [${fieldKey}]: ${oldField}, ${newField}`)
}
function ensureRatchet (oldField, newField, fieldKey) {
const monotonic = ['cryptoAtoms', 'fiat', 'send', 'sendConfirmed', 'operatorCompleted', 'timedout']
const monotonic = ['cryptoAtoms', 'fiat', 'cashInFeeCrypto', 'send', 'sendConfirmed', 'operatorCompleted', 'timedout', 'txVersion']
const free = ['sendPending', 'error', 'errorCode']
if (_.isNil(oldField)) return true
@ -121,16 +122,11 @@ function toObj (row) {
keys.forEach(key => {
const objKey = _.camelCase(key)
if (key === 'crypto_atoms' || key === 'fiat' || key === 'cash_in_fee') {
if (_.includes(key, ['crypto_atoms', 'fiat', 'cash_in_fee', 'cash_in_fee_crypto'])) {
newObj[objKey] = BN(row[key])
return
}
if (key === 'device_time') {
newObj[objKey] = parseInt(row[key], 10)
return
}
newObj[objKey] = row[key]
})
@ -200,14 +196,13 @@ function update (tx, changes) {
}
function registerTrades (pi, newBills) {
console.log('DEBUG600: %j', newBills)
_.forEach(bill => pi.buy(bill), newBills)
}
function logAction (rec, tx) {
const action = {
tx_id: tx.id,
action: rec.sendConfirmed ? 'sendCoins' : 'sendCoinsError',
action: rec.action || (rec.sendConfirmed ? 'sendCoins' : 'sendCoinsError'),
error: rec.error,
error_code: rec.errorCode,
tx_hash: rec.txHash
@ -219,13 +214,22 @@ function logAction (rec, tx) {
.then(_.constant(rec))
}
function logActionById (action, _rec, txId) {
const rec = _.assign(_rec, {action, tx_id: txId})
const sql = pgp.helpers.insert(rec, null, 'cash_in_actions')
return db.none(sql)
}
function isClearToSend (oldTx, newTx) {
const now = Date.now()
return newTx.send &&
(!oldTx || (!oldTx.sendPending && !oldTx.sendConfirmed))
(!oldTx || (!oldTx.sendPending && !oldTx.sendConfirmed)) &&
(newTx.created > now - PENDING_INTERVAL_MS)
}
function postProcess (r, pi) {
console.log('DEBUG701: %j', r)
registerTrades(pi, r.newBills)
if (isClearToSend(r.dbTx, r.tx)) {
@ -303,3 +307,22 @@ function monitorPending (settings) {
.then(rows => Promise.all(_.map(processPending, rows)))
.catch(logger.error)
}
function cancel (txId) {
const updateRec = {
error: 'Operator cancel',
error_code: 'operatorCancel',
operator_completed: true
}
return Promise.resolve()
.then(() => {
return pgp.helpers.update(updateRec, null, 'cash_in_txs') +
pgp.as.format(' where id=$1', [txId])
})
.then(sql => db.result(sql, false))
.then(res => {
if (res.rowCount !== 1) throw new Error('No such tx-id')
})
.then(() => logActionById('operatorCompleted', {}, txId))
}