diff --git a/lib/cash-in-tx.js b/lib/cash-in-tx.js index 689d0987..1c608433 100644 --- a/lib/cash-in-tx.js +++ b/lib/cash-in-tx.js @@ -3,17 +3,19 @@ const pgp = require('pg-promise')() const db = require('./db') const BN = require('./bn') -module.exports = {postCashIn} +module.exports = {post} -const UPDATEABLE_FIELDS = ['fee', 'txHash', 'phone', 'error'] +const UPDATEABLE_FIELDS = ['fee', 'txHash', 'phone', 'error', 'send'] -function postCashIn (tx, pi) { +function post (tx, pi) { const TransactionMode = pgp.txMode.TransactionMode const isolationLevel = pgp.txMode.isolationLevel const tmSRD = new TransactionMode({tiLevel: isolationLevel.serializable}) function transaction (t) { const sql = 'select * from cash_in_txs where id=$1' + + console.log('DEBUG888: %j', tx) return t.oneOrNone(sql, [tx.id]) .then(row => upsert(row, tx)) } @@ -21,15 +23,18 @@ function postCashIn (tx, pi) { transaction.txMode = tmSRD return db.tx(transaction) - .then(txVector => postProcess(txVector, pi)) - .then(changes => update(tx.id, changes)) + .then(txVector => { + const [, newTx] = txVector + postProcess(txVector, pi) + .then(changes => update(newTx, changes)) + }) } function diff (oldTx, newTx) { let updatedTx = {} UPDATEABLE_FIELDS.forEach(fieldKey => { - if (_.isEqual(oldTx[fieldKey], newTx[fieldKey])) return + if (oldTx && _.isEqual(oldTx[fieldKey], newTx[fieldKey])) return updatedTx[fieldKey] = newTx[fieldKey] }) @@ -37,6 +42,8 @@ function diff (oldTx, newTx) { } function toObj (row) { + if (!row) return null + const keys = _.keys(row) let newObj = {} @@ -56,31 +63,48 @@ function toObj (row) { function upsert (row, tx) { const oldTx = toObj(row) - if (oldTx) return insert(tx) - return update(tx.id, diff(oldTx, tx)) + // insert bills + + if (!oldTx) { + return insert(tx) + .then(newTx => [oldTx, newTx]) + } + + return update(tx, diff(oldTx, tx)) + .then(newTx => [oldTx, newTx]) } function insert (tx) { - const dbTx = _.mapKeys(_.snakeCase, tx) + const dbTx = _.mapKeys(_.snakeCase, _.omit(['direction', 'bills'], tx)) - const sql = pgp.helpers.insert(dbTx, null, 'cash_in_txs') - return db.none(sql) + const sql = pgp.helpers.insert(dbTx, null, 'cash_in_txs') + ' returning *' + return db.one(sql) + .then(toObj) } -function update (txId, changes) { - const dbChanges = _.mapKeys(_.snakeCase, changes) - const sql = pgp.helpers.update(dbChanges, null, 'cash_in_txs') + - pgp.as.format(' where id=$1', [txId]) +function update (tx, changes) { + if (_.isEmpty(changes)) return Promise.resolve(tx) - return db.none(sql) + const dbChanges = _.mapKeys(_.snakeCase, _.omit(['direction', 'bills'], changes)) + console.log('DEBUG893: %j', dbChanges) + const sql = pgp.helpers.update(dbChanges, null, 'cash_in_txs') + + pgp.as.format(' where id=$1', [tx.id]) + ' returning *' + + return db.one(sql) + .then(toObj) } function postProcess (txVector, pi) { const [oldTx, newTx] = txVector - if (newTx.sent && !oldTx.sent) { + if (newTx.send && !oldTx.send) { return pi.sendCoins(newTx) .then(txHash => ({txHash})) - .catch(error => ({error})) + .catch(error => { + console.log('DEBUG895: %j', error) + return {error} + }) } + + return Promise.resolve({}) } diff --git a/lib/plugins.js b/lib/plugins.js index 736e4c14..f05c9f63 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -30,8 +30,8 @@ const coins = { ETH: {unitScale: 18} } -function plugins (settings) { - function buildRates (deviceId, tickers) { +function plugins (settings, deviceId) { + function buildRates (tickers) { const config = configManager.machineScoped(deviceId, settings.config) const cryptoCodes = config.cryptoCurrencies const cashOut = config.cashOutEnabled @@ -41,6 +41,7 @@ function plugins (settings) { const rates = {} + console.log('DEBUG444: %j', tickers) cryptoCodes.forEach((cryptoCode, i) => { const rateRec = tickers[i] if (Date.now() - rateRec.timestamp > STALE_TICKER) return logger.warn('Stale rate for ' + cryptoCode) @@ -54,7 +55,7 @@ function plugins (settings) { return rates } - function buildBalances (deviceId, balanceRecs) { + function buildBalances (balanceRecs) { const config = configManager.machineScoped(deviceId, settings.config) const cryptoCodes = config.cryptoCurrencies @@ -97,7 +98,7 @@ function plugins (settings) { .then(row => row.id) } - function pollQueries (deviceTime, deviceId, deviceRec) { + function pollQueries (deviceTime, deviceRec) { const config = configManager.machineScoped(deviceId, settings.config) const fiatCode = config.fiatCurrency const cryptoCodes = config.cryptoCurrencies @@ -106,8 +107,8 @@ function plugins (settings) { const virtualCartridges = [config.virtualCashOutDenomination] const tickerPromises = cryptoCodes.map(c => ticker.getRates(settings, fiatCode, c)) - const balancePromises = cryptoCodes.map(c => fiatBalance(fiatCode, c, deviceId)) - const pingPromise = recordPing(deviceId, deviceTime, deviceRec) + const balancePromises = cryptoCodes.map(c => fiatBalance(fiatCode, c)) + const pingPromise = recordPing(deviceTime, deviceRec) const currentConfigVersionPromise = fetchCurrentConfigVersion() const promises = [ @@ -125,8 +126,8 @@ function plugins (settings) { return { cartridges: buildCartridges(cartridges, virtualCartridges, cartridgeCounts), - rates: buildRates(deviceId, tickers), - balances: buildBalances(deviceId, balances), + rates: buildRates(tickers), + balances: buildBalances(balances), currentConfigVersion } }) @@ -134,23 +135,12 @@ function plugins (settings) { // NOTE: This will fail if we have already sent coins because there will be // a unique dbm record in the table already. - function sendCoins (deviceId, tx) { - return dbm.addOutgoingTx(deviceId, tx) - .then(() => wallet.sendCoins(settings, tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)) - .then(txHash => { - const fee = null // Need to fill this out in plugins - const toSend = {cryptoAtoms: tx.cryptoAtoms, fiat: tx.fiat} - - return dbm.sentCoins(tx, toSend, fee, null, txHash) - .then(() => ({ - statusCode: 201, // Created - txHash, - txId: tx.id - })) - }) + function sendCoins (tx) { + console.log('DEBUG50: %j', settings) + return wallet.sendCoins(settings, tx.toAddress, tx.cryptoAtoms, tx.cryptoCode) } - function trade (deviceId, rawTrade) { + function trade (rawTrade) { // TODO: move this to dbm, too // add bill to trader queue (if trader is enabled) const cryptoCode = rawTrade.cryptoCode @@ -174,18 +164,18 @@ function plugins (settings) { }) } - function recordPing (deviceId, deviceTime, rec) { + function recordPing (deviceTime, rec) { const event = { id: uuid.v4(), - deviceId: deviceId, + deviceId, eventType: 'ping', note: JSON.stringify({state: rec.state, isIdle: rec.idle === 'true', txId: rec.txId}), - deviceTime: deviceTime + deviceTime } return dbm.machineEvent(event) } - function cashOut (deviceId, tx) { + function cashOut (tx) { const cryptoCode = tx.cryptoCode const serialPromise = wallet.supportsHD @@ -210,7 +200,7 @@ function plugins (settings) { }) } - function dispenseAck (deviceId, tx) { + function dispenseAck (tx) { const config = configManager.machineScoped(deviceId, settings.config) const cartridges = [ config.topCashOutDenomination, config.bottomCashOutDenomination ] @@ -218,7 +208,7 @@ function plugins (settings) { return dbm.addDispense(deviceId, tx, cartridges) } - function fiatBalance (fiatCode, cryptoCode, deviceId) { + function fiatBalance (fiatCode, cryptoCode) { const config = configManager.scoped(cryptoCode, deviceId, settings.config) return Promise.all([ @@ -404,11 +394,11 @@ function plugins (settings) { return Promise.all(promises) } - function checkDeviceBalances (deviceId) { - const config = configManager.machineScoped(deviceId, settings.config) + function checkDeviceBalances (_deviceId) { + const config = configManager.machineScoped(_deviceId, settings.config) const cryptoCodes = config.cryptoCurrencies const fiatCode = config.fiatCurrency - const fiatBalancePromises = cryptoCodes.map(c => fiatBalance(fiatCode, c, deviceId)) + const fiatBalancePromises = cryptoCodes.map(c => fiatBalance(fiatCode, c)) return Promise.all(fiatBalancePromises) .then(arr => { @@ -416,7 +406,7 @@ function plugins (settings) { fiatBalance: balance, cryptoCode: cryptoCodes[i], fiatCode, - deviceId + _deviceId })) }) } diff --git a/lib/routes.js b/lib/routes.js index e0fbf18a..35fe056a 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -3,15 +3,12 @@ const morgan = require('morgan') const helmet = require('helmet') const bodyParser = require('body-parser') -const BigNumber = require('bignumber.js') const _ = require('lodash/fp') const express = require('express') const options = require('./options') const logger = require('./logger') const configManager = require('./config-manager') -const db = require('./db') -const dbm = require('./postgresql_interface') const pairing = require('./pairing') const settingsLoader = require('./settings-loader') const plugins = require('./plugins') @@ -35,11 +32,11 @@ function poll (req, res, next) { const pid = req.query.pid const settings = req.settings const config = configManager.machineScoped(deviceId, settings.config) - const pi = plugins(settings) + const pi = plugins(settings, deviceId) pids[deviceId] = {pid, ts: Date.now()} - pi.pollQueries(deviceTime, deviceId, req.query) + pi.pollQueries(deviceTime, req.query) .then(results => { const cartridges = results.cartridges @@ -82,77 +79,38 @@ function poll (req, res, next) { } function postTx (req, res, next) { - return Tx.post(req.body) + console.log('DEBUG60: %j', req.settings) + const pi = plugins(req.settings, req.deviceId) + + return Tx.post(_.set('deviceId', req.deviceId, req.body), pi) .then(tx => res.json(tx)) .catch(next) } -function trade (req, res, next) { - const tx = req.body - const pi = plugins(req.settings) - - tx.cryptoAtoms = new BigNumber(tx.cryptoAtoms) - - pi.trade(req.deviceId, tx) - .then(() => cacheAndRespond(req, res)) - .catch(next) -} - function stateChange (req, res, next) { helpers.stateChange(req.deviceId, req.deviceTime, req.body) - .then(() => cacheAndRespond(req, res)) - .catch(next) -} - -function send (req, res, next) { - const pi = plugins(req.settings) - const tx = req.body - tx.cryptoAtoms = new BigNumber(tx.cryptoAtoms) - - return pi.sendCoins(req.deviceId, tx) - .then(status => { - const body = {txId: status && status.txId} - return cacheAndRespond(req, res, body) - }) - .catch(next) -} - -function cashOut (req, res, next) { - const pi = plugins(req.settings) - logger.info({tx: req.body, cmd: 'cashOut'}) - const tx = req.body - tx.cryptoAtoms = new BigNumber(tx.cryptoAtoms) - - return pi.cashOut(req.deviceId, tx) - .then(cryptoAddress => cacheAndRespond(req, res, {toAddress: cryptoAddress})) - .catch(next) -} - -function dispenseAck (req, res, next) { - const pi = plugins(req.settings) - pi.dispenseAck(req.deviceId, req.body.tx) - .then(() => cacheAndRespond(req, res)) + .then(() => respond(req, res)) .catch(next) } function deviceEvent (req, res, next) { - const pi = plugins(req.settings) - pi.logEvent(req.deviceId, req.body) - .then(() => cacheAndRespond(req, res)) + const pi = plugins(req.settings, req.deviceId) + pi.logEvent(req.body) + .then(() => respond(req, res)) .catch(next) } function verifyUser (req, res, next) { - const pi = plugins(req.settings) + const pi = plugins(req.settings, req.deviceId) pi.verifyUser(req.body) - .then(idResult => cacheAndRespond(req, res, idResult)) + .then(idResult => respond(req, res, idResult)) .catch(next) } function verifyTx (req, res, next) { - const pi = plugins(req.settings) + const pi = plugins(req.settings, req.deviceId) pi.verifyTransaction(req.body) - .then(idResult => cacheAndRespond(req, res, idResult)) + .then(idResult => respond(req, res, idResult)) .catch(next) } @@ -177,11 +135,11 @@ function pair (req, res, next) { } function phoneCode (req, res, next) { - const pi = plugins(req.settings) + const pi = plugins(req.settings, req.deviceId) const phone = req.body.phone return pi.getPhoneCode(phone) - .then(code => cacheAndRespond(req, res, {code})) + .then(code => respond(req, res, {code})) .catch(err => { if (err.name === 'BadNumberError') throw httpError('Bad number', 410) throw err @@ -189,88 +147,6 @@ function phoneCode (req, res, next) { .catch(next) } -function updatePhone (req, res, next) { - const notified = req.query.notified === 'true' - const tx = req.body - - return dbm.updatePhone(tx, notified) - .then(r => cacheAndRespond(req, res, r)) - .catch(next) -} - -function fetchPhoneTx (req, res, next) { - return helpers.fetchPhoneTx(req.query.phone) - .then(r => res.json(r)) - .catch(next) -} - -function registerRedeem (req, res, next) { - const txId = req.params.txId - return dbm.registerRedeem(txId) - .then(() => cacheAndRespond(req, res)) - .catch(next) -} - -function waitForDispense (req, res, next) { - logger.debug('waitForDispense') - return dbm.fetchTx(req.params.txId) - .then(tx => { - logger.debug('tx fetched') - logger.debug(tx) - if (!tx) return res.sendStatus(404) - if (tx.status === req.query.status) return res.sendStatus(304) - res.json({tx}) - }) - .catch(next) -} - -function dispense (req, res, next) { - const tx = req.body.tx - - return dbm.addDispenseRequest(tx) - .then(dispenseRec => cacheAndRespond(req, res, dispenseRec)) - .catch(next) -} - -function isUniqueViolation (err) { - return err.code === '23505' -} - -function cacheAction (req, res, next) { - const requestId = req.headers['request-id'] - if (!requestId) return next() - - const sql = `insert into idempotents (request_id, device_id, body, status, pending) - values ($1, $2, $3, $4, $5)` - - const deviceId = req.deviceId - - db.none(sql, [requestId, deviceId, {}, 204, true]) - .then(() => next()) - .catch(err => { - if (!isUniqueViolation(err)) throw err - - const sql2 = 'select body, status, pending from idempotents where request_id=$1' - return db.one(sql2, [requestId]) - .then(row => { - if (row.pending) return res.status(204).end() - return res.status(row.status).json(row.body) - }) - }) -} - -function updateCachedAction (req, body, status) { - const requestId = req.headers['request-id'] - if (!requestId) return Promise.resolve() - - const sql = `update idempotents set body=$1, status=$2, pending=$3 - where request_id=$4 and device_id=$5 and pending=$6` - - const deviceId = req.deviceId - - return db.none(sql, [body, status, false, requestId, deviceId, true]) -} - function errorHandler (err, req, res, next) { const statusCode = err.name === 'HttpError' ? err.code || 500 @@ -280,22 +156,14 @@ function errorHandler (err, req, res, next) { logger.error(err) - return updateCachedAction(req, json, statusCode) - .then(() => res.status(statusCode).json(json)) + return res.status(statusCode).json(json) } -function cacheAndRespond (req, res, _body, _status) { +function respond (req, res, _body, _status) { const status = _status || 200 const body = _body || {} - return updateCachedAction(req, body, status) - .then(() => res.status(status).json(body)) -} - -function pruneIdempotents () { - const sql = "delete from idempotents where created < now() - interval '24 hours'" - - return db.none(sql) + return res.status(status).json(body) } function httpError (msg, code) { @@ -339,14 +207,11 @@ const skip = options.logLevel === 'debug' const configRequiredRoutes = [ '/poll', - '/trade', - '/send', - '/cash_out', - '/dispense_ack', '/event', '/verify_user', '/verify_transaction', - '/phone_code' + '/phone_code', + '/tx' ] const app = express() @@ -364,7 +229,6 @@ app.use(populateDeviceId) if (!devMode) app.use(authorize) app.use(configRequiredRoutes, populateSettings) app.use(filterOldRequests) -app.post('*', cacheAction) app.get('/poll', poll) app.post('/state', stateChange) @@ -435,6 +299,4 @@ function populateSettings (req, res, next) { .catch(next) } -setInterval(pruneIdempotents, 60000) - module.exports = {app, localApp} diff --git a/lib/tx.js b/lib/tx.js index dd1283a2..44cb3c48 100644 --- a/lib/tx.js +++ b/lib/tx.js @@ -1,35 +1,8 @@ -const db = require('./db') -const pgp = require('pg-promise')() +const CashInTx = require('./cash-in-tx') -function postCashIn (tx) { - const TransactionMode = pgp.txMode.TransactionMode - const isolationLevel = pgp.txMode.isolationLevel - const tmSRD = new TransactionMode({tiLevel: isolationLevel.serializable}) - - function transaction (t) { - const sql = 'select * from cash_in_txs where id=$1' - return t.oneOrNone(sql, [tx.id]) - .then(row => { - const newTx = executeCashInTxChange(tx, row) - - if (row) return updateCashInTx(newTx) - insertCashInTx(newTx) - }) - } - - transaction.txMode = tmSRD - - return db.tx(transaction) - // retry failed -} - -function postCashOut (tx) { - throw new Error('not implemented') -} - -function post (tx) { - if (tx.direction === 'cashIn') return postCashIn(tx) - if (tx.direction === 'cashOut') return postCashOut(tx) +function post (tx, pi) { + if (tx.direction === 'cashIn') return CashInTx.post(tx, pi) + if (tx.direction === 'cashOut') throw new Error('not implemented') return Promise.reject(new Error('No such tx direction: %s', tx.direction)) } diff --git a/lib/wallet.js b/lib/wallet.js index 05cf92dd..062a4094 100644 --- a/lib/wallet.js +++ b/lib/wallet.js @@ -6,10 +6,21 @@ const FETCH_INTERVAL = 5000 function fetchWallet (settings, cryptoCode) { return Promise.resolve() .then(() => { + console.log('DEBUG44') + console.log('DEBUG44.0.0: %j', cryptoCode) + try { + console.log('DEBUG44.0: %j', configManager.cryptoScoped(cryptoCode, settings.config).wallet) + } catch (err) { + console.log('DEBUG44.0.e: %s', err.stack) + } const plugin = configManager.cryptoScoped(cryptoCode, settings.config).wallet + console.log('DEBUG44.1') const account = settings.accounts[plugin] + console.log('DEBUG44.2') const wallet = require('lamassu-' + plugin) + console.log('DEBUG45: %j', {wallet, account}) + return {wallet, account} }) } @@ -21,11 +32,15 @@ function balance (settings, cryptoCode) { } function sendCoins (settings, toAddress, cryptoAtoms, cryptoCode) { + console.log('DEBUG40') return fetchWallet(settings, cryptoCode) .then(r => { + console.log('DEBUG41') return r.wallet.sendCoins(r.account, toAddress, cryptoAtoms, cryptoCode) .then(res => { + console.log('DEBUG42') mem.clear(module.exports.balance) + console.log('DEBUG43: %j', res) return res }) }) diff --git a/migrations/022-add_cash_in_sent.js b/migrations/022-add_cash_in_sent.js index c3b1c982..13e80201 100644 --- a/migrations/022-add_cash_in_sent.js +++ b/migrations/022-add_cash_in_sent.js @@ -2,7 +2,8 @@ var db = require('./db') exports.up = function (next) { var sql = [ - 'alter table cash_in_txs add column sent boolean not null default false' + 'alter table cash_in_txs add column send boolean not null default false', + 'alter table cash_in_txs rename currency_code to fiat_code' ] db.multi(sql, next) }