const _ = require('lodash/fp') const pgp = require('pg-promise')() const db = require('./db') const billMath = require('./bill-math') const T = require('./time') const logger = require('./logger') const plugins = require('./plugins') const helper = require('./cash-out-helper') module.exports = { post, monitorLiveIncoming, monitorStaleIncoming, monitorUnnotified, cancel } const UPDATEABLE_FIELDS = ['txHash', 'status', 'dispense', 'dispenseConfirmed', 'notified', 'redeem', 'phone', 'error', 'swept'] const STALE_INCOMING_TX_AGE = T.week const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes const MAX_NOTIFY_AGE = 2 * T.days const MIN_NOTIFY_AGE = 5 * T.minutes const INSUFFICIENT_FUNDS_CODE = 570 const toObj = helper.toObj const toDb = helper.toDb function httpError (msg, code) { const err = new Error(msg) err.name = 'HTTPError' err.code = code || 500 return err } function post (tx, pi) { const TransactionMode = pgp.txMode.TransactionMode const isolationLevel = pgp.txMode.isolationLevel const tmSRD = new TransactionMode({tiLevel: isolationLevel.serializable}) function transaction (t) { const sql = 'select * from cash_out_txs where id=$1' return t.oneOrNone(sql, [tx.id]) .then(toObj) .then(oldTx => { return preProcess(oldTx, tx, pi) .then(preProcessedTx => upsert(oldTx, preProcessedTx)) }) } transaction.txMode = tmSRD return db.tx(transaction) .then(txVector => { const [, newTx] = txVector return postProcess(txVector, pi) .then(changes => update(newTx, changes)) }) } function logError (action, err, tx) { return logAction(action, { error: err.message, error_code: err.name }, tx) } function mapDispense (tx) { const bills = tx.bills if (_.isEmpty(bills)) return {} return { provisioned_1: bills[0].provisioned, provisioned_2: bills[1].provisioned, dispensed_1: bills[0].dispensed, dispensed_2: bills[1].dispensed, rejected_1: bills[0].rejected, rejected_2: bills[1].rejected, denomination_1: bills[0].denomination, denomination_2: bills[1].denomination } } function logDispense (tx) { const baseRec = {error: tx.error, error_code: tx.errorCode} const rec = _.merge(mapDispense(tx), baseRec) const action = tx.dispenseConfirmed ? 'dispense' : 'dispenseError' return logAction(action, rec, tx) } function logActionById (action, _rec, txId) { const rec = _.assign(_rec, {action, tx_id: txId, redeem: false}) const sql = pgp.helpers.insert(rec, null, 'cash_out_actions') return db.none(sql) } function logAction (action, _rec, tx) { const rec = _.assign(_rec, {action, tx_id: tx.id, redeem: !!tx.redeem}) const sql = pgp.helpers.insert(rec, null, 'cash_out_actions') return db.none(sql) .then(_.constant(tx)) } function nilEqual (a, b) { if (_.isNil(a) && _.isNil(b)) return true return undefined } function diff (oldTx, newTx) { let updatedTx = {} UPDATEABLE_FIELDS.forEach(fieldKey => { if (oldTx && _.isEqualWith(nilEqual, oldTx[fieldKey], newTx[fieldKey])) return // We never null out an existing field if (oldTx && _.isNil(newTx[fieldKey])) return updatedTx[fieldKey] = newTx[fieldKey] }) return updatedTx } function upsert (oldTx, tx) { if (!oldTx) { return insert(tx) .then(newTx => [oldTx, newTx]) } return update(tx, diff(oldTx, tx)) .then(newTx => [oldTx, newTx]) } function insert (tx) { const dbTx = toDb(tx) const sql = pgp.helpers.insert(dbTx, null, 'cash_out_txs') + ' returning *' return db.one(sql) .then(toObj) } function update (tx, changes) { if (_.isEmpty(changes)) return Promise.resolve(tx) const dbChanges = toDb(changes) const sql = pgp.helpers.update(dbChanges, null, 'cash_out_txs') + pgp.as.format(' where id=$1', [tx.id]) const newTx = _.merge(tx, changes) return db.none(sql) .then(() => newTx) } function nextHd (isHd, tx) { if (!isHd) return Promise.resolve(tx) return db.one("select nextval('hd_indices_seq') as hd_index") .then(row => _.set('hdIndex', row.hd_index, tx)) } function updateCassettes (tx) { const sql = `update devices set cassette1 = cassette1 - $1, cassette2 = cassette2 - $2 where device_id = $3` const values = [ tx.bills[0].dispensed + tx.bills[0].rejected, tx.bills[1].dispensed + tx.bills[1].rejected, tx.deviceId ] return db.none(sql, values) } function wasJustAuthorized (oldTx, newTx, isZeroConf) { const isAuthorized = () => _.includes(oldTx.status, ['notSeen', 'published']) && _.includes(newTx.status, ['authorized', 'instant', 'confirmed']) const isConfirmed = () => _.includes(oldTx.status, ['notSeen', 'published', 'authorized']) && _.includes(newTx.status, ['instant', 'confirmed']) return isZeroConf ? isAuthorized() : isConfirmed() } function preProcess (oldTx, newTx, pi) { if (!oldTx) { return pi.isHd(newTx) .then(isHd => nextHd(isHd, newTx)) .then(newTxHd => { return pi.newAddress(newTxHd) .then(_.set('toAddress', _, newTxHd)) }) .then(addressedTx => { const rec = {to_address: addressedTx.toAddress} return logAction('provisionAddress', rec, addressedTx) }) .catch(err => { return logError('provisionAddress', err, newTx) .then(() => { throw err }) }) } return Promise.resolve(updateStatus(oldTx, newTx)) .then(updatedTx => { if (!oldTx) return updatedTx if (updatedTx.status !== oldTx.status) { const isZeroConf = pi.isZeroConf(updatedTx) if (wasJustAuthorized(oldTx, updatedTx, isZeroConf)) pi.sell(updatedTx) const rec = { to_address: updatedTx.toAddress, tx_hash: updatedTx.txHash } return logAction(updatedTx.status, rec, updatedTx) } if (!oldTx.dispenseConfirmed && updatedTx.dispenseConfirmed) { return logDispense(updatedTx) .then(updateCassettes(updatedTx)) } if (!oldTx.phone && newTx.phone) { return logAction('addPhone', {}, updatedTx) } if (!oldTx.redeem && newTx.redeem) { return logAction('redeemLater', {}, updatedTx) } return updatedTx }) } function postProcess (txVector, pi) { const [oldTx, newTx] = txVector if ((newTx.dispense && !oldTx.dispense) || (newTx.redeem && !oldTx.redeem)) { return pi.buildAvailableCassettes(newTx.id) .then(cassettes => { const bills = billMath.makeChange(cassettes.cassettes, newTx.fiat) console.log('DEBUG130: %j', cassettes.cassettes) if (!bills) throw httpError('Out of bills', INSUFFICIENT_FUNDS_CODE) return bills }) .then(bills => { const provisioned1 = bills[0].provisioned const provisioned2 = bills[1].provisioned const denomination1 = bills[0].denomination const denomination2 = bills[1].denomination const rec = { provisioned_1: provisioned1, provisioned_2: provisioned2, denomination_1: denomination1, denomination_2: denomination2 } return logAction('provisionNotes', rec, newTx) .then(_.constant({bills})) }) .catch(err => { return logError('provisionNotesError', err, newTx) .then(() => { throw err }) }) } return Promise.resolve({}) } function updateStatus (oldTx, newTx) { return _.set('status', ratchetStatus(oldTx.status, newTx.status), newTx) } function ratchetStatus (oldStatus, newStatus) { const statusOrder = ['notSeen', 'published', 'rejected', 'authorized', 'instant', 'confirmed'] if (oldStatus === newStatus) return oldStatus if (newStatus === 'insufficientFunds') return newStatus const idx = Math.max(statusOrder.indexOf(oldStatus), statusOrder.indexOf(newStatus)) return statusOrder[idx] } function fetchOpenTxs (statuses, age) { const sql = `select * from cash_out_txs where ((extract(epoch from (now() - created))) * 1000)<$1 and status in ($2^)` const statusClause = _.map(pgp.as.text, statuses).join(',') return db.any(sql, [age, statusClause]) .then(rows => rows.map(toObj)) } function processTxStatus (tx, settings) { const pi = plugins(settings, tx.deviceId) return pi.getStatus(tx) .then(res => _.set('status', res.status, tx)) .then(_tx => post(_tx, pi)) } function monitorLiveIncoming (settings) { const statuses = ['notSeen', 'published', 'insufficientFunds'] return fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE) .then(txs => Promise.all(txs.map(tx => processTxStatus(tx, settings)))) .catch(logger.error) } function monitorStaleIncoming (settings) { const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] return fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) .then(txs => Promise.all(txs.map(tx => processTxStatus(tx, settings)))) .catch(logger.error) } function monitorUnnotified (settings) { const sql = `select * from cash_out_txs where ((extract(epoch from (now() - created))) * 1000)<$1 and notified=$2 and dispense=$3 and phone is not null and status in ('instant', 'confirmed') and (redeem=$4 or ((extract(epoch from (now() - created))) * 1000)>$5)` const notify = tx => plugins(settings, tx.deviceId).notifyConfirmation(tx) return db.any(sql, [MAX_NOTIFY_AGE, false, false, true, MIN_NOTIFY_AGE]) .then(rows => _.map(toObj, rows)) .then(txs => Promise.all(txs.map(notify))) .catch(logger.error) } function cancel (txId) { const updateRec = { 'dispense_time': 'now()^', error: 'Operator cancel', dispense: true } return Promise.resolve() .then(() => { return pgp.helpers.update(updateRec, null, 'cash_out_txs') + pgp.as.format(' where id=$1', [txId]) }) .then(sql => db.result(sql, false)) .then(res => { if (res.rowCount !== 1) throw new Error('No such tx-id') }) .then(() => logActionById('operatorCompleted', {}, txId)) }