From a375adb8b98ef1a8700316001cbf4663513824d3 Mon Sep 17 00:00:00 2001 From: Josh Harvey Date: Thu, 8 Dec 2016 17:56:50 +0200 Subject: [PATCH] WIPP --- lib/plugins.js | 980 ++++++++++++++++++++--------------------- lib/poller.js | 12 +- lib/settings-loader.js | 20 + 3 files changed, 516 insertions(+), 496 deletions(-) diff --git a/lib/plugins.js b/lib/plugins.js index 7396f118..4e311446 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -10,7 +10,6 @@ const logger = require('./logger') const notifier = require('./notifier') const T = require('./time') const configManager = require('./config-manager') -const settingsLoader = require('./settings-loader') const ticker = require('./ticker') const wallet = require('./wallet') const exchange = require('./exchange') @@ -38,550 +37,545 @@ const coins = { let alertFingerprint = null let lastAlertTime = null -function buildRates (deviceId, tickers) { - const settings = settingsLoader.settings() - const config = configManager.machineScoped(deviceId, settings.config) - const cryptoCodes = config.cryptoCurrencies - const cashOut = config.cashOutEnabled +function plugins (settings) { + function buildRates (deviceId, tickers) { + const config = configManager.machineScoped(deviceId, settings.config) + const cryptoCodes = config.cryptoCurrencies + const cashOut = config.cashOutEnabled - const cashInCommission = new BigNumber(config.cashInCommission).div(100).plus(1) - const cashOutCommission = cashOut && new BigNumber(config.cashOutCommission).div(100).plus(1) + const cashInCommission = new BigNumber(config.cashInCommission).div(100).plus(1) + const cashOutCommission = cashOut && new BigNumber(config.cashOutCommission).div(100).plus(1) - const rates = {} + const rates = {} - cryptoCodes.forEach((cryptoCode, i) => { - const rateRec = tickers[i] - if (Date.now() - rateRec.timestamp > STALE_TICKER) return logger.warn('Stale rate for ' + cryptoCode) - const rate = rateRec.rates - rates[cryptoCode] = { - cashIn: rate.ask.times(cashInCommission), - cashOut: cashOut ? rate.bid.div(cashOutCommission) : undefined - } - }) - - return rates -} - -function buildBalances (deviceId, balanceRecs) { - const settings = settingsLoader.settings() - const config = configManager.machineScoped(deviceId, settings.config) - const cryptoCodes = config.cryptoCurrencies - - const balances = {} - - cryptoCodes.forEach((cryptoCode, i) => { - const balanceRec = balanceRecs[i] - if (!balanceRec) return logger.warn('No balance for ' + cryptoCode + ' yet') - if (Date.now() - balanceRec.timestamp > STALE_BALANCE) return logger.warn('Stale balance for ' + cryptoCode) - - balances[cryptoCode] = balanceRec.balance - }) - - return balances -} - -function buildCartridges (cartridges, virtualCartridges, rec) { - return { - cartridges: [ - { - denomination: parseInt(cartridges[0], 10), - count: parseInt(rec.counts[0], 10) - }, - { - denomination: parseInt(cartridges[1], 10), - count: parseInt(rec.counts[1], 10) + cryptoCodes.forEach((cryptoCode, i) => { + const rateRec = tickers[i] + if (Date.now() - rateRec.timestamp > STALE_TICKER) return logger.warn('Stale rate for ' + cryptoCode) + const rate = rateRec.rates + rates[cryptoCode] = { + cashIn: rate.ask.times(cashInCommission), + cashOut: cashOut ? rate.bid.div(cashOutCommission) : undefined } - ], - virtualCartridges + }) + + return rates } -} -function pollQueries (deviceTime, deviceId, deviceRec) { - const settings = settingsLoader.settings() - const config = configManager.machineScoped(deviceId, settings.config) - const fiatCode = config.fiatCurrency - const cryptoCodes = config.cryptoCurrencies - const cartridges = [ config.topCashOutDenomination, - config.bottomCashOutDenomination ] - const virtualCartridges = [config.virtualCashOutDenomination] + function buildBalances (deviceId, balanceRecs) { + const config = configManager.machineScoped(deviceId, settings.config) + const cryptoCodes = config.cryptoCurrencies - const tickerPromises = cryptoCodes.map(c => ticker.getRates(fiatCode, c)) - const balancePromises = cryptoCodes.map(wallet.balance) - const pingPromise = recordPing(deviceId, deviceTime, deviceRec) + const balances = {} - const promises = [dbm.cartridgeCounts(deviceId), pingPromise].concat(tickerPromises, balancePromises) + cryptoCodes.forEach((cryptoCode, i) => { + const balanceRec = balanceRecs[i] + if (!balanceRec) return logger.warn('No balance for ' + cryptoCode + ' yet') + if (Date.now() - balanceRec.timestamp > STALE_BALANCE) return logger.warn('Stale balance for ' + cryptoCode) - return Promise.all(promises) - .then(arr => { - const cartridgeCounts = arr[0] - const tickers = arr.slice(2, cryptoCodes.length + 2) - const balances = arr.slice(cryptoCodes.length + 2) + balances[cryptoCode] = balanceRec.balance + }) + return balances + } + + function buildCartridges (cartridges, virtualCartridges, rec) { return { - cartridges: buildCartridges(cartridges, virtualCartridges, cartridgeCounts), - rates: buildRates(deviceId, tickers), - balances: buildBalances(deviceId, balances) + cartridges: [ + { + denomination: parseInt(cartridges[0], 10), + count: parseInt(rec.counts[0], 10) + }, + { + denomination: parseInt(cartridges[1], 10), + count: parseInt(rec.counts[1], 10) + } + ], + virtualCartridges } - }) -} + } -// NOTE: This will fail if we have already sent coins because there will be -// a dbm unique dbm record in the table already. -function executeTx (deviceId, tx) { - return dbm.addOutgoingTx(deviceId, tx) - .then(() => wallet.sendCoins(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)) - .then(txHash => { - const fee = null // Need to fill this out in plugins - const toSend = {cryptoAtoms: tx.cryptoAtoms, fiat: tx.fiat} + function pollQueries (deviceTime, deviceId, deviceRec) { + const config = configManager.machineScoped(deviceId, settings.config) + const fiatCode = config.fiatCurrency + const cryptoCodes = config.cryptoCurrencies + const cartridges = [ config.topCashOutDenomination, + config.bottomCashOutDenomination ] + const virtualCartridges = [config.virtualCashOutDenomination] - return dbm.sentCoins(tx, toSend, fee, null, txHash) - .then(() => ({ - statusCode: 201, // Created - txHash, - txId: tx.id - })) - }) -} + const tickerPromises = cryptoCodes.map(c => ticker.getRates(fiatCode, c)) + const balancePromises = cryptoCodes.map(wallet.balance) + const pingPromise = recordPing(deviceId, deviceTime, deviceRec) -function trade (deviceId, rawTrade) { - // TODO: move this to dbm, too - // add bill to trader queue (if trader is enabled) - const cryptoCode = rawTrade.cryptoCode - const fiatCode = rawTrade.fiatCode - const cryptoAtoms = rawTrade.cryptoAtoms + const promises = [dbm.cartridgeCounts(deviceId), pingPromise].concat(tickerPromises, balancePromises) - return dbm.recordBill(deviceId, rawTrade) - .then(() => { + return Promise.all(promises) + .then(arr => { + const cartridgeCounts = arr[0] + const tickers = arr.slice(2, cryptoCodes.length + 2) + const balances = arr.slice(cryptoCodes.length + 2) + + return { + cartridges: buildCartridges(cartridges, virtualCartridges, cartridgeCounts), + rates: buildRates(deviceId, tickers), + balances: buildBalances(deviceId, balances) + } + }) + } + + // NOTE: This will fail if we have already sent coins because there will be + // a dbm unique dbm record in the table already. + function executeTx (deviceId, tx) { + return dbm.addOutgoingTx(deviceId, tx) + .then(() => wallet.sendCoins(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)) + .then(txHash => { + const fee = null // Need to fill this out in plugins + const toSend = {cryptoAtoms: tx.cryptoAtoms, fiat: tx.fiat} + + return dbm.sentCoins(tx, toSend, fee, null, txHash) + .then(() => ({ + statusCode: 201, // Created + txHash, + txId: tx.id + })) + }) + } + + function trade (deviceId, rawTrade) { + // TODO: move this to dbm, too + // add bill to trader queue (if trader is enabled) + const cryptoCode = rawTrade.cryptoCode + const fiatCode = rawTrade.fiatCode + const cryptoAtoms = rawTrade.cryptoAtoms + + return dbm.recordBill(deviceId, rawTrade) + .then(() => { + const market = [fiatCode, cryptoCode].join('') + + if (!exchange.active(cryptoCode)) return + + logger.debug('[%s] Pushing trade: %d', market, cryptoAtoms) + if (!tradesQueues[market]) tradesQueues[market] = [] + tradesQueues[market].push({ + fiatCode, + cryptoAtoms, + cryptoCode, + timestamp: Date.now() + }) + }) + } + + function stateChange (deviceId, deviceTime, rec) { + const event = { + id: rec.uuid, + deviceId: deviceId, + eventType: 'stateChange', + note: JSON.stringify({state: rec.state, isIdle: rec.isIdle, txId: rec.txId}), + deviceTime: deviceTime + } + return dbm.machineEvent(event) + } + + function recordPing (deviceId, deviceTime, rec) { + const event = { + id: uuid.v4(), + deviceId: deviceId, + eventType: 'ping', + note: JSON.stringify({state: rec.state, isIdle: rec.idle === 'true', txId: rec.txId}), + deviceTime: deviceTime + } + return dbm.machineEvent(event) + } + + function sendCoins (deviceId, rawTx) { + return executeTx(deviceId, rawTx) + } + + function cashOut (deviceId, tx) { + const cryptoCode = tx.cryptoCode + + const serialPromise = wallet.supportsHD + ? dbm.nextCashOutSerialHD(tx.id, cryptoCode) + : Promise.resolve() + + return serialPromise + .then(serialNumber => { + const info = { + label: 'TX ' + Date.now(), + account: 'deposit', + serialNumber + } + + return wallet.newAddress(cryptoCode, info) + .then(address => { + const newTx = R.assoc('toAddress', address, tx) + + return dbm.addInitialIncoming(deviceId, newTx, address) + .then(() => address) + }) + }) + } + + function dispenseAck (deviceId, tx) { + const config = configManager.machineScoped(deviceId, settings.config) + const cartridges = [ config.topCashOutDenomination, + config.bottomCashOutDenomination ] + + return dbm.addDispense(deviceId, tx, cartridges) + } + + function fiatBalance (fiatCode, cryptoCode, deviceId) { + const config = configManager.scoped(cryptoCode, deviceId, settings.config) + + return Promise.all([ticker.getRates(fiatCode, cryptoCode), wallet.balance(cryptoCode)]) + .then(([rates, balanceRec]) => { + const rawRate = rates.rates.ask + const commission = (new BigNumber(config.cashInCommission).div(100)).plus(1) + const balance = balanceRec.balance + + if (!rawRate || !balance) return null + + // The rate is actually our commission times real rate. + const rate = rawRate.times(commission) + + // `lowBalanceMargin` is our safety net. It's a number > 1, and we divide + // all our balances by it to provide a safety margin. + const lowBalanceMargin = (new BigNumber(config.lowBalanceMargin).div(100)).plus(1) + + const unitScale = new BigNumber(10).pow(coins[cryptoCode].unitScale) + const fiatTransferBalance = balance.div(unitScale).times(rate).div(lowBalanceMargin) + + return {timestamp: balanceRec.timestamp, balance: fiatTransferBalance.round(3).toNumber()} + }) + } + + function processTxStatus (tx) { + return wallet.getStatus(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode) + .then(res => dbm.updateTxStatus(tx, res.status)) + } + + function notifyConfirmation (tx) { + logger.debug('notifyConfirmation') + + const phone = tx.phone + const rec = { + sms: { + toNumber: phone, + body: 'Your cash is waiting! Go to the Cryptomat and press Redeem.' + } + } + + return sms.sendMessage(rec) + .then(() => dbm.updateNotify(tx)) + } + + function monitorLiveIncoming () { + const statuses = ['notSeen', 'published', 'insufficientFunds'] + + return dbm.fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE) + .then(txs => Promise.all(txs.map(processTxStatus))) + .catch(logger.error) + } + + function monitorIncoming () { + const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] + + return dbm.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) + .then(txs => Promise.all(txs.map(processTxStatus))) + .catch(logger.error) + } + + function monitorUnnotified () { + dbm.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE) + .then(txs => Promise.all(txs.map(notifyConfirmation))) + .catch(logger.error) + } + + function pong () { + db.none('insert into server_events (event_type) values ($1)', ['ping']) + .catch(logger.error) + } + + function pongClear () { + const sql = `delete from server_events + where event_type=$1 + and created < now() - interval $2` + + db.none(sql, ['ping', PONG_TTL]) + .catch(logger.error) + } + + /* + * Trader functions + */ + + function consolidateTrades (cryptoCode, fiatCode) { const market = [fiatCode, cryptoCode].join('') - if (!exchange.active(cryptoCode)) return + const marketTradesQueues = tradesQueues[market] + if (!marketTradesQueues || marketTradesQueues.length === 0) return null - logger.debug('[%s] Pushing trade: %d', market, cryptoAtoms) - if (!tradesQueues[market]) tradesQueues[market] = [] - tradesQueues[market].push({ + logger.debug('[%s] tradesQueues size: %d', market, marketTradesQueues.length) + logger.debug('[%s] tradesQueues head: %j', market, marketTradesQueues[0]) + + const t1 = Date.now() + + const filtered = marketTradesQueues + .filter(tradeEntry => { + console.log('DEBUG33: %j, %s, %s, %s', tradeEntry, t1, tradeEntry.timestamp, TRADE_TTL) + return t1 - tradeEntry.timestamp < TRADE_TTL + }) + + const filteredCount = marketTradesQueues.length - filtered.length + + if (filteredCount > 0) { + tradesQueues[market] = filtered + logger.debug('[%s] expired %d trades', market, filteredCount) + } + + if (filtered.length === 0) return null + + const cryptoAtoms = filtered + .reduce((prev, current) => prev.plus(current.cryptoAtoms), new BigNumber(0)) + + const timestamp = filtered.map(r => r.timestamp).reduce((acc, r) => Math.max(acc, r), 0) + + const consolidatedTrade = { fiatCode, cryptoAtoms, cryptoCode, - timestamp: Date.now() + timestamp + } + + tradesQueues[market] = [] + + logger.debug('[%s] consolidated: %j', market, consolidatedTrade) + return consolidatedTrade + } + + function executeTrades () { + return dbm.devices() + .then(devices => { + const deviceIds = devices.map(device => device.device_id) + const lists = deviceIds.map(deviceId => { + const config = configManager.machineScoped(deviceId, settings.config) + const fiatCode = config.fiatCurrency + const cryptoCodes = config.cryptoCurrencies + + return cryptoCodes.map(cryptoCode => ({fiatCode, cryptoCode})) + }) + + const tradesPromises = R.uniq(R.flatten(lists)) + .map(r => executeTradesForMarket(settings, r.fiatCode, r.cryptoCode)) + + return Promise.all(tradesPromises) }) - }) -} - -function stateChange (deviceId, deviceTime, rec) { - const event = { - id: rec.uuid, - deviceId: deviceId, - eventType: 'stateChange', - note: JSON.stringify({state: rec.state, isIdle: rec.isIdle, txId: rec.txId}), - deviceTime: deviceTime + .catch(logger.error) } - return dbm.machineEvent(event) -} -function recordPing (deviceId, deviceTime, rec) { - const event = { - id: uuid.v4(), - deviceId: deviceId, - eventType: 'ping', - note: JSON.stringify({state: rec.state, isIdle: rec.idle === 'true', txId: rec.txId}), - deviceTime: deviceTime - } - return dbm.machineEvent(event) -} + function executeTradesForMarket (settings, fiatCode, cryptoCode) { + if (!exchange.active(cryptoCode)) return -function sendCoins (deviceId, rawTx) { - return executeTx(deviceId, rawTx) -} + const market = [fiatCode, cryptoCode].join('') + logger.debug('[%s] checking for trades', market) -function cashOut (deviceId, tx) { - const cryptoCode = tx.cryptoCode + const tradeEntry = consolidateTrades(cryptoCode, fiatCode) + if (tradeEntry === null) return logger.debug('[%s] no trades', market) - const serialPromise = wallet.supportsHD - ? dbm.nextCashOutSerialHD(tx.id, cryptoCode) - : Promise.resolve() - - return serialPromise - .then(serialNumber => { - const info = { - label: 'TX ' + Date.now(), - account: 'deposit', - serialNumber + if (tradeEntry.cryptoAtoms.eq(0)) { + logger.debug('[%s] rejecting 0 trade', market) + return } - return wallet.newAddress(cryptoCode, info) - .then(address => { - const newTx = R.assoc('toAddress', address, tx) + logger.debug('[%s] making a trade: %d', market, tradeEntry.cryptoAtoms.toString()) - return dbm.addInitialIncoming(deviceId, newTx, address) - .then(() => address) + return exchange.buy(tradeEntry.cryptoAtoms, tradeEntry.fiatCode, tradeEntry.cryptoCode) + .then(() => logger.debug('[%s] Successful trade.', market)) + .catch(err => { + tradesQueues[market].push(tradeEntry) + if (err.name === 'NoExchangeError') return logger.debug(err.message) + logger.error(err) }) - }) -} - -function dispenseAck (deviceId, tx) { - const settings = settingsLoader.settings() - const config = configManager.machineScoped(deviceId, settings.config) - const cartridges = [ config.topCashOutDenomination, - config.bottomCashOutDenomination ] - - return dbm.addDispense(deviceId, tx, cartridges) -} - -function fiatBalance (fiatCode, cryptoCode, deviceId) { - const settings = settingsLoader.settings() - const config = configManager.scoped(cryptoCode, deviceId, settings.config) - - return Promise.all([ticker.getRates(fiatCode, cryptoCode), wallet.balance(cryptoCode)]) - .then(([rates, balanceRec]) => { - const rawRate = rates.rates.ask - const commission = (new BigNumber(config.cashInCommission).div(100)).plus(1) - const balance = balanceRec.balance - - if (!rawRate || !balance) return null - - // The rate is actually our commission times real rate. - const rate = rawRate.times(commission) - - // `lowBalanceMargin` is our safety net. It's a number > 1, and we divide - // all our balances by it to provide a safety margin. - const lowBalanceMargin = (new BigNumber(config.lowBalanceMargin).div(100)).plus(1) - - const unitScale = new BigNumber(10).pow(coins[cryptoCode].unitScale) - const fiatTransferBalance = balance.div(unitScale).times(rate).div(lowBalanceMargin) - - return {timestamp: balanceRec.timestamp, balance: fiatTransferBalance.round(3).toNumber()} - }) -} - -function processTxStatus (tx) { - return wallet.getStatus(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode) - .then(res => dbm.updateTxStatus(tx, res.status)) -} - -function notifyConfirmation (tx) { - logger.debug('notifyConfirmation') - - const phone = tx.phone - const rec = { - sms: { - toNumber: phone, - body: 'Your cash is waiting! Go to the Cryptomat and press Redeem.' - } } - return sms.sendMessage(rec) - .then(() => dbm.updateNotify(tx)) -} + function sendMessage (rec) { + const config = configManager.unscoped(settings.config) -function monitorLiveIncoming () { - const statuses = ['notSeen', 'published', 'insufficientFunds'] + let promises = [] + if (config.notificationsEmailEnabled) promises.push(email.sendMessage(rec)) + if (config.notificationsSMSEnabled) promises.push(sms.sendMessage(rec)) - return dbm.fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE) - .then(txs => Promise.all(txs.map(processTxStatus))) - .catch(logger.error) -} - -function monitorIncoming () { - const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] - - return dbm.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) - .then(txs => Promise.all(txs.map(processTxStatus))) - .catch(logger.error) -} - -function monitorUnnotified () { - dbm.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE) - .then(txs => Promise.all(txs.map(notifyConfirmation))) - .catch(logger.error) -} - -function pong () { - db.none('insert into server_events (event_type) values ($1)', ['ping']) - .catch(logger.error) -} - -function pongClear () { - const sql = `delete from server_events - where event_type=$1 - and created < now() - interval $2` - - db.none(sql, ['ping', PONG_TTL]) - .catch(logger.error) -} - -/* - * Trader functions - */ - -function consolidateTrades (cryptoCode, fiatCode) { - const market = [fiatCode, cryptoCode].join('') - - const marketTradesQueues = tradesQueues[market] - if (!marketTradesQueues || marketTradesQueues.length === 0) return null - - logger.debug('[%s] tradesQueues size: %d', market, marketTradesQueues.length) - logger.debug('[%s] tradesQueues head: %j', market, marketTradesQueues[0]) - - const t1 = Date.now() - - const filtered = marketTradesQueues - .filter(tradeEntry => { - console.log('DEBUG33: %j, %s, %s, %s', tradeEntry, t1, tradeEntry.timestamp, TRADE_TTL) - return t1 - tradeEntry.timestamp < TRADE_TTL - }) - - const filteredCount = marketTradesQueues.length - filtered.length - - if (filteredCount > 0) { - tradesQueues[market] = filtered - logger.debug('[%s] expired %d trades', market, filteredCount) + return Promise.all(promises) } - if (filtered.length === 0) return null - - const cryptoAtoms = filtered - .reduce((prev, current) => prev.plus(current.cryptoAtoms), new BigNumber(0)) - - const timestamp = filtered.map(r => r.timestamp).reduce((acc, r) => Math.max(acc, r), 0) - - const consolidatedTrade = { - fiatCode, - cryptoAtoms, - cryptoCode, - timestamp - } - - tradesQueues[market] = [] - - logger.debug('[%s] consolidated: %j', market, consolidatedTrade) - return consolidatedTrade -} - -function executeTrades () { - const settings = settingsLoader.settings() - - return dbm.devices() - .then(devices => { - const deviceIds = devices.map(device => device.device_id) - const lists = deviceIds.map(deviceId => { - const config = configManager.machineScoped(deviceId, settings.config) - const fiatCode = config.fiatCurrency - const cryptoCodes = config.cryptoCurrencies - - return cryptoCodes.map(cryptoCode => ({fiatCode, cryptoCode})) - }) - - const tradesPromises = R.uniq(R.flatten(lists)) - .map(r => executeTradesForMarket(settings, r.fiatCode, r.cryptoCode)) - - return Promise.all(tradesPromises) - }) - .catch(logger.error) -} - -function executeTradesForMarket (settings, fiatCode, cryptoCode) { - if (!exchange.active(cryptoCode)) return - - const market = [fiatCode, cryptoCode].join('') - logger.debug('[%s] checking for trades', market) - - const tradeEntry = consolidateTrades(cryptoCode, fiatCode) - if (tradeEntry === null) return logger.debug('[%s] no trades', market) - - if (tradeEntry.cryptoAtoms.eq(0)) { - logger.debug('[%s] rejecting 0 trade', market) - return - } - - logger.debug('[%s] making a trade: %d', market, tradeEntry.cryptoAtoms.toString()) - - return exchange.buy(tradeEntry.cryptoAtoms, tradeEntry.fiatCode, tradeEntry.cryptoCode) - .then(() => logger.debug('[%s] Successful trade.', market)) - .catch(err => { - tradesQueues[market].push(tradeEntry) - if (err.name === 'NoExchangeError') return logger.debug(err.message) - logger.error(err) - }) -} - -function sendMessage (rec) { - const settings = settingsLoader.settings() - const config = configManager.unscoped(settings.config) - - let promises = [] - if (config.notificationsEmailEnabled) promises.push(email.sendMessage(rec)) - if (config.notificationsSMSEnabled) promises.push(sms.sendMessage(rec)) - - return Promise.all(promises) -} - -function sendNoAlerts () { - const subject = '[Lamassu] All clear' - const rec = { - sms: { - body: subject - }, - email: { - subject, - body: 'No errors are reported for your machines.' - } - } - return sendMessage(rec) -} - -function checkNotification () { - return notifier.checkStatus() - .then(alertRec => { - const currentAlertFingerprint = notifier.alertFingerprint(alertRec) - if (!currentAlertFingerprint) { - const inAlert = !!alertFingerprint - alertFingerprint = null - lastAlertTime = null - if (inAlert) return sendNoAlerts() - } - - const alertChanged = currentAlertFingerprint === alertFingerprint && - lastAlertTime - Date.now() < ALERT_SEND_INTERVAL - if (alertChanged) return - - const subject = notifier.alertSubject(alertRec) + function sendNoAlerts () { + const subject = '[Lamassu] All clear' const rec = { sms: { body: subject }, email: { subject, - body: notifier.printEmailAlerts(alertRec) + body: 'No errors are reported for your machines.' } } - alertFingerprint = currentAlertFingerprint - lastAlertTime = Date.now() - return sendMessage(rec) - }) - .then(results => { - if (results && results.length > 0) logger.debug('Successfully sent alerts') - }) - .catch(logger.error) -} - -function checkDeviceBalances (deviceId) { - const settings = settingsLoader.settings() - const config = configManager.machineScoped(deviceId, settings.config) - const cryptoCodes = config.cryptoCurrencies - const fiatCode = config.fiatCurrency - const fiatBalancePromises = cryptoCodes.map(c => fiatBalance(fiatCode, c, deviceId)) - - return Promise.all(fiatBalancePromises) - .then(arr => { - return arr.map((balance, i) => ({ - fiatBalance: balance, - cryptoCode: cryptoCodes[i], - fiatCode, - deviceId - })) - }) -} - -function checkBalances () { - return dbm.devices() - .then(devices => { - const deviceIds = devices.map(r => r.device_id) - const deviceBalancePromises = deviceIds.map(deviceId => checkDeviceBalances(deviceId)) - - return Promise.all(deviceBalancePromises) - .then(arr => { - const toMarket = r => r.fiatBalance + r.cryptoCode - const min = R.minBy(r => r.fiatBalance) - return R.values(R.reduceBy(min, Infinity, toMarket, R.flatten(arr))) - }) - }) -} - -function startCheckingNotification (config) { - notifier.init(checkBalances) - checkNotification() - setInterval(checkNotification, CHECK_NOTIFICATION_INTERVAL) -} - -function randomCode () { - return new BigNumber(crypto.randomBytes(3).toString('hex'), 16).shift(-6).toFixed(6).slice(-6) -} - -function getPhoneCode (phone) { - const code = argv.mockSms - ? '123' - : randomCode() - - const rec = { - sms: { - toNumber: phone, - body: 'Your cryptomat code: ' + code - } } - return sms.sendMessage(rec) - .then(() => code) -} + function checkNotification () { + return notifier.checkStatus() + .then(alertRec => { + const currentAlertFingerprint = notifier.alertFingerprint(alertRec) + if (!currentAlertFingerprint) { + const inAlert = !!alertFingerprint + alertFingerprint = null + lastAlertTime = null + if (inAlert) return sendNoAlerts() + } -function fetchPhoneTx (phone) { - return dbm.fetchPhoneTxs(phone, TRANSACTION_EXPIRATION) - .then(txs => { - const confirmedTxs = txs.filter(tx => R.contains(tx.status, ['instant', 'confirmed'])) - if (confirmedTxs.length > 0) { - const maxTx = R.reduce((acc, val) => { - return !acc || val.cryptoAtoms.gt(acc.cryptoAtoms) ? val : acc - }, null, confirmedTxs) + const alertChanged = currentAlertFingerprint === alertFingerprint && + lastAlertTime - Date.now() < ALERT_SEND_INTERVAL + if (alertChanged) return - return {tx: maxTx} + const subject = notifier.alertSubject(alertRec) + const rec = { + sms: { + body: subject + }, + email: { + subject, + body: notifier.printEmailAlerts(alertRec) + } + } + alertFingerprint = currentAlertFingerprint + lastAlertTime = Date.now() + + return sendMessage(rec) + }) + .then(results => { + if (results && results.length > 0) logger.debug('Successfully sent alerts') + }) + .catch(logger.error) + } + + function checkDeviceBalances (deviceId) { + const config = configManager.machineScoped(deviceId, settings.config) + const cryptoCodes = config.cryptoCurrencies + const fiatCode = config.fiatCurrency + const fiatBalancePromises = cryptoCodes.map(c => fiatBalance(fiatCode, c, deviceId)) + + return Promise.all(fiatBalancePromises) + .then(arr => { + return arr.map((balance, i) => ({ + fiatBalance: balance, + cryptoCode: cryptoCodes[i], + fiatCode, + deviceId + })) + }) + } + + function checkBalances () { + return dbm.devices() + .then(devices => { + const deviceIds = devices.map(r => r.device_id) + const deviceBalancePromises = deviceIds.map(deviceId => checkDeviceBalances(deviceId)) + + return Promise.all(deviceBalancePromises) + .then(arr => { + const toMarket = r => r.fiatBalance + r.cryptoCode + const min = R.minBy(r => r.fiatBalance) + return R.values(R.reduceBy(min, Infinity, toMarket, R.flatten(arr))) + }) + }) + } + + function startCheckingNotification (config) { + notifier.init(checkBalances) + checkNotification() + setInterval(checkNotification, CHECK_NOTIFICATION_INTERVAL) + } + + function randomCode () { + return new BigNumber(crypto.randomBytes(3).toString('hex'), 16).shift(-6).toFixed(6).slice(-6) + } + + function getPhoneCode (phone) { + const code = argv.mockSms + ? '123' + : randomCode() + + const rec = { + sms: { + toNumber: phone, + body: 'Your cryptomat code: ' + code + } } - if (txs.length > 0) return {pending: true} - return {} - }) + return sms.sendMessage(rec) + .then(() => code) + } + + function fetchPhoneTx (phone) { + return dbm.fetchPhoneTxs(phone, TRANSACTION_EXPIRATION) + .then(txs => { + const confirmedTxs = txs.filter(tx => R.contains(tx.status, ['instant', 'confirmed'])) + if (confirmedTxs.length > 0) { + const maxTx = R.reduce((acc, val) => { + return !acc || val.cryptoAtoms.gt(acc.cryptoAtoms) ? val : acc + }, null, confirmedTxs) + + return {tx: maxTx} + } + + if (txs.length > 0) return {pending: true} + return {} + }) + } + + function sweepHD (row) { + const cryptoCode = row.crypto_code + + return wallet.sweep(row.hd_serial) + .then(txHash => { + if (txHash) { + logger.debug('[%s] Swept address with tx: %s', cryptoCode, txHash) + return dbm.markSwept(row.tx_id) + } + }) + .catch(err => logger.error('[%s] Sweep error: %s', cryptoCode, err.message)) + } + + function sweepLiveHD () { + return dbm.fetchLiveHD() + .then(rows => Promise.all(rows.map(sweepHD))) + .catch(err => logger.error(err)) + } + + function sweepOldHD () { + return dbm.fetchOldHD() + .then(rows => Promise.all(rows.map(sweepHD))) + .catch(err => logger.error(err)) + } + + return { + pollQueries, + trade, + stateChange, + sendCoins, + cashOut, + dispenseAck, + startCheckingNotification, + getPhoneCode, + fetchPhoneTx, + executeTrades, + pong, + pongClear, + monitorLiveIncoming, + monitorIncoming, + monitorUnnotified, + sweepLiveHD, + sweepOldHD + } } -function sweepHD (row) { - const cryptoCode = row.crypto_code - - return wallet.sweep(row.hd_serial) - .then(txHash => { - if (txHash) { - logger.debug('[%s] Swept address with tx: %s', cryptoCode, txHash) - return dbm.markSwept(row.tx_id) - } - }) - .catch(err => logger.error('[%s] Sweep error: %s', cryptoCode, err.message)) -} - -function sweepLiveHD () { - return dbm.fetchLiveHD() - .then(rows => Promise.all(rows.map(sweepHD))) - .catch(err => logger.error(err)) -} - -function sweepOldHD () { - return dbm.fetchOldHD() - .then(rows => Promise.all(rows.map(sweepHD))) - .catch(err => logger.error(err)) -} - -module.exports = { - pollQueries, - trade, - stateChange, - sendCoins, - cashOut, - dispenseAck, - startCheckingNotification, - getPhoneCode, - fetchPhoneTx, - executeTrades, - pong, - pongClear, - monitorLiveIncoming, - monitorIncoming, - monitorUnnotified, - sweepLiveHD, - sweepOldHD -} +module.exports = plugins diff --git a/lib/poller.js b/lib/poller.js index 302fc662..477c3d46 100644 --- a/lib/poller.js +++ b/lib/poller.js @@ -10,8 +10,14 @@ const TRADE_INTERVAL = 10 * T.seconds const PONG_INTERVAL = 10 * T.seconds const PONG_CLEAR_INTERVAL = 1 * T.day -function start () { - let pi = plugins +let pi + +function reload (settings) { + pi = plugins(settings) +} + +function start (settings) { + reload(settings) pi.executeTrades() pi.pong() @@ -32,4 +38,4 @@ function start () { setInterval(() => pi.pongClear(), PONG_CLEAR_INTERVAL) } -module.exports = {start} +module.exports = {start, reload} diff --git a/lib/settings-loader.js b/lib/settings-loader.js index 3cfb7462..ca17efd4 100644 --- a/lib/settings-loader.js +++ b/lib/settings-loader.js @@ -12,6 +12,14 @@ function load (versionId) { })) } +function loadLatest (versionId) { + return Promise.all([loadLatestConfig(), loadAccounts()]) + .then(([config, accounts]) => ({ + config, + accounts + })) +} + function loadConfig (versionId) { const sql = `select data from user_config @@ -21,6 +29,17 @@ function loadConfig (versionId) { .then(row => row ? row.data.config : []) } +function loadLatestConfig () { + const sql = `select data + from user_config + where type=$1 + order by versionId desc + limit 1` + + return db.oneOrNone(sql, ['config']) + .then(row => row ? row.data.config : []) +} + function loadAccounts () { const toFields = fieldArr => R.fromPairs(R.map(r => [r.code, r.value], fieldArr)) const toPairs = r => [r.code, toFields(r.fields)] @@ -45,5 +64,6 @@ module.exports = { settings, loadConfig, load, + loadLatest, save }