From 9ea6d86d6f015c2c9a7ad81b970a6f8bb725d3cf Mon Sep 17 00:00:00 2001 From: Josh Harvey Date: Sat, 28 May 2016 19:47:05 +0300 Subject: [PATCH] moved to pg-promise --- lib/postgresql_interface.js | 308 +++++++++++++----------------------- todo.txt | 1 + 2 files changed, 115 insertions(+), 194 deletions(-) diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index 0385dbe1..e4c4f4fe 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -1,12 +1,14 @@ 'use strict' -var BigNumber = require('bignumber.js') -var pg = require('pg') +const BigNumber = require('bignumber.js') +const pgp = require('pg-promise')() -var logger = require('./logger') +const logger = require('./logger') const CACHED_SESSION_TTL = 60 * 60 * 1000 +let db + /* function inspect(rec) { console.log(require('util').inspect(rec, {depth: null, colors: true})) @@ -17,19 +19,13 @@ function isUniqueViolation (err) { return err.code === '23505' } -function isLowSeverity (err) { - return isUniqueViolation(err) || err.severity === 'low' -} - -var conString = null - function getInsertQuery (tableName, fields) { // outputs string like: '$1, $2, $3...' with proper No of items - var placeholders = fields.map(function (_, i) { + const placeholders = fields.map(function (_, i) { return '$' + (i + 1) }).join(', ') - var query = 'INSERT INTO ' + tableName + + const query = 'INSERT INTO ' + tableName + ' (' + fields.join(', ') + ')' + ' VALUES' + ' (' + placeholders + ')' @@ -37,25 +33,19 @@ function getInsertQuery (tableName, fields) { return query } -exports.init = function init (_conString) { - conString = _conString +exports.init = function init (conString) { if (!conString) { throw new Error('Postgres connection string is required') } - setInterval(pruneCachedResponses, CACHED_SESSION_TTL) -} + db = pgp(conString) -function connect (cb) { - pg.connect(conString, function (err, client, done) { - if (err) logger.error(err) - cb(err, client, done) - }) + setInterval(pruneCachedResponses, CACHED_SESSION_TTL) } // logs inputted bill and overall tx status (if available) exports.recordBill = function recordBill (session, rec, cb) { - var fields = [ + const fields = [ 'id', 'device_fingerprint', 'currency_code', @@ -67,7 +57,7 @@ exports.recordBill = function recordBill (session, rec, cb) { 'denomination' ] - var values = [ + const values = [ rec.uuid, session.fingerprint, rec.currency, @@ -79,56 +69,28 @@ exports.recordBill = function recordBill (session, rec, cb) { rec.fiat ] - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - query(client, getInsertQuery('bills', fields), values, function (err) { - done() - if (err && isUniqueViolation(err)) { - logger.warn('Attempt to report bill twice') - return cb() - } - cb(err) - }) + return db.none(getInsertQuery('bills', fields), values) + .catch(err => { + if (isUniqueViolation(err)) return logger.warn('Attempt to report bill twice') + throw err }) } exports.recordDeviceEvent = function recordDeviceEvent (session, event) { - connect(function (cerr, client, done) { - if (cerr) return - var sql = 'INSERT INTO device_events (device_fingerprint, event_type, ' + - 'note, device_time) VALUES ($1, $2, $3, $4)' - var values = [session.fingerprint, event.eventType, event.note, - event.deviceTime] - client.query(sql, values, done) - }) -} - -function query (client, queryStr, values, cb) { - if (!cb) { - cb = values - values = [] - } - - client.query(queryStr, values, function (err, results) { - if (err) { - if (!isLowSeverity(err)) { - console.error(err) - console.log(queryStr) - console.log(values) - } - return cb(err) - } - cb(null, results) - }) + const sql = 'INSERT INTO device_events (device_fingerprint, event_type, ' + + 'note, device_time) VALUES ($1, $2, $3, $4)' + const values = [session.fingerprint, event.eventType, event.note, + event.deviceTime] + return db.none(sql, values) } exports.addOutgoingTx = function addOutgoingTx (session, tx, cb) { - var fields = ['session_id', 'device_fingerprint', 'to_address', + const fields = ['session_id', 'device_fingerprint', 'to_address', 'crypto_atoms', 'crypto_code', 'currency_code', 'fiat', 'tx_hash', 'fee', 'phone', 'error' ] - var values = [ + const values = [ session.id, session.fingerprint, tx.toAddress, @@ -142,21 +104,21 @@ exports.addOutgoingTx = function addOutgoingTx (session, tx, cb) { tx.error ] - return pquery(getInsertQuery('cash_in_txs', fields), values) + return db.none(getInsertQuery('cash_in_txs', fields), values) } exports.sentCoins = function sentCoins (session, tx, toSend, fee, error, txHash) { - var sql = `update cash_in_txs set tx_hash=$1, error=$2 where session_id=$3` - return pquery(sql, [txHash, error, session.id]) + const sql = `update cash_in_txs set tx_hash=$1, error=$2 where session_id=$3` + return db.none(sql, [txHash, error, session.id]) } exports.addInitialIncoming = function addInitialIncoming (session, tx, cb) { - var fields = ['session_id', 'device_fingerprint', 'to_address', + const fields = ['session_id', 'device_fingerprint', 'to_address', 'crypto_atoms', 'crypto_code', 'currency_code', 'fiat', 'tx_hash', 'phone', 'error' ] - var values = [ + const values = [ session.id, session.fingerprint, tx.toAddress, @@ -169,102 +131,104 @@ exports.addInitialIncoming = function addInitialIncoming (session, tx, cb) { tx.error ] - return pquery(getInsertQuery('cash_out_txs', fields), values) + return db.none(getInsertQuery('cash_out_txs', fields), values) } function insertDispense (session, tx, cartridges, cb) { - var fields = [ + const fields = [ 'device_fingerprint', 'session_id', 'dispense1', 'reject1', 'count1', 'dispense2', 'reject2', 'count2', 'refill', 'error' ] - var sql = getInsertQuery('dispenses', fields) + const sql = getInsertQuery('dispenses', fields) - var dispense1 = tx.bills[0].actualDispense - var dispense2 = tx.bills[1].actualDispense - var reject1 = tx.bills[0].rejected - var reject2 = tx.bills[1].rejected - var count1 = cartridges[0].count - var count2 = cartridges[1].count - var values = [ + const dispense1 = tx.bills[0].actualDispense + const dispense2 = tx.bills[1].actualDispense + const reject1 = tx.bills[0].rejected + const reject2 = tx.bills[1].rejected + const count1 = cartridges[0].count + const count2 = cartridges[1].count + const values = [ session.fingerprint, tx.sessionId, dispense1, reject1, count1, dispense2, reject2, count2, false, tx.error ] - return pquery(sql, values) + return db.none(sql, values) } exports.addIncomingPhone = function addIncomingPhone (session, tx, notified, cb) { - var sql = `UPDATE cash_out_txs SET phone=$1, notified=$2 + const sql = `UPDATE cash_out_txs SET phone=$1, notified=$2 WHERE session_id=$3 AND phone IS NULL` - var values = [tx.phone, notified, session.fingerprint, tx.sessionId] + const values = [tx.phone, notified, session.fingerprint, tx.sessionId] - return pquery(sql, values) + return db.result(sql, values) .then(results => { const noPhone = results.rowCount === 0 const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' if (noPhone) return {noPhone: noPhone} - return pquery(sql2, [tx.sessionId, 'addedPhone']) + return db.none(sql2, [tx.sessionId, 'addedPhone']) .then(() => ({noPhone: noPhone})) }) } +function normalizeTx (tx) { + tx.toAddress = tx.to_address + tx.currencyCode = tx.currency_code + tx.txHash = tx.tx_hash + tx.cryptoCode = tx.crypto_code + tx.cryptoAtoms = new BigNumber(tx.crypto_atoms) + tx.sessionId = tx.session_id + + tx.to_address = undefined + tx.currency_code = undefined + tx.tx_hash = undefined + tx.crypto_code = undefined + tx.satoshis = undefined + tx.session_id = undefined + + return tx +} + function normalizeTxs (txs) { - return txs.map(function (tx) { - tx.toAddress = tx.to_address - tx.currencyCode = tx.currency_code - tx.txHash = tx.tx_hash - tx.cryptoCode = tx.crypto_code - tx.cryptoAtoms = new BigNumber(tx.crypto_atoms) - tx.sessionId = tx.session_id - - tx.to_address = undefined - tx.currency_code = undefined - tx.tx_hash = undefined - tx.crypto_code = undefined - tx.satoshis = undefined - tx.session_id = undefined - - return tx - }) + return txs.map(normalizeTx) } exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) { - var sql = 'SELECT * FROM cash_out_txs ' + + const sql = 'SELECT * FROM cash_out_txs ' + 'WHERE phone=$1 AND dispensed=$2 ' + 'AND (EXTRACT(EPOCH FROM (COALESCE(confirmation_time, now()) - created))) * 1000 < $3' - var values = [phone, false, dispenseTimeout] + const values = [phone, false, dispenseTimeout] - return pquery(sql, values) - .then(r => normalizeTxs(r.rows)) + return db.manyOrNone(sql, values) + .then(rows => normalizeTxs(rows)) } exports.fetchTx = function fetchTx (session) { const sql = 'SELECT * FROM cash_out_txs WHERE session_id=$1' - return pquery(sql, [session.id]) - .then(r => normalizeTxs(r.rows)[0]) + return db.one(sql, [session.id]) + .then(row => normalizeTx(row)) } exports.addDispenseRequest = function addDispenseRequest (tx) { const sql = 'update cash_out_txs set dispensed=$1 where session_id=$2 and dispensed=$3' const values = [true, tx.sessionId, false] - return pquery(sql, values) + return db.result(sql, values) .then(results => { const alreadyDispensed = results.rowCount === 0 if (alreadyDispensed) return {dispense: false, reason: 'alreadyDispensed'} const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' - return pquery(sql2, [tx.sessionId, 'dispenseRequested']) + return db.none(sql2, [tx.sessionId, 'dispenseRequested']) .then(() => ({dispense: true})) }) } @@ -274,90 +238,59 @@ exports.addDispense = function addDispense (session, tx, cartridges) { .then(() => { const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' - return pquery(sql2, [tx.sessionId, 'dispensed']) + return db.none(sql2, [tx.sessionId, 'dispensed']) }) } exports.cartridgeCounts = function cartridgeCounts (session, cb) { - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - var sql = 'SELECT id, count1, count2 FROM dispenses ' + - 'WHERE device_fingerprint=$1 AND refill=$2 ' + - 'ORDER BY id DESC LIMIT 1' - query(client, sql, [session.fingerprint, true], function (err, results) { - done() - if (err) return cb(err) - var counts = results.rows.length === 1 - ? [results.rows[0].count1, results.rows[0].count2] - : [0, 0] - cb(null, {id: results.rows[0].id, counts: counts}) - }) + const sql = 'SELECT id, count1, count2 FROM dispenses ' + + 'WHERE device_fingerprint=$1 AND refill=$2 ' + + 'ORDER BY id DESC LIMIT 1' + return db.oneOrNone(sql, [session.fingerprint, true]) + .then(row => { + const counts = row ? [row.count1, row.count2] : [0, 0] + return {id: row.id, counts: counts} }) } exports.machineEvent = function machineEvent (rec, cb) { - var TTL = 2 * 60 * 60 * 1000 - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - var fields = ['id', 'device_fingerprint', 'event_type', 'note', 'device_time'] - var sql = getInsertQuery('machine_events', fields) - var values = [rec.id, rec.fingerprint, rec.eventType, rec.note, rec.deviceTime] + const TTL = 2 * 60 * 60 * 1000 + const fields = ['id', 'device_fingerprint', 'event_type', 'note', 'device_time'] + const sql = getInsertQuery('machine_events', fields) + const values = [rec.id, rec.fingerprint, rec.eventType, rec.note, rec.deviceTime] + const deleteSql = 'DELETE FROM machine_events WHERE (EXTRACT(EPOCH FROM (now() - created))) * 1000 > $1' + const deleteValues = [TTL] - var deleteSql = 'DELETE FROM machine_events WHERE (EXTRACT(EPOCH FROM (now() - created))) * 1000 > $1' - var deleteValues = [TTL] - - query(client, deleteSql, deleteValues, function (err) { - if (err) console.error(err) - }) - - query(client, sql, values, function (err, results) { - done() - return cb(err, results) - }) - }) + return db.none(sql, values) + .then(() => db.none(deleteSql, deleteValues)) } exports.devices = function devices (cb) { - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - var sql = 'SELECT fingerprint, name FROM devices ' + - 'WHERE authorized=$1' - query(client, sql, [true], function (err, results) { - done() - if (err) return cb(err) - cb(null, results) - }) - }) + const sql = 'SELECT fingerprint, name FROM devices WHERE authorized=$1' + return db.manyOrNone(sql, [true]) } exports.machineEvents = function machineEvents (cb) { - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - var sql = 'SELECT *, (EXTRACT(EPOCH FROM (now() - created))) * 1000 AS age FROM machine_events' - query(client, sql, [], function (err, results) { - done() - if (err) return cb(err) - cb(null, results) - }) - }) + const sql = 'SELECT *, (EXTRACT(EPOCH FROM (now() - created))) * 1000 AS age FROM machine_events' + return db.manyOrNone(sql, []) } function singleQuotify (item) { return '\'' + item + '\'' } exports.fetchOpenTxs = function fetchOpenTxs (statuses, age, cb) { - var _statuses = '(' + statuses.map(singleQuotify).join(',') + ')' + const _statuses = '(' + statuses.map(singleQuotify).join(',') + ')' - var sql = 'SELECT * ' + + const sql = 'SELECT * ' + 'FROM cash_out_txs ' + 'WHERE ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$1 ' + 'AND status IN ' + _statuses - return pquery(sql, [age]) - .then(r => normalizeTxs(r.rows)) + return db.manyOrNone(sql, [age]) + .then(rows => normalizeTxs(rows)) } exports.fetchUnnotifiedTxs = function fetchUnnotifiedTxs (age, waitPeriod, cb) { - var sql = `SELECT * + const sql = `SELECT * FROM cash_out_txs WHERE ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$1 AND notified=$2 AND dispensed=$3 @@ -365,56 +298,43 @@ exports.fetchUnnotifiedTxs = function fetchUnnotifiedTxs (age, waitPeriod, cb) { AND status IN ('instant', 'confirmed') AND (redeem=$4 OR ((EXTRACT(EPOCH FROM (now() - created))) * 1000)>$5)` - return pquery(sql, [age, false, false, true, waitPeriod]) - .then(r => normalizeTxs(r.rows)) -} - -function pquery (sql, values) { - return new Promise((resolve, reject) => { - connect(function (cerr, client, done) { - if (cerr) return reject(cerr) - query(client, sql, values, function (err, results) { - done(err) - if (err) return reject(err) - resolve(results) - }) - }) - }) + return db.manyOrNone(sql, [age, false, false, true, waitPeriod]) + .then(rows => normalizeTxs(rows)) } exports.updateTxStatus = function updateTxStatus (tx, status, confirm) { - var sql = confirm + const sql = confirm ? 'UPDATE cash_out_txs SET status=$1, confirmation_time=now() WHERE session_id=$2' : 'UPDATE cash_out_txs SET status=$1 WHERE session_id=$2' - var values = [status, tx.sessionId] + const values = [status, tx.sessionId] - return pquery(sql, values) + return db.none(sql, values) .then(() => { const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' - return pquery(sql2, [tx.sessionId, status]) + return db.none(sql2, [tx.sessionId, status]) }) } exports.updateRedeem = function updateRedeem (session, cb) { - var sql = 'UPDATE cash_out_txs SET redeem=$1 WHERE session_id=$2' - var values = [true, session.id] + const sql = 'UPDATE cash_out_txs SET redeem=$1 WHERE session_id=$2' + const values = [true, session.id] - return pquery(sql, values) + return db.none(sql, values) .then(() => { const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' - return pquery(sql2, [session.id, 'redeem']) + return db.none(sql2, [session.id, 'redeem']) }) } exports.updateNotify = function updateNotify (tx) { - var sql = 'UPDATE cash_out_txs SET notified=$1 WHERE session_id=$2' - var values = [true, tx.sessionId] + const sql = 'UPDATE cash_out_txs SET notified=$1 WHERE session_id=$2' + const values = [true, tx.sessionId] - return pquery(sql, values) + return db.none(sql, values) .then(() => { const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' - return pquery(sql2, [tx.sessionId, 'notified']) + return db.none(sql2, [tx.sessionId, 'notified']) }) } @@ -428,7 +348,7 @@ function insertCachedRequest (session, path, method, body) { ] const sql = getInsertQuery('cached_responses', fields) - return pquery(sql, [session.fingerprint, session.id, path, method, body]) + return db.none(sql, [session.fingerprint, session.id, path, method, body]) } exports.cachedResponse = function (session, path, method) { @@ -444,8 +364,8 @@ exports.cachedResponse = function (session, path, method) { .then(() => ({})) .catch(err => { if (!isUniqueViolation(err)) throw err - return pquery(sql, values) - .then(r => ({body: r.rows[0].body})) + return db.one(sql, values) + .then(row => ({body: row.body})) }) } @@ -455,7 +375,7 @@ function pruneCachedResponses () { const values = [CACHED_SESSION_TTL] - return pquery(sql, values) + return db.none(sql, values) } exports.cacheResponse = function (session, path, method, body) { @@ -468,5 +388,5 @@ exports.cacheResponse = function (session, path, method, body) { const values = [body, session.fingerprint, session.id, path, method] - return pquery(sql, values) + return db.none(sql, values) } diff --git a/todo.txt b/todo.txt index 96d61b98..b8972e15 100644 --- a/todo.txt +++ b/todo.txt @@ -2,3 +2,4 @@ - l-m shouldn't keep polling l-s when not on pending screen (low priority) - convert postgresql_interface to pg-promise +- check what happens in pgp when double-inserting