'use strict'; var _ = require('lodash'); var async = require('async'); var logger = require('./logger'); var argv = require('minimist')(process.argv.slice(2)); var SATOSHI_FACTOR = 1e8; var POLLING_RATE = 60 * 1000; // poll each minute var REAP_RATE = 2 * 1000; var PENDING_TIMEOUT = 70 * 1000; if (argv.timeout) PENDING_TIMEOUT = argv.timeout / 1000; // TODO: might have to update this if user is allowed to extend monitoring time var DEPOSIT_TIMEOUT = 130 * 1000; var db = null; var tickerPlugins = {}; var traderPlugin = null; var walletPlugins = {}; var idVerifierPlugin = null; var infoPlugin = null; var currentlyUsedPlugins = {}; var cachedConfig = null; var deviceCurrency = 'USD'; var lastBalances = {}; var lastRates = {}; var tradesQueue = []; // that's basically a constructor exports.init = function init(databaseHandle) { if (!databaseHandle) { throw new Error('\'db\' is required'); } db = databaseHandle; }; function loadPlugin(name, config) { // plugins definitions var moduleMethods = { ticker: ['ticker'], trader: ['balance', 'purchase', 'sell'], wallet: ['balance', 'sendBitcoins', 'newAddress'], idVerifier: ['verifyUser', 'verifyTransaction'], info: ['checkAddress'] }; var plugin = null; // each used plugin MUST be installed try { plugin = require('lamassu-' + name); } catch (_) { throw new Error(name + ' module is not installed. ' + 'Try running \'npm install --save lamassu-' + name + '\' first'); } // each plugin MUST implement those if (typeof plugin.SUPPORTED_MODULES !== 'undefined') { if (plugin.SUPPORTED_MODULES === 'string') plugin.SUPPORTED_MODULES = [plugin.SUPPORTED_MODULES]; } if (!(plugin.SUPPORTED_MODULES instanceof Array)) throw new Error('\'' + name + '\' fails to implement *required* ' + '\'SUPPORTED_MODULES\' constant'); plugin.SUPPORTED_MODULES.forEach(function(moduleName) { moduleMethods[moduleName].forEach(function(methodName) { if (typeof plugin[methodName] !== 'function') { throw new Error('\'' + name + '\' declares \'' + moduleName + '\', but fails to implement \'' + methodName + '\' method'); } }); }); // each plugin SHOULD implement those if (typeof plugin.NAME === 'undefined') logger.warn(new Error('\'' + name + '\' fails to implement *recommended* \'NAME\' field')); if (typeof plugin.config !== 'function') { logger.warn(new Error('\'' + name + '\' fails to implement *recommended* \'config\' method')); plugin.config = function() {}; } else if (config !== null) { plugin.config(config); // only when plugin supports it, and config is passed } return plugin; } function loadOrConfigPlugin(pluginHandle, pluginType, cryptoCoin, currency, onChangeCallback) { if (!cryptoCoin) cryptoCoin = 'any' var currentName = cryptoCoin === 'any' || cryptoCoin === 'BTC' ? cachedConfig.exchanges.plugins.current[pluginType] : cachedConfig.exchanges.plugins.current[cryptoCoin][pluginType] var pluginChanged = currentlyUsedPlugins[cryptoCoin][pluginType] !== currentName; if (!currentName) pluginHandle = null; else { // some plugins may be disabled var pluginConfig = cachedConfig.exchanges.plugins.settings[currentName] || {}; if (currency) pluginConfig.currency = currency; if (pluginHandle && !pluginChanged) pluginHandle.config(pluginConfig); else { pluginHandle = loadPlugin(currentName, pluginConfig); currentlyUsedPlugins[cryptoCoin] ||= {} currentlyUsedPlugins[cryptoCoin][pluginType] = currentName logger.debug('[%s] plugin(%s) loaded: %s', cryptoCoin, pluginType, pluginHandle.NAME || currentName); } } if (typeof onChangeCallback === 'function') onChangeCallback(pluginHandle, currency); return pluginHandle; } exports.configure = function configure(config) { if (config.exchanges.settings.lowBalanceMargin < 1) { throw new Error('\'settings.lowBalanceMargin\' has to be >= 1'); } cachedConfig = config; deviceCurrency = config.exchanges.settings.currency; cryptoCoins = config.exchanges.settings.coins || ['BTC']; cryptoCoins.forEach(function (cryptoCoin) { // TICKER [required] configure (or load) loadOrConfigPlugin( tickerPlugins[cryptoCoin], 'ticker', cryptoCoin, deviceCurrency, // device currency function onTickerChange(newTicker) { tickerPlugins[cryptoCoin] = newTicker; pollRate(cryptoCoin); } ); // WALLET [required] configure (or load) loadOrConfigPlugin( walletPlugins[cryptoCoin], 'transfer', cryptoCoin, null, function onWalletChange(newWallet) { walletPlugins[cryptoCoin] = newWallet; pollBalance(cryptoCoin); } ); }) // TRADER [optional] configure (or load) traderPlugin = loadOrConfigPlugin( traderPlugin, 'trade', null, null, function onTraderChange(newTrader) { traderPlugin = newTrader; if (newTrader === null) stopTrader(); else startTrader(); } ); // ID VERIFIER [optional] configure (or load) idVerifierPlugin = loadOrConfigPlugin( idVerifierPlugin, 'idVerifier' ); infoPlugin = loadOrConfigPlugin( infoPlugin, 'info' ); }; exports.getConfig = function getConfig() { return cachedConfig; }; exports.logEvent = function event(session, rawEvent) { db.recordDeviceEvent(session, rawEvent); }; function buildCartridges(cartridges, virtualCartridges, rec) { return { cartridges: [ { denomination: parseInt(cartridges[0], 10), count: parseInt(rec.counts[0], 10) }, { denomination: parseInt(cartridges[1], 10), count: parseInt(rec.counts[1], 10) } ], virtualCartridges: virtualCartridges, id: rec.id }; } exports.pollQueries = function pollQueries(session, cb) { var cartridges = cachedConfig.exchanges.settings.cartridges; if (!cartridges) return cb(null, {}); var virtualCartridges = cachedConfig.exchanges.settings.virtualCartridges; db.cartridgeCounts(session, function(err, result) { if (err) return cb(err); return cb(null, { cartridges: buildCartridges(cartridges, virtualCartridges, result) }); }); }; function _sendCoins(toAddress, cryptoUnits, cryptoCoin, cb) { var walletPlugin = walletPlugins[cryptoCoin] var transactionFee = cachedConfig.exchanges.settings.transactionFee; if (cryptoCoin === 'BTC') walletPlugin.sendBitcoins(toAddress, cryptoUnits, transactionFee, cb); else walletPlugin.sendCoins(toAddress, cryptoUnits, cryptoCoin, transactionFee, cb); } function executeTx(session, tx, authority, cb) { db.addOutgoingTx(session, tx, function(err, toSend) { if (err) return cb(err); var cryptoUnitsToSend = toSend.cryptoUnits; if (cryptoUnitsToSend === 0) return cb(null, {statusCode: 204, txId: tx.txId, txHash: null}); _sendCoins(tx.toAddress, cryptoUnitsToSend, function(_err, txHash) { var fee = null; // Need to fill this out in plugins if (_err) toSend = {cryptoUnits: new BigNumber(0), fiat: 0}; db.sentCoins(session, tx, authority, toSend, fee, _err, txHash); if (_err) return cb(_err); var cryptoCoin = tx.coin ? tx.coin.unitCode : 'BTC' pollBalance('BTC'); cb(null, { statusCode: 201, // Created txHash: txHash, txId: tx.txId }); }); }); } function reapOutgoingTx(session, tx) { executeTx(session, tx, 'timeout', function(err) { if (err) logger.error(err); }); } function reapTx(row) { var session = {fingerprint: row.device_fingerprint, id: row.session_id}; var tx = { fiat: 0, satoshis: row.satoshis, toAddress: row.to_address, currencyCode: row.currency_code, incoming: row.incoming }; if (!row.incoming) reapOutgoingTx(session, tx); } function reapTxs() { db.removeOldPending(DEPOSIT_TIMEOUT); // NOTE: No harm in processing old pending tx, we don't need to wait for // removeOldPending to complete. db.pendingTxs(PENDING_TIMEOUT, function(err, results) { if (err) return logger.warn(err); var rows = results.rows; var rowCount = rows.length; for (var i = 0; i < rowCount; i++) { var row = rows[i]; reapTx(row); } }); } // TODO: Run these in parallel and return success exports.trade = function trade(session, rawTrade, cb) { // TODO: move this to DB, too // add bill to trader queue (if trader is enabled) if (traderPlugin) { tradesQueue.push({ currency: rawTrade.currency, satoshis: rawTrade.satoshis }); } if (!rawTrade.toAddress) { var newRawTrade = _.cloneDeep(rawTrade); newRawTrade.toAddress = 'remit'; return db.recordBill(session, newRawTrade, cb); } var tx = { txId: rawTrade.txId, fiat: 0, satoshis: 0, toAddress: rawTrade.toAddress, currencyCode: rawTrade.currency }; async.parallel([ async.apply(db.addOutgoingPending, session, tx.currencyCode, tx.toAddress), async.apply(db.recordBill, session, rawTrade) ], cb); }; exports.sendBitcoins = function sendBitcoins(session, rawTx, cb) { executeTx(session, rawTx, 'machine', cb); }; exports.cashOut = function cashOut(session, tx, cb) { var tmpInfo = { label: 'TX ' + Date.now(), account: 'deposit' }; var cryptoCoin = tx.coin ? tx.coin.unitCode : 'BTC' var walletPlugin = walletPlugins[cryptoCoin] walletPlugin.newAddress(tmpInfo, function(err, address) { if (err) return cb(err); var newTx = _.clone(tx); newTx.toAddress = address; db.addInitialIncoming(session, newTx, function(_err) { cb(_err, address); }); }); }; exports.dispenseAck = function dispenseAck(session, rec) { db.addDispense(session, rec.tx, rec.cartridges); }; exports.fiatBalance = function fiatBalance(cryptoCoin) { var rawRate = exports.getDeviceRate(cryptoCoin).rates.ask; var commission = cachedConfig.exchanges.settings.commission; var lastBalance = lastBalances[cryptoCoin] if (!rawRate || !lastBalance) return null; // The rate is actually our commission times real rate. var rate = commission * rawRate; // `lowBalanceMargin` is our safety net. It's a number > 1, and we divide // all our balances by it to provide a safety margin. var lowBalanceMargin = cachedConfig.exchanges.settings.lowBalanceMargin; // `balance.transferBalance` is the balance of our transfer account (the one // we use to send Bitcoins to clients) in satoshis. var transferBalance = lastBalances.transferBalance; var fiatTransferBalance = (transferBalance * rate) / lowBalanceMargin; return fiatTransferBalance; }; /* * Polling livecycle */ exports.startPolling = function startPolling() { executeTrades(); cryptoCoins.forEach(function (coin) { setInterval(async.apply(pollBalance, coin), POLLING_RATE); setInterval(async.apply(pollRate, coin), POLLING_RATE); }); setInterval(reapTxs, REAP_RATE); startTrader(); }; function startTrader() { // Always start trading, even if we don't have a trade exchange configured, // since configuration can always change in `Trader#configure`. // `Trader#executeTrades` returns early if we don't have a trade exchange // configured at the moment. if (traderPlugin && !tradeInterval) { tradeInterval = setInterval( executeTrades, cachedConfig.exchanges.settings.tradeInterval ); } } function stopTrader() { if (tradeInterval) { clearInterval(tradeInterval); tradeInterval = null; tradesQueue = []; } } function pollBalance(cryptoCoin, cb) { logger.debug('[%s] collecting balance', cryptoCoin); var walletPlugin = walletPlugins[cryptoCoin] var jobs = { transferBalance: walletPlugin.balance }; walletPlugin.balance(function(err, balance) { if (err) { logger.error(err); return cb && cb(err); } logger.debug('[%s] Balance update:', cryptoCoin, balance); balance.timestamp = Date.now(); lastBalances[cryptoCoin] = balance; return cb && cb(null, lastBalances); }); } function pollRates (cb) { var polls = cryptoCoins.map(function (cryptoCoin) { async.apply(pollRate, cryptoCoin) }); async.parallel(polls, cb); } function pollRate(cryptoCoin, cb) { logger.debug('[%s] polling for rates (%s)', cryptoCoin, tickerPlugin.NAME); var tickerPlugin = tickerPlugins[cryptoCoin]; tickerPlugin.ticker(deviceCurrency, function(err, resRates) { if (err) { logger.error(err); return cb && cb(err); } logger.debug('got rates: %j', resRates); resRates.timestamp = new Date(); lastRates[cryptoCoin] = resRates; return cb && cb(null, lastRates); }); } /* * Getters | Helpers */ exports.getDeviceRate = function getDeviceRate(cryptoCoin) { if (!lastRates[cryptoCoin]) return null; var lastRate = lastRates[cryptoCoin] if (!lastRate) return null; return lastRate[deviceCurrency]; }; exports.getBalance = function getBalance() { if (!lastBalances) return null; return lastBalances.transferBalance; }; /* * Trader functions */ function purchase(trade, cb) { traderPlugin.purchase(trade.satoshis, null, function(err) { if (err) return cb(err); pollBalance('BTC'); if (typeof cb === 'function') cb(); }); } function consolidateTrades() { // NOTE: value in satoshis stays the same no matter the currency var consolidatedTrade = { currency: deviceCurrency, satoshis: tradesQueue.reduce(function(prev, current) { return prev + current.satoshis; }, 0) }; tradesQueue = []; logger.debug('consolidated: ', JSON.stringify(consolidatedTrade)); return consolidatedTrade; } function executeTrades() { if (!traderPlugin) return; logger.debug('checking for trades'); var trade = consolidateTrades(); if (trade.satoshis === 0) { logger.debug('rejecting 0 trade'); return; } logger.debug('making a trade: %d', trade.satoshis / SATOSHI_FACTOR); purchase(trade, function(err) { if (err) { tradesQueue.push(trade); if (err.name !== 'orderTooSmall') logger.error(err); } }); } /* * ID Verifier functions */ exports.verifyUser = function verifyUser(data, cb) { idVerifierPlugin.verifyUser(data, cb); }; exports.verifyTx = function verifyTx(data, cb) { idVerifierPlugin.verifyTransaction(data, cb); };