From f660d60e2deb9863decd3647c1447dec13c5edc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Salgado?= Date: Tue, 22 Feb 2022 00:55:01 +0000 Subject: [PATCH] feat: add digest auth request queueing --- lib/plugins/common/json-rpc.js | 110 +++++++++++++++++++++++++- lib/plugins/wallet/monerod/monerod.js | 18 ++--- 2 files changed, 115 insertions(+), 13 deletions(-) diff --git a/lib/plugins/common/json-rpc.js b/lib/plugins/common/json-rpc.js index 33174591..a8e8a656 100644 --- a/lib/plugins/common/json-rpc.js +++ b/lib/plugins/common/json-rpc.js @@ -11,7 +11,7 @@ const options = require('../../options') const blockchainDir = options.blockchainDir module.exports = { - fetch, fetchDigest, parseConf, rpcConfig + fetch, fetchDigest, createDigestRequest, parseConf, rpcConfig } function fetch (account = {}, method, params) { @@ -49,6 +49,114 @@ function fetch (account = {}, method, params) { }) } +var fetchDigestRequestQueue = [] +var fetchDigestResponses = {} + +function createDigestRequest (account = {}, method, params = []) { + const requestId = uuid.v4() + + fetchDigestRequestQueue.push({ + requestId, + options: generateDigestOptions(account, method, params, requestId), + status: 'sendPending' + }) + fetchDigestResponses[requestId] = { status: 'receivePending' } + + return new Promise((resolve, reject) => { + setTimeout(() => { + const request = _.find(it => it.requestId === requestId, fetchDigestRequestQueue) + const response = fetchDigestResponses[requestId] + + if (_.isNil(response) || request.status === 'sendError') { + fetchDigestResponses[requestId].status = 'receiveError' + return reject() + } + fetchDigestResponses[requestId].status = 'finished' + return resolve(response) + }, 750) + }) +} + +function generateDigestOptions (account = {}, method, params, requestId) { + const headers = { + 'Content-Type': 'application/json' + } + + const dataString = `{"jsonrpc":"2.0","id":"${requestId}","method":"${method}","params":${JSON.stringify(params)}}` + + const options = { + url: `http://localhost:${account.port}/json_rpc`, + method: 'POST', + headers, + body: dataString, + forever: true, + auth: { + user: account.username, + pass: account.password, + sendImmediately: false + } + } + + return options +} + +function executeRequestQueue (queue) { + return _.reduce( + (acc, value) => acc.then(a => { + const originalReq = _.find(it => it.requestId === value.requestId, queue) + switch (originalReq.status) { + case 'sendPending': + return request(value.options) + .then(res => { + originalReq.status = 'sendComplete' + return Promise.resolve( + _.merge(a, { [value.requestId]: { status: 'receiveComplete', ...JSON.parse(res).result } }) + ) + }) + .catch(err => { + originalReq.status = 'sendError' + throw err + }) + case 'sendComplete': + return Promise.resolve({ ...a }) + case 'sendError': + return Promise.resolve({ ...a }) + default: + break + } + }), + Promise.resolve(fetchDigestResponses), + queue + ).then(r => { + fetchDigestResponses = r + return fetchDigestResponses + }) +} + +function trimReqRes () { + const requestIds = _.keys(fetchDigestResponses) + _.forEach(it => { + const request = _.find(ite => ite.requestId === it, fetchDigestRequestQueue) + const response = fetchDigestResponses[it] + + if (request && response && response.status === 'finished' && request.status === 'sendComplete') { + fetchDigestRequestQueue = _.filter(ite => ite.requestId !== it, fetchDigestRequestQueue) + delete fetchDigestResponses[it] + } + }, requestIds) +} + +setInterval( + () => { + if (_.size(fetchDigestRequestQueue) === 0 && _.size(_.keys(fetchDigestResponses)) === 0) { + return + } + trimReqRes() + return executeRequestQueue(fetchDigestRequestQueue) + }, + 500 +) + function fetchDigest(account = {}, method, params = []) { return Promise.resolve(true) .then(() => { diff --git a/lib/plugins/wallet/monerod/monerod.js b/lib/plugins/wallet/monerod/monerod.js index 971fda29..0cb315d6 100644 --- a/lib/plugins/wallet/monerod/monerod.js +++ b/lib/plugins/wallet/monerod/monerod.js @@ -15,7 +15,6 @@ const blockchainDir = options.blockchainDir const cryptoRec = utils.getCryptoCurrency(COINS.XMR) const configPath = utils.configPath(cryptoRec, blockchainDir) const walletDir = path.resolve(utils.cryptoDir(cryptoRec, blockchainDir), 'wallets') -const unitScale = cryptoRec.unitScale function rpcConfig () { try { @@ -31,7 +30,7 @@ function rpcConfig () { } function fetch (method, params) { - return jsonRpc.fetchDigest(rpcConfig(), method, params) + return jsonRpc.createDigestRequest(rpcConfig(), method, params) } function handleError (error, method) { @@ -151,16 +150,11 @@ function getStatus (account, tx, requested, settings, operatorId) { function newFunding (account, cryptoCode, settings, operatorId) { return checkCryptoCode(cryptoCode) .then(() => refreshWallet()) - .then(() => fetch('get_balance', { account_index: 0, address_indices: [0] })) - .then(balanceRes => Promise.all([ - balanceRes, - fetch('create_address', { account_index: 0 }) - ])) - .then(([balanceRes, addressRes]) => Promise.all([ - balanceRes, - addressRes, - fetch('get_transfers', { pool: true, account_index: 0 }) - ])) + .then(() => Promise.all([ + fetch('get_balance', { account_index: 0, address_indices: [0] }), + fetch('create_address', { account_index: 0 }), + fetch('get_transfers', { pool: true, account_index: 0 }) + ])) .then(([balanceRes, addressRes, transferRes]) => { const memPoolBalance = _.reduce((acc, value) => acc.plus(value.amount), BN(0), transferRes.pool) return {