generate uptime pongs

This commit is contained in:
Josh Harvey 2016-11-29 17:08:09 +02:00
parent d75d75f30c
commit 33ff407437
2 changed files with 63 additions and 27 deletions

View file

@ -4,7 +4,8 @@ const BigNumber = require('bignumber.js')
const argv = require('minimist')(process.argv.slice(2)) const argv = require('minimist')(process.argv.slice(2))
const crypto = require('crypto') const crypto = require('crypto')
const db = require('./postgresql_interface') const dbm = require('./postgresql_interface')
const db = require('./db')
const logger = require('./logger') const logger = require('./logger')
const notifier = require('./notifier') const notifier = require('./notifier')
const T = require('./time') const T = require('./time')
@ -30,9 +31,11 @@ const SWEEP_LIVE_HD_INTERVAL = T.minute
const SWEEP_OLD_HD_INTERVAL = 2 * T.minutes const SWEEP_OLD_HD_INTERVAL = 2 * T.minutes
const TRADE_INTERVAL = 10 * T.seconds const TRADE_INTERVAL = 10 * T.seconds
const TRADE_TTL = 2 * T.minutes const TRADE_TTL = 2 * T.minutes
const STALE_TICKER = 3 * 60 * 1000 const STALE_TICKER = 3 * T.minutes
const STALE_BALANCE = 3 * 60 * 1000 const STALE_BALANCE = 3 * T.minutes
const PONG_INTERVAL = 10 * T.seconds
const PONG_CLEAR_INTERVAL = 1 * T.day
const PONG_TTL = '1 week'
const tradesQueues = {} const tradesQueues = {}
const coins = { const coins = {
@ -113,7 +116,7 @@ function pollQueries (deviceTime, deviceId, deviceRec) {
const balancePromises = cryptoCodes.map(wallet.balance) const balancePromises = cryptoCodes.map(wallet.balance)
const pingPromise = recordPing(deviceId, deviceTime, deviceRec) const pingPromise = recordPing(deviceId, deviceTime, deviceRec)
const promises = [db.cartridgeCounts(deviceId), pingPromise].concat(tickerPromises, balancePromises) const promises = [dbm.cartridgeCounts(deviceId), pingPromise].concat(tickerPromises, balancePromises)
return Promise.all(promises) return Promise.all(promises)
.then(arr => { .then(arr => {
@ -130,15 +133,15 @@ function pollQueries (deviceTime, deviceId, deviceRec) {
} }
// NOTE: This will fail if we have already sent coins because there will be // NOTE: This will fail if we have already sent coins because there will be
// a db unique db record in the table already. // a dbm unique dbm record in the table already.
function executeTx (deviceId, tx) { function executeTx (deviceId, tx) {
return db.addOutgoingTx(deviceId, tx) return dbm.addOutgoingTx(deviceId, tx)
.then(() => wallet.sendCoins(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)) .then(() => wallet.sendCoins(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode))
.then(txHash => { .then(txHash => {
const fee = null // Need to fill this out in plugins const fee = null // Need to fill this out in plugins
const toSend = {cryptoAtoms: tx.cryptoAtoms, fiat: tx.fiat} const toSend = {cryptoAtoms: tx.cryptoAtoms, fiat: tx.fiat}
return db.sentCoins(tx, toSend, fee, null, txHash) return dbm.sentCoins(tx, toSend, fee, null, txHash)
.then(() => ({ .then(() => ({
statusCode: 201, // Created statusCode: 201, // Created
txHash, txHash,
@ -148,13 +151,13 @@ function executeTx (deviceId, tx) {
} }
function trade (deviceId, rawTrade) { function trade (deviceId, rawTrade) {
// TODO: move this to DB, too // TODO: move this to dbm, too
// add bill to trader queue (if trader is enabled) // add bill to trader queue (if trader is enabled)
const cryptoCode = rawTrade.cryptoCode const cryptoCode = rawTrade.cryptoCode
const fiatCode = rawTrade.fiatCode const fiatCode = rawTrade.fiatCode
const cryptoAtoms = rawTrade.cryptoAtoms const cryptoAtoms = rawTrade.cryptoAtoms
return db.recordBill(deviceId, rawTrade) return dbm.recordBill(deviceId, rawTrade)
.then(() => { .then(() => {
const market = [fiatCode, cryptoCode].join('') const market = [fiatCode, cryptoCode].join('')
@ -177,7 +180,7 @@ function stateChange (deviceId, deviceTime, rec) {
note: JSON.stringify({state: rec.state, isIdle: rec.isIdle, txId: rec.txId}), note: JSON.stringify({state: rec.state, isIdle: rec.isIdle, txId: rec.txId}),
deviceTime: deviceTime deviceTime: deviceTime
} }
return db.machineEvent(event) return dbm.machineEvent(event)
} }
function recordPing (deviceId, deviceTime, rec) { function recordPing (deviceId, deviceTime, rec) {
@ -188,7 +191,7 @@ function recordPing (deviceId, deviceTime, rec) {
note: JSON.stringify({state: rec.state, isIdle: rec.idle === 'true', txId: rec.txId}), note: JSON.stringify({state: rec.state, isIdle: rec.idle === 'true', txId: rec.txId}),
deviceTime: deviceTime deviceTime: deviceTime
} }
return db.machineEvent(event) return dbm.machineEvent(event)
} }
function sendCoins (deviceId, rawTx) { function sendCoins (deviceId, rawTx) {
@ -199,7 +202,7 @@ function cashOut (deviceId, tx) {
const cryptoCode = tx.cryptoCode const cryptoCode = tx.cryptoCode
const serialPromise = wallet.supportsHD const serialPromise = wallet.supportsHD
? db.nextCashOutSerialHD(tx.id, cryptoCode) ? dbm.nextCashOutSerialHD(tx.id, cryptoCode)
: Promise.resolve() : Promise.resolve()
return serialPromise return serialPromise
@ -214,7 +217,7 @@ function cashOut (deviceId, tx) {
.then(address => { .then(address => {
const newTx = R.assoc('toAddress', address, tx) const newTx = R.assoc('toAddress', address, tx)
return db.addInitialIncoming(deviceId, newTx, address) return dbm.addInitialIncoming(deviceId, newTx, address)
.then(() => address) .then(() => address)
}) })
}) })
@ -226,7 +229,7 @@ function dispenseAck (deviceId, tx) {
const cartridges = [ config.currencies.topCashOutDenomination, const cartridges = [ config.currencies.topCashOutDenomination,
config.currencies.bottomCashOutDenomination ] config.currencies.bottomCashOutDenomination ]
return db.addDispense(deviceId, tx, cartridges) return dbm.addDispense(deviceId, tx, cartridges)
} }
function fiatBalance (fiatCode, cryptoCode, deviceId) { function fiatBalance (fiatCode, cryptoCode, deviceId) {
@ -257,7 +260,7 @@ function fiatBalance (fiatCode, cryptoCode, deviceId) {
function processTxStatus (tx) { function processTxStatus (tx) {
return wallet.getStatus(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode) return wallet.getStatus(tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)
.then(res => db.updateTxStatus(tx, res.status)) .then(res => dbm.updateTxStatus(tx, res.status))
} }
function notifyConfirmation (tx) { function notifyConfirmation (tx) {
@ -272,13 +275,13 @@ function notifyConfirmation (tx) {
} }
return sms.sendMessage(rec) return sms.sendMessage(rec)
.then(() => db.updateNotify(tx)) .then(() => dbm.updateNotify(tx))
} }
function monitorLiveIncoming () { function monitorLiveIncoming () {
const statuses = ['notSeen', 'published', 'insufficientFunds'] const statuses = ['notSeen', 'published', 'insufficientFunds']
return db.fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE) return dbm.fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE)
.then(txs => Promise.all(txs.map(processTxStatus))) .then(txs => Promise.all(txs.map(processTxStatus)))
.catch(logger.error) .catch(logger.error)
} }
@ -286,22 +289,37 @@ function monitorLiveIncoming () {
function monitorIncoming () { function monitorIncoming () {
const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds']
return db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) return dbm.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE)
.then(txs => Promise.all(txs.map(processTxStatus))) .then(txs => Promise.all(txs.map(processTxStatus)))
.catch(logger.error) .catch(logger.error)
} }
function monitorUnnotified () { function monitorUnnotified () {
db.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE) dbm.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE)
.then(txs => Promise.all(txs.map(notifyConfirmation))) .then(txs => Promise.all(txs.map(notifyConfirmation)))
.catch(logger.error) .catch(logger.error)
} }
function pong () {
db.none('insert into server_events (event_type) values ($1)', ['ping'])
.catch(logger.error)
}
function pongClear () {
const sql = `delete from server_events
where event_type=$1
and created < now() - interval $2`
db.none(sql, ['ping', PONG_TTL])
.catch(logger.error)
}
/* /*
* Polling livecycle * Polling livecycle
*/ */
function startPolling () { function startPolling () {
executeTrades() executeTrades()
pongClear()
setInterval(executeTrades, TRADE_INTERVAL) setInterval(executeTrades, TRADE_INTERVAL)
setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL) setInterval(monitorLiveIncoming, LIVE_INCOMING_TX_INTERVAL)
@ -309,7 +327,8 @@ function startPolling () {
setInterval(monitorUnnotified, UNNOTIFIED_INTERVAL) setInterval(monitorUnnotified, UNNOTIFIED_INTERVAL)
setInterval(sweepLiveHD, SWEEP_LIVE_HD_INTERVAL) setInterval(sweepLiveHD, SWEEP_LIVE_HD_INTERVAL)
setInterval(sweepOldHD, SWEEP_OLD_HD_INTERVAL) setInterval(sweepOldHD, SWEEP_OLD_HD_INTERVAL)
setInterval(pong, PONG_INTERVAL)
setInterval(pongClear, PONG_CLEAR_INTERVAL)
monitorLiveIncoming() monitorLiveIncoming()
monitorIncoming() monitorIncoming()
monitorUnnotified() monitorUnnotified()
@ -369,7 +388,7 @@ function executeTrades () {
const settings = settingsLoader.settings() const settings = settingsLoader.settings()
const config = settings.config const config = settings.config
return db.devices() return dbm.devices()
.then(devices => { .then(devices => {
const deviceIds = devices.map(device => device.device_id) const deviceIds = devices.map(device => device.device_id)
const lists = deviceIds.map(deviceId => { const lists = deviceIds.map(deviceId => {
@ -492,7 +511,7 @@ function checkDeviceBalances (deviceId) {
} }
function checkBalances () { function checkBalances () {
return db.devices() return dbm.devices()
.then(devices => { .then(devices => {
const deviceIds = devices.map(r => r.device_id) const deviceIds = devices.map(r => r.device_id)
const deviceBalancePromises = deviceIds.map(deviceId => checkDeviceBalances(deviceId)) const deviceBalancePromises = deviceIds.map(deviceId => checkDeviceBalances(deviceId))
@ -533,7 +552,7 @@ function getPhoneCode (phone) {
} }
function fetchPhoneTx (phone) { function fetchPhoneTx (phone) {
return db.fetchPhoneTxs(phone, TRANSACTION_EXPIRATION) return dbm.fetchPhoneTxs(phone, TRANSACTION_EXPIRATION)
.then(txs => { .then(txs => {
const confirmedTxs = txs.filter(tx => R.contains(tx.status, ['instant', 'confirmed'])) const confirmedTxs = txs.filter(tx => R.contains(tx.status, ['instant', 'confirmed']))
if (confirmedTxs.length > 0) { if (confirmedTxs.length > 0) {
@ -556,20 +575,20 @@ function sweepHD (row) {
.then(txHash => { .then(txHash => {
if (txHash) { if (txHash) {
logger.debug('[%s] Swept address with tx: %s', cryptoCode, txHash) logger.debug('[%s] Swept address with tx: %s', cryptoCode, txHash)
return db.markSwept(row.tx_id) return dbm.markSwept(row.tx_id)
} }
}) })
.catch(err => logger.error('[%s] Sweep error: %s', cryptoCode, err.message)) .catch(err => logger.error('[%s] Sweep error: %s', cryptoCode, err.message))
} }
function sweepLiveHD () { function sweepLiveHD () {
return db.fetchLiveHD() return dbm.fetchLiveHD()
.then(rows => Promise.all(rows.map(sweepHD))) .then(rows => Promise.all(rows.map(sweepHD)))
.catch(err => logger.error(err)) .catch(err => logger.error(err))
} }
function sweepOldHD () { function sweepOldHD () {
return db.fetchOldHD() return dbm.fetchOldHD()
.then(rows => Promise.all(rows.map(sweepHD))) .then(rows => Promise.all(rows.map(sweepHD)))
.catch(err => logger.error(err)) .catch(err => logger.error(err))
} }

View file

@ -0,0 +1,17 @@
var db = require('./db')
exports.up = function (next) {
var sql = [
`create table server_events (
id serial PRIMARY KEY,
event_type text NOT NULL,
created timestamptz NOT NULL default now()
)`,
'CREATE INDEX ON server_events (created)'
]
db.multi(sql, next)
}
exports.down = function (next) {
next()
}