WIP
This commit is contained in:
parent
7055104218
commit
98e863d31d
4 changed files with 109 additions and 199 deletions
|
|
@ -373,7 +373,7 @@ exports.cashOut = function cashOut (session, tx, cb) {
|
|||
}
|
||||
|
||||
exports.dispenseAck = function dispenseAck (session, rec) {
|
||||
db.addDispense(session, rec.tx, rec.cartridges)
|
||||
return db.addDispense(session, rec.tx, rec.cartridges)
|
||||
}
|
||||
|
||||
exports.fiatBalance = function fiatBalance (cryptoCode) {
|
||||
|
|
@ -413,10 +413,7 @@ function processTxStatus (tx) {
|
|||
if (tx.status === status) return resolve()
|
||||
const confirm = (status === 'instant' && tx.status !== 'confirmed') ||
|
||||
(status === 'confirmed' && tx.status !== 'instant')
|
||||
db.updateTxStatus(tx, status, confirm, function (_err) {
|
||||
if (_err) logger.error(err)
|
||||
resolve()
|
||||
})
|
||||
db.updateTxStatus(tx, status, confirm).then(resolve).catch(reject)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
@ -438,25 +435,22 @@ function notifyConfirmation (tx) {
|
|||
|
||||
function monitorLiveIncoming () {
|
||||
const statuses = ['notSeen', 'published', 'insufficientFunds']
|
||||
db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) {
|
||||
if (err) return
|
||||
txs.forEach(processTxStatus)
|
||||
})
|
||||
db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE)
|
||||
.then(txs => txs.forEach(processTxStatus))
|
||||
.catch(err => logger.error(err))
|
||||
}
|
||||
|
||||
function monitorIncoming () {
|
||||
const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected', 'insufficientFunds']
|
||||
db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) {
|
||||
if (err) return
|
||||
txs.forEach(processTxStatus)
|
||||
})
|
||||
db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE)
|
||||
.then(txs => txs.forEach(processTxStatus))
|
||||
.catch(err => logger.error(err))
|
||||
}
|
||||
|
||||
function monitorUnnotified () {
|
||||
db.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE, function (err, txs) {
|
||||
if (err) return
|
||||
txs.forEach(notifyConfirmation)
|
||||
})
|
||||
db.fetchUnnotifiedTxs(MAX_NOTIFY_AGE, MIN_NOTIFY_AGE)
|
||||
.then(txs => txs.forEach(notifyConfirmation))
|
||||
.catch(err => logger.error(err))
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -776,8 +770,8 @@ exports.fetchTx = function fetchTx (session) {
|
|||
return db.fetchTx(session)
|
||||
}
|
||||
|
||||
exports.requestDispense = function requestDispense (session, tx) {
|
||||
return db.addDispenseRequest(session, tx)
|
||||
exports.requestDispense = function requestDispense (tx) {
|
||||
return db.addDispenseRequest(tx)
|
||||
}
|
||||
|
||||
exports.cachedResponse = function (session, path, method) {
|
||||
|
|
|
|||
|
|
@ -1,10 +1,7 @@
|
|||
'use strict'
|
||||
|
||||
// TODO: Consider using serializable transactions for true ACID
|
||||
|
||||
var BigNumber = require('bignumber.js')
|
||||
var pg = require('pg')
|
||||
var async = require('async')
|
||||
|
||||
var logger = require('./logger')
|
||||
|
||||
|
|
@ -26,7 +23,7 @@ function isLowSeverity (err) {
|
|||
|
||||
var conString = null
|
||||
|
||||
function getInsertQuery (tableName, fields, hasId) {
|
||||
function getInsertQuery (tableName, fields) {
|
||||
// outputs string like: '$1, $2, $3...' with proper No of items
|
||||
var placeholders = fields.map(function (_, i) {
|
||||
return '$' + (i + 1)
|
||||
|
|
@ -37,8 +34,6 @@ function getInsertQuery (tableName, fields, hasId) {
|
|||
' VALUES' +
|
||||
' (' + placeholders + ')'
|
||||
|
||||
if (hasId) query += ' RETURNING id'
|
||||
|
||||
return query
|
||||
}
|
||||
|
||||
|
|
@ -86,7 +81,7 @@ exports.recordBill = function recordBill (session, rec, cb) {
|
|||
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
query(client, getInsertQuery('bills', fields, false), values, function (err) {
|
||||
query(client, getInsertQuery('bills', fields), values, function (err) {
|
||||
done()
|
||||
if (err && isUniqueViolation(err)) {
|
||||
logger.warn('Attempt to report bill twice')
|
||||
|
|
@ -127,29 +122,7 @@ function query (client, queryStr, values, cb) {
|
|||
})
|
||||
}
|
||||
|
||||
function insertIncoming (client, session, tx, cryptoAtoms, fiat, cb) {
|
||||
var fields = ['session_id', 'device_fingerprint', 'to_address',
|
||||
'crypto_atoms', 'crypto_code', 'currency_code', 'fiat', 'tx_hash',
|
||||
'phone', 'error'
|
||||
]
|
||||
|
||||
var values = [
|
||||
session.id,
|
||||
session.fingerprint,
|
||||
tx.toAddress,
|
||||
cryptoAtoms.toString(),
|
||||
tx.currencyCode,
|
||||
tx.cryptoCode,
|
||||
fiat,
|
||||
tx.txHash,
|
||||
tx.phone,
|
||||
tx.error
|
||||
]
|
||||
|
||||
query(client, getInsertQuery('cash_out_txs', fields), values, cb)
|
||||
}
|
||||
|
||||
function insertOutgoing (client, session, tx, cryptoAtoms, fiat, cb) {
|
||||
exports.addOutgoingTx = function addOutgoingTx (session, tx, cb) {
|
||||
var fields = ['session_id', 'device_fingerprint', 'to_address',
|
||||
'crypto_atoms', 'crypto_code', 'currency_code', 'fiat', 'tx_hash',
|
||||
'fee', 'phone', 'error'
|
||||
|
|
@ -159,28 +132,17 @@ function insertOutgoing (client, session, tx, cryptoAtoms, fiat, cb) {
|
|||
session.id,
|
||||
session.fingerprint,
|
||||
tx.toAddress,
|
||||
cryptoAtoms.toString(),
|
||||
tx.cryptoAtoms.toString(),
|
||||
tx.cryptoCode,
|
||||
tx.currencyCode,
|
||||
fiat,
|
||||
tx.fiat,
|
||||
tx.txHash,
|
||||
null,
|
||||
tx.phone,
|
||||
tx.error
|
||||
]
|
||||
|
||||
query(client, getInsertQuery('cash_in_txs', fields), values, cb)
|
||||
}
|
||||
|
||||
// Calling function should only send bitcoins if result.cryptoAtomsToSend > 0
|
||||
exports.addOutgoingTx = function addOutgoingTx (session, tx, cb) {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
|
||||
var cryptoAtoms = tx.cryptoAtoms
|
||||
var fiat = tx.fiat
|
||||
insertOutgoing(client, session, tx, cryptoAtoms, fiat, cb)
|
||||
})
|
||||
return pquery(getInsertQuery('cash_in_txs', fields), values)
|
||||
}
|
||||
|
||||
exports.sentCoins = function sentCoins (session, tx, toSend, fee, error, txHash) {
|
||||
|
|
@ -189,13 +151,28 @@ exports.sentCoins = function sentCoins (session, tx, toSend, fee, error, txHash)
|
|||
}
|
||||
|
||||
exports.addInitialIncoming = function addInitialIncoming (session, tx, cb) {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
insertIncoming(client, session, tx, tx.cryptoAtoms, tx.fiat, cb)
|
||||
})
|
||||
var fields = ['session_id', 'device_fingerprint', 'to_address',
|
||||
'crypto_atoms', 'crypto_code', 'currency_code', 'fiat', 'tx_hash',
|
||||
'phone', 'error'
|
||||
]
|
||||
|
||||
var values = [
|
||||
session.id,
|
||||
session.fingerprint,
|
||||
tx.toAddress,
|
||||
tx.cryptoAtoms.toString(),
|
||||
tx.currencyCode,
|
||||
tx.cryptoCode,
|
||||
tx.fiat,
|
||||
tx.txHash,
|
||||
tx.phone,
|
||||
tx.error
|
||||
]
|
||||
|
||||
return pquery(getInsertQuery('cash_out_txs', fields), values)
|
||||
}
|
||||
|
||||
function insertDispense (client, session, tx, cartridges, cb) {
|
||||
function insertDispense (session, tx, cartridges, cb) {
|
||||
var fields = [
|
||||
'device_fingerprint', 'session_id',
|
||||
'dispense1', 'reject1', 'count1',
|
||||
|
|
@ -212,28 +189,29 @@ function insertDispense (client, session, tx, cartridges, cb) {
|
|||
var count1 = cartridges[0].count
|
||||
var count2 = cartridges[1].count
|
||||
var values = [
|
||||
session.fingerprint, session.id,
|
||||
session.fingerprint, tx.sessionId,
|
||||
dispense1, reject1, count1, dispense2, reject2, count2,
|
||||
false, tx.error
|
||||
]
|
||||
client.query(sql, values, cb)
|
||||
|
||||
return pquery(sql, values)
|
||||
}
|
||||
|
||||
exports.addIncomingPhone = function addIncomingPhone (session, tx, notified, cb) {
|
||||
var sql = 'UPDATE transactions SET phone=$1, notified=$2 ' +
|
||||
'WHERE device_fingerprint=$3 AND session_id=$4 ' +
|
||||
'AND phone IS NULL'
|
||||
var sql = `UPDATE cash_out_txs SET phone=$1, notified=$2
|
||||
WHERE session_id=$3
|
||||
AND phone IS NULL`
|
||||
var values = [tx.phone, notified, session.fingerprint, tx.sessionId]
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return reject(cerr)
|
||||
var values = [tx.phone, notified, session.fingerprint, tx.sessionId]
|
||||
query(client, sql, values, function (err, results) {
|
||||
done(err)
|
||||
if (err) return reject(err)
|
||||
resolve({noPhone: results.rowCount === 0})
|
||||
})
|
||||
})
|
||||
return pquery(sql, values)
|
||||
.then(results => {
|
||||
const noPhone = results.rowCount === 0
|
||||
const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)'
|
||||
|
||||
if (noPhone) return {noPhone: noPhone}
|
||||
|
||||
return pquery(sql2, [tx.sessionId, 'addedPhone'])
|
||||
.then(() => ({noPhone: noPhone}))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -269,63 +247,34 @@ exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) {
|
|||
}
|
||||
|
||||
exports.fetchTx = function fetchTx (session) {
|
||||
var sql = 'SELECT * FROM cash_out_txs ' +
|
||||
'WHERE device_fingerprint=$1 AND session_id=$2'
|
||||
const sql = 'SELECT * FROM cash_out_txs WHERE session_id=$1'
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return reject(cerr)
|
||||
var values = [session.fingerprint, session.id]
|
||||
query(client, sql, values, function (err, results) {
|
||||
done()
|
||||
if (err) return reject(err)
|
||||
resolve(normalizeTxs(results.rows)[0])
|
||||
})
|
||||
})
|
||||
})
|
||||
return pquery(sql, [session.id])
|
||||
.then(r => normalizeTxs(r.rows)[0])
|
||||
}
|
||||
|
||||
exports.addDispenseRequest = function addDispenseRequest (session, tx) {
|
||||
return new Promise((resolve, reject) => {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return reject(cerr)
|
||||
exports.addDispenseRequest = function addDispenseRequest (tx) {
|
||||
const sql = 'update cash_out_txs set dispensed=$1 where session_id=$2 and dispensed=$3'
|
||||
const values = [true, tx.sessionId, false]
|
||||
|
||||
const originalSession = {id: tx.sessionId, fingerprint: session.fingerprint}
|
||||
async.waterfall([
|
||||
async.apply(updateDispense, client, originalSession, true),
|
||||
async.apply(insertIncoming, client, originalSession, tx, 0, tx.fiat)
|
||||
], function (err) {
|
||||
done()
|
||||
return pquery(sql, values)
|
||||
.then(results => {
|
||||
const alreadyDispensed = results.rowCount === 0
|
||||
if (alreadyDispensed) return {dispense: false, reason: 'alreadyDispensed'}
|
||||
|
||||
if (err) return reject(err)
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)'
|
||||
|
||||
function updateDispense (client, session, dispensed, cb) {
|
||||
var sql = 'UPDATE transactions SET dispensed=$1 ' +
|
||||
'WHERE device_fingerprint=$2 AND session_id=$3'
|
||||
var values = [dispensed, session.fingerprint, session.id]
|
||||
query(client, sql, values, function (err, results) {
|
||||
if (err) return cb(err)
|
||||
if (results.rowCount === 0) return cb(new Error('No pending tx'))
|
||||
cb()
|
||||
return pquery(sql2, [tx.sessionId, 'dispenseRequested'])
|
||||
.then(() => ({dispense: true}))
|
||||
})
|
||||
}
|
||||
|
||||
exports.addDispense = function addDispense (session, tx, cartridges) {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return
|
||||
return insertDispense(session, tx, cartridges)
|
||||
.then(() => {
|
||||
const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)'
|
||||
|
||||
async.waterfall([
|
||||
async.apply(insertIncoming, client, session, tx, 0, tx.fiat),
|
||||
async.apply(insertDispense, client, session, tx, cartridges)
|
||||
], function (err) {
|
||||
done()
|
||||
if (err) logger.error(err)
|
||||
})
|
||||
return pquery(sql2, [tx.sessionId, 'dispensed'])
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -351,7 +300,7 @@ exports.machineEvent = function machineEvent (rec, cb) {
|
|||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
var fields = ['id', 'device_fingerprint', 'event_type', 'note', 'device_time']
|
||||
var sql = getInsertQuery('machine_events', fields, false)
|
||||
var sql = getInsertQuery('machine_events', fields)
|
||||
var values = [rec.id, rec.fingerprint, rec.eventType, rec.note, rec.deviceTime]
|
||||
|
||||
var deleteSql = 'DELETE FROM machine_events WHERE (EXTRACT(EPOCH FROM (now() - created))) * 1000 > $1'
|
||||
|
|
@ -403,36 +352,21 @@ exports.fetchOpenTxs = function fetchOpenTxs (statuses, age, cb) {
|
|||
'WHERE ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$1 ' +
|
||||
'AND status IN ' + _statuses
|
||||
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
|
||||
query(client, sql, [age], function (err, results) {
|
||||
done()
|
||||
if (err) return cb(err)
|
||||
cb(null, normalizeTxs(results.rows))
|
||||
})
|
||||
})
|
||||
return pquery(sql, [age])
|
||||
.then(r => normalizeTxs(r.rows))
|
||||
}
|
||||
|
||||
exports.fetchUnnotifiedTxs = function fetchUnnotifiedTxs (age, waitPeriod, cb) {
|
||||
var sql = 'SELECT * ' +
|
||||
'FROM transactions ' +
|
||||
'WHERE ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$1 ' +
|
||||
'AND notified=$2 AND dispensed=$3 ' +
|
||||
'AND phone IS NOT NULL ' +
|
||||
"AND status IN ('instant', 'confirmed') " +
|
||||
'AND (redeem=$4 OR ((EXTRACT(EPOCH FROM (now() - created))) * 1000)>$5)'
|
||||
var sql = `SELECT *
|
||||
FROM cash_out_txs
|
||||
WHERE ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$1
|
||||
AND notified=$2 AND dispensed=$3
|
||||
AND phone IS NOT NULL
|
||||
AND status IN ('instant', 'confirmed')
|
||||
AND (redeem=$4 OR ((EXTRACT(EPOCH FROM (now() - created))) * 1000)>$5)`
|
||||
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return cb(cerr)
|
||||
|
||||
var values = [age, false, false, true, waitPeriod]
|
||||
query(client, sql, values, function (err, results) {
|
||||
done()
|
||||
if (err) return cb(err)
|
||||
cb(null, normalizeTxs(results.rows))
|
||||
})
|
||||
})
|
||||
return pquery(sql, [age, false, false, true, waitPeriod])
|
||||
.then(r => normalizeTxs(r.rows))
|
||||
}
|
||||
|
||||
function pquery (sql, values) {
|
||||
|
|
@ -450,45 +384,37 @@ function pquery (sql, values) {
|
|||
|
||||
exports.updateTxStatus = function updateTxStatus (tx, status, confirm) {
|
||||
var sql = confirm
|
||||
? 'UPDATE transactions SET status=$1, confirmation_time=now() WHERE id=$2'
|
||||
: 'UPDATE transactions SET status=$1 WHERE id=$2'
|
||||
? 'UPDATE cash_out_txs SET status=$1, confirmation_time=now() WHERE session_id=$2'
|
||||
: 'UPDATE cash_out_txs SET status=$1 WHERE session_id=$2'
|
||||
|
||||
var values = [status, tx.id]
|
||||
var values = [status, tx.sessionId]
|
||||
|
||||
return pquery(sql, values)
|
||||
.then(() => {
|
||||
const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)'
|
||||
return pquery(sql2, [tx.sessionId, status])
|
||||
})
|
||||
}
|
||||
|
||||
exports.updateRedeem = function updateRedeem (session, cb) {
|
||||
var sql = 'UPDATE transactions SET redeem=$1 ' +
|
||||
'WHERE device_fingerprint=$2 AND session_id=$3'
|
||||
var sql = 'UPDATE cash_out_txs SET redeem=$1 WHERE session_id=$2'
|
||||
var values = [true, session.id]
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return reject(cerr)
|
||||
var values = [true, session.fingerprint, session.id]
|
||||
query(client, sql, values, function (err) {
|
||||
done(err)
|
||||
if (err) return reject(err)
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
return pquery(sql, values)
|
||||
.then(() => {
|
||||
const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)'
|
||||
return pquery(sql2, [session.id, 'redeem'])
|
||||
})
|
||||
}
|
||||
|
||||
exports.updateNotify = function updateNotify (tx) {
|
||||
var sql = 'UPDATE transactions SET notified=$1 ' +
|
||||
'WHERE id=$2'
|
||||
var sql = 'UPDATE cash_out_txs SET notified=$1 WHERE session_id=$2'
|
||||
var values = [true, tx.sessionId]
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
connect(function (cerr, client, done) {
|
||||
if (cerr) return reject(cerr)
|
||||
var values = [true, tx.id]
|
||||
query(client, sql, values, function (err) {
|
||||
done(err)
|
||||
if (err) return reject(err)
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
return pquery(sql, values)
|
||||
.then(() => {
|
||||
const sql2 = 'insert into cash_out_actions (session_id, action) values ($1, $2)'
|
||||
return pquery(sql2, [tx.sessionId, 'notified'])
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -544,14 +470,3 @@ exports.cacheResponse = function (session, path, method, body) {
|
|||
|
||||
return pquery(sql, values)
|
||||
}
|
||||
|
||||
/*
|
||||
exports.init('postgres://lamassu:lamassu@localhost/lamassu')
|
||||
connect(function(err, client, done) {
|
||||
var sql = 'select * from transactions where id=$1'
|
||||
query(client, sql, [130], function(_err, results) {
|
||||
done()
|
||||
console.dir(results.rows[0])
|
||||
})
|
||||
})
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -269,11 +269,12 @@ function waitForDispense (req, res) {
|
|||
|
||||
function dispense (req, res) {
|
||||
const tx = req.body.tx
|
||||
const body = {dispense: true}
|
||||
|
||||
return plugins.requestDispense(session(req), tx)
|
||||
.then(() => cacheResponse(req, body))
|
||||
.then(() => res.json(body))
|
||||
return plugins.requestDispense(tx)
|
||||
.then(r => {
|
||||
return cacheResponse(req, r)
|
||||
.then(() => res.json(r))
|
||||
})
|
||||
.catch(err => {
|
||||
logger.error(err)
|
||||
res.sendStatus(500)
|
||||
|
|
|
|||
2
todo.txt
2
todo.txt
|
|
@ -1,2 +1,2 @@
|
|||
- test cash out
|
||||
- l-m shouldn't keep polling l-s when not on pending screen
|
||||
- l-m shouldn't keep polling l-s when not on pending screen (low priority)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue