Lots of development

This commit is contained in:
Josh Harvey 2017-03-31 16:45:14 +03:00
parent 5cbec6bd23
commit 3a244f691e
19 changed files with 594 additions and 837 deletions

View file

@ -2,6 +2,7 @@ const uuid = require('uuid')
const _ = require('lodash/fp')
const argv = require('minimist')(process.argv.slice(2))
const crypto = require('crypto')
const pgp = require('pg-promise')()
const BN = require('./bn')
const dbm = require('./postgresql_interface')
@ -15,10 +16,8 @@ const exchange = require('./exchange')
const sms = require('./sms')
const email = require('./email')
const STALE_INCOMING_TX_AGE = T.week
const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes
const MAX_NOTIFY_AGE = 2 * T.days
const MIN_NOTIFY_AGE = 5 * T.minutes
const mapValuesWithKey = _.mapValues.convert({cap: false})
const TRADE_TTL = 2 * T.minutes
const STALE_TICKER = 3 * T.minutes
const STALE_BALANCE = 3 * T.minutes
@ -42,13 +41,13 @@ function plugins (settings, deviceId) {
const cryptoConfig = configManager.scoped(cryptoCode, deviceId, settings.config)
const rateRec = tickers[i]
const cashInCommission = BN(1).minus(BN(cryptoConfig.cashInCommission).div(100))
const cashOutCommission = cashOut && BN(1).plus(BN(cryptoConfig.cashOutCommission).div(100))
const cashInCommission = BN(1).add(BN(cryptoConfig.cashInCommission).div(100))
const cashOutCommission = cashOut && BN(1).add(BN(cryptoConfig.cashOutCommission).div(100))
if (Date.now() - rateRec.timestamp > STALE_TICKER) return logger.warn('Stale rate for ' + cryptoCode)
const rate = rateRec.rates
rates[cryptoCode] = {
cashIn: rate.ask.div(cashInCommission),
cashIn: rate.ask.mul(cashInCommission),
cashOut: cashOut ? rate.bid.div(cashOutCommission) : undefined
}
})
@ -160,36 +159,10 @@ function plugins (settings, deviceId) {
})
}
// NOTE: This will fail if we have already sent coins because there will be
// a unique dbm record in the table already.
function sendCoins (tx) {
return wallet.sendCoins(settings, tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)
}
function trade (rawTrade) {
// TODO: move this to dbm, too
// add bill to trader queue (if trader is enabled)
const cryptoCode = rawTrade.cryptoCode
const fiatCode = rawTrade.fiatCode
const cryptoAtoms = rawTrade.cryptoAtoms
return dbm.recordBill(deviceId, rawTrade)
.then(() => {
const market = [fiatCode, cryptoCode].join('')
if (!exchange.active(settings, cryptoCode)) return
logger.debug('[%s] Pushing trade: %d', market, cryptoAtoms)
if (!tradesQueues[market]) tradesQueues[market] = []
tradesQueues[market].push({
fiatCode,
cryptoAtoms,
cryptoCode,
timestamp: Date.now()
})
})
}
function recordPing (deviceTime, rec) {
const event = {
id: uuid.v4(),
@ -201,13 +174,22 @@ function plugins (settings, deviceId) {
return dbm.machineEvent(event)
}
function isHd (tx) {
return wallet.isHd(settings, tx.cryptoCode)
}
function getStatus (tx) {
return wallet.getStatus(settings, tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)
}
function newAddress (tx) {
const cryptoCode = tx.cryptoCode
const info = {
cryptoCode: tx.cryptoCode,
label: 'TX ' + Date.now(),
account: 'deposit'
account: 'deposit',
hdIndex: tx.hdIndex
}
return wallet.newAddress(settings, cryptoCode, info)
return wallet.newAddress(settings, info)
}
function dispenseAck (tx) {
@ -244,11 +226,6 @@ function plugins (settings, deviceId) {
})
}
function processTxStatus (tx) {
return wallet.getStatus(settings, tx.toAddress, tx.cryptoAtoms, tx.cryptoCode)
.then(res => dbm.updateTxStatus(tx, res.status))
}
function notifyConfirmation (tx) {
logger.debug('notifyConfirmation')
@ -256,34 +233,17 @@ function plugins (settings, deviceId) {
const rec = {
sms: {
toNumber: phone,
body: 'Your cash is waiting! Go to the Cryptomat and press Redeem.'
body: 'Your cash is waiting! Go to the Cryptomat and press Redeem within 24 hours.'
}
}
return sms.sendMessage(settings, rec)
.then(() => dbm.updateNotify(tx))
}
.then(() => {
const sql = 'update cash_out_txs set notified=$1 where id=$2'
const values = [true, tx.id]
function monitorLiveIncoming () {
const statuses = ['notSeen', 'published', 'insufficientFunds']
return dbm.fetchOpenTxs(statuses, STALE_LIVE_INCOMING_TX_AGE)
.then(txs => Promise.all(txs.map(processTxStatus)))
.catch(logger.error)
}
function monitorIncoming () {
const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds']
return dbm.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE)
.then(txs => Promise.all(txs.map(processTxStatus)))
.catch(logger.error)
}
function monitorUnnotified () {
dbm.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE)
.then(txs => Promise.all(txs.map(notifyConfirmation)))
.catch(logger.error)
return db.none(sql, values)
})
}
function pong () {
@ -304,6 +264,35 @@ function plugins (settings, deviceId) {
* Trader functions
*/
function buy (rec) {
return buyAndSell(rec, true)
}
function sell (rec) {
return buyAndSell(rec, false)
}
function buyAndSell (rec, doBuy) {
const cryptoCode = rec.cryptoCode
const fiatCode = rec.fiatCode
const cryptoAtoms = doBuy ? rec.cryptoAtoms : rec.cryptoAtoms.neg()
const market = [fiatCode, cryptoCode].join('')
console.log('DEBUG333')
if (!exchange.active(settings, cryptoCode)) return
console.log('DEBUG334')
logger.debug('[%s] Pushing trade: %d', market, cryptoAtoms)
if (!tradesQueues[market]) tradesQueues[market] = []
tradesQueues[market].push({
fiatCode,
cryptoAtoms,
cryptoCode,
timestamp: Date.now()
})
}
function consolidateTrades (cryptoCode, fiatCode) {
const market = [fiatCode, cryptoCode].join('')
@ -371,27 +360,56 @@ function plugins (settings, deviceId) {
if (!exchange.active(settings, cryptoCode)) return
const market = [fiatCode, cryptoCode].join('')
logger.debug('[%s] checking for trades', market)
const tradeEntry = consolidateTrades(cryptoCode, fiatCode)
if (tradeEntry === null) return logger.debug('[%s] no trades', market)
if (tradeEntry.cryptoAtoms.eq(0)) {
logger.debug('[%s] rejecting 0 trade', market)
return
}
if (tradeEntry === null || tradeEntry.cryptoAtoms.eq(0)) return
logger.debug('[%s] making a trade: %d', market, tradeEntry.cryptoAtoms.toString())
return exchange.buy(settings, tradeEntry.cryptoAtoms, tradeEntry.fiatCode, tradeEntry.cryptoCode)
.then(() => logger.debug('[%s] Successful trade.', market))
return executeTradeForType(tradeEntry)
.catch(err => {
tradesQueues[market].push(tradeEntry)
if (err.name === 'NoExchangeError') return logger.debug(err.message)
if (err.name === 'orderTooSmall') return logger.debug(err.message)
logger.error(err)
})
}
function executeTradeForType (_tradeEntry) {
const expand = te => _.assign(te, {
cryptoAtoms: te.cryptoAtoms.abs(),
type: te.cryptoAtoms.gte(0) ? 'buy' : 'sell'
})
const tradeEntry = expand(_tradeEntry)
const execute = tradeEntry.type === 'buy' ? exchange.buy : exchange.sell
return execute(settings, tradeEntry.cryptoAtoms, tradeEntry.fiatCode, tradeEntry.cryptoCode)
.then(() => recordTrade(tradeEntry))
}
function convertBigNumFields (obj) {
const convert = (value, key) => _.includes(key, ['cryptoAtoms', 'fiat'])
? value.toString()
: value
const convertKey = key => _.includes(key, ['cryptoAtoms', 'fiat'])
? key + '#'
: key
return _.mapKeys(convertKey, mapValuesWithKey(convert, obj))
}
function recordTrade (_tradeEntry) {
const massage = _.flow(
_.pick(['cryptoCode', 'cryptoAtoms', 'fiatCode', 'type']),
convertBigNumFields,
_.mapKeys(_.snakeCase)
)
const tradeEntry = massage(_tradeEntry)
const sql = pgp.helpers.insert(tradeEntry, null, 'trades')
return db.none(sql)
}
function sendMessage (rec) {
const config = configManager.unscoped(settings.config)
@ -466,49 +484,51 @@ function plugins (settings, deviceId) {
.then(() => code)
}
function sweepHD (row) {
function sweepHdRow (row) {
const cryptoCode = row.crypto_code
return wallet.sweep(settings, row.hd_serial)
console.log('DEBUG200')
return wallet.sweep(settings, cryptoCode, row.hd_index)
.then(txHash => {
if (txHash) {
logger.debug('[%s] Swept address with tx: %s', cryptoCode, txHash)
return dbm.markSwept(row.tx_id)
const sql = `update cash_out_txs set swept='t'
where id=$1`
return db.none(sql, row.id)
}
})
.catch(err => logger.error('[%s] Sweep error: %s', cryptoCode, err.message))
}
function sweepLiveHD () {
return dbm.fetchLiveHD()
.then(rows => Promise.all(rows.map(sweepHD)))
.catch(err => logger.error(err))
}
function sweepHd () {
const sql = `select id, crypto_code, hd_index from cash_out_txs
where hd_index is not null and not swept and status in ('confirmed', 'instant')`
function sweepOldHD () {
return dbm.fetchOldHD()
.then(rows => Promise.all(rows.map(sweepHD)))
return db.any(sql)
.then(rows => Promise.all(rows.map(sweepHdRow)))
.catch(err => logger.error(err))
}
return {
pollQueries,
trade,
sendCoins,
newAddress,
isHd,
getStatus,
dispenseAck,
getPhoneCode,
executeTrades,
pong,
pongClear,
monitorLiveIncoming,
monitorIncoming,
monitorUnnotified,
sweepLiveHD,
sweepOldHD,
notifyConfirmation,
sweepHd,
sendMessage,
checkBalances,
buildCartridges
buildCartridges,
buy,
sell
}
}