Merge branch 'v5' into v5
This commit is contained in:
commit
d254e28e96
15 changed files with 2223 additions and 1468 deletions
|
|
@ -4,13 +4,15 @@ const db = require('./db')
|
|||
const BN = require('./bn')
|
||||
const plugins = require('./plugins')
|
||||
const logger = require('./logger')
|
||||
const pp = require('./pp')
|
||||
const T = require('./time')
|
||||
const E = require('./error')
|
||||
|
||||
module.exports = {post, monitorPending}
|
||||
|
||||
const PENDING_INTERVAL = '1 day'
|
||||
const PENDING_INTERVAL = '60 minutes'
|
||||
const PENDING_INTERVAL_MS = 60 * T.minutes
|
||||
const MAX_PENDING = 10
|
||||
|
||||
module.exports = {post, monitorPending, cancel, PENDING_INTERVAL}
|
||||
|
||||
function atomic (machineTx, pi) {
|
||||
const TransactionMode = pgp.txMode.TransactionMode
|
||||
const isolationLevel = pgp.txMode.isolationLevel
|
||||
|
|
@ -22,6 +24,8 @@ function atomic (machineTx, pi) {
|
|||
|
||||
return t.oneOrNone(sql, [machineTx.id])
|
||||
.then(row => {
|
||||
if (row && row.tx_version >= machineTx.txVersion) throw new E.StaleTxError('Stale tx')
|
||||
|
||||
return t.any(sql2, [machineTx.id])
|
||||
.then(billRows => {
|
||||
const dbTx = toObj(row)
|
||||
|
|
@ -29,10 +33,8 @@ function atomic (machineTx, pi) {
|
|||
return preProcess(dbTx, machineTx, pi)
|
||||
.then(preProcessedTx => upsert(dbTx, preProcessedTx))
|
||||
.then(r => {
|
||||
pp('DEBUG701.5')(r)
|
||||
return insertNewBills(billRows, machineTx)
|
||||
.then(newBills => _.set('newBills', newBills, r))
|
||||
.then(pp('DEBUG702'))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
@ -44,7 +46,6 @@ function atomic (machineTx, pi) {
|
|||
}
|
||||
|
||||
function post (machineTx, pi) {
|
||||
console.log('DEBUG700: %j', machineTx)
|
||||
return db.tx(atomic(machineTx, pi))
|
||||
.then(r => {
|
||||
const updatedTx = r.tx
|
||||
|
|
@ -67,11 +68,11 @@ function isMonotonic (oldField, newField, fieldKey) {
|
|||
if (oldField.isBigNumber) return oldField.lte(newField)
|
||||
if (_.isNumber(oldField)) return oldField <= newField
|
||||
|
||||
throw new Error(`Unexpected value: ${oldField}`)
|
||||
throw new Error(`Unexpected value [${fieldKey}]: ${oldField}, ${newField}`)
|
||||
}
|
||||
|
||||
function ensureRatchet (oldField, newField, fieldKey) {
|
||||
const monotonic = ['cryptoAtoms', 'fiat', 'send', 'sendConfirmed', 'operatorCompleted', 'timedout']
|
||||
const monotonic = ['cryptoAtoms', 'fiat', 'cashInFeeCrypto', 'send', 'sendConfirmed', 'operatorCompleted', 'timedout', 'txVersion']
|
||||
const free = ['sendPending', 'error', 'errorCode', 'customerId']
|
||||
|
||||
if (_.isNil(oldField)) return true
|
||||
|
|
@ -121,16 +122,11 @@ function toObj (row) {
|
|||
|
||||
keys.forEach(key => {
|
||||
const objKey = _.camelCase(key)
|
||||
if (key === 'crypto_atoms' || key === 'fiat' || key === 'cash_in_fee') {
|
||||
if (_.includes(key, ['crypto_atoms', 'fiat', 'cash_in_fee', 'cash_in_fee_crypto'])) {
|
||||
newObj[objKey] = BN(row[key])
|
||||
return
|
||||
}
|
||||
|
||||
if (key === 'device_time') {
|
||||
newObj[objKey] = parseInt(row[key], 10)
|
||||
return
|
||||
}
|
||||
|
||||
newObj[objKey] = row[key]
|
||||
})
|
||||
|
||||
|
|
@ -200,14 +196,13 @@ function update (tx, changes) {
|
|||
}
|
||||
|
||||
function registerTrades (pi, newBills) {
|
||||
console.log('DEBUG600: %j', newBills)
|
||||
_.forEach(bill => pi.buy(bill), newBills)
|
||||
}
|
||||
|
||||
function logAction (rec, tx) {
|
||||
const action = {
|
||||
tx_id: tx.id,
|
||||
action: rec.sendConfirmed ? 'sendCoins' : 'sendCoinsError',
|
||||
action: rec.action || (rec.sendConfirmed ? 'sendCoins' : 'sendCoinsError'),
|
||||
error: rec.error,
|
||||
error_code: rec.errorCode,
|
||||
tx_hash: rec.txHash
|
||||
|
|
@ -219,13 +214,22 @@ function logAction (rec, tx) {
|
|||
.then(_.constant(rec))
|
||||
}
|
||||
|
||||
function logActionById (action, _rec, txId) {
|
||||
const rec = _.assign(_rec, {action, tx_id: txId})
|
||||
const sql = pgp.helpers.insert(rec, null, 'cash_in_actions')
|
||||
|
||||
return db.none(sql)
|
||||
}
|
||||
|
||||
function isClearToSend (oldTx, newTx) {
|
||||
const now = Date.now()
|
||||
|
||||
return newTx.send &&
|
||||
(!oldTx || (!oldTx.sendPending && !oldTx.sendConfirmed))
|
||||
(!oldTx || (!oldTx.sendPending && !oldTx.sendConfirmed)) &&
|
||||
(newTx.created > now - PENDING_INTERVAL_MS)
|
||||
}
|
||||
|
||||
function postProcess (r, pi) {
|
||||
console.log('DEBUG701: %j', r)
|
||||
registerTrades(pi, r.newBills)
|
||||
|
||||
if (isClearToSend(r.dbTx, r.tx)) {
|
||||
|
|
@ -303,3 +307,22 @@ function monitorPending (settings) {
|
|||
.then(rows => Promise.all(_.map(processPending, rows)))
|
||||
.catch(logger.error)
|
||||
}
|
||||
|
||||
function cancel (txId) {
|
||||
const updateRec = {
|
||||
error: 'Operator cancel',
|
||||
error_code: 'operatorCancel',
|
||||
operator_completed: true
|
||||
}
|
||||
|
||||
return Promise.resolve()
|
||||
.then(() => {
|
||||
return pgp.helpers.update(updateRec, null, 'cash_in_txs') +
|
||||
pgp.as.format(' where id=$1', [txId])
|
||||
})
|
||||
.then(sql => db.result(sql, false))
|
||||
.then(res => {
|
||||
if (res.rowCount !== 1) throw new Error('No such tx-id')
|
||||
})
|
||||
.then(() => logActionById('operatorCompleted', {}, txId))
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue