const uuid = require('uuid') const R = require('ramda') const BigNumber = require('bignumber.js') // Needed for BigNumber for now global.crypto = require('crypto') BigNumber.config({CRYPTO: true}) const db = require('./postgresql_interface') const logger = require('./logger') const notifier = require('./notifier') const T = require('./time') const configManager = require('./config-manager') const settingsLoader = require('./settings-loader') const ticker = require('./ticker') const wallet = require('./wallet') const exchange = require('./exchange') const sms = require('./sms') const email = require('./email') const CHECK_NOTIFICATION_INTERVAL = T.minute const ALERT_SEND_INTERVAL = T.hour const INCOMING_TX_INTERVAL = 30 * T.seconds const LIVE_INCOMING_TX_INTERVAL = 5 * T.seconds const STALE_INCOMING_TX_AGE = T.week const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes const UNNOTIFIED_INTERVAL = T.minute const MAX_NOTIFY_AGE = 2 * T.days const MIN_NOTIFY_AGE = 5 * T.minutes const TRANSACTION_EXPIRATION = 2 * T.days const SWEEP_LIVE_HD_INTERVAL = T.minute const SWEEP_OLD_HD_INTERVAL = 2 * T.minutes const TRADE_INTERVAL = T.minute const TRADE_TTL = 5 * T.minutes const STALE_TICKER = 3 * 60 * 1000 const STALE_BALANCE = 3 * 60 * 1000 const tradesQueues = {} const coins = { BTC: {unitScale: 8}, ETH: {unitScale: 18} } let alertFingerprint = null let lastAlertTime = null exports.logEvent = db.recordDeviceEvent function buildRates (settings, deviceId, tickers) { const config = configManager.machineScoped(deviceId, settings.config) const cryptoCodes = config.currencies.cryptoCurrencies const cashInCommission = new BigNumber(config.commissions.cashInCommission).div(100).plus(1) const cashOutCommission = new BigNumber(config.commissions.cashOutCommission).div(100).plus(1) const rates = {} cryptoCodes.forEach((cryptoCode, i) => { const rateRec = tickers[i] if (Date.now() - rateRec.timestamp > STALE_TICKER) return logger.warn('Stale rate for ' + cryptoCode) const rate = rateRec.rates rates[cryptoCode] = { cashIn: rate.ask.times(cashInCommission), cashOut: rate.bid.div(cashOutCommission) } }) return rates } function buildBalances (settings, deviceId, balanceRecs) { const config = configManager.machineScoped(deviceId, settings.config) const cryptoCodes = config.currencies.cryptoCurrencies const balances = {} cryptoCodes.forEach((cryptoCode, i) => { const balanceRec = balanceRecs[i] if (!balanceRec) return logger.warn('No balance for ' + cryptoCode + ' yet') if (Date.now() - balanceRec.timestamp > STALE_BALANCE) return logger.warn('Stale balance for ' + cryptoCode) balances[cryptoCode] = balanceRec.balance }) return balances } function buildCartridges (cartridges, virtualCartridges, rec) { return { cartridges: [ { denomination: parseInt(cartridges[0], 10), count: parseInt(rec.counts[0], 10) }, { denomination: parseInt(cartridges[1], 10), count: parseInt(rec.counts[1], 10) } ], virtualCartridges } } exports.pollQueries = function pollQueries (settings, deviceTime, deviceId, deviceRec) { const config = configManager.machineScoped(deviceId, settings.config) const fiatCode = config.currencies.fiatCurrency const cryptoCodes = config.currencies.cryptoCurrencies const cartridges = [ config.currencies.topCashOutDenomination, config.currencies.bottomCashOutDenomination ] const virtualCartridges = [config.currencies.virtualCashOutDenomination] const tickerPromises = cryptoCodes.map(c => ticker.getRates(settings, fiatCode, c)) const balancePromises = cryptoCodes.map(c => wallet.balance(settings, c)) const pingPromise = recordPing(deviceId, deviceTime, deviceRec) const promises = [db.cartridgeCounts(deviceId), pingPromise].concat(tickerPromises, balancePromises) return Promise.all(promises) .then(arr => { const cartridgeCounts = arr[0] const tickers = arr.slice(2, cryptoCodes.length + 2) const balances = arr.slice(cryptoCodes.length + 2) return { cartridges: buildCartridges(cartridges, virtualCartridges, cartridgeCounts), rates: buildRates(settings, deviceId, tickers), balances: buildBalances(settings, deviceId, balances) } }) } // NOTE: This will fail if we have already sent coins because there will be // a db unique db record in the table already. function executeTx (deviceId, tx) { return db.addOutgoingTx(deviceId, tx) .then(() => wallet.sendCoins(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)) .then(txHash => { const fee = null // Need to fill this out in plugins const toSend = {cryptoAtoms: tx.cryptoAtoms, fiat: tx.fiat} return db.sentCoins(tx, toSend, fee, null, txHash) .then(() => ({ statusCode: 201, // Created txHash, txId: tx.id })) }) } // TODO: Run these in parallel and return success exports.trade = function trade (deviceId, rawTrade) { // TODO: move this to DB, too // add bill to trader queue (if trader is enabled) const cryptoCode = rawTrade.cryptoCode const fiatCode = rawTrade.fiatCode const cryptoAtoms = rawTrade.cryptoAtoms return db.recordBill(deviceId, rawTrade) .then(() => exchange.active(cryptoCode)) .then(active => { if (!active) return const market = [fiatCode, cryptoCode].join('') logger.debug('[%s] Pushing trade: %d', market, cryptoAtoms) tradesQueues[market].push({ fiatCode, cryptoAtoms, cryptoCode, timestamp: Date.now() }) }) } exports.stateChange = function stateChange (deviceId, deviceTime, rec, cb) { const event = { id: rec.uuid, deviceId: deviceId, eventType: 'stateChange', note: JSON.stringify({state: rec.state, isIdle: rec.isIdle, txId: rec.txId}), deviceTime: deviceTime } return db.machineEvent(event) } function recordPing (deviceId, deviceTime, rec) { const event = { id: uuid.v4(), deviceId: deviceId, eventType: 'ping', note: JSON.stringify({state: rec.state, isIdle: rec.idle === 'true', txId: rec.txId}), deviceTime: deviceTime } return db.machineEvent(event) } exports.sendCoins = function sendCoins (deviceId, rawTx) { return executeTx(deviceId, rawTx) } exports.cashOut = function cashOut (deviceId, tx) { const cryptoCode = tx.cryptoCode const serialPromise = wallet.supportsHD ? db.nextCashOutSerialHD(tx.id, cryptoCode) : Promise.resolve() return serialPromise .then(serialNumber => { const info = { label: 'TX ' + Date.now(), account: 'deposit', serialNumber } return wallet.newAddress(cryptoCode, info) .then(address => { const newTx = R.assoc('toAddress', address, tx) return db.addInitialIncoming(deviceId, newTx, address) .then(() => address) }) }) } exports.dispenseAck = function (settings, deviceId, tx) { const config = configManager.machineScoped(deviceId, settings.config) const cartridges = [ config.currencies.topCashOutDenomination, config.currencies.bottomCashOutDenomination ] return db.addDispense(deviceId, tx, cartridges) } function fiatBalance (settings, fiatCode, cryptoCode, deviceId) { const config = configManager.scoped(cryptoCode, deviceId, settings.config) return Promise.all([ticker.ticker(cryptoCode), wallet.balance(cryptoCode)]) .then(([rates, balanceRec]) => { const rawRate = rates[cryptoCode].rates.ask const commission = (new BigNumber(config.commissions.cashInCommission).div(100)).plus(1) const balance = balanceRec.balance if (!rawRate || !balance) return null // The rate is actually our commission times real rate. const rate = rawRate.times(commission) // `lowBalanceMargin` is our safety net. It's a number > 1, and we divide // all our balances by it to provide a safety margin. const lowBalanceMargin = (new BigNumber(config.commissions.lowBalanceMargin).div(100)).plus(1) const unitScale = new BigNumber(10).pow(coins[cryptoCode].unitScale) const fiatTransferBalance = balance.div(unitScale).times(rate).div(lowBalanceMargin) return {timestamp: balanceRec.timestamp, balance: fiatTransferBalance.round(3).toNumber()} }) } function processTxStatus (tx) { return wallet.getStatus(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode) .then(res => db.updateTxStatus(tx, res.status)) } function notifyConfirmation (tx) { logger.debug('notifyConfirmation') const phone = tx.phone const rec = { sms: { toNumber: phone, body: 'Your cash is waiting! Go to the Cryptomat and press Redeem.' } } return sms.sendMessage(rec) .then(() => db.updateNotify(tx)) } function monitorLiveIncoming () { const statuses = ['notSeen', 'published', 'insufficientFunds'] db.fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE) .then(txs => Promise.all(txs.map(processTxStatus))) .catch(err => logger.error(err)) } function monitorIncoming () { const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) .then(txs => Promise.all(txs.map(processTxStatus))) .catch(err => logger.error(err)) } function monitorUnnotified () { db.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE) .then(txs => Promise.all(txs.map(notifyConfirmation))) .catch(err => logger.error(err)) } /* * Polling livecycle */ exports.startPolling = function startPolling () { executeTrades() setInterval(executeTrades, TRADE_INTERVAL) setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL) setInterval(monitorIncoming, INCOMING_TX_INTERVAL) setInterval(monitorUnnotified, UNNOTIFIED_INTERVAL) setInterval(sweepLiveHD, SWEEP_LIVE_HD_INTERVAL) setInterval(sweepOldHD, SWEEP_OLD_HD_INTERVAL) monitorLiveIncoming() monitorIncoming() monitorUnnotified() sweepLiveHD() sweepOldHD() } /* * Trader functions */ function buy (trade) { return exchange.buy(trade.cryptoAtoms, trade.fiatCode, trade.cryptoCode) } function consolidateTrades (cryptoCode, fiatCode) { const market = [fiatCode, cryptoCode].join('') if (tradesQueues[market].length === 0) return null logger.debug('[%s] tradesQueues size: %d', market, tradesQueues[market].length) logger.debug('[%s] tradesQueues head: %j', market, tradesQueues[market][0]) const t0 = Date.now() const filtered = tradesQueues[market] .filter(trade => t0 - trade.timestamp < TRADE_TTL) const filteredCount = tradesQueues[market].length - filtered.length if (filteredCount > 0) { tradesQueues[market] = filtered logger.debug('[%s] expired %d trades', market, filteredCount) } const cryptoAtoms = filtered .reduce((prev, current) => prev.plus(current.cryptoAtoms), new BigNumber(0)) const consolidatedTrade = { fiatCode, cryptoAtoms, cryptoCode } tradesQueues[market] = [] logger.debug('[%s] consolidated: %j', market, consolidatedTrade) return consolidatedTrade } function executeTrades () { return settingsLoader() .then(settings => { const config = settings.config return db.devices() .then(devices => { const deviceIds = devices.map(device => device.device_id) const lists = deviceIds.map(deviceId => { const currencies = configManager.machineScoped(deviceId, config).currencies const fiatCode = currencies.fiatCurrency const cryptoCodes = currencies.cryptoCurrencies return cryptoCodes.map(cryptoCode => ({fiatCode, cryptoCode})) }) const tradesPromises = R.uniq(R.flatten(lists)) .map(r => executeTradesForMarket(settings, r.fiatCode, r.cryptoCode)) return Promise.all(tradesPromises) }) }) .catch(logger.error) } function executeTradesForMarket (settings, fiatCode, cryptoCode) { const market = [fiatCode, cryptoCode].join('') logger.debug('[%s] checking for trades', market) const trade = consolidateTrades(cryptoCode, fiatCode) if (trade === null) return logger.debug('[%s] no trades', market) if (trade.cryptoAtoms.eq(0)) { logger.debug('[%s] rejecting 0 trade', market) return } logger.debug('[%s] making a trade: %d', market, trade.cryptoAtoms.toString()) return buy(trade) .catch(err => { tradesQueues[market].push(trade) logger.error(err) }) .then(() => { logger.debug('[%s] Successful trade.', market) }) } function sendMessage (rec) { return Promise.all([sms.sendMessage(rec), email.sendMessage(rec)]) } exports.sendMessage = sendMessage function sendNoAlerts () { const subject = '[Lamassu] All clear' const rec = { sms: { body: subject }, email: { subject, body: 'No errors are reported for your machines.' } } return sendMessage(rec) } function checkNotification () { return notifier.checkStatus() .then(alertRec => { const currentAlertFingerprint = notifier.alertFingerprint(alertRec) if (!currentAlertFingerprint) { const inAlert = !!alertFingerprint alertFingerprint = null lastAlertTime = null if (inAlert) return sendNoAlerts() } const alertChanged = currentAlertFingerprint === alertFingerprint && lastAlertTime - Date.now() < ALERT_SEND_INTERVAL if (alertChanged) return const subject = notifier.alertSubject(alertRec) const rec = { sms: { body: subject }, email: { subject, body: notifier.printEmailAlerts(alertRec) } } alertFingerprint = currentAlertFingerprint lastAlertTime = Date.now() return sendMessage(rec) }) .then(results => { if (results && results.length > 0) logger.debug('Successfully sent alerts') }) .catch(err => { logger.error(err) }) } function checkDeviceBalances (settings, deviceId) { const config = configManager.machineScoped(deviceId, settings.config) const cryptoCodes = config.currencies.cryptoCurrencies const fiatCode = config.currencies.fiatCurrency const fiatBalancePromises = cryptoCodes.map(c => fiatBalance(settings, fiatCode, c, deviceId)) return Promise.all(fiatBalancePromises) .then(arr => { return arr.map((balance, i) => ({ fiatBalance: balance, cryptoCode: cryptoCodes[i], fiatCode, deviceId })) }) } function checkBalances (settings) { return db.devices() .then(devices => { const deviceIds = devices.map(r => r.device_id) const deviceBalancePromises = deviceIds.map(deviceId => checkDeviceBalances(settings, deviceId)) return Promise.all(deviceBalancePromises) .then(arr => { const toMarket = r => r.fiatBalance + r.cryptoCode const min = R.minBy(r => r.fiatBalance) return R.reduceBy(min, Infinity, toMarket, R.flatten(arr)) }) }) } exports.startCheckingNotification = function startCheckingNotification (config) { notifier.init(db, checkBalances, config.notifications) checkNotification() setInterval(checkNotification, CHECK_NOTIFICATION_INTERVAL) } exports.getPhoneCode = function getPhoneCode (phone) { return sms.name() .then(name => { const code = name === 'MockSMS' ? '123' : BigNumber.random().toFixed(6).slice(2) const rec = { sms: { toNumber: phone, body: 'Your cryptomat code: ' + code } } return sms.sendMessage(rec) .then(() => code) }) } exports.updatePhone = db.addIncomingPhone exports.registerRedeem = db.updateRedeem exports.fetchPhoneTx = function fetchPhoneTx (phone) { return db.fetchPhoneTxs(phone, TRANSACTION_EXPIRATION) .then(txs => { const confirmedTxs = txs.filter(tx => R.contains(tx.status, ['instant', 'confirmed'])) if (confirmedTxs.length > 0) { const maxTx = R.reduce((acc, val) => { return !acc || val.cryptoAtoms.gt(acc.cryptoAtoms) ? val : acc }, null, confirmedTxs) return {tx: maxTx} } if (txs.length > 0) return {pending: true} return {} }) } exports.requestDispense = function requestDispense (tx) { return db.addDispenseRequest(tx) } exports.fetchTx = db.fetchTx function sweepHD (row) { const cryptoCode = row.crypto_code return wallet.sweep(row.hd_serial) .then(txHash => { if (txHash) { logger.debug('[%s] Swept address with tx: %s', cryptoCode, txHash) return db.markSwept(row.tx_id) } }) .catch(err => logger.error('[%s] Sweep error: %s', cryptoCode, err.message)) } function sweepLiveHD () { return db.fetchLiveHD() .then(rows => Promise.all(rows.map(sweepHD))) .catch(err => logger.error(err)) } function sweepOldHD () { return db.fetchOldHD() .then(rows => Promise.all(rows.map(sweepHD))) .catch(err => logger.error(err)) }