From 493e669e6da7d7e147d6505bc9e02440bf4ae73d Mon Sep 17 00:00:00 2001 From: Josh Harvey Date: Sun, 27 Nov 2016 16:54:08 +0200 Subject: [PATCH] WIPP --- lib/exchange.js | 19 +++-- lib/plugins.js | 197 ++++++++++++++++++++++++++++-------------------- lib/routes.js | 139 ++++++++++++++-------------------- lib/ticker.js | 20 ++--- lib/wallet.js | 19 +++-- 5 files changed, 198 insertions(+), 196 deletions(-) diff --git a/lib/exchange.js b/lib/exchange.js index be5e9201..45f09ed5 100644 --- a/lib/exchange.js +++ b/lib/exchange.js @@ -1,9 +1,8 @@ -const settingsLoader = require('./settings-loader') const configManager = require('./config-manager') -function fetchExchange (cryptoCode) { - return settingsLoader.settings() - .then(settings => { +function fetchExchange (settings, cryptoCode) { + return Promise.resolve() + .then(() => { const plugin = configManager.cryptoScoped(cryptoCode, settings.config).cryptoServices.wallet if (!plugin) throw new Error('No exchange plugin defined for: ' + cryptoCode) const account = settings.accounts.plugin @@ -13,18 +12,18 @@ function fetchExchange (cryptoCode) { }) } -function buy (cryptoAtoms, fiatCode, cryptoCode) { - return fetchExchange(cryptoCode) +function buy (settings, cryptoAtoms, fiatCode, cryptoCode) { + return fetchExchange(settings, cryptoCode) .then(r => r.exchange.buy(r.account, cryptoAtoms, fiatCode, cryptoCode)) } -function sell (cryptoAtoms, fiatCode, cryptoCode) { - return fetchExchange(cryptoCode) +function sell (settings, cryptoAtoms, fiatCode, cryptoCode) { + return fetchExchange(settings, cryptoCode) .then(r => r.exchange.sell(r.account, cryptoAtoms, fiatCode, cryptoCode)) } -function active (cryptoCode) { - return fetchExchange(cryptoCode) +function active (settings, cryptoCode) { + return fetchExchange(settings, cryptoCode) .then(() => true) .catch(() => false) } diff --git a/lib/plugins.js b/lib/plugins.js index 67310c0d..15500130 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -1,5 +1,3 @@ -'use strict' - const uuid = require('uuid') const R = require('ramda') const BigNumber = require('bignumber.js') @@ -12,16 +10,14 @@ const db = require('./postgresql_interface') const logger = require('./logger') const notifier = require('./notifier') const T = require('./time') -const settingsLoader = require('./settings') const configManager = require('./config-manager') +const settingsLoader = require('./settings-loader') const ticker = require('./ticker') const wallet = require('./wallet') const exchange = require('./exchange') const sms = require('./sms') const email = require('./email') -const tradeIntervals = {} - const CHECK_NOTIFICATION_INTERVAL = T.minute const ALERT_SEND_INTERVAL = T.hour const INCOMING_TX_INTERVAL = 30 * T.seconds @@ -36,6 +32,8 @@ const SWEEP_LIVE_HD_INTERVAL = T.minute const SWEEP_OLD_HD_INTERVAL = 2 * T.minutes const TRADE_INTERVAL = T.minute const TRADE_TTL = 5 * T.minutes +const STALE_TICKER = 3 * 60 * 1000 +const STALE_BALANCE = 3 * 60 * 1000 const tradesQueues = {} @@ -47,13 +45,46 @@ const coins = { let alertFingerprint = null let lastAlertTime = null -function getConfig (machineId) { - const config = settingsLoader.settings().config - return configManager.machineScoped(machineId, config) +exports.logEvent = db.recordDeviceEvent + +function buildRates (settings, deviceId, tickers) { + const config = configManager.machineScoped(deviceId, settings.config) + const cryptoCodes = config.currencies.cryptoCurrencies + + const cashInCommission = new BigNumber(config.commissions.cashInCommission).div(100).plus(1) + const cashOutCommission = new BigNumber(config.commissions.cashOutCommission).div(100).plus(1) + + const rates = {} + + cryptoCodes.forEach((cryptoCode, i) => { + const rateRec = tickers[i] + if (Date.now() - rateRec.timestamp > STALE_TICKER) return logger.warn('Stale rate for ' + cryptoCode) + const rate = rateRec.rates + rates[cryptoCode] = { + cashIn: rate.ask.times(cashInCommission), + cashOut: rate.bid.div(cashOutCommission) + } + }) + + return rates } -exports.getConfig = getConfig -exports.logEvent = db.recordDeviceEvent +function buildBalances (settings, deviceId, balanceRecs) { + const config = configManager.machineScoped(deviceId, settings.config) + const cryptoCodes = config.currencies.cryptoCurrencies + + const balances = {} + + cryptoCodes.forEach((cryptoCode, i) => { + const balanceRec = balanceRecs[i] + if (!balanceRec) return logger.warn('No balance for ' + cryptoCode + ' yet') + if (Date.now() - balanceRec.timestamp > STALE_BALANCE) return logger.warn('Stale balance for ' + cryptoCode) + + balances[cryptoCode] = balanceRec.balance + }) + + return balances +} function buildCartridges (cartridges, virtualCartridges, rec) { return { @@ -71,16 +102,32 @@ function buildCartridges (cartridges, virtualCartridges, rec) { } } -exports.pollQueries = function pollQueries (deviceId) { - const config = getConfig(deviceId) +exports.pollQueries = function pollQueries (settings, deviceTime, deviceId, deviceRec) { + const config = configManager.machineScoped(deviceId, settings.config) + const fiatCode = config.currencies.fiatCurrency + const cryptoCodes = config.currencies.cryptoCurrencies const cartridges = [ config.currencies.topCashOutDenomination, config.currencies.bottomCashOutDenomination ] const virtualCartridges = [config.currencies.virtualCashOutDenomination] - return db.cartridgeCounts(deviceId) - .then(result => ({ - cartridges: buildCartridges(cartridges, virtualCartridges, result) - })) + const tickerPromises = cryptoCodes.map(c => ticker.getRates(settings, fiatCode, c)) + const balancePromises = cryptoCodes.map(c => wallet.balance(settings, c)) + const pingPromise = recordPing(deviceId, deviceTime, deviceRec) + + const promises = [db.cartridgeCounts(deviceId), pingPromise].concat(tickerPromises, balancePromises) + + return Promise.all(promises) + .then(arr => { + const cartridgeCounts = arr[0] + const tickers = arr.slice(2, cryptoCodes.length + 2) + const balances = arr.slice(cryptoCodes.length + 2) + + return { + cartridges: buildCartridges(cartridges, virtualCartridges, cartridgeCounts), + rates: buildRates(settings, deviceId, tickers), + balances: buildBalances(settings, deviceId, balances) + } + }) } // NOTE: This will fail if we have already sent coins because there will be @@ -137,7 +184,7 @@ exports.stateChange = function stateChange (deviceId, deviceTime, rec, cb) { return db.machineEvent(event) } -exports.recordPing = function recordPing (deviceId, deviceTime, rec, cb) { +function recordPing (deviceId, deviceTime, rec) { const event = { id: uuid.v4(), deviceId: deviceId, @@ -177,17 +224,16 @@ exports.cashOut = function cashOut (deviceId, tx) { }) } -exports.dispenseAck = function (deviceId, tx) { - const config = getConfig(deviceId) +exports.dispenseAck = function (settings, deviceId, tx) { + const config = configManager.machineScoped(deviceId, settings.config) const cartridges = [ config.currencies.topCashOutDenomination, config.currencies.bottomCashOutDenomination ] return db.addDispense(deviceId, tx, cartridges) } -exports.fiatBalance = function fiatBalance (fiatCode, cryptoCode, deviceId) { - const _config = settingsLoader.settings().config - const config = configManager.scoped(cryptoCode, deviceId, _config) +function fiatBalance (settings, fiatCode, cryptoCode, deviceId) { + const config = configManager.scoped(cryptoCode, deviceId, settings.config) return Promise.all([ticker.ticker(cryptoCode), wallet.balance(cryptoCode)]) .then(([rates, balanceRec]) => { @@ -257,11 +303,7 @@ function monitorUnnotified () { exports.startPolling = function startPolling () { executeTrades() - const cryptoCodes = getAllCryptoCodes() - cryptoCodes.forEach(cryptoCode => { - startTrader(cryptoCode) - }) - + setInterval(executeTrades, TRADE_INTERVAL) setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL) setInterval(monitorIncoming, INCOMING_TX_INTERVAL) setInterval(monitorUnnotified, UNNOTIFIED_INTERVAL) @@ -275,14 +317,6 @@ exports.startPolling = function startPolling () { sweepOldHD() } -function startTrader (cryptoCode) { - if (tradeIntervals[cryptoCode]) return - - logger.debug('[%s] startTrader', cryptoCode) - - tradeIntervals[cryptoCode] = setInterval(() => executeTrades(cryptoCode), TRADE_INTERVAL) -} - /* * Trader functions */ @@ -325,7 +359,30 @@ function consolidateTrades (cryptoCode, fiatCode) { return consolidatedTrade } -function executeTrades (cryptoCode, fiatCode) { +function executeTrades () { + return settingsLoader() + .then(settings => { + const config = settings.config + return db.devices() + .then(devices => { + const deviceIds = devices.map(device => device.device_id) + const lists = deviceIds.map(deviceId => { + const currencies = configManager.machineScoped(deviceId, config).currencies + const fiatCode = currencies.fiatCurrency + const cryptoCodes = currencies.cryptoCurrencies + return cryptoCodes.map(cryptoCode => ({fiatCode, cryptoCode})) + }) + + const tradesPromises = R.uniq(R.flatten(lists)) + .map(r => executeTradesForMarket(settings, r.fiatCode, r.cryptoCode)) + + return Promise.all(tradesPromises) + }) + }) + .catch(logger.error) +} + +function executeTradesForMarket (settings, fiatCode, cryptoCode) { const market = [fiatCode, cryptoCode].join('') logger.debug('[%s] checking for trades', market) @@ -405,59 +462,35 @@ function checkNotification () { }) } -function getCryptoCodes (deviceId) { - return settingsLoader.settings() - .then(settings => { - return configManager.machineScoped(deviceId, settings.config).currencies.cryptoCurrencies - }) -} -exports.getCryptoCodes = getCryptoCodes +function checkDeviceBalances (settings, deviceId) { + const config = configManager.machineScoped(deviceId, settings.config) + const cryptoCodes = config.currencies.cryptoCurrencies + const fiatCode = config.currencies.fiatCurrency + const fiatBalancePromises = cryptoCodes.map(c => fiatBalance(settings, fiatCode, c, deviceId)) -// Get union of all cryptoCodes from all machines -function getAllCryptoCodes () { - return Promise.all([db.devices(), settingsLoader.settings()]) - .then(([rows, settings]) => { - return rows.reduce((acc, r) => { - const cryptoCodes = configManager.machineScoped(r.device_id, settings.config).currencies.cryptoCurrencies - cryptoCodes.forEach(c => acc.add(c)) - return acc - }, new Set()) + return Promise.all(fiatBalancePromises) + .then(arr => { + return arr.map((balance, i) => ({ + fiatBalance: balance, + cryptoCode: cryptoCodes[i], + fiatCode, + deviceId + })) }) } -function getAllMarkets () { - return Promise.all([db.devices(), settingsLoader.settings()]) - .then(([rows, settings]) => { - return rows.reduce((acc, r) => { - const currencies = configManager.machineScoped(r.device_id, settings.config).currencies - const cryptoCodes = currencies.cryptoCurrencies - const fiatCodes = currencies.fiatCodes - fiatCodes.forEach(fiatCode => cryptoCodes.forEach(cryptoCode => acc.add(fiatCode + cryptoCode))) - return acc - }, new Set()) - }) -} - -function checkBalances () { - return Promise.all([getAllMarkets(), db.devices()]) - .then(([markets, devices]) => { +function checkBalances (settings) { + return db.devices() + .then(devices => { const deviceIds = devices.map(r => r.device_id) - const balances = [] + const deviceBalancePromises = deviceIds.map(deviceId => checkDeviceBalances(settings, deviceId)) - markets.forEach(market => { - const fiatCode = market.fiatCode - const cryptoCode = market.cryptoCode - const minBalance = deviceIds.map(deviceId => { - const fiatBalanceRec = exports.fiatBalance(fiatCode, cryptoCode, deviceId) - return fiatBalanceRec ? fiatBalanceRec.balance : Infinity - }) - .reduce((min, cur) => Math.min(min, cur), Infinity) - - const rec = {fiatBalance: minBalance, cryptoCode, fiatCode} - balances.push(rec) + return Promise.all(deviceBalancePromises) + .then(arr => { + const toMarket = r => r.fiatBalance + r.cryptoCode + const min = R.minBy(r => r.fiatBalance) + return R.reduceBy(min, Infinity, toMarket, R.flatten(arr)) }) - - return balances }) } diff --git a/lib/routes.js b/lib/routes.js index 08f013fd..ec913731 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -8,6 +8,7 @@ const logger = require('./logger') const configManager = require('./config-manager') const db = require('./db') const pairing = require('./pairing') +const settingsLoader = require('./settings') let plugins @@ -15,68 +16,31 @@ module.exports = { init } -const STALE_TICKER = 3 * 60 * 1000 -const STALE_BALANCE = 3 * 60 * 1000 const CLOCK_SKEW = 60 * 1000 const REQUEST_TTL = 3 * 60 * 1000 const pids = {} const reboots = {} -function buildRates (deviceId) { - const cryptoCodes = plugins.getCryptoCodes() - const config = plugins.getConfig(deviceId) - - const cashInCommission = new BigNumber(config.commissions.cashInCommission).div(100).plus(1) - const cashOutCommission = new BigNumber(config.commissions.cashOutCommission).div(100).plus(1) - - const rates = {} - cryptoCodes.forEach(cryptoCode => { - const _rate = plugins.getDeviceRate(cryptoCode) - if (!_rate) return logger.warn('No rate for ' + cryptoCode + ' yet') - if (Date.now() - _rate.timestamp > STALE_TICKER) return logger.warn('Stale rate for ' + cryptoCode) - const rate = _rate.rates - rates[cryptoCode] = { - cashIn: rate.ask.times(cashInCommission), - cashOut: rate.bid.div(cashOutCommission) - } +function loadSettings (req, res, next) { + settingsLoader.settings() + .then(settings => { + req.settings = settings + next() }) - - return rates + .catch(next) } -function buildBalances (deviceId) { - const cryptoCodes = plugins.getCryptoCodes(deviceId) - const fiatCode = plugins.getFiatCode(deviceId) - const _balances = {} - - cryptoCodes.forEach(cryptoCode => { - const balanceRec = plugins.fiatBalance(fiatCode, cryptoCode, deviceId) - if (!balanceRec) return logger.warn('No balance for ' + cryptoCode + ' yet') - if (Date.now() - balanceRec.timestamp > STALE_BALANCE) return logger.warn('Stale balance for ' + cryptoCode) - - _balances[cryptoCode] = balanceRec.balance - }) - - return _balances -} - -function poll (req, res) { +function poll (req, res, next) { const deviceId = req.deviceId const deviceTime = req.deviceTime const pid = req.query.pid + const settings = req.settings + const config = configManager.machineScoped(deviceId, settings.config) pids[deviceId] = {pid, ts: Date.now()} - let rates = {} - let balances = {} - - rates = buildRates(deviceId) - balances = buildBalances(deviceId) - - const config = plugins.getConfig(deviceId) - - plugins.pollQueries(deviceId) + plugins.pollQueries(settings, deviceTime, deviceId, req.query) .then(results => { const cartridges = results.cartridges @@ -102,8 +66,8 @@ function poll (req, res) { zeroConfLimit: config.commissions.zeroConfLimit, fiatTxLimit: config.limits.cashOutTransactionLimit, reboot, - rates, - balances, + rates: results.rates, + balances: results.balances, coins: config.currencies.cryptoCurrencies } @@ -111,32 +75,31 @@ function poll (req, res) { response.idVerificationLimit = config.compliance.idVerificationLimit } - res.json(response) + return res.json(response) }) - .catch(e => { console.log(e); logger.error(e) }) - - plugins.recordPing(deviceId, deviceTime, req.query) - .catch(logger.error) + .catch(next) } function trade (req, res, next) { const tx = req.body tx.cryptoAtoms = new BigNumber(tx.cryptoAtoms) - plugins.trade(req.deviceId, tx) + plugins.trade(req.settings, req.deviceId, tx) .then(() => cacheAndRespond(req, res)) + .catch(next) } -function stateChange (req, res) { - plugins.stateChange(req.deviceId, req.deviceTime, req.body) +function stateChange (req, res, next) { + plugins.stateChange(req.settings, req.deviceId, req.deviceTime, req.body) .then(() => cacheAndRespond(req, res)) + .catch(next) } function send (req, res, next) { const tx = req.body tx.cryptoAtoms = new BigNumber(tx.cryptoAtoms) - return plugins.sendCoins(req.deviceId, tx) + return plugins.sendCoins(req.settings, req.deviceId, tx) .then(status => { const body = {txId: status && status.txId} return cacheAndRespond(req, res, body) @@ -144,33 +107,38 @@ function send (req, res, next) { .catch(next) } -function cashOut (req, res) { +function cashOut (req, res, next) { logger.info({tx: req.body, cmd: 'cashOut'}) const tx = req.body tx.cryptoAtoms = new BigNumber(tx.cryptoAtoms) - return plugins.cashOut(req.deviceId, tx) + return plugins.cashOut(req.settings, req.deviceId, tx) .then(cryptoAddress => cacheAndRespond(req, res, {toAddress: cryptoAddress})) + .catch(next) } -function dispenseAck (req, res) { - plugins.dispenseAck(req.deviceId, req.body.tx) +function dispenseAck (req, res, next) { + plugins.dispenseAck(req.settings, req.deviceId, req.body.tx) .then(() => cacheAndRespond(req, res)) + .catch(next) } -function deviceEvent (req, res) { +function deviceEvent (req, res, next) { plugins.logEvent(req.deviceId, req.body) .then(() => cacheAndRespond(req, res)) + .catch(next) } -function verifyUser (req, res) { - plugins.verifyUser(req.body) +function verifyUser (req, res, next) { + plugins.verifyUser(req.settings, req.body) .then(idResult => cacheAndRespond(req, res, idResult)) + .catch(next) } -function verifyTx (req, res) { - plugins.verifyTransaction(req.body) +function verifyTx (req, res, next) { + plugins.verifyTransaction(req.settings, req.body) .then(idResult => cacheAndRespond(req, res, idResult)) + .catch(next) } function ca (req, res) { @@ -193,36 +161,38 @@ function pair (req, res, next) { .catch(next) } -function phoneCode (req, res) { +function phoneCode (req, res, next) { const phone = req.body.phone - logger.debug('Phone code requested for: ' + phone) - - return plugins.getPhoneCode(phone) + return plugins.getPhoneCode(req.settings, phone) .then(code => cacheAndRespond(req, res, {code})) .catch(err => { if (err.name === 'BadNumberError') throw httpError('Bad number', 410) throw err }) + .catch(next) } -function updatePhone (req, res) { +function updatePhone (req, res, next) { const notified = req.query.notified === 'true' const tx = req.body return plugins.updatePhone(tx, notified) .then(r => cacheAndRespond(req, res, r)) + .catch(next) } -function fetchPhoneTx (req, res) { +function fetchPhoneTx (req, res, next) { return plugins.fetchPhoneTx(req.query.phone) .then(r => res.json(r)) + .catch(next) } -function registerRedeem (req, res) { +function registerRedeem (req, res, next) { const txId = req.params.txId return plugins.registerRedeem(txId) .then(() => cacheAndRespond(req, res)) + .catch(next) } function waitForDispense (req, res, next) { @@ -238,11 +208,12 @@ function waitForDispense (req, res, next) { .catch(next) } -function dispense (req, res) { +function dispense (req, res, next) { const tx = req.body.tx return plugins.requestDispense(tx) .then(dispenseRec => cacheAndRespond(req, res, dispenseRec)) + .catch(next) } function isUniqueViolation (err) { @@ -363,19 +334,19 @@ function init (opts) { app.post('/pair', pair) app.get('/ca', ca) - app.get('/poll', authMiddleware, poll) + app.get('/poll', authMiddleware, loadSettings, poll) - app.post('/trade', authMiddleware, trade) - app.post('/send', authMiddleware, send) - app.post('/state', authMiddleware, stateChange) - app.post('/cash_out', authMiddleware, cashOut) - app.post('/dispense_ack', authMiddleware, dispenseAck) + app.post('/trade', authMiddleware, loadSettings, trade) + app.post('/send', authMiddleware, loadSettings, send) + app.post('/state', authMiddleware, loadSettings, stateChange) + app.post('/cash_out', authMiddleware, loadSettings, cashOut) + app.post('/dispense_ack', authMiddleware, loadSettings, dispenseAck) app.post('/event', authMiddleware, deviceEvent) - app.post('/verify_user', authMiddleware, verifyUser) - app.post('/verify_transaction', authMiddleware, verifyTx) + app.post('/verify_user', authMiddleware, loadSettings, verifyUser) + app.post('/verify_transaction', authMiddleware, loadSettings, verifyTx) - app.post('/phone_code', authMiddleware, phoneCode) + app.post('/phone_code', authMiddleware, loadSettings, phoneCode) app.post('/update_phone', authMiddleware, updatePhone) app.get('/phone_tx', authMiddleware, fetchPhoneTx) app.post('/register_redeem/:txId', authMiddleware, registerRedeem) diff --git a/lib/ticker.js b/lib/ticker.js index 30a89d3d..7ca7c8f7 100644 --- a/lib/ticker.js +++ b/lib/ticker.js @@ -1,20 +1,20 @@ const mem = require('mem') -const settingsLoader = require('./settings-loader') const configManager = require('./config-manager') const FETCH_INTERVAL = 10000 -function getRates (fiatCode, cryptoCode) { +function getRates (settings, fiatCode, cryptoCode) { return Promise.resolve() .then(() => { - return settingsLoader.settings() - .then(settings => { - const config = settings.config - const plugin = configManager.cryptoScoped(cryptoCode, config).cryptoServices.ticker - const account = settings.accounts.plugin - const ticker = require('lamassu-' + plugin) + const config = settings.config + const plugin = configManager.cryptoScoped(cryptoCode, config).cryptoServices.ticker + const account = settings.accounts.plugin + const ticker = require('lamassu-' + plugin) - return ticker.ticker(account, fiatCode, cryptoCode) - }) + return ticker.ticker(account, fiatCode, cryptoCode) + .then(r => ({ + rates: r.rates, + timestamp: Date.now() + })) }) } diff --git a/lib/wallet.js b/lib/wallet.js index 64277de2..b8752e17 100644 --- a/lib/wallet.js +++ b/lib/wallet.js @@ -1,12 +1,11 @@ const mem = require('mem') -const settingsLoader = require('./settings-loader') const configManager = require('./config-manager') const FETCH_INTERVAL = 5000 -function fetchWallet (cryptoCode) { - return settingsLoader.settings() - .then(settings => { +function fetchWallet (settings, cryptoCode) { + return Promise.resolve() + .then(() => { const plugin = configManager.cryptoScoped(cryptoCode, settings.config).cryptoServices.wallet const account = settings.accounts.plugin const wallet = require('lamassu-' + plugin) @@ -15,8 +14,8 @@ function fetchWallet (cryptoCode) { }) } -function balance (cryptoCode) { - return fetchWallet(cryptoCode) +function balance (settings, cryptoCode) { + return fetchWallet(settings, cryptoCode) .then(r => r.wallet.balance(r.account)) .then(balance => ({balance, timestamp: Date.now()})) } @@ -32,13 +31,13 @@ function sendCoins (toAddress, cryptoAtoms, cryptoCode) { }) } -function newAddress (cryptoCode, info) { - return fetchWallet(cryptoCode) +function newAddress (settings, cryptoCode, info) { + return fetchWallet(settings, cryptoCode) .then(r => r.wallet.newAddress(r.account, cryptoCode, info)) } -function getStatus (toAdress, cryptoAtoms, cryptoCode) { - return fetchWallet(cryptoCode) +function getStatus (settings, toAdress, cryptoAtoms, cryptoCode) { + return fetchWallet(settings, cryptoCode) .then(r => r.wallet.getStatus(r.account, toAdress, cryptoAtoms, cryptoCode)) }