This commit is contained in:
Josh Harvey 2017-06-24 18:52:55 +03:00
parent 45e6e2b82d
commit 2779dcd6f3
5 changed files with 34 additions and 20 deletions

View file

@ -4,6 +4,7 @@ const db = require('./db')
const BN = require('./bn')
const plugins = require('./plugins')
const logger = require('./logger')
const pp = require('./pp')
module.exports = {post, monitorPending}
@ -27,9 +28,11 @@ function atomic (machineTx, pi) {
return preProcess(dbTx, machineTx, pi)
.then(preProcessedTx => upsert(dbTx, preProcessedTx))
.then(vector => {
.then(r => {
pp('DEBUG701.5')(r)
return insertNewBills(billRows, machineTx)
.then(_.constant(_.concat(vector, machineTx.bills)))
.then(_.constant(_.set('bills', machineTx.bills, r)))
.then(pp('DEBUG702'))
})
})
})
@ -41,11 +44,12 @@ function atomic (machineTx, pi) {
}
function post (machineTx, pi) {
console.log('DEBUG700: %j', machineTx)
return db.tx(atomic(machineTx, pi))
.then(txVector => {
const [, updatedTx] = txVector
.then(r => {
const updatedTx = r.tx
return postProcess(txVector, pi)
return postProcess(r, pi)
.then(changes => update(updatedTx, changes))
.then(tx => _.set('bills', machineTx.bills, tx))
})
@ -168,11 +172,11 @@ function insertNewBills (billRows, machineTx) {
function upsert (dbTx, preProcessedTx) {
if (!dbTx) {
return insert(preProcessedTx)
.then(tx => [dbTx, tx])
.then(tx => ({dbTx, tx}))
}
return update(dbTx, diff(dbTx, preProcessedTx))
.then(tx => [dbTx, tx])
.then(tx => ({dbTx, tx}))
}
function insert (tx) {
@ -195,6 +199,7 @@ function update (tx, changes) {
}
function registerTrades (pi, newBills) {
console.log('DEBUG600: %j', newBills)
_.forEach(bill => pi.buy(bill), newBills)
}
@ -218,13 +223,12 @@ function isClearToSend (oldTx, newTx) {
(!oldTx || (!oldTx.sendPending && !oldTx.sendConfirmed))
}
function postProcess (txVector, pi) {
const [dbTx, updatedTx, newBills] = txVector
function postProcess (r, pi) {
console.log('DEBUG701: %j', r)
registerTrades(pi, r.bills)
registerTrades(pi, newBills)
if (isClearToSend(dbTx, updatedTx)) {
return pi.sendCoins(updatedTx)
if (isClearToSend(r.dbTx, r.tx)) {
return pi.sendCoins(r.tx)
.then(txHash => ({
txHash,
sendConfirmed: true,
@ -248,7 +252,7 @@ function postProcess (txVector, pi) {
sendPending
}
})
.then(r => logAction(r, updatedTx))
.then(sendRec => logAction(sendRec, r.tx))
}
return Promise.resolve({})

View file

@ -20,6 +20,7 @@ function fetchExchange (settings, cryptoCode) {
}
function buy (settings, cryptoAtoms, fiatCode, cryptoCode) {
console.log('DEBUG600')
return fetchExchange(settings, cryptoCode)
.then(r => r.exchange.buy(r.account, cryptoAtoms, fiatCode, cryptoCode))
}

View file

@ -335,6 +335,7 @@ function plugins (settings, deviceId) {
const market = [fiatCode, cryptoCode].join('')
console.log('DEBUG505')
if (!exchange.active(settings, cryptoCode)) return
logger.debug('[%s] Pushing trade: %d', market, cryptoAtoms)
@ -351,6 +352,7 @@ function plugins (settings, deviceId) {
const market = [fiatCode, cryptoCode].join('')
const marketTradesQueues = tradesQueues[market]
console.log('DEBUG504: %j', marketTradesQueues)
if (!marketTradesQueues || marketTradesQueues.length === 0) return null
logger.debug('[%s] tradesQueues size: %d', market, marketTradesQueues.length)
@ -391,6 +393,7 @@ function plugins (settings, deviceId) {
}
function executeTrades () {
console.log('DEBUG500')
return machineLoader.getMachines()
.then(devices => {
const deviceIds = devices.map(device => device.deviceId)
@ -411,6 +414,7 @@ function plugins (settings, deviceId) {
}
function executeTradesForMarket (settings, fiatCode, cryptoCode) {
console.log('DEBUG501: %s, %j', cryptoCode, exchange.active(settings, cryptoCode))
if (!exchange.active(settings, cryptoCode)) return
const market = [fiatCode, cryptoCode].join('')
@ -418,6 +422,8 @@ function plugins (settings, deviceId) {
if (tradeEntry === null || tradeEntry.cryptoAtoms.eq(0)) return
console.log('DEBUG502')
return executeTradeForType(tradeEntry)
.catch(err => {
tradesQueues[market].push(tradeEntry)
@ -435,6 +441,8 @@ function plugins (settings, deviceId) {
const tradeEntry = expand(_tradeEntry)
const execute = tradeEntry.type === 'buy' ? exchange.buy : exchange.sell
console.log('DEBUG503')
return execute(settings, tradeEntry.cryptoAtoms, tradeEntry.fiatCode, tradeEntry.cryptoCode)
.then(() => recordTrade(tradeEntry))
}

View file

@ -1,5 +1,5 @@
const Kraken = require('kraken-api')
const common = require('../common/kraken')
const common = require('../../common/kraken')
var PAIRS = common.PAIRS
@ -17,11 +17,11 @@ function trade (account, type, cryptoAtoms, fiatCode, cryptoCode) {
const kraken = new Kraken(account.apiKey, account.privateKey)
const amount = common.toUnit(cryptoAtoms, cryptoCode)
if (amount.lte('0.01')) {
const err = new Error('Order size too small')
err.name = 'orderTooSmall'
return Promise.reject(err)
}
// if (amount.lte('0.01')) {
// const err = new Error('Order size too small')
// err.name = 'orderTooSmall'
// return Promise.reject(err)
// }
const amountStr = amount.toFixed(6)
const pair = PAIRS[cryptoCode][fiatCode]

View file

@ -2,5 +2,6 @@ module.exports = function (label) {
return function (o) {
console.log(label)
console.log(require('util').inspect(o, {depth: null, colors: true}))
return o
}
}