diff --git a/lib/plugins/common/json-rpc.js b/lib/plugins/common/json-rpc.js index a8e8a656..5a5434c2 100644 --- a/lib/plugins/common/json-rpc.js +++ b/lib/plugins/common/json-rpc.js @@ -4,6 +4,7 @@ const uuid = require('uuid') const fs = require('fs') const _ = require('lodash/fp') const request = require('request-promise') +const Queue = require('queue-promise') const { utils: coinUtils } = require('@lamassu/coins') const options = require('../../options') @@ -49,40 +50,23 @@ function fetch (account = {}, method, params) { }) } -var fetchDigestRequestQueue = [] -var fetchDigestResponses = {} +const DIGEST_QUEUE = new Queue({ + concurrent: 1, + interval: 100, + start: false +}) function createDigestRequest (account = {}, method, params = []) { - const requestId = uuid.v4() - - fetchDigestRequestQueue.push({ - requestId, - options: generateDigestOptions(account, method, params, requestId), - status: 'sendPending' - }) - fetchDigestResponses[requestId] = { status: 'receivePending' } - - return new Promise((resolve, reject) => { - setTimeout(() => { - const request = _.find(it => it.requestId === requestId, fetchDigestRequestQueue) - const response = fetchDigestResponses[requestId] - - if (_.isNil(response) || request.status === 'sendError') { - fetchDigestResponses[requestId].status = 'receiveError' - return reject() - } - fetchDigestResponses[requestId].status = 'finished' - return resolve(response) - }, 750) - }) + DIGEST_QUEUE.enqueue(() => fetchDigest(account, method, params)) + return DIGEST_QUEUE.dequeue() } -function generateDigestOptions (account = {}, method, params, requestId) { +function generateDigestOptions (account = {}, method, params) { const headers = { 'Content-Type': 'application/json' } - const dataString = `{"jsonrpc":"2.0","id":"${requestId}","method":"${method}","params":${JSON.stringify(params)}}` + const dataString = `{"jsonrpc":"2.0","id":"${uuid.v4()}","method":"${method}","params":${JSON.stringify(params)}}` const options = { url: `http://localhost:${account.port}/json_rpc`, @@ -100,88 +84,13 @@ function generateDigestOptions (account = {}, method, params, requestId) { return options } -function executeRequestQueue (queue) { - return _.reduce( - (acc, value) => acc.then(a => { - const originalReq = _.find(it => it.requestId === value.requestId, queue) - switch (originalReq.status) { - case 'sendPending': - return request(value.options) - .then(res => { - originalReq.status = 'sendComplete' - return Promise.resolve( - _.merge(a, { [value.requestId]: { status: 'receiveComplete', ...JSON.parse(res).result } }) - ) - }) - .catch(err => { - originalReq.status = 'sendError' - throw err - }) - case 'sendComplete': - return Promise.resolve({ ...a }) - case 'sendError': - return Promise.resolve({ ...a }) - default: - break - } - }), - Promise.resolve(fetchDigestResponses), - queue - ).then(r => { - fetchDigestResponses = r - return fetchDigestResponses - }) -} - -function trimReqRes () { - const requestIds = _.keys(fetchDigestResponses) - _.forEach(it => { - const request = _.find(ite => ite.requestId === it, fetchDigestRequestQueue) - const response = fetchDigestResponses[it] - - if (request && response && response.status === 'finished' && request.status === 'sendComplete') { - fetchDigestRequestQueue = _.filter(ite => ite.requestId !== it, fetchDigestRequestQueue) - delete fetchDigestResponses[it] - } - }, requestIds) -} - -setInterval( - () => { - if (_.size(fetchDigestRequestQueue) === 0 && _.size(_.keys(fetchDigestResponses)) === 0) { - return - } - trimReqRes() - return executeRequestQueue(fetchDigestRequestQueue) - }, - 500 -) - function fetchDigest(account = {}, method, params = []) { return Promise.resolve(true) .then(() => { if (_.isNil(account.port)) throw new Error('port attribute required for jsonRpc') - const headers = { - 'Content-Type': 'application/json' - } - - const dataString = `{"jsonrpc":"2.0","id":"${uuid.v4()}","method":"${method}","params":${JSON.stringify(params)}}` - - const options = { - url: `http://localhost:${account.port}/json_rpc`, - method: 'POST', - headers, - body: dataString, - forever: true, - auth: { - user: account.username, - pass: account.password, - sendImmediately: false - } - } - + const options = generateDigestOptions(account, method, params) return request(options) }) .then((res) => {