diff --git a/lib/config-manager.js b/lib/config-manager.js index ddefb820..dfdf32d6 100644 --- a/lib/config-manager.js +++ b/lib/config-manager.js @@ -6,7 +6,6 @@ function connect () { console.log(psqlUrl) return pgp(psqlUrl) } -exports.connect = connect function load () { var db = connect() diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index fdd31cf0..80a63bf9 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -3,6 +3,7 @@ const BigNumber = require('bignumber.js') const pgp = require('pg-promise')() +var psqlUrl = require('../lib/options').postgresql const backoff = require('u-promised').backoff const logger = require('./logger') @@ -10,8 +11,6 @@ const logger = require('./logger') const CACHED_SESSION_TTL = 60 * 60 * 1000 const LIVE_SWEEP_TTL = 48 * 60 * 60 * 1000 -let db - function isUniqueViolation (err) { return err.code === '23505' } @@ -30,14 +29,8 @@ function getInsertQuery (tableName, fields) { return query } -exports.init = function init (conString) { - if (!conString) { - throw new Error('Postgres connection string is required') - } - - db = pgp(conString) - - setInterval(pruneCachedResponses, CACHED_SESSION_TTL) +function connect () { + return pgp(psqlUrl) } // logs inputted bill and overall tx status (if available) @@ -69,6 +62,8 @@ exports.recordBill = function recordBill (deviceId, rec) { console.log('DEBUG11: %j', values) + const db = connect() + return db.none(getInsertQuery('bills', fields), values) .catch(err => { if (isUniqueViolation(err)) return logger.warn('Attempt to report bill twice') @@ -81,6 +76,8 @@ exports.recordDeviceEvent = function recordDeviceEvent (deviceId, event) { 'note, device_time) VALUES ($1, $2, $3, $4)' const values = [deviceId, event.eventType, event.note, event.deviceTime] + const db = connect() + return db.none(sql, values) } @@ -106,11 +103,13 @@ exports.addOutgoingTx = function addOutgoingTx (deviceId, tx) { tx.error ] + const db = connect() return db.none(getInsertQuery('cash_in_txs', fields), values) } exports.sentCoins = function sentCoins (tx, toSend, fee, error, txHash) { - const sql = `update cash_in_txs set tx_hash=$1, error=$2 where id=$3` + const sql = 'update cash_in_txs set tx_hash=$1, error=$2 where id=$3' + const db = connect() return db.none(sql, [txHash, error, tx.id]) } @@ -133,6 +132,7 @@ exports.addInitialIncoming = function addInitialIncoming (deviceId, tx) { tx.error ] + const db = connect() return db.none(getInsertQuery('cash_out_txs', fields), values) } @@ -160,6 +160,7 @@ function insertDispense (deviceId, tx, cartridges) { false, tx.error ] + const db = connect() return db.none(sql, values) } @@ -168,6 +169,7 @@ exports.addIncomingPhone = function addIncomingPhone (tx, notified) { WHERE id=$3 AND phone IS NULL` const values = [tx.phone, notified, tx.id] + const db = connect() return db.result(sql, values) .then(results => { @@ -209,6 +211,7 @@ exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) { 'AND (EXTRACT(EPOCH FROM (COALESCE(confirmation_time, now()) - created))) * 1000 < $3' const values = [phone, false, dispenseTimeout] + const db = connect() return db.any(sql, values) .then(rows => normalizeTxs(rows)) @@ -216,6 +219,7 @@ exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) { exports.fetchTx = function fetchTx (txId) { const sql = 'SELECT * FROM cash_out_txs WHERE id=$1' + const db = connect() return db.one(sql, [txId]) .then(row => normalizeTx(row)) @@ -224,6 +228,7 @@ exports.fetchTx = function fetchTx (txId) { exports.addDispenseRequest = function addDispenseRequest (tx) { const sql = 'update cash_out_txs set dispensed=$1 where id=$2 and dispensed=$3' const values = [true, tx.id, false] + const db = connect() return db.result(sql, values) .then(results => { @@ -241,6 +246,7 @@ exports.addDispense = function addDispense (deviceId, tx, cartridges) { return insertDispense(deviceId, tx, cartridges) .then(() => { const sql2 = 'insert into cash_out_actions (cash_out_txs_id, action) values ($1, $2)' + const db = connect() return db.none(sql2, [tx.id, 'dispensed']) }) @@ -250,6 +256,8 @@ exports.cartridgeCounts = function cartridgeCounts (deviceId) { const sql = 'SELECT id, count1, count2 FROM dispenses ' + 'WHERE device_id=$1 AND refill=$2 ' + 'ORDER BY id DESC LIMIT 1' + const db = connect() + return db.oneOrNone(sql, [deviceId, true]) .then(row => { const counts = row ? [row.count1, row.count2] : [0, 0] @@ -264,6 +272,7 @@ exports.machineEvent = function machineEvent (rec) { const values = [rec.id, rec.deviceId, rec.eventType, rec.note, rec.deviceTime] const deleteSql = 'DELETE FROM machine_events WHERE (EXTRACT(EPOCH FROM (now() - created))) * 1000 > $1' const deleteValues = [TTL] + const db = connect() return db.none(sql, values) .then(() => db.none(deleteSql, deleteValues)) @@ -271,11 +280,15 @@ exports.machineEvent = function machineEvent (rec) { exports.devices = function devices () { const sql = 'SELECT device_id, name FROM devices WHERE authorized=$1' + const db = connect() + return db.any(sql, [true]) } exports.machineEvents = function machineEvents () { const sql = 'SELECT *, (EXTRACT(EPOCH FROM (now() - created))) * 1000 AS age FROM machine_events' + const db = connect() + return db.any(sql, []) } @@ -288,6 +301,7 @@ exports.fetchOpenTxs = function fetchOpenTxs (statuses, age) { 'FROM cash_out_txs ' + 'WHERE ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$1 ' + 'AND status IN ' + _statuses + const db = connect() return db.any(sql, [age]) .then(rows => normalizeTxs(rows)) @@ -301,6 +315,7 @@ exports.fetchUnnotifiedTxs = function fetchUnnotifiedTxs (age, waitPeriod) { AND phone IS NOT NULL AND status IN ('instant', 'confirmed') AND (redeem=$4 OR ((EXTRACT(EPOCH FROM (now() - created))) * 1000)>$5)` + const db = connect() return db.any(sql, [age, false, false, true, waitPeriod]) .then(rows => normalizeTxs(rows)) @@ -321,6 +336,7 @@ exports.updateTxStatus = function updateTxStatus (tx, status) { const TransactionMode = pgp.txMode.TransactionMode const isolationLevel = pgp.txMode.isolationLevel const tmSRD = new TransactionMode({tiLevel: isolationLevel.serializable}) + const db = connect() function transaction (t) { const sql = 'select status, confirmation_time from cash_out_txs where id=$1' @@ -355,7 +371,7 @@ exports.updateTxStatus = function updateTxStatus (tx, status) { return db.none(sql3, [tx.id, r.status]) .then(() => { if (r.status === 'confirmed') { - const sql4 = `update cash_out_hds set confirmed=true where id=$1` + const sql4 = 'update cash_out_hds set confirmed=true where id=$1' return db.none(sql4, [tx.id]) } }) @@ -365,6 +381,7 @@ exports.updateTxStatus = function updateTxStatus (tx, status) { exports.updateRedeem = function updateRedeem (txId) { const sql = 'UPDATE cash_out_txs SET redeem=$1 WHERE id=$2' const values = [true, txId] + const db = connect() return db.none(sql, values) .then(() => { @@ -376,6 +393,7 @@ exports.updateRedeem = function updateRedeem (txId) { exports.updateNotify = function updateNotify (tx) { const sql = 'UPDATE cash_out_txs SET notified=$1 WHERE id=$2' const values = [true, tx.id] + const db = connect() return db.none(sql, values) .then(() => { @@ -392,6 +410,7 @@ function insertCachedRequest (deviceId, txId, path, method, body) { 'method', 'body' ] + const db = connect() const sql = getInsertQuery('cached_responses', fields) return db.none(sql, [deviceId, txId, path, method, body]) @@ -405,6 +424,7 @@ exports.cachedResponse = function (deviceId, txId, path, method) { and method=$4` const values = [deviceId, txId, path, method] + const db = connect() return insertCachedRequest(deviceId, txId, path, method, {pendingRequest: true}) .then(() => ({})) @@ -421,6 +441,7 @@ function pruneCachedResponses () { where (EXTRACT(EPOCH FROM (now() - created))) * 1000 < $1` const values = [CACHED_SESSION_TTL] + const db = connect() return db.none(sql, values) } @@ -434,6 +455,7 @@ exports.cacheResponse = function (deviceId, txId, path, method, body) { and method=$5` const values = [body, deviceId, txId, path, method] + const db = connect() return db.none(sql, values) } @@ -441,6 +463,8 @@ exports.cacheResponse = function (deviceId, txId, path, method, body) { exports.nextCashOutSerialHD = function nextCashOutSerialHD (txId, cryptoCode) { const sql = `select hd_serial from cash_out_hds where crypto_code=$1 order by hd_serial desc limit 1` + const db = connect() + const attempt = () => db.oneOrNone(sql, [cryptoCode]) .then(row => { const serialNumber = row ? row.hd_serial + 1 : 0 @@ -461,6 +485,7 @@ exports.fetchLiveHD = function fetchLiveHD () { ((extract(epoch from (now() - cash_out_txs.created))) * 1000)<$3` const values = ['confirmed', false, LIVE_SWEEP_TTL] + const db = connect() return db.any(sql, values) } @@ -470,11 +495,16 @@ exports.fetchOldHD = function fetchLiveHD () { where confirmed order by last_checked limit 10` + const db = connect() return db.any(sql) } exports.markSwept = function markSwept (txId) { - const sql = `update cash_out_hds set swept=$1 where id=$2` + const sql = 'update cash_out_hds set swept=$1 where id=$2' + const db = connect() + return db.none(sql, [true, txId]) } + +setInterval(pruneCachedResponses, CACHED_SESSION_TTL)