diff --git a/lib/plugins.js b/lib/plugins.js index 6fad64a8..f938da63 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -338,9 +338,8 @@ function reapTxs () { // NOTE: No harm in processing old pending tx, we don't need to wait for // removeOldPending to complete. - db.pendingTxs(PENDING_TIMEOUT, function (err, results) { + db.pendingTxs(PENDING_TIMEOUT, function (err, rows) { if (err) return logger.warn(err) - var rows = results.rows var rowCount = rows.length for (var i = 0; i < rowCount; i++) { var row = rows[i] @@ -446,8 +445,11 @@ function processTxStatus (tx) { const cryptoCode = tx.cryptoCode const walletPlugin = walletPlugins[cryptoCode] - walletPlugin.getStatus(tx, function (err, status) { + walletPlugin.getStatus(tx.toAddress, tx.cryptoAtoms, function (err, res) { if (err) return logger.error(err) + console.log('DEBUG5') + console.log(res) + var status = res.status if (tx.status === status) return db.updateTxStatus(tx, status, function (_err) { if (_err) return logger.error(err) @@ -464,7 +466,7 @@ function monitorLiveIncoming () { } function monitorIncoming () { - const statuses = ['notSeen', 'published', 'zeroConf', 'authorized', 'rejected'] + const statuses = ['notSeen', 'published', 'authorized', 'instant', 'rejected'] db.fetchOpenTxs(statuses, STALE_INCOMING_TX_AGE, function (err, txs) { if (err) return txs.forEach(processTxStatus) @@ -758,10 +760,7 @@ exports.getPhoneCode = function getPhoneCode (phone) { } exports.updatePhone = function updatePhone (session, tx) { - db.addIncomingPhone(session, tx, err => { - if (err) return Promise.reject(err) - return Promise.resolve() - }) + return db.addIncomingPhone(session, tx) } exports.fetchPhoneTx = function fetchPhoneTx (phone) { @@ -781,16 +780,8 @@ exports.fetchPhoneTx = function fetchPhoneTx (phone) { }) } -exports.fetchTx = function fetchTx (session, status) { +exports.fetchTx = function fetchTx (session) { return db.fetchTx(session) - .then(txRecs => { - const deposits = txRecs.filter(t => t.stage === 'deposit') - const deposit = R.last(deposits) - const status = deposit ? deposit.authority : 'notSeen' - const tx = txRecs.find(t => t.stage === 'initial_request') - if (!tx) return null - return R.assoc('status', status, tx) - }) } exports.requestDispense = function requestDispense (session, tx) { diff --git a/lib/postgresql_interface.js b/lib/postgresql_interface.js index 6558790c..12d9e249 100644 --- a/lib/postgresql_interface.js +++ b/lib/postgresql_interface.js @@ -83,7 +83,7 @@ exports.recordBill = function recordBill (session, rec, cb) { rec.toAddress, session.id, rec.deviceTime, - rec.cryptoAtoms, + rec.cryptoAtoms.toString(), rec.fiat ] @@ -227,7 +227,7 @@ exports.pendingTxs = function pendingTxs (timeoutMS, cb) { var values = [timeoutS] query(client, sql, values, function (err, results) { done() - cb(err, normalizeTxs(results)) + cb(err, normalizeTxs(results.rows)) }) }) } @@ -299,7 +299,7 @@ function insertTx (client, session, incoming, tx, cryptoAtoms, fiat, stage, incoming, session.fingerprint, tx.toAddress, - cryptoAtoms, + cryptoAtoms.toString(), tx.currencyCode, tx.cryptoCode, fiat, @@ -490,12 +490,16 @@ function insertDispense (client, session, tx, cartridges, transactionId, cb) { exports.addIncomingPhone = function addIncomingPhone (session, tx, cb) { var sql = 'UPDATE transactions SET phone=$1 ' + 'WHERE incoming=$2 AND device_fingerprint=$3 AND session_id=$4' - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - var values = [tx.phone, true, session.fingerprint, session.id] - query(client, sql, values, function (err) { - done(err) - cb(err) + + return new Promise((resolve, reject) => { + connect(function (cerr, client, done) { + if (cerr) return reject(cerr) + var values = [tx.phone, true, session.fingerprint, session.id] + query(client, sql, values, function (err) { + done(err) + if (err) return reject(err) + resolve() + }) }) }) } @@ -518,56 +522,64 @@ function normalizeTxs (txs) { }) } -exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout, cb) { +exports.fetchPhoneTxs = function fetchPhoneTxs (phone, dispenseTimeout) { var sql = 'SELECT * FROM transactions ' + 'WHERE phone=$1 AND dispensed=$2 ' + 'AND (EXTRACT(EPOCH FROM (now() - created))) * 1000 < $1' - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - query(client, sql, [phone, false, dispenseTimeout], function (err, results) { - done() - if (err) return cb(err) - cb(null, normalizeTxs(results.rows)) + return new Promise((resolve, reject) => { + connect(function (cerr, client, done) { + if (cerr) return reject(cerr) + query(client, sql, [phone, false, dispenseTimeout], function (err, results) { + done() + if (err) return reject(err) + resolve(normalizeTxs(results.rows)) + }) }) }) } -exports.fetchTx = function fetchTx (session, cb) { +exports.fetchTx = function fetchTx (session) { var sql = 'SELECT * FROM transactions ' + 'WHERE device_fingerprint=$1 AND session_id=$2 ' + - 'ORDER BY created' + 'AND stage=$3 AND authority=$4' - connect(function (cerr, client, done) { - if (cerr) return cb(cerr) - query(client, sql, [session.fingerprint, session.id], function (err, results) { - done() - if (err) return cb(err) - cb(null, normalizeTxs(results.rows)) + return new Promise((resolve, reject) => { + connect(function (cerr, client, done) { + if (cerr) return reject(cerr) + var values = [session.fingerprint, session.id, 'initial_request', 'pending'] + query(client, sql, values, function (err, results) { + done() + if (err) return reject(err) + resolve(normalizeTxs(results.rows)[0]) + }) }) }) } -exports.addDispenseRequest = function addDispenseRequest (session, tx, cb) { - connect(function (cerr, client, done) { - if (cerr) return +exports.addDispenseRequest = function addDispenseRequest (session, tx) { + return new Promise((resolve, reject) => { + connect(function (cerr, client, done) { + if (cerr) return reject(cerr) - const originalSession = {id: tx.sessionId, fingerprint: session.fingerprint} - async.waterfall([ - async.apply(updateDispense, client, true), - async.apply(insertIncoming, client, originalSession, tx, 0, tx.fiat, - 'dispense', 'pending') - ], function (err) { - done() - if (err) logger.error(err) + const originalSession = {id: tx.sessionId, fingerprint: session.fingerprint} + async.waterfall([ + async.apply(updateDispense, client, originalSession, true), + async.apply(insertIncoming, client, originalSession, tx, 0, tx.fiat, + 'dispense', 'pending') + ], function (err) { + done() + if (err) return reject(err) + resolve() + }) }) }) } function updateDispense (client, session, dispensed, cb) { - var sql = 'UPDATE transactions SET dispense=$1 ' + + var sql = 'UPDATE transactions SET dispensed=$1 ' + 'WHERE stage=$2 AND authority=$3 AND device_fingerprint=$4 AND session_id=$5' - var values = [dispensed, 'initial_request', 'deposit', session.fingerprint, session.id] + var values = [dispensed, 'initial_request', 'pending', session.fingerprint, session.id] query(client, sql, values, function (err) { cb(err) }) @@ -652,18 +664,24 @@ exports.machineEvents = function machineEvents (cb) { }) } +function singleQuotify (item) { return '\'' + item + '\'' } + exports.fetchOpenTxs = function fetchOpenTxs (statuses, age, cb) { - var sql = 'SELECT *, (EXTRACT(EPOCH FROM (now() - created))) * 1000 AS age ' + + var _statuses = '(' + statuses.map(singleQuotify).join(',') + ')' + + var sql = 'SELECT * ' + 'FROM transactions ' + - 'WHERE incoming=$1 age<$2 AND status IN $3' + 'WHERE incoming=$1 AND ((EXTRACT(EPOCH FROM (now() - created))) * 1000)<$2 ' + + 'AND stage=$3 AND authority=$4 ' + + 'AND status IN ' + _statuses connect(function (cerr, client, done) { if (cerr) return cb(cerr) - query(client, sql, [true, age, statuses], function (err, results) { + query(client, sql, [true, age, 'initial_request', 'pending'], function (err, results) { done() if (err) return cb(err) - cb(null, normalizeTxs(results)) + cb(null, normalizeTxs(results.rows)) }) }) } diff --git a/lib/routes.js b/lib/routes.js index 0a854bc3..eabd0e94 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -224,7 +224,7 @@ function phoneCode (req, res) { function updatePhone (req, res) { return plugins.updatePhone(session(req), req.body.phone) - .then(code => res.send(200)) + .then(code => res.json(200)) .catch(err => { logger.error(err) res.send(500) @@ -241,11 +241,12 @@ function fetchPhoneTx (req, res) { } function waitForDispense (req, res) { - return plugins.fetchTx(session(req), req.query.status) - .then(r => { - if (!r) return res.send(404) - if (r.status === req.query.status) return res.send(304) - res.json(r) + return plugins.fetchTx(session(req)) + .then(tx => { + console.log('%j', tx) + if (!tx) return res.send(404) + if (tx.status === req.query.status) return res.send(304) + res.json({tx: tx}) }) .catch(err => { logger.error(err) @@ -256,7 +257,7 @@ function waitForDispense (req, res) { function dispense (req, res) { const tx = req.body.tx return plugins.requestDispense(session(req), tx) - .then(() => res.send(200)) + .then(() => res.json(200)) .catch(err => { logger.error(err) res.send(500) diff --git a/migrations/008-add-two-way.js b/migrations/008-add-two-way.js index f74d8f7c..8da9b9ae 100644 --- a/migrations/008-add-two-way.js +++ b/migrations/008-add-two-way.js @@ -3,7 +3,8 @@ var db = require('./db') function singleQuotify (item) { return '\'' + item + '\'' } exports.up = function (next) { - var statuses = ['notSeen', 'published', 'zeroConf', 'authorized', 'confirmed', 'rejected'] + var statuses = ['notSeen', 'published', 'authorized', 'instant', + 'confirmed', 'rejected', 'insufficientFunds'] .map(singleQuotify).join(',') var sql = [ diff --git a/todo.txt b/todo.txt index b7baf2b2..24401ab7 100644 --- a/todo.txt +++ b/todo.txt @@ -1,3 +1,2 @@ - change satoshis to crypto_atoms in db (ask neal about this) -- test bitgo - start testing