'use strict' var _ = require('lodash') var async = require('async') var BigNumber = require('bignumber.js') var logger = require('./logger') var argv = require('minimist')(process.argv.slice(2)) var tradeInterval = null var SATOSHI_FACTOR = 1e8 var POLLING_RATE = 60 * 1000 // poll each minute var REAP_RATE = 2 * 1000 var PENDING_TIMEOUT = 70 * 1000 if (argv.timeout) PENDING_TIMEOUT = argv.timeout / 1000 // TODO: might have to update this if user is allowed to extend monitoring time var DEPOSIT_TIMEOUT = 130 * 1000 var db = null var cryptoCoins = null var tickerPlugins = {} var traderPlugin = null var walletPlugins = {} var idVerifierPlugin = null var infoPlugin = null var currentlyUsedPlugins = {} var cachedConfig = null var deviceCurrency = 'USD' var lastBalances = {} var lastRates = {} var tradesQueue = [] // that's basically a constructor exports.init = function init (databaseHandle) { if (!databaseHandle) { throw new Error('\'db\' is required') } db = databaseHandle } function loadPlugin (name, config) { // plugins definitions var moduleMethods = { ticker: ['ticker'], trader: ['balance', 'purchase', 'sell'], wallet: ['balance', 'sendBitcoins', 'newAddress'], idVerifier: ['verifyUser', 'verifyTransaction'], info: ['checkAddress'] } var plugin = null // each used plugin MUST be installed try { plugin = require('lamassu-' + name) } catch (_) { try { require('plugins/' + name) } catch (_) { throw new Error(name + ' module is not installed. ' + 'Try running \'npm install --save lamassu-' + name + '\' first') } } // each plugin MUST implement those if (typeof plugin.SUPPORTED_MODULES !== 'undefined') { if (plugin.SUPPORTED_MODULES === 'string') { plugin.SUPPORTED_MODULES = [plugin.SUPPORTED_MODULES] } } if (!(plugin.SUPPORTED_MODULES instanceof Array)) { throw new Error('\'' + name + '\' fails to implement *required* ' + '\'SUPPORTED_MODULES\' constant') } plugin.SUPPORTED_MODULES.forEach(function (moduleName) { moduleMethods[moduleName].forEach(function (methodName) { if (typeof plugin[methodName] !== 'function') { throw new Error('\'' + name + '\' declares \'' + moduleName + '\', but fails to implement \'' + methodName + '\' method') } }) }) // each plugin SHOULD implement those if (typeof plugin.NAME === 'undefined') { logger.warn(new Error('\'' + name + '\' fails to implement *recommended* \'NAME\' field')) } if (typeof plugin.config !== 'function') { logger.warn(new Error('\'' + name + '\' fails to implement *recommended* \'config\' method')) plugin.config = function () {} } else if (config !== null) { plugin.config(config) // only when plugin supports it, and config is passed } return plugin } function loadOrConfigPlugin (pluginHandle, pluginType, cryptoCoin, currency, onChangeCallback) { if (!cryptoCoin) cryptoCoin = 'any' var currentName = cryptoCoin === 'any' || cryptoCoin === 'BTC' ? cachedConfig.exchanges.plugins.current[pluginType] : cachedConfig.exchanges.plugins.current[cryptoCoin][pluginType] var pluginChanged = currentlyUsedPlugins[cryptoCoin][pluginType] !== currentName if (!currentName) pluginHandle = null else { // some plugins may be disabled var pluginConfig = cachedConfig.exchanges.plugins.settings[currentName] || {} if (currency) pluginConfig.currency = currency if (pluginHandle && !pluginChanged) pluginHandle.config(pluginConfig) else { pluginHandle = loadPlugin(currentName, pluginConfig) currentlyUsedPlugins[cryptoCoin] = currentlyUsedPlugins[cryptoCoin] || {} currentlyUsedPlugins[cryptoCoin][pluginType] = currentName logger.debug('[%s] plugin(%s) loaded: %s', cryptoCoin, pluginType, pluginHandle.NAME || currentName) } } if (typeof onChangeCallback === 'function') onChangeCallback(pluginHandle, currency) return pluginHandle } exports.configure = function configure (config) { if (config.exchanges.settings.lowBalanceMargin < 1) { throw new Error('\'settings.lowBalanceMargin\' has to be >= 1') } cachedConfig = config deviceCurrency = config.exchanges.settings.currency cryptoCoins = config.exchanges.settings.coins || ['BTC'] cryptoCoins.forEach(function (cryptoCoin) { // TICKER [required] configure (or load) loadOrConfigPlugin( tickerPlugins[cryptoCoin], 'ticker', cryptoCoin, deviceCurrency, // device currency function onTickerChange (newTicker) { tickerPlugins[cryptoCoin] = newTicker pollRate(cryptoCoin) } ) // WALLET [required] configure (or load) loadOrConfigPlugin( walletPlugins[cryptoCoin], 'transfer', cryptoCoin, null, function onWalletChange (newWallet) { walletPlugins[cryptoCoin] = newWallet pollBalance(cryptoCoin) } ) }) // TRADER [optional] configure (or load) traderPlugin = loadOrConfigPlugin( traderPlugin, 'trade', null, null, function onTraderChange (newTrader) { traderPlugin = newTrader if (newTrader === null) stopTrader() else startTrader() } ) // ID VERIFIER [optional] configure (or load) idVerifierPlugin = loadOrConfigPlugin( idVerifierPlugin, 'idVerifier' ) infoPlugin = loadOrConfigPlugin( infoPlugin, 'info' ) } exports.getConfig = function getConfig () { return cachedConfig } exports.logEvent = function event (session, rawEvent) { db.recordDeviceEvent(session, rawEvent) } function buildCartridges (cartridges, virtualCartridges, rec) { return { cartridges: [ { denomination: parseInt(cartridges[0], 10), count: parseInt(rec.counts[0], 10) }, { denomination: parseInt(cartridges[1], 10), count: parseInt(rec.counts[1], 10) } ], virtualCartridges: virtualCartridges, id: rec.id } } exports.pollQueries = function pollQueries (session, cb) { var cartridges = cachedConfig.exchanges.settings.cartridges if (!cartridges) return cb(null, {}) var virtualCartridges = cachedConfig.exchanges.settings.virtualCartridges db.cartridgeCounts(session, function (err, result) { if (err) return cb(err) return cb(null, { cartridges: buildCartridges(cartridges, virtualCartridges, result) }) }) } function _sendCoins (toAddress, cryptoUnits, cryptoCoin, cb) { var walletPlugin = walletPlugins[cryptoCoin] var transactionFee = cachedConfig.exchanges.settings.transactionFee if (cryptoCoin === 'BTC') { walletPlugin.sendBitcoins(toAddress, cryptoUnits, transactionFee, cb) } else { walletPlugin.sendCoins(toAddress, cryptoUnits, cryptoCoin, transactionFee, cb) } } function executeTx (session, tx, authority, cb) { db.addOutgoingTx(session, tx, function (err, toSend) { if (err) return cb(err) var cryptoUnitsToSend = toSend.cryptoUnits if (cryptoUnitsToSend === 0) { return cb(null, {statusCode: 204, txId: tx.txId, txHash: null}) } _sendCoins(tx.toAddress, cryptoUnitsToSend, function (_err, txHash) { var fee = null // Need to fill this out in plugins if (_err) toSend = {cryptoUnits: new BigNumber(0), fiat: 0} db.sentCoins(session, tx, authority, toSend, fee, _err, txHash) if (_err) return cb(_err) pollBalance('BTC') cb(null, { statusCode: 201, // Created txHash: txHash, txId: tx.txId }) }) }) } function reapOutgoingTx (session, tx) { executeTx(session, tx, 'timeout', function (err) { if (err) logger.error(err) }) } function reapTx (row) { var session = {fingerprint: row.device_fingerprint, id: row.session_id} var tx = { fiat: 0, satoshis: row.satoshis, toAddress: row.to_address, currencyCode: row.currency_code, incoming: row.incoming } if (!row.incoming) reapOutgoingTx(session, tx) } function reapTxs () { db.removeOldPending(DEPOSIT_TIMEOUT) // NOTE: No harm in processing old pending tx, we don't need to wait for // removeOldPending to complete. db.pendingTxs(PENDING_TIMEOUT, function (err, results) { if (err) return logger.warn(err) var rows = results.rows var rowCount = rows.length for (var i = 0; i < rowCount; i++) { var row = rows[i] reapTx(row) } }) } // TODO: Run these in parallel and return success exports.trade = function trade (session, rawTrade, cb) { // TODO: move this to DB, too // add bill to trader queue (if trader is enabled) if (traderPlugin) { tradesQueue.push({ currency: rawTrade.currency, satoshis: rawTrade.satoshis }) } if (!rawTrade.toAddress) { var newRawTrade = _.cloneDeep(rawTrade) newRawTrade.toAddress = 'remit' return db.recordBill(session, newRawTrade, cb) } var tx = { txId: rawTrade.txId, fiat: 0, satoshis: 0, toAddress: rawTrade.toAddress, currencyCode: rawTrade.currency } async.parallel([ async.apply(db.addOutgoingPending, session, tx.currencyCode, tx.toAddress), async.apply(db.recordBill, session, rawTrade) ], cb) } exports.sendCoins = function sendCoins (session, rawTx, cb) { executeTx(session, rawTx, 'machine', cb) } exports.cashOut = function cashOut (session, tx, cb) { var tmpInfo = { label: 'TX ' + Date.now(), account: 'deposit' } var cryptoCoin = tx.coin ? tx.coin.unitCode : 'BTC' var walletPlugin = walletPlugins[cryptoCoin] walletPlugin.newAddress(tmpInfo, function (err, address) { if (err) return cb(err) var newTx = _.clone(tx) newTx.toAddress = address db.addInitialIncoming(session, newTx, function (_err) { cb(_err, address) }) }) } exports.dispenseAck = function dispenseAck (session, rec) { db.addDispense(session, rec.tx, rec.cartridges) } exports.fiatBalance = function fiatBalance (cryptoCoin) { var rawRate = exports.getDeviceRate(cryptoCoin).rates.ask var commission = cachedConfig.exchanges.settings.commission var lastBalance = lastBalances[cryptoCoin] if (!rawRate || !lastBalance) return null // The rate is actually our commission times real rate. var rate = commission * rawRate // `lowBalanceMargin` is our safety net. It's a number > 1, and we divide // all our balances by it to provide a safety margin. var lowBalanceMargin = cachedConfig.exchanges.settings.lowBalanceMargin // `balance.transferBalance` is the balance of our transfer account (the one // we use to send Bitcoins to clients) in satoshis. var transferBalance = lastBalance.transferBalance var fiatTransferBalance = (transferBalance * rate) / lowBalanceMargin return fiatTransferBalance } /* * Polling livecycle */ exports.startPolling = function startPolling () { executeTrades() cryptoCoins.forEach(function (coin) { setInterval(async.apply(pollBalance, coin), POLLING_RATE) setInterval(async.apply(pollRate, coin), POLLING_RATE) }) setInterval(reapTxs, REAP_RATE) startTrader() } function startTrader () { // Always start trading, even if we don't have a trade exchange configured, // since configuration can always change in `Trader#configure`. // `Trader#executeTrades` returns early if we don't have a trade exchange // configured at the moment. if (traderPlugin && !tradeInterval) { tradeInterval = setInterval( executeTrades, cachedConfig.exchanges.settings.tradeInterval ) } } function stopTrader () { if (tradeInterval) { clearInterval(tradeInterval) tradeInterval = null tradesQueue = [] } } function pollBalance (cryptoCoin, cb) { logger.debug('[%s] collecting balance', cryptoCoin) var walletPlugin = walletPlugins[cryptoCoin] walletPlugin.balance(function (err, balance) { if (err) { logger.error(err) return cb && cb(err) } logger.debug('[%s] Balance update:', cryptoCoin, balance) balance.timestamp = Date.now() lastBalances[cryptoCoin] = balance return cb && cb(null, lastBalances) }) } function pollRate (cryptoCoin, cb) { logger.debug('[%s] polling for rates (%s)', cryptoCoin, tickerPlugin.NAME) var tickerPlugin = tickerPlugins[cryptoCoin] var currencies = deviceCurrency if (typeof currencies === 'string') currencies = [currencies] var tickerF = cryptoCoin === 'BTC' ? async.apply(tickerPlugin.ticker, currencies) : async.apply(tickerPlugin.ticker, currencies, cryptoCoin) tickerF(function (err, resRates) { if (err) { logger.error(err) return cb && cb(err) } logger.debug('got rates: %j', resRates) resRates.timestamp = new Date() lastRates[cryptoCoin] = resRates return cb && cb(null, lastRates) }) } /* * Getters | Helpers */ exports.getDeviceRate = function getDeviceRate (cryptoCoin) { if (!lastRates[cryptoCoin]) return null var lastRate = lastRates[cryptoCoin] if (!lastRate) return null return lastRate[deviceCurrency] } exports.getBalance = function getBalance (cryptoCoin) { var lastBalance = lastBalances[cryptoCoin] if (!lastBalance) return null return lastBalance.transferBalance } /* * Trader functions */ function purchase (trade, cb) { traderPlugin.purchase(trade.satoshis, null, function (err) { if (err) return cb(err) pollBalance('BTC') if (typeof cb === 'function') cb() }) } function consolidateTrades () { // NOTE: value in satoshis stays the same no matter the currency var consolidatedTrade = { currency: deviceCurrency, satoshis: tradesQueue.reduce(function (prev, current) { return prev + current.satoshis }, 0) } tradesQueue = [] logger.debug('consolidated: ', JSON.stringify(consolidatedTrade)) return consolidatedTrade } function executeTrades () { if (!traderPlugin) return logger.debug('checking for trades') var trade = consolidateTrades() if (trade.satoshis === 0) { logger.debug('rejecting 0 trade') return } logger.debug('making a trade: %d', trade.satoshis / SATOSHI_FACTOR) purchase(trade, function (err) { if (err) { tradesQueue.push(trade) if (err.name !== 'orderTooSmall') logger.error(err) } }) } /* * ID Verifier functions */ exports.verifyUser = function verifyUser (data, cb) { idVerifierPlugin.verifyUser(data, cb) } exports.verifyTx = function verifyTx (data, cb) { idVerifierPlugin.verifyTransaction(data, cb) } exports.getCryptoCoins = function getCryptoCoins () { return cryptoCoins }