update postgressql_interface

This commit is contained in:
Josh Harvey 2016-10-13 18:30:35 +03:00
parent 2c9dd2284a
commit c9b7a80e89
2 changed files with 43 additions and 14 deletions

View file

@ -6,7 +6,6 @@ function connect () {
console.log(psqlUrl)
return pgp(psqlUrl)
}
exports.connect = connect
function load () {
var db = connect()

View file

@ -3,6 +3,7 @@
const BigNumber = require('bignumber.js')
const pgp = require('pg-promise')()
var psqlUrl = require('../lib/options').postgresql
const backoff = require('u-promised').backoff
const logger = require('./logger')
@ -10,8 +11,6 @@ const logger = require('./logger')
const CACHED_SESSION_TTL = 60 * 60 * 1000
const LIVE_SWEEP_TTL = 48 * 60 * 60 * 1000
let db
function isUniqueViolation (err) {
return err.code === '23505'
}
@ -30,14 +29,8 @@ function getInsertQuery (tableName, fields) {
return query
}
exports.init = function init (conString) {
if (!conString) {
throw new Error('Postgres connection string is required')
}
db = pgp(conString)
setInterval(pruneCachedResponses, CACHED_SESSION_TTL)
function connect () {
return pgp(psqlUrl)
}
// logs inputted bill and overall tx status (if available)
@ -69,6 +62,8 @@ exports.recordBill = function recordBill (deviceId, rec) {
console.log('DEBUG11: %j', values)
const db = connect()
return db.none(getInsertQuery('bills', fields), values)
.catch(err => {
if (isUniqueViolation(err)) return logger.warn('Attempt to report bill twice')
@ -81,6 +76,8 @@ exports.recordDeviceEvent = function recordDeviceEvent (deviceId, event) {
'note, device_time) VALUES ($1, $2, $3, $4)'
const values = [deviceId, event.eventType, event.note,
event.deviceTime]
const db = connect()
return db.none(sql, values)
}
@ -106,11 +103,13 @@ exports.addOutgoingTx = function addOutgoingTx (deviceId, tx) {
tx.error
]
const db = connect()
return db.none(getInsertQuery('cash_in_txs', fields), values)
}
exports.sentCoins = function sentCoins (tx, toSend, fee, error, txHash) {
const sql = `update cash_in_txs set tx_hash=$1, error=$2 where id=$3`
const sql = 'update cash_in_txs set tx_hash=$1, error=$2 where id=$3'
const db = connect()
return db.none(sql, [txHash, error, tx.id])
}
@ -133,6 +132,7 @@ exports.addInitialIncoming = function addInitialIncoming (deviceId, tx) {
tx.error
]
const db = connect()
return db.none(getInsertQuery('cash_out_txs', fields), values)
}
@ -160,6 +160,7 @@ function insertDispense (deviceId, tx, cartridges) {
false, tx.error
]
const db = connect()
return db.none(sql, values)
}
@ -168,6 +169,7 @@ exports.addIncomingPhone = function addIncomingPhone (tx, notified) {
WHERE id=$3
AND phone IS NULL`
const values = [tx.phone, notified, tx.id]
const db = connect()
return db.result(sql, values)
.then(results => {
@ -209,6 +211,7 @@ exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) {
'AND (EXTRACT(EPOCH FROM (COALESCE(confirmation_time, now()) - created))) * 1000 < $3'
const values = [phone, false, dispenseTimeout]
const db = connect()
return db.any(sql, values)
.then(rows => normalizeTxs(rows))
@ -216,6 +219,7 @@ exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) {
exports.fetchTx = function fetchTx (txId) {
const sql = 'SELECT * FROM cash_out_txs WHERE id=$1'
const db = connect()
return db.one(sql, [txId])
.then(row => normalizeTx(row))
@ -224,6 +228,7 @@ exports.fetchTx = function fetchTx (txId) {
exports.addDispenseRequest = function addDispenseRequest (tx) {
const sql = 'update cash_out_txs set dispensed=$1 where id=$2 and dispensed=$3'
const values = [true, tx.id, false]
const db = connect()
return db.result(sql, values)
.then(results => {
@ -241,6 +246,7 @@ exports.addDispense = function addDispense (deviceId, tx, cartridges) {
return insertDispense(deviceId, tx, cartridges)
.then(() => {
const sql2 = 'insert into cash_out_actions (cash_out_txs_id, action) values ($1, $2)'
const db = connect()
return db.none(sql2, [tx.id, 'dispensed'])
})
@ -250,6 +256,8 @@ exports.cartridgeCounts = function cartridgeCounts (deviceId) {
const sql = 'SELECT id, count1, count2 FROM dispenses ' +
'WHERE device_id=$1 AND refill=$2 ' +
'ORDER BY id DESC LIMIT 1'
const db = connect()
return db.oneOrNone(sql, [deviceId, true])
.then(row => {
const counts = row ? [row.count1, row.count2] : [0, 0]
@ -264,6 +272,7 @@ exports.machineEvent = function machineEvent (rec) {
const values = [rec.id, rec.deviceId, rec.eventType, rec.note, rec.deviceTime]
const deleteSql = 'DELETE FROM machine_events WHERE (EXTRACT(EPOCH FROM (now() - created))) * 1000 > $1'
const deleteValues = [TTL]
const db = connect()
return db.none(sql, values)
.then(() => db.none(deleteSql, deleteValues))
@ -271,11 +280,15 @@ exports.machineEvent = function machineEvent (rec) {
exports.devices = function devices () {
const sql = 'SELECT device_id, name FROM devices WHERE authorized=$1'
const db = connect()
return db.any(sql, [true])
}
exports.machineEvents = function machineEvents () {
const sql = 'SELECT *, (EXTRACT(EPOCH FROM (now() - created))) * 1000 AS age FROM machine_events'
const db = connect()
return db.any(sql, [])
}
@ -288,6 +301,7 @@ exports.fetchOpenTxs = function fetchOpenTxs (statuses, age) {
'FROM cash_out_txs ' +
'WHERE ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$1 ' +
'AND status IN ' + _statuses
const db = connect()
return db.any(sql, [age])
.then(rows => normalizeTxs(rows))
@ -301,6 +315,7 @@ exports.fetchUnnotifiedTxs = function fetchUnnotifiedTxs (age, waitPeriod) {
AND phone IS NOT NULL
AND status IN ('instant', 'confirmed')
AND (redeem=$4 OR ((EXTRACT(EPOCH FROM (now() - created))) * 1000)>$5)`
const db = connect()
return db.any(sql, [age, false, false, true, waitPeriod])
.then(rows => normalizeTxs(rows))
@ -321,6 +336,7 @@ exports.updateTxStatus = function updateTxStatus (tx, status) {
const TransactionMode = pgp.txMode.TransactionMode
const isolationLevel = pgp.txMode.isolationLevel
const tmSRD = new TransactionMode({tiLevel: isolationLevel.serializable})
const db = connect()
function transaction (t) {
const sql = 'select status, confirmation_time from cash_out_txs where id=$1'
@ -355,7 +371,7 @@ exports.updateTxStatus = function updateTxStatus (tx, status) {
return db.none(sql3, [tx.id, r.status])
.then(() => {
if (r.status === 'confirmed') {
const sql4 = `update cash_out_hds set confirmed=true where id=$1`
const sql4 = 'update cash_out_hds set confirmed=true where id=$1'
return db.none(sql4, [tx.id])
}
})
@ -365,6 +381,7 @@ exports.updateTxStatus = function updateTxStatus (tx, status) {
exports.updateRedeem = function updateRedeem (txId) {
const sql = 'UPDATE cash_out_txs SET redeem=$1 WHERE id=$2'
const values = [true, txId]
const db = connect()
return db.none(sql, values)
.then(() => {
@ -376,6 +393,7 @@ exports.updateRedeem = function updateRedeem (txId) {
exports.updateNotify = function updateNotify (tx) {
const sql = 'UPDATE cash_out_txs SET notified=$1 WHERE id=$2'
const values = [true, tx.id]
const db = connect()
return db.none(sql, values)
.then(() => {
@ -392,6 +410,7 @@ function insertCachedRequest (deviceId, txId, path, method, body) {
'method',
'body'
]
const db = connect()
const sql = getInsertQuery('cached_responses', fields)
return db.none(sql, [deviceId, txId, path, method, body])
@ -405,6 +424,7 @@ exports.cachedResponse = function (deviceId, txId, path, method) {
and method=$4`
const values = [deviceId, txId, path, method]
const db = connect()
return insertCachedRequest(deviceId, txId, path, method, {pendingRequest: true})
.then(() => ({}))
@ -421,6 +441,7 @@ function pruneCachedResponses () {
where (EXTRACT(EPOCH FROM (now() - created))) * 1000 < $1`
const values = [CACHED_SESSION_TTL]
const db = connect()
return db.none(sql, values)
}
@ -434,6 +455,7 @@ exports.cacheResponse = function (deviceId, txId, path, method, body) {
and method=$5`
const values = [body, deviceId, txId, path, method]
const db = connect()
return db.none(sql, values)
}
@ -441,6 +463,8 @@ exports.cacheResponse = function (deviceId, txId, path, method, body) {
exports.nextCashOutSerialHD = function nextCashOutSerialHD (txId, cryptoCode) {
const sql = `select hd_serial from cash_out_hds
where crypto_code=$1 order by hd_serial desc limit 1`
const db = connect()
const attempt = () => db.oneOrNone(sql, [cryptoCode])
.then(row => {
const serialNumber = row ? row.hd_serial + 1 : 0
@ -461,6 +485,7 @@ exports.fetchLiveHD = function fetchLiveHD () {
((extract(epoch from (now() - cash_out_txs.created))) * 1000)<$3`
const values = ['confirmed', false, LIVE_SWEEP_TTL]
const db = connect()
return db.any(sql, values)
}
@ -470,11 +495,16 @@ exports.fetchOldHD = function fetchLiveHD () {
where confirmed
order by last_checked
limit 10`
const db = connect()
return db.any(sql)
}
exports.markSwept = function markSwept (txId) {
const sql = `update cash_out_hds set swept=$1 where id=$2`
const sql = 'update cash_out_hds set swept=$1 where id=$2'
const db = connect()
return db.none(sql, [true, txId])
}
setInterval(pruneCachedResponses, CACHED_SESSION_TTL)