fix: simplify queueing code
This commit is contained in:
parent
f660d60e2d
commit
1614df27e8
1 changed files with 11 additions and 102 deletions
|
|
@ -4,6 +4,7 @@ const uuid = require('uuid')
|
|||
const fs = require('fs')
|
||||
const _ = require('lodash/fp')
|
||||
const request = require('request-promise')
|
||||
const Queue = require('queue-promise')
|
||||
const { utils: coinUtils } = require('@lamassu/coins')
|
||||
|
||||
const options = require('../../options')
|
||||
|
|
@ -49,40 +50,23 @@ function fetch (account = {}, method, params) {
|
|||
})
|
||||
}
|
||||
|
||||
var fetchDigestRequestQueue = []
|
||||
var fetchDigestResponses = {}
|
||||
const DIGEST_QUEUE = new Queue({
|
||||
concurrent: 1,
|
||||
interval: 100,
|
||||
start: false
|
||||
})
|
||||
|
||||
function createDigestRequest (account = {}, method, params = []) {
|
||||
const requestId = uuid.v4()
|
||||
|
||||
fetchDigestRequestQueue.push({
|
||||
requestId,
|
||||
options: generateDigestOptions(account, method, params, requestId),
|
||||
status: 'sendPending'
|
||||
})
|
||||
fetchDigestResponses[requestId] = { status: 'receivePending' }
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
setTimeout(() => {
|
||||
const request = _.find(it => it.requestId === requestId, fetchDigestRequestQueue)
|
||||
const response = fetchDigestResponses[requestId]
|
||||
|
||||
if (_.isNil(response) || request.status === 'sendError') {
|
||||
fetchDigestResponses[requestId].status = 'receiveError'
|
||||
return reject()
|
||||
}
|
||||
fetchDigestResponses[requestId].status = 'finished'
|
||||
return resolve(response)
|
||||
}, 750)
|
||||
})
|
||||
DIGEST_QUEUE.enqueue(() => fetchDigest(account, method, params))
|
||||
return DIGEST_QUEUE.dequeue()
|
||||
}
|
||||
|
||||
function generateDigestOptions (account = {}, method, params, requestId) {
|
||||
function generateDigestOptions (account = {}, method, params) {
|
||||
const headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
const dataString = `{"jsonrpc":"2.0","id":"${requestId}","method":"${method}","params":${JSON.stringify(params)}}`
|
||||
const dataString = `{"jsonrpc":"2.0","id":"${uuid.v4()}","method":"${method}","params":${JSON.stringify(params)}}`
|
||||
|
||||
const options = {
|
||||
url: `http://localhost:${account.port}/json_rpc`,
|
||||
|
|
@ -100,88 +84,13 @@ function generateDigestOptions (account = {}, method, params, requestId) {
|
|||
return options
|
||||
}
|
||||
|
||||
function executeRequestQueue (queue) {
|
||||
return _.reduce(
|
||||
(acc, value) => acc.then(a => {
|
||||
const originalReq = _.find(it => it.requestId === value.requestId, queue)
|
||||
switch (originalReq.status) {
|
||||
case 'sendPending':
|
||||
return request(value.options)
|
||||
.then(res => {
|
||||
originalReq.status = 'sendComplete'
|
||||
return Promise.resolve(
|
||||
_.merge(a, { [value.requestId]: { status: 'receiveComplete', ...JSON.parse(res).result } })
|
||||
)
|
||||
})
|
||||
.catch(err => {
|
||||
originalReq.status = 'sendError'
|
||||
throw err
|
||||
})
|
||||
case 'sendComplete':
|
||||
return Promise.resolve({ ...a })
|
||||
case 'sendError':
|
||||
return Promise.resolve({ ...a })
|
||||
default:
|
||||
break
|
||||
}
|
||||
}),
|
||||
Promise.resolve(fetchDigestResponses),
|
||||
queue
|
||||
).then(r => {
|
||||
fetchDigestResponses = r
|
||||
return fetchDigestResponses
|
||||
})
|
||||
}
|
||||
|
||||
function trimReqRes () {
|
||||
const requestIds = _.keys(fetchDigestResponses)
|
||||
_.forEach(it => {
|
||||
const request = _.find(ite => ite.requestId === it, fetchDigestRequestQueue)
|
||||
const response = fetchDigestResponses[it]
|
||||
|
||||
if (request && response && response.status === 'finished' && request.status === 'sendComplete') {
|
||||
fetchDigestRequestQueue = _.filter(ite => ite.requestId !== it, fetchDigestRequestQueue)
|
||||
delete fetchDigestResponses[it]
|
||||
}
|
||||
}, requestIds)
|
||||
}
|
||||
|
||||
setInterval(
|
||||
() => {
|
||||
if (_.size(fetchDigestRequestQueue) === 0 && _.size(_.keys(fetchDigestResponses)) === 0) {
|
||||
return
|
||||
}
|
||||
trimReqRes()
|
||||
return executeRequestQueue(fetchDigestRequestQueue)
|
||||
},
|
||||
500
|
||||
)
|
||||
|
||||
function fetchDigest(account = {}, method, params = []) {
|
||||
return Promise.resolve(true)
|
||||
.then(() => {
|
||||
if (_.isNil(account.port))
|
||||
throw new Error('port attribute required for jsonRpc')
|
||||
|
||||
const headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
const dataString = `{"jsonrpc":"2.0","id":"${uuid.v4()}","method":"${method}","params":${JSON.stringify(params)}}`
|
||||
|
||||
const options = {
|
||||
url: `http://localhost:${account.port}/json_rpc`,
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: dataString,
|
||||
forever: true,
|
||||
auth: {
|
||||
user: account.username,
|
||||
pass: account.password,
|
||||
sendImmediately: false
|
||||
}
|
||||
}
|
||||
|
||||
const options = generateDigestOptions(account, method, params)
|
||||
return request(options)
|
||||
})
|
||||
.then((res) => {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue