'use strict' var fs = require('fs') var R = require('ramda') var async = require('async') var HKDF = require('node-hkdf-sync') var BigNumber = require('bignumber.js') BigNumber.config({CRYPTO: true}) var db = require('./postgresql_interface') var logger = require('./logger') var notifier = require('./notifier') var uuid = require('node-uuid') var tradeIntervals = {} var CHECK_NOTIFICATION_INTERVAL = 60 * 1000 var ALERT_SEND_INTERVAL = 60 * 60 * 1000 var POLLING_RATE = 60 * 1000 // poll each minute var INCOMING_TX_INTERVAL = 5 * 1000 var LIVE_INCOMING_TX_INTERVAL = 30 * 1000 var STALE_INCOMING_TX_AGE = 7 * 24 * 60 * 60 * 1000 var UNNOTIFIED_INTERVAL = 60 * 1000 var MAX_NOTIFY_AGE = 48 * 60 * 60 * 1000 var MIN_NOTIFY_AGE = 5 * 60 * 1000 var TRANSACTION_EXPIRATION = 48 * 60 * 60 * 1000 var SWEEP_LIVE_HD_INTERVAL = 60 * 1000 var SWEEP_OLD_HD_INTERVAL = 60 * 60 * 1000 var TRADE_INTERVAL = 60 * 1000 var cryptoCodes = null var tickerPlugins = {} var traderPlugins = {} var walletPlugins = {} var idVerifierPlugin = null var infoPlugin = null var emailPlugin = null var smsPlugin = null var hkdf = null var currentlyUsedPlugins = {} var cachedConfig = null var deviceCurrency = 'USD' var lastBalances = {} var lastRates = {} var tradesQueues = {} var coins = { BTC: {unitScale: 8}, ETH: {unitScale: 18} } var alertFingerprint = null var lastAlertTime = null exports.init = function init (connectionString) { const masterSeed = fs.readFileSync('seeds/seed.txt', 'utf8').trim() hkdf = new HKDF('sha256', 'lamassu-server-salt', masterSeed) db.init(connectionString) } function loadPlugin (name, config) { // plugins definitions var moduleMethods = { ticker: ['ticker'], trader: ['purchase', 'sell'], wallet: ['balance', 'sendBitcoins', 'newAddress'], idVerifier: ['verifyUser', 'verifyTransaction'], info: ['checkAddress'], email: ['sendMessage'] } var plugin = null // each used plugin MUST be installed try { plugin = require('lamassu-' + name) } catch (_) { throw new Error(name + ' module is not installed. ' + 'Try running \'npm install --save lamassu-' + name + '\' first') } // each plugin MUST implement those if (typeof plugin.SUPPORTED_MODULES !== 'undefined') { if (plugin.SUPPORTED_MODULES === 'string') { plugin.SUPPORTED_MODULES = [plugin.SUPPORTED_MODULES] } } if (!(plugin.SUPPORTED_MODULES instanceof Array)) { throw new Error('\'' + name + '\' fails to implement *required* ' + '\'SUPPORTED_MODULES\' constant') } plugin.SUPPORTED_MODULES.forEach(function (moduleName) { moduleMethods[moduleName].forEach(function (methodName) { if (typeof plugin[methodName] !== 'function') { throw new Error('\'' + name + '\' declares \'' + moduleName + '\', but fails to implement \'' + methodName + '\' method') } }) }) // each plugin SHOULD implement those if (typeof plugin.NAME === 'undefined') { logger.warn(new Error('\'' + name + '\' fails to implement *recommended* \'NAME\' field')) } if (typeof plugin.config !== 'function') { logger.warn(new Error('\'' + name + '\' fails to implement *recommended* \'config\' method')) plugin.config = function () {} } else if (config !== null) { plugin.config(config, logger) // only when plugin supports it, and config is passed } return plugin } function loadOrConfigPlugin (pluginHandle, pluginType, cryptoCode, options, onChangeCallback) { cryptoCode = cryptoCode || 'BTC' var currentName = cryptoCode === 'any' || cryptoCode === 'BTC' ? cachedConfig.exchanges.plugins.current[pluginType] : cachedConfig.exchanges.plugins.current[cryptoCode][pluginType] currentlyUsedPlugins[cryptoCode] = currentlyUsedPlugins[cryptoCode] || {} var pluginChanged = currentlyUsedPlugins[cryptoCode][pluginType] !== currentName if (!currentName) pluginHandle = null else { // some plugins may be disabled var pluginConfig = cachedConfig.exchanges.plugins.settings[currentName] || {} const mergedConfig = R.merge(pluginConfig, options) if (pluginHandle && !pluginChanged) pluginHandle.config(mergedConfig) else { pluginHandle = loadPlugin(currentName, mergedConfig) currentlyUsedPlugins[cryptoCode] = currentlyUsedPlugins[cryptoCode] || {} currentlyUsedPlugins[cryptoCode][pluginType] = currentName logger.debug('[%s] plugin(%s) loaded: %s', cryptoCode, pluginType, pluginHandle.NAME || currentName) } } if (typeof onChangeCallback === 'function') onChangeCallback(pluginHandle) return pluginHandle } exports.loadOrConfigPlugin = loadOrConfigPlugin // Note: this whole function gets called every time there's a config update exports.configure = function configure (config) { if (config.exchanges.settings.lowBalanceMargin < 1) { throw new Error('\'settings.lowBalanceMargin\' has to be >= 1') } cachedConfig = config deviceCurrency = config.exchanges.settings.currency cryptoCodes = config.exchanges.settings.coins || ['BTC', 'ETH'] cryptoCodes.forEach(function (cryptoCode) { // TICKER [required] configure (or load) loadOrConfigPlugin( tickerPlugins[cryptoCode], 'ticker', cryptoCode, {currency: deviceCurrency}, function onTickerChange (newTicker) { tickerPlugins[cryptoCode] = newTicker pollRate(cryptoCode) } ) // Give each crypto a different derived seed so as not to allow any // plugin to spend another plugin's funds const cryptoSeed = hkdf.derive(cryptoCode, 32) loadOrConfigPlugin( walletPlugins[cryptoCode], 'transfer', cryptoCode, {masterSeed: cryptoSeed}, function onWalletChange (newWallet) { walletPlugins[cryptoCode] = newWallet pollBalance(cryptoCode) } ) tradesQueues[cryptoCode] = tradesQueues[cryptoCode] || [] loadOrConfigPlugin( traderPlugins[cryptoCode], 'trader', cryptoCode, null, function onTraderChange (newTrader) { traderPlugins[cryptoCode] = newTrader if (newTrader === null) stopTrader(cryptoCode) else startTrader(cryptoCode) } ) }) // ID VERIFIER [optional] configure (or load) idVerifierPlugin = loadOrConfigPlugin( idVerifierPlugin, 'idVerifier' ) infoPlugin = loadOrConfigPlugin( infoPlugin, 'info' ) emailPlugin = loadOrConfigPlugin( emailPlugin, 'email' ) smsPlugin = loadOrConfigPlugin( smsPlugin, 'sms' ) } exports.getConfig = function getConfig () { return cachedConfig } exports.logEvent = function event (session, rawEvent) { return db.recordDeviceEvent(session, rawEvent) } function buildCartridges (cartridges, virtualCartridges, rec) { return { cartridges: [ { denomination: parseInt(cartridges[0], 10), count: parseInt(rec.counts[0], 10) }, { denomination: parseInt(cartridges[1], 10), count: parseInt(rec.counts[1], 10) } ], virtualCartridges: virtualCartridges, id: rec.id } } exports.pollQueries = function pollQueries (session, cb) { var cartridges = cachedConfig.exchanges.settings.cartridges if (!cartridges) return cb(null, {}) var virtualCartridges = cachedConfig.exchanges.settings.virtualCartridges return db.cartridgeCounts(session) .then(result => ({ cartridges: buildCartridges(cartridges, virtualCartridges, result) })) } function _sendCoins (toAddress, cryptoAtoms, cryptoCode) { return new Promise((resolve, reject) => { _sendCoinsCb(toAddress, cryptoAtoms, cryptoCode, (err, txHash) => { if (err) return reject(err) return resolve(txHash) }) }) } function _sendCoinsCb (toAddress, cryptoAtoms, cryptoCode, cb) { var walletPlugin = walletPlugins[cryptoCode] var transactionFee = cachedConfig.exchanges.settings.transactionFee logger.debug('Sending coins [%s] to: %s', cryptoCode, toAddress) if (cryptoCode === 'BTC') { walletPlugin.sendBitcoins(toAddress, cryptoAtoms.truncated().toNumber(), transactionFee, cb) } else { walletPlugin.sendBitcoins(toAddress, cryptoAtoms, cryptoCode, transactionFee, cb) } } function executeTx (session, tx) { return db.addOutgoingTx(session, tx) .then(() => _sendCoins(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)) .then(txHash => { const fee = null // Need to fill this out in plugins const toSend = {cryptoAtoms: tx.cryptoAtoms, fiat: tx.fiat} return db.sentCoins(session, tx, toSend, fee, null, txHash) .then(() => pollBalance(tx.cryptoCode)) .then(() => ({ statusCode: 201, // Created txHash: txHash, txId: tx.txId })) }) } // TODO: Run these in parallel and return success exports.trade = function trade (session, rawTrade) { // TODO: move this to DB, too // add bill to trader queue (if trader is enabled) var cryptoCode = rawTrade.cryptoCode || 'BTC' var traderPlugin = traderPlugins[cryptoCode] if (traderPlugin) { logger.debug('[%s] Pushing trade: %d', cryptoCode, rawTrade.cryptoAtoms) tradesQueues[cryptoCode].push({ currency: rawTrade.currency, cryptoAtoms: rawTrade.cryptoAtoms, cryptoCode: cryptoCode }) } return db.recordBill(session, rawTrade) } exports.stateChange = function stateChange (session, rec, cb) { var event = { id: rec.uuid, fingerprint: session.fingerprint, eventType: 'stateChange', note: JSON.stringify({state: rec.state, isIdle: rec.isIdle, sessionId: session.id}), deviceTime: session.deviceTime } return db.machineEvent(event) } exports.recordPing = function recordPing (session, rec, cb) { var event = { id: uuid.v4(), fingerprint: session.fingerprint, eventType: 'ping', note: JSON.stringify({state: rec.state, isIdle: rec.idle === 'true', sessionId: session.id}), deviceTime: session.deviceTime } return db.machineEvent(event) } exports.sendCoins = function sendCoins (session, rawTx) { var _session = {id: rawTx.sessionId || session.id, fingerprint: session.fingerprint} return executeTx(_session, rawTx) } exports.cashOut = function cashOut (session, tx) { var cryptoCode = tx.cryptoCode || 'BTC' var walletPlugin = walletPlugins[cryptoCode] return db.nextCashOutSerialHD(tx.sessionId, cryptoCode) .then(serialNumber => new Promise((resolve, reject) => { const tmpInfo = { label: 'TX ' + Date.now(), account: 'deposit', serialNumber: serialNumber } walletPlugin.newAddress(tmpInfo, function (err, address) { if (err) return reject(err) const newTx = R.assoc('toAddress', address, tx) return db.addInitialIncoming(session, newTx, address) .then(() => resolve(address)) }) })) } exports.dispenseAck = function dispenseAck (session, rec) { return db.addDispense(session, rec.tx, rec.cartridges) } exports.fiatBalance = function fiatBalance (cryptoCode) { var deviceRate = exports.getDeviceRate(cryptoCode) if (!deviceRate) return null var rawRate = deviceRate.rates.ask var commission = cachedConfig.exchanges.settings.commission var lastBalanceRec = lastBalances[cryptoCode] if (!lastBalanceRec) return null var lastBalance = lastBalanceRec.balance if (!rawRate || !lastBalance) return null // The rate is actually our commission times real rate. var rate = rawRate.times(commission) // `lowBalanceMargin` is our safety net. It's a number > 1, and we divide // all our balances by it to provide a safety margin. var lowBalanceMargin = cachedConfig.exchanges.settings.lowBalanceMargin var unitScale = new BigNumber(10).pow(coins[cryptoCode].unitScale) var fiatTransferBalance = lastBalance.div(unitScale).times(rate).div(lowBalanceMargin) return {timestamp: lastBalanceRec.timestamp, balance: fiatTransferBalance.round(3).toNumber()} } function processTxStatus (tx) { const cryptoCode = tx.cryptoCode const walletPlugin = walletPlugins[cryptoCode] if (!walletPlugin) return console.error('No wallet plugins for: ' + cryptoCode) return walletPlugin.getStatus(tx.toAddress, tx.cryptoAtoms) .then(res => db.updateTxStatus(tx, res.status)) } function notifyConfirmation (tx) { logger.debug('notifyConfirmation') const phone = tx.phone const rec = { sms: { toNumber: phone, body: 'Your cash is waiting! Go to the Cryptomat and press Redeem.' } } return smsPlugin.sendMessage(rec) .then(() => db.updateNotify(tx)) } function monitorLiveIncoming () { const statuses = ['notSeen', 'published', 'insufficientFunds'] db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) .then(txs => Promise.all(txs.map(processTxStatus))) .catch(err => logger.error(err)) } function monitorIncoming () { const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) .then(txs => Promise.all(txs.map(processTxStatus))) .catch(err => logger.error(err)) } function monitorUnnotified () { db.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE) .then(txs => Promise.all(txs.map(notifyConfirmation))) .catch(err => logger.error(err)) } /* * Polling livecycle */ exports.startPolling = function startPolling () { executeTrades() cryptoCodes.forEach(function (cryptoCode) { setInterval(async.apply(pollBalance, cryptoCode), POLLING_RATE) setInterval(async.apply(pollRate, cryptoCode), POLLING_RATE) startTrader(cryptoCode) }) setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL) setInterval(monitorIncoming, INCOMING_TX_INTERVAL) setInterval(monitorUnnotified, UNNOTIFIED_INTERVAL) setInterval(sweepLiveHD, SWEEP_LIVE_HD_INTERVAL) setInterval(sweepOldHD, SWEEP_OLD_HD_INTERVAL) monitorLiveIncoming() monitorIncoming() monitorUnnotified() sweepLiveHD() } function startTrader (cryptoCode) { // Always start trading, even if we don't have a trade exchange configured, // since configuration can always change in `Trader#configure`. // `Trader#executeTrades` returns early if we don't have a trade exchange // configured at the moment. var traderPlugin = traderPlugins[cryptoCode] if (!traderPlugin || tradeIntervals[cryptoCode]) return logger.debug('[%s] startTrader', cryptoCode) tradeIntervals[cryptoCode] = setInterval( function () { executeTrades(cryptoCode) }, TRADE_INTERVAL ) } function stopTrader (cryptoCode) { if (!tradeIntervals[cryptoCode]) return logger.debug('[%s] stopTrader', cryptoCode) clearInterval(tradeIntervals[cryptoCode]) tradeIntervals[cryptoCode] = null tradesQueues[cryptoCode] = [] } function pollBalance (cryptoCode, cb) { logger.debug('[%s] collecting balance', cryptoCode) var walletPlugin = walletPlugins[cryptoCode] walletPlugin.balance(function (err, balance) { if (err) { logger.error('[%s] Error loading balance: %s', cryptoCode, err.message) return cb && cb(err) } logger.debug('[%s] Balance update: %j', cryptoCode, balance) lastBalances[cryptoCode] = {timestamp: Date.now(), balance: new BigNumber(balance[cryptoCode])} return cb && cb(null, lastBalances) }) } function pollRate (cryptoCode, cb) { var tickerPlugin = tickerPlugins[cryptoCode] logger.debug('[%s] polling for rates (%s)', cryptoCode, tickerPlugin.NAME) var currencies = deviceCurrency if (typeof currencies === 'string') currencies = [currencies] var tickerF = cryptoCode === 'BTC' ? async.apply(tickerPlugin.ticker, currencies) : async.apply(tickerPlugin.ticker, currencies, cryptoCode) tickerF(function (err, resRates) { if (err) { logger.error(err) return cb && cb(err) } resRates.timestamp = Date.now() var rates = resRates[deviceCurrency].rates if (rates) { rates.ask = rates.ask && new BigNumber(rates.ask) rates.bid = rates.bid && new BigNumber(rates.bid) } logger.debug('[%s] got rates: %j', cryptoCode, resRates) lastRates[cryptoCode] = resRates return cb && cb(null, lastRates) }) } /* * Getters | Helpers */ exports.getDeviceRate = function getDeviceRate (cryptoCode) { var lastRate = lastRates[cryptoCode] if (!lastRate) return null return lastRate[deviceCurrency] } /* * Trader functions */ function purchase (trade, cb) { var cryptoCode = trade.cryptoCode var traderPlugin = traderPlugins[cryptoCode] var opts = { cryptoCode: cryptoCode, fiat: deviceCurrency } traderPlugin.purchase(trade.cryptoAtoms, opts, function (err) { if (err) return cb(err) pollBalance(cryptoCode) if (typeof cb === 'function') cb() }) } function consolidateTrades (cryptoCode) { // NOTE: value in cryptoAtoms stays the same no matter the currency if (tradesQueues[cryptoCode].length === 0) return null logger.debug('tradesQueues size: %d', tradesQueues[cryptoCode].length) logger.debug('tradesQueues head: %j', tradesQueues[cryptoCode][0]) var cryptoAtoms = tradesQueues[cryptoCode].reduce(function (prev, current) { return prev.plus(current.cryptoAtoms) }, new BigNumber(0)) var consolidatedTrade = { currency: deviceCurrency, cryptoAtoms: cryptoAtoms, cryptoCode: cryptoCode } tradesQueues[cryptoCode] = [] logger.debug('[%s] consolidated: %j', cryptoCode, consolidatedTrade) return consolidatedTrade } function executeTrades (cryptoCode) { var traderPlugin = traderPlugins[cryptoCode] if (!traderPlugin) return logger.debug('[%s] checking for trades', cryptoCode) var trade = consolidateTrades(cryptoCode) if (trade === null) return logger.debug('[%s] no trades', cryptoCode) if (trade.cryptoAtoms.eq(0)) { logger.debug('[%s] rejecting 0 trade', cryptoCode) return } logger.debug('making a trade: %d', trade.cryptoAtoms.toString()) purchase(trade, function (err) { if (err) { tradesQueues[cryptoCode].push(trade) if (err.name !== 'orderTooSmall') return logger.error(err) else return logger.debug(err) } logger.debug('Successful trade.') }) } /* * ID Verifier functions */ exports.verifyUser = function verifyUser (data, cb) { idVerifierPlugin.verifyUser(data, cb) } exports.verifyTx = function verifyTx (data, cb) { idVerifierPlugin.verifyTransaction(data, cb) } exports.getcryptoCodes = function getcryptoCodes () { return cryptoCodes } function sendMessage (rec) { var pluginTypes = JSON.parse(cachedConfig.exchanges.plugins.current.notify) var pluginPromises = pluginTypes.map(function (pluginType) { if (pluginType === 'email') return emailPlugin.sendMessage(rec) if (pluginType === 'sms') return smsPlugin.sendMessage(rec) throw new Error('No such plugin type: ' + pluginType) }) return Promise.all(pluginPromises) } exports.sendMessage = sendMessage function sendNoAlerts () { var subject = '[Lamassu] All clear' var rec = { sms: { body: subject }, email: { subject: subject, body: 'No errors are reported for your machines.' } } return sendMessage(rec) } function checkNotification () { return notifier.checkStatus() .then(function (alertRec) { var fingerprint = notifier.alertFingerprint(alertRec) if (!fingerprint) { var inAlert = !!alertFingerprint alertFingerprint = null lastAlertTime = null if (inAlert) return sendNoAlerts() } var alertChanged = fingerprint === alertFingerprint && lastAlertTime - Date.now() < ALERT_SEND_INTERVAL if (alertChanged) return var subject = notifier.alertSubject(alertRec) var rec = { sms: { body: subject }, email: { subject: subject, body: notifier.printEmailAlerts(alertRec) } } alertFingerprint = fingerprint lastAlertTime = Date.now() return sendMessage(rec) }) .then(function () { logger.debug('Successfully sent alerts') }) .catch(function (err) { logger.error(err) }) } function checkBalances () { var cryptoCodes = exports.getcryptoCodes() var balances = [] cryptoCodes.forEach(function (cryptoCode) { var balanceRec = exports.fiatBalance(cryptoCode) if (!balanceRec) return var rec = {fiatBalance: balanceRec.balance, cryptoCode: cryptoCode, fiatCode: deviceCurrency} balances.push(rec) }) return balances } exports.startCheckingNotification = function startCheckingNotification () { var config = cachedConfig.exchanges.plugins.settings.notifier notifier.init(db, checkBalances, config) checkNotification() setInterval(checkNotification, CHECK_NOTIFICATION_INTERVAL) } exports.getPhoneCode = function getPhoneCode (phone) { const code = BigNumber.random().toFixed(6).slice(2) const rec = { sms: { toNumber: phone, body: 'Your cryptomat code: ' + code } } return smsPlugin.sendMessage(rec) .then(() => code) } exports.updatePhone = function updatePhone (session, tx, notified) { return db.addIncomingPhone(session, tx, notified) } exports.registerRedeem = function registerRedeem (session) { return db.updateRedeem(session) } exports.fetchPhoneTx = function fetchPhoneTx (phone) { return db.fetchPhoneTxs(phone, TRANSACTION_EXPIRATION) .then(txs => { const confirmedTxs = txs.filter(tx => R.contains(tx.status, ['instant', 'confirmed'])) if (confirmedTxs.length > 0) { const maxTx = R.reduce((acc, val) => { return !acc || val.cryptoAtoms.gt(acc.cryptoAtoms) ? val : acc }, null, confirmedTxs) return {tx: maxTx} } if (txs.length > 0) return {pending: true} return {} }) } exports.fetchTx = function fetchTx (session) { return db.fetchTx(session) } exports.requestDispense = function requestDispense (tx) { return db.addDispenseRequest(tx) } exports.cachedResponse = function (session, path, method) { return db.cachedResponse(session, path, method) } exports.cacheResponse = function (session, path, method, body) { return db.cacheResponse(session, path, method, body) } function sweepHD (row) { const cryptoCode = row.crypto_code const walletPlugin = walletPlugins[cryptoCode] return walletPlugin.sweep(row.hd_serial) .then(txHash => { if (txHash) { logger.debug('[%s] Swept address with tx: %s', cryptoCode, txHash) return db.markSwept(row.session_id) } }) .catch(err => logger.error(err)) } function sweepLiveHD () { return db.fetchLiveHD() .then(rows => Promise.all(rows.map(sweepHD))) } function sweepOldHD () { return db.fetchOldHD() .then(rows => Promise.all(rows.map(sweepHD))) }