diff --git a/lib/plugins.js b/lib/plugins.js index e87dd007..1e4c31be 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -373,7 +373,7 @@ exports.cashOut = function cashOut (session, tx, cb) { } exports.dispenseAck = function dispenseAck (session, rec) { - db.addDispense(session, rec.tx, rec.cartridges) + return db.addDispense(session, rec.tx, rec.cartridges) } exports.fiatBalance = function fiatBalance (cryptoCode) { @@ -413,10 +413,7 @@ function processTxStatus (tx) { if (tx.status === status) return resolve() const confirm = (status === 'instant' && tx.status !== 'confirmed') || (status === 'confirmed' && tx.status !== 'instant') - db.updateTxStatus(tx, status, confirm, function (_err) { - if (_err) logger.error(err) - resolve() - }) + db.updateTxStatus(tx, status, confirm).then(resolve).catch(reject) }) }) } @@ -438,25 +435,22 @@ function notifyConfirmation (tx) { function monitorLiveIncoming () { const statuses = ['notSeen', 'published', 'insufficientFunds'] - db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) { - if (err) return - txs.forEach(processTxStatus) - }) + db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) + .then(txs => txs.forEach(processTxStatus)) + .catch(err => logger.error(err)) } function monitorIncoming () { const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds'] - db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) { - if (err) return - txs.forEach(processTxStatus) - }) + db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE) + .then(txs => txs.forEach(processTxStatus)) + .catch(err => logger.error(err)) } function monitorUnnotified () { - db.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE, function (err, txs) { - if (err) return - txs.forEach(notifyConfirmation) - }) + db.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE) + .then(txs => txs.forEach(notifyConfirmation)) + .catch(err => logger.error(err)) } /* @@ -776,8 +770,8 @@ exports.fetchTx = function fetchTx (session) { return db.fetchTx(session) } -exports.requestDispense = function requestDispense (session, tx) { - return db.addDispenseRequest(session, tx) +exports.requestDispense = function requestDispense (tx) { + return db.addDispenseRequest(tx) } exports.cachedResponse = function (session, path, method) { diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index 5c4676fd..0385dbe1 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -1,10 +1,7 @@ 'use strict' -// TODO: Consider using serializable transactions for true ACID - var BigNumber = require('bignumber.js') var pg = require('pg') -var async = require('async') var logger = require('./logger') @@ -26,7 +23,7 @@ function isLowSeverity (err) { var conString = null -function getInsertQuery (tableName, fields, hasId) { +function getInsertQuery (tableName, fields) { // outputs string like: '$1, $2, $3...' with proper No of items var placeholders = fields.map(function (_, i) { return '$' + (i + 1) @@ -37,8 +34,6 @@ function getInsertQuery (tableName, fields, hasId) { ' VALUES' + ' (' + placeholders + ')' - if (hasId) query += ' RETURNING id' - return query } @@ -86,7 +81,7 @@ exports.recordBill = function recordBill (session, rec, cb) { connect(function (cerr, client, done) { if (cerr) return cb(cerr) - query(client, getInsertQuery('bills', fields, false), values, function (err) { + query(client, getInsertQuery('bills', fields), values, function (err) { done() if (err && isUniqueViolation(err)) { logger.warn('Attempt to report bill twice') @@ -127,29 +122,7 @@ function query (client, queryStr, values, cb) { }) } -function insertIncoming (client, session, tx, cryptoAtoms, fiat, cb) { - var fields = ['session_id', 'device_fingerprint', 'to_address', - 'crypto_atoms', 'crypto_code', 'currency_code', 'fiat', 'tx_hash', - 'phone', 'error' - ] - - var values = [ - session.id, - session.fingerprint, - tx.toAddress, - cryptoAtoms.toString(), - tx.currencyCode, - tx.cryptoCode, - fiat, - tx.txHash, - tx.phone, - tx.error - ] - - query(client, getInsertQuery('cash_out_txs', fields), values, cb) -} - -function insertOutgoing (client, session, tx, cryptoAtoms, fiat, cb) { +exports.addOutgoingTx = function addOutgoingTx (session, tx, cb) { var fields = ['session_id', 'device_fingerprint', 'to_address', 'crypto_atoms', 'crypto_code', 'currency_code', 'fiat', 'tx_hash', 'fee', 'phone', 'error' @@ -159,28 +132,17 @@ function insertOutgoing (client, session, tx, cryptoAtoms, fiat, cb) { session.id, session.fingerprint, tx.toAddress, - cryptoAtoms.toString(), + tx.cryptoAtoms.toString(), tx.cryptoCode, tx.currencyCode, - fiat, + tx.fiat, tx.txHash, null, tx.phone, tx.error ] - query(client, getInsertQuery('cash_in_txs', fields), values, cb) -} - -// Calling function should only send bitcoins if result.cryptoAtomsToSend > 0 -exports.addOutgoingTx = function addOutgoingTx (session, tx, cb) { - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - - var cryptoAtoms = tx.cryptoAtoms - var fiat = tx.fiat - insertOutgoing(client, session, tx, cryptoAtoms, fiat, cb) - }) + return pquery(getInsertQuery('cash_in_txs', fields), values) } exports.sentCoins = function sentCoins (session, tx, toSend, fee, error, txHash) { @@ -189,13 +151,28 @@ exports.sentCoins = function sentCoins (session, tx, toSend, fee, error, txHash) } exports.addInitialIncoming = function addInitialIncoming (session, tx, cb) { - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - insertIncoming(client, session, tx, tx.cryptoAtoms, tx.fiat, cb) - }) + var fields = ['session_id', 'device_fingerprint', 'to_address', + 'crypto_atoms', 'crypto_code', 'currency_code', 'fiat', 'tx_hash', + 'phone', 'error' + ] + + var values = [ + session.id, + session.fingerprint, + tx.toAddress, + tx.cryptoAtoms.toString(), + tx.currencyCode, + tx.cryptoCode, + tx.fiat, + tx.txHash, + tx.phone, + tx.error + ] + + return pquery(getInsertQuery('cash_out_txs', fields), values) } -function insertDispense (client, session, tx, cartridges, cb) { +function insertDispense (session, tx, cartridges, cb) { var fields = [ 'device_fingerprint', 'session_id', 'dispense1', 'reject1', 'count1', @@ -212,28 +189,29 @@ function insertDispense (client, session, tx, cartridges, cb) { var count1 = cartridges[0].count var count2 = cartridges[1].count var values = [ - session.fingerprint, session.id, + session.fingerprint, tx.sessionId, dispense1, reject1, count1, dispense2, reject2, count2, false, tx.error ] - client.query(sql, values, cb) + + return pquery(sql, values) } exports.addIncomingPhone = function addIncomingPhone (session, tx, notified, cb) { - var sql = 'UPDATE transactions SET phone=$1, notified=$2 ' + - 'WHERE device_fingerprint=$3 AND session_id=$4 ' + - 'AND phone IS NULL' + var sql = `UPDATE cash_out_txs SET phone=$1, notified=$2 + WHERE session_id=$3 + AND phone IS NULL` + var values = [tx.phone, notified, session.fingerprint, tx.sessionId] - return new Promise((resolve, reject) => { - connect(function (cerr, client, done) { - if (cerr) return reject(cerr) - var values = [tx.phone, notified, session.fingerprint, tx.sessionId] - query(client, sql, values, function (err, results) { - done(err) - if (err) return reject(err) - resolve({noPhone: results.rowCount === 0}) - }) - }) + return pquery(sql, values) + .then(results => { + const noPhone = results.rowCount === 0 + const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' + + if (noPhone) return {noPhone: noPhone} + + return pquery(sql2, [tx.sessionId, 'addedPhone']) + .then(() => ({noPhone: noPhone})) }) } @@ -269,63 +247,34 @@ exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) { } exports.fetchTx = function fetchTx (session) { - var sql = 'SELECT * FROM cash_out_txs ' + - 'WHERE device_fingerprint=$1 AND session_id=$2' + const sql = 'SELECT * FROM cash_out_txs WHERE session_id=$1' - return new Promise((resolve, reject) => { - connect(function (cerr, client, done) { - if (cerr) return reject(cerr) - var values = [session.fingerprint, session.id] - query(client, sql, values, function (err, results) { - done() - if (err) return reject(err) - resolve(normalizeTxs(results.rows)[0]) - }) - }) - }) + return pquery(sql, [session.id]) + .then(r => normalizeTxs(r.rows)[0]) } -exports.addDispenseRequest = function addDispenseRequest (session, tx) { - return new Promise((resolve, reject) => { - connect(function (cerr, client, done) { - if (cerr) return reject(cerr) +exports.addDispenseRequest = function addDispenseRequest (tx) { + const sql = 'update cash_out_txs set dispensed=$1 where session_id=$2 and dispensed=$3' + const values = [true, tx.sessionId, false] - const originalSession = {id: tx.sessionId, fingerprint: session.fingerprint} - async.waterfall([ - async.apply(updateDispense, client, originalSession, true), - async.apply(insertIncoming, client, originalSession, tx, 0, tx.fiat) - ], function (err) { - done() + return pquery(sql, values) + .then(results => { + const alreadyDispensed = results.rowCount === 0 + if (alreadyDispensed) return {dispense: false, reason: 'alreadyDispensed'} - if (err) return reject(err) - resolve() - }) - }) - }) -} + const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' -function updateDispense (client, session, dispensed, cb) { - var sql = 'UPDATE transactions SET dispensed=$1 ' + - 'WHERE device_fingerprint=$2 AND session_id=$3' - var values = [dispensed, session.fingerprint, session.id] - query(client, sql, values, function (err, results) { - if (err) return cb(err) - if (results.rowCount === 0) return cb(new Error('No pending tx')) - cb() + return pquery(sql2, [tx.sessionId, 'dispenseRequested']) + .then(() => ({dispense: true})) }) } exports.addDispense = function addDispense (session, tx, cartridges) { - connect(function (cerr, client, done) { - if (cerr) return + return insertDispense(session, tx, cartridges) + .then(() => { + const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' - async.waterfall([ - async.apply(insertIncoming, client, session, tx, 0, tx.fiat), - async.apply(insertDispense, client, session, tx, cartridges) - ], function (err) { - done() - if (err) logger.error(err) - }) + return pquery(sql2, [tx.sessionId, 'dispensed']) }) } @@ -351,7 +300,7 @@ exports.machineEvent = function machineEvent (rec, cb) { connect(function (cerr, client, done) { if (cerr) return cb(cerr) var fields = ['id', 'device_fingerprint', 'event_type', 'note', 'device_time'] - var sql = getInsertQuery('machine_events', fields, false) + var sql = getInsertQuery('machine_events', fields) var values = [rec.id, rec.fingerprint, rec.eventType, rec.note, rec.deviceTime] var deleteSql = 'DELETE FROM machine_events WHERE (EXTRACT(EPOCH FROM (now() - created))) * 1000 > $1' @@ -403,36 +352,21 @@ exports.fetchOpenTxs = function fetchOpenTxs (statuses, age, cb) { 'WHERE ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$1 ' + 'AND status IN ' + _statuses - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - - query(client, sql, [age], function (err, results) { - done() - if (err) return cb(err) - cb(null, normalizeTxs(results.rows)) - }) - }) + return pquery(sql, [age]) + .then(r => normalizeTxs(r.rows)) } exports.fetchUnnotifiedTxs = function fetchUnnotifiedTxs (age, waitPeriod, cb) { - var sql = 'SELECT * ' + - 'FROM transactions ' + - 'WHERE ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$1 ' + - 'AND notified=$2 AND dispensed=$3 ' + - 'AND phone IS NOT NULL ' + - "AND status IN ('instant', 'confirmed') " + - 'AND (redeem=$4 OR ((EXTRACT(EPOCH FROM (now() - created))) * 1000)>$5)' + var sql = `SELECT * + FROM cash_out_txs + WHERE ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$1 + AND notified=$2 AND dispensed=$3 + AND phone IS NOT NULL + AND status IN ('instant', 'confirmed') + AND (redeem=$4 OR ((EXTRACT(EPOCH FROM (now() - created))) * 1000)>$5)` - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - - var values = [age, false, false, true, waitPeriod] - query(client, sql, values, function (err, results) { - done() - if (err) return cb(err) - cb(null, normalizeTxs(results.rows)) - }) - }) + return pquery(sql, [age, false, false, true, waitPeriod]) + .then(r => normalizeTxs(r.rows)) } function pquery (sql, values) { @@ -450,45 +384,37 @@ function pquery (sql, values) { exports.updateTxStatus = function updateTxStatus (tx, status, confirm) { var sql = confirm - ? 'UPDATE transactions SET status=$1, confirmation_time=now() WHERE id=$2' - : 'UPDATE transactions SET status=$1 WHERE id=$2' + ? 'UPDATE cash_out_txs SET status=$1, confirmation_time=now() WHERE session_id=$2' + : 'UPDATE cash_out_txs SET status=$1 WHERE session_id=$2' - var values = [status, tx.id] + var values = [status, tx.sessionId] return pquery(sql, values) + .then(() => { + const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' + return pquery(sql2, [tx.sessionId, status]) + }) } exports.updateRedeem = function updateRedeem (session, cb) { - var sql = 'UPDATE transactions SET redeem=$1 ' + - 'WHERE device_fingerprint=$2 AND session_id=$3' + var sql = 'UPDATE cash_out_txs SET redeem=$1 WHERE session_id=$2' + var values = [true, session.id] - return new Promise((resolve, reject) => { - connect(function (cerr, client, done) { - if (cerr) return reject(cerr) - var values = [true, session.fingerprint, session.id] - query(client, sql, values, function (err) { - done(err) - if (err) return reject(err) - resolve() - }) - }) + return pquery(sql, values) + .then(() => { + const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' + return pquery(sql2, [session.id, 'redeem']) }) } exports.updateNotify = function updateNotify (tx) { - var sql = 'UPDATE transactions SET notified=$1 ' + - 'WHERE id=$2' + var sql = 'UPDATE cash_out_txs SET notified=$1 WHERE session_id=$2' + var values = [true, tx.sessionId] - return new Promise((resolve, reject) => { - connect(function (cerr, client, done) { - if (cerr) return reject(cerr) - var values = [true, tx.id] - query(client, sql, values, function (err) { - done(err) - if (err) return reject(err) - resolve() - }) - }) + return pquery(sql, values) + .then(() => { + const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)' + return pquery(sql2, [tx.sessionId, 'notified']) }) } @@ -544,14 +470,3 @@ exports.cacheResponse = function (session, path, method, body) { return pquery(sql, values) } - -/* -exports.init('postgres://lamassu:lamassu@localhost/lamassu') -connect(function(err, client, done) { - var sql = 'select * from transactions where id=$1' - query(client, sql, [130], function(_err, results) { - done() - console.dir(results.rows[0]) - }) -}) -*/ diff --git a/lib/routes.js b/lib/routes.js index 5b74160c..de32186d 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -269,11 +269,12 @@ function waitForDispense (req, res) { function dispense (req, res) { const tx = req.body.tx - const body = {dispense: true} - return plugins.requestDispense(session(req), tx) - .then(() => cacheResponse(req, body)) - .then(() => res.json(body)) + return plugins.requestDispense(tx) + .then(r => { + return cacheResponse(req, r) + .then(() => res.json(r)) + }) .catch(err => { logger.error(err) res.sendStatus(500) diff --git a/todo.txt b/todo.txt index 2844c1a6..6eff1e50 100644 --- a/todo.txt +++ b/todo.txt @@ -1,2 +1,2 @@ - test cash out -- l-m shouldn't keep polling l-s when not on pending screen +- l-m shouldn't keep polling l-s when not on pending screen (low priority)