Chore: move tx code into atomic function

This commit is contained in:
csrapr 2021-04-15 14:17:42 +01:00 committed by Josh Harvey
parent 8cd1389de3
commit 4f12bcf4cf
4 changed files with 12 additions and 14 deletions

View file

@ -1,6 +1,7 @@
const _ = require('lodash/fp') const _ = require('lodash/fp')
const pgp = require('pg-promise')() const pgp = require('pg-promise')()
const db = require('../db')
const E = require('../error') const E = require('../error')
const cashInLow = require('./cash-in-low') const cashInLow = require('./cash-in-low')
@ -8,6 +9,9 @@ const cashInLow = require('./cash-in-low')
module.exports = { atomic } module.exports = { atomic }
function atomic (machineTx, pi) { function atomic (machineTx, pi) {
const TransactionMode = pgp.txMode.TransactionMode
const isolationLevel = pgp.txMode.isolationLevel
const mode = new TransactionMode({ tiLevel: isolationLevel.serializable })
function transaction (t) { function transaction (t) {
const sql = 'select * from cash_in_txs where id=$1' const sql = 'select * from cash_in_txs where id=$1'
const sql2 = 'select * from bills where cash_in_txs_id=$1' const sql2 = 'select * from bills where cash_in_txs_id=$1'
@ -29,7 +33,7 @@ function atomic (machineTx, pi) {
}) })
}) })
} }
return transaction return db.tx({ mode }, transaction)
} }
function insertNewBills (t, billRows, machineTx) { function insertNewBills (t, billRows, machineTx) {

View file

@ -19,11 +19,7 @@ const MAX_PENDING = 10
module.exports = { post, monitorPending, cancel, PENDING_INTERVAL } module.exports = { post, monitorPending, cancel, PENDING_INTERVAL }
function post (machineTx, pi) { function post (machineTx, pi) {
const TransactionMode = pgp.txMode.TransactionMode return cashInAtomic.atomic(machineTx, pi)
const isolationLevel = pgp.txMode.isolationLevel
const mode = new TransactionMode({ tiLevel: isolationLevel.serializable })
return db.tx({ mode }, cashInAtomic.atomic(machineTx, pi))
.then(r => { .then(r => {
const updatedTx = r.tx const updatedTx = r.tx
let blacklisted = false let blacklisted = false

View file

@ -1,6 +1,7 @@
const _ = require('lodash/fp') const _ = require('lodash/fp')
const pgp = require('pg-promise')() const pgp = require('pg-promise')()
const db = require('../db')
const E = require('../error') const E = require('../error')
const socket = require('../socket-client') const socket = require('../socket-client')
const logger = require('../logger') const logger = require('../logger')
@ -14,9 +15,11 @@ const toObj = helper.toObj
module.exports = { atomic } module.exports = { atomic }
function atomic (tx, pi, fromClient) { function atomic (tx, pi, fromClient) {
const TransactionMode = pgp.txMode.TransactionMode
const isolationLevel = pgp.txMode.isolationLevel
const mode = new TransactionMode({ tiLevel: isolationLevel.serializable })
function transaction (t) { function transaction (t) {
const sql = 'select * from cash_out_txs where id=$1' const sql = 'select * from cash_out_txs where id=$1'
return t.oneOrNone(sql, [tx.id]) return t.oneOrNone(sql, [tx.id])
.then(toObj) .then(toObj)
.then(oldTx => { .then(oldTx => {
@ -27,8 +30,7 @@ function atomic (tx, pi, fromClient) {
.then(preProcessedTx => cashOutLow.upsert(t, oldTx, preProcessedTx)) .then(preProcessedTx => cashOutLow.upsert(t, oldTx, preProcessedTx))
}) })
} }
return db.tx({ mode }, transaction)
return transaction
} }
function preProcess (t, oldTx, newTx, pi) { function preProcess (t, oldTx, newTx, pi) {

View file

@ -37,11 +37,7 @@ function selfPost (tx, pi) {
} }
function post (tx, pi, fromClient = true) { function post (tx, pi, fromClient = true) {
const TransactionMode = pgp.txMode.TransactionMode return cashOutAtomic.atomic(tx, pi, fromClient)
const isolationLevel = pgp.txMode.isolationLevel
const mode = new TransactionMode({ tiLevel: isolationLevel.serializable })
return db.tx({ mode }, cashOutAtomic.atomic(tx, pi, fromClient))
.then(txVector => { .then(txVector => {
const [, newTx, justAuthorized] = txVector const [, newTx, justAuthorized] = txVector
return postProcess(txVector, justAuthorized, pi) return postProcess(txVector, justAuthorized, pi)