diff --git a/lib/plugins.js b/lib/plugins.js index 777aa351..98fc6fc8 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -4,7 +4,8 @@ const BigNumber = require('bignumber.js') const argv = require('minimist')(process.argv.slice(2)) const crypto = require('crypto') -const db = require('./postgresql_interface') +const dbm = require('./postgresql_interface') +const db = require('./db') const logger = require('./logger') const notifier = require('./notifier') const T = require('./time') @@ -30,9 +31,11 @@ const SWEEP_LIVE_HD_INTERVAL = T.minute const SWEEP_OLD_HD_INTERVAL = 2 * T.minutes const TRADE_INTERVAL = 10 * T.seconds const TRADE_TTL = 2 * T.minutes -const STALE_TICKER = 3 * 60 * 1000 -const STALE_BALANCE = 3 * 60 * 1000 - +const STALE_TICKER = 3 * T.minutes +const STALE_BALANCE = 3 * T.minutes +const PONG_INTERVAL = 10 * T.seconds +const PONG_CLEAR_INTERVAL = 1 * T.day +const PONG_TTL = '1 week' const tradesQueues = {} const coins = { @@ -113,7 +116,7 @@ function pollQueries (deviceTime, deviceId, deviceRec) { const balancePromises = cryptoCodes.map(wallet.balance) const pingPromise = recordPing(deviceId, deviceTime, deviceRec) - const promises = [db.cartridgeCounts(deviceId), pingPromise].concat(tickerPromises, balancePromises) + const promises = [dbm.cartridgeCounts(deviceId), pingPromise].concat(tickerPromises, balancePromises) return Promise.all(promises) .then(arr => { @@ -130,15 +133,15 @@ function pollQueries (deviceTime, deviceId, deviceRec) { } // NOTE: This will fail if we have already sent coins because there will be -// a db unique db record in the table already. +// a dbm unique dbm record in the table already. function executeTx (deviceId, tx) { - return db.addOutgoingTx(deviceId, tx) + return dbm.addOutgoingTx(deviceId, tx) .then(() => wallet.sendCoins(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)) .then(txHash => { const fee = null // Need to fill this out in plugins const toSend = {cryptoAtoms: tx.cryptoAtoms, fiat: tx.fiat} - return db.sentCoins(tx, toSend, fee, null, txHash) + return dbm.sentCoins(tx, toSend, fee, null, txHash) .then(() => ({ statusCode: 201, // Created txHash, @@ -148,13 +151,13 @@ function executeTx (deviceId, tx) { } function trade (deviceId, rawTrade) { - // TODO: move this to DB, too + // TODO: move this to dbm, too // add bill to trader queue (if trader is enabled) const cryptoCode = rawTrade.cryptoCode const fiatCode = rawTrade.fiatCode const cryptoAtoms = rawTrade.cryptoAtoms - return db.recordBill(deviceId, rawTrade) + return dbm.recordBill(deviceId, rawTrade) .then(() => { const market = [fiatCode, cryptoCode].join('') @@ -177,7 +180,7 @@ function stateChange (deviceId, deviceTime, rec) { note: JSON.stringify({state: rec.state, isIdle: rec.isIdle, txId: rec.txId}), deviceTime: deviceTime } - return db.machineEvent(event) + return dbm.machineEvent(event) } function recordPing (deviceId, deviceTime, rec) { @@ -188,7 +191,7 @@ function recordPing (deviceId, deviceTime, rec) { note: JSON.stringify({state: rec.state, isIdle: rec.idle === 'true', txId: rec.txId}), deviceTime: deviceTime } - return db.machineEvent(event) + return dbm.machineEvent(event) } function sendCoins (deviceId, rawTx) { @@ -199,7 +202,7 @@ function cashOut (deviceId, tx) { const cryptoCode = tx.cryptoCode const serialPromise = wallet.supportsHD - ? db.nextCashOutSerialHD(tx.id, cryptoCode) + ? dbm.nextCashOutSerialHD(tx.id, cryptoCode) : Promise.resolve() return serialPromise @@ -214,7 +217,7 @@ function cashOut (deviceId, tx) { .then(address => { const newTx = R.assoc('toAddress', address, tx) - return db.addInitialIncoming(deviceId, newTx, address) + return dbm.addInitialIncoming(deviceId, newTx, address) .then(() => address) }) }) @@ -226,7 +229,7 @@ function dispenseAck (deviceId, tx) { const cartridges = [ config.currencies.topCashOutDenomination, config.currencies.bottomCashOutDenomination ] - return db.addDispense(deviceId, tx, cartridges) + return dbm.addDispense(deviceId, tx, cartridges) } function fiatBalance (fiatCode, cryptoCode, deviceId) { @@ -257,7 +260,7 @@ function fiatBalance (fiatCode, cryptoCode, deviceId) { function processTxStatus (tx) { return wallet.getStatus(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode) - .then(res => db.updateTxStatus(tx, res.status)) + .then(res => dbm.updateTxStatus(tx, res.status)) } function notifyConfirmation (tx) { @@ -272,13 +275,13 @@ function notifyConfirmation (tx) { } return sms.sendMessage(rec) - .then(() => db.updateNotify(tx)) + .then(() => dbm.updateNotify(tx)) } function monitorLiveIncoming () { const statuses = ['notSeen', 'published', 'insufficientFunds'] - return db.fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE) + return dbm.fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE) .then(txs => Promise.all(txs.map(processTxStatus))) .catch(logger.error) } @@ -286,22 +289,37 @@ function monitorLiveIncoming () { function monitorIncoming () { const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] - return db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) + return dbm.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) .then(txs => Promise.all(txs.map(processTxStatus))) .catch(logger.error) } function monitorUnnotified () { - db.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE) + dbm.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE) .then(txs => Promise.all(txs.map(notifyConfirmation))) .catch(logger.error) } +function pong () { + db.none('insert into server_events (event_type) values ($1)', ['ping']) + .catch(logger.error) +} + +function pongClear () { + const sql = `delete from server_events + where event_type=$1 + and created < now() - interval $2` + + db.none(sql, ['ping', PONG_TTL]) + .catch(logger.error) +} + /* * Polling livecycle */ function startPolling () { executeTrades() + pongClear() setInterval(executeTrades, TRADE_INTERVAL) setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL) @@ -309,7 +327,8 @@ function startPolling () { setInterval(monitorUnnotified, UNNOTIFIED_INTERVAL) setInterval(sweepLiveHD, SWEEP_LIVE_HD_INTERVAL) setInterval(sweepOldHD, SWEEP_OLD_HD_INTERVAL) - + setInterval(pong, PONG_INTERVAL) + setInterval(pongClear, PONG_CLEAR_INTERVAL) monitorLiveIncoming() monitorIncoming() monitorUnnotified() @@ -369,7 +388,7 @@ function executeTrades () { const settings = settingsLoader.settings() const config = settings.config - return db.devices() + return dbm.devices() .then(devices => { const deviceIds = devices.map(device => device.device_id) const lists = deviceIds.map(deviceId => { @@ -492,7 +511,7 @@ function checkDeviceBalances (deviceId) { } function checkBalances () { - return db.devices() + return dbm.devices() .then(devices => { const deviceIds = devices.map(r => r.device_id) const deviceBalancePromises = deviceIds.map(deviceId => checkDeviceBalances(deviceId)) @@ -533,7 +552,7 @@ function getPhoneCode (phone) { } function fetchPhoneTx (phone) { - return db.fetchPhoneTxs(phone, TRANSACTION_EXPIRATION) + return dbm.fetchPhoneTxs(phone, TRANSACTION_EXPIRATION) .then(txs => { const confirmedTxs = txs.filter(tx => R.contains(tx.status, ['instant', 'confirmed'])) if (confirmedTxs.length > 0) { @@ -556,20 +575,20 @@ function sweepHD (row) { .then(txHash => { if (txHash) { logger.debug('[%s] Swept address with tx: %s', cryptoCode, txHash) - return db.markSwept(row.tx_id) + return dbm.markSwept(row.tx_id) } }) .catch(err => logger.error('[%s] Sweep error: %s', cryptoCode, err.message)) } function sweepLiveHD () { - return db.fetchLiveHD() + return dbm.fetchLiveHD() .then(rows => Promise.all(rows.map(sweepHD))) .catch(err => logger.error(err)) } function sweepOldHD () { - return db.fetchOldHD() + return dbm.fetchOldHD() .then(rows => Promise.all(rows.map(sweepHD))) .catch(err => logger.error(err)) } diff --git a/migrations/020-add-server-events.js b/migrations/020-add-server-events.js new file mode 100644 index 00000000..93d0ce0e --- /dev/null +++ b/migrations/020-add-server-events.js @@ -0,0 +1,17 @@ +var db = require('./db') + +exports.up = function (next) { + var sql = [ + `create table server_events ( + id serial PRIMARY KEY, + event_type text NOT NULL, + created timestamptz NOT NULL default now() + )`, + 'CREATE INDEX ON server_events (created)' + ] + db.multi(sql, next) +} + +exports.down = function (next) { + next() +}