changed postgresql_interface.js to standard format
This commit is contained in:
parent
14497d81c6
commit
f357b6080e
1 changed files with 297 additions and 301 deletions
|
|
@ -1,72 +1,69 @@
|
||||||
/* @flow weak */
|
/* @flow weak */
|
||||||
'use strict';
|
'use strict'
|
||||||
|
|
||||||
// TODO: Consider using serializable transactions for true ACID
|
// TODO: Consider using serializable transactions for true ACID
|
||||||
|
|
||||||
var BigNumber = require('bignumber.js')
|
var BigNumber = require('bignumber.js')
|
||||||
var pg = require('pg');
|
var pg = require('pg')
|
||||||
var async = require('async');
|
var async = require('async')
|
||||||
var _ = require('lodash');
|
var _ = require('lodash')
|
||||||
|
|
||||||
var logger = require('./logger');
|
var logger = require('./logger')
|
||||||
|
|
||||||
/*
|
/*
|
||||||
function inspect(rec) {
|
function inspect(rec) {
|
||||||
console.log(require('util').inspect(rec, {depth: null, colors: true}));
|
console.log(require('util').inspect(rec, {depth: null, colors: true}))
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
var SATOSHI_FACTOR = 1e8;
|
function isUniqueViolation (err) {
|
||||||
|
return err.code === '23505'
|
||||||
function isUniqueViolation(err) {
|
|
||||||
return err.code === '23505';
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function isLowSeverity(err) {
|
function isLowSeverity (err) {
|
||||||
return isUniqueViolation(err) || err.severity === 'low';
|
return isUniqueViolation(err) || err.severity === 'low'
|
||||||
}
|
}
|
||||||
|
|
||||||
var conString = null;
|
var conString = null
|
||||||
|
|
||||||
function rollback(client, done) {
|
function rollback (client, done) {
|
||||||
client.query('ROLLBACK', function(err) {
|
client.query('ROLLBACK', function (err) {
|
||||||
return done(err);
|
return done(err)
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function getInsertQuery(tableName, fields, hasId) {
|
function getInsertQuery (tableName, fields, hasId) {
|
||||||
|
|
||||||
// outputs string like: '$1, $2, $3...' with proper No of items
|
// outputs string like: '$1, $2, $3...' with proper No of items
|
||||||
var placeholders = fields.map(function(_, i) {
|
var placeholders = fields.map(function (_, i) {
|
||||||
return '$' + (i + 1);
|
return '$' + (i + 1)
|
||||||
}).join(', ');
|
}).join(', ')
|
||||||
|
|
||||||
var query = 'INSERT INTO ' + tableName +
|
var query = 'INSERT INTO ' + tableName +
|
||||||
' (' + fields.join(', ') + ')' +
|
' (' + fields.join(', ') + ')' +
|
||||||
' VALUES' +
|
' VALUES' +
|
||||||
' (' + placeholders + ')';
|
' (' + placeholders + ')'
|
||||||
|
|
||||||
if (hasId) query += ' RETURNING id';
|
if (hasId) query += ' RETURNING id'
|
||||||
|
|
||||||
return query;
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.init = function init(_conString) {
|
exports.init = function init (_conString) {
|
||||||
conString = _conString;
|
conString = _conString
|
||||||
if (!conString) {
|
if (!conString) {
|
||||||
throw new Error('Postgres connection string is required');
|
throw new Error('Postgres connection string is required')
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
function connect(cb) {
|
function connect (cb) {
|
||||||
pg.connect(conString, function(err, client, done) {
|
pg.connect(conString, function (err, client, done) {
|
||||||
if (err) logger.error(err);
|
if (err) logger.error(err)
|
||||||
cb(err, client, done);
|
cb(err, client, done)
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// logs inputted bill and overall tx status (if available)
|
// logs inputted bill and overall tx status (if available)
|
||||||
exports.recordBill = function recordBill(session, rec, cb) {
|
exports.recordBill = function recordBill (session, rec, cb) {
|
||||||
var fields = [
|
var fields = [
|
||||||
'id',
|
'id',
|
||||||
'device_fingerprint',
|
'device_fingerprint',
|
||||||
|
|
@ -76,7 +73,7 @@ exports.recordBill = function recordBill(session, rec, cb) {
|
||||||
'device_time',
|
'device_time',
|
||||||
'satoshis',
|
'satoshis',
|
||||||
'denomination'
|
'denomination'
|
||||||
];
|
]
|
||||||
|
|
||||||
var values = [
|
var values = [
|
||||||
rec.uuid,
|
rec.uuid,
|
||||||
|
|
@ -87,198 +84,196 @@ exports.recordBill = function recordBill(session, rec, cb) {
|
||||||
rec.deviceTime,
|
rec.deviceTime,
|
||||||
rec.satoshis,
|
rec.satoshis,
|
||||||
rec.fiat
|
rec.fiat
|
||||||
];
|
]
|
||||||
|
|
||||||
connect(function(cerr, client, done) {
|
connect(function (cerr, client, done) {
|
||||||
if (cerr) return cb(cerr);
|
if (cerr) return cb(cerr)
|
||||||
query(client, getInsertQuery('bills', fields, false), values, function(err) {
|
query(client, getInsertQuery('bills', fields, false), values, function (err) {
|
||||||
done();
|
done()
|
||||||
if (err && isUniqueViolation(err)) {
|
if (err && isUniqueViolation(err)) {
|
||||||
logger.warn('Attempt to report bill twice');
|
logger.warn('Attempt to report bill twice')
|
||||||
return cb();
|
return cb()
|
||||||
}
|
}
|
||||||
cb(err);
|
cb(err)
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
};
|
|
||||||
|
|
||||||
exports.recordDeviceEvent = function recordDeviceEvent(session, event) {
|
|
||||||
connect(function(cerr, client, done) {
|
|
||||||
if (cerr) return;
|
|
||||||
var sql = 'INSERT INTO device_events (device_fingerprint, event_type, ' +
|
|
||||||
'note, device_time) VALUES ($1, $2, $3, $4)';
|
|
||||||
var values = [session.fingerprint, event.eventType, event.note,
|
|
||||||
event.deviceTime];
|
|
||||||
client.query(sql, values, done);
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
function query(client, queryStr, values, cb) {
|
|
||||||
if (!cb) {
|
|
||||||
cb = values;
|
|
||||||
values = [];
|
|
||||||
}
|
|
||||||
|
|
||||||
client.query(queryStr, values, function(err, results) {
|
|
||||||
if (err) {
|
|
||||||
if (!isLowSeverity(err)) {
|
|
||||||
console.log(queryStr);
|
|
||||||
console.log(values);
|
|
||||||
}
|
|
||||||
return cb(err);
|
|
||||||
}
|
|
||||||
cb(null, results);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function silentQuery(client, queryStr, values, cb) {
|
exports.recordDeviceEvent = function recordDeviceEvent (session, event) {
|
||||||
|
connect(function (cerr, client, done) {
|
||||||
|
if (cerr) return
|
||||||
|
var sql = 'INSERT INTO device_events (device_fingerprint, event_type, ' +
|
||||||
|
'note, device_time) VALUES ($1, $2, $3, $4)'
|
||||||
|
var values = [session.fingerprint, event.eventType, event.note,
|
||||||
|
event.deviceTime]
|
||||||
|
client.query(sql, values, done)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
function query (client, queryStr, values, cb) {
|
||||||
if (!cb) {
|
if (!cb) {
|
||||||
cb = values;
|
cb = values
|
||||||
values = [];
|
values = []
|
||||||
}
|
}
|
||||||
|
|
||||||
client.query(queryStr, values, function(err) {
|
client.query(queryStr, values, function (err, results) {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (!isLowSeverity(err)) {
|
if (!isLowSeverity(err)) {
|
||||||
console.log(queryStr);
|
console.log(queryStr)
|
||||||
console.log(values);
|
console.log(values)
|
||||||
}
|
}
|
||||||
cb(err);
|
return cb(err)
|
||||||
}
|
}
|
||||||
cb();
|
cb(null, results)
|
||||||
});
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
function silentQuery (client, queryStr, values, cb) {
|
||||||
|
if (!cb) {
|
||||||
|
cb = values
|
||||||
|
values = []
|
||||||
|
}
|
||||||
|
|
||||||
|
client.query(queryStr, values, function (err) {
|
||||||
|
if (err) {
|
||||||
|
if (!isLowSeverity(err)) {
|
||||||
|
console.log(queryStr)
|
||||||
|
console.log(values)
|
||||||
|
}
|
||||||
|
cb(err)
|
||||||
|
}
|
||||||
|
cb()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// OPTIMIZE: No need to query bills if tx.fiat and tx.satoshis are set
|
// OPTIMIZE: No need to query bills if tx.fiat and tx.satoshis are set
|
||||||
function billsAndTxs(client, session, cb) {
|
function billsAndTxs (client, session, cb) {
|
||||||
var sessionId = session.id;
|
var sessionId = session.id
|
||||||
var fingerprint = session.fingerprint;
|
var fingerprint = session.fingerprint
|
||||||
var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' +
|
var billsQuery = 'SELECT COALESCE(SUM(denomination), 0) as fiat, ' +
|
||||||
'COALESCE(SUM(satoshis), 0) AS satoshis ' +
|
'COALESCE(SUM(satoshis), 0) AS satoshis ' +
|
||||||
'FROM bills ' +
|
'FROM bills ' +
|
||||||
'WHERE device_fingerprint=$1 AND session_id=$2';
|
'WHERE device_fingerprint=$1 AND session_id=$2'
|
||||||
var billsValues = [fingerprint, sessionId];
|
var billsValues = [fingerprint, sessionId]
|
||||||
var txQuery = 'SELECT COALESCE(SUM(fiat), 0) AS fiat, ' +
|
var txQuery = 'SELECT COALESCE(SUM(fiat), 0) AS fiat, ' +
|
||||||
'COALESCE(SUM(satoshis), 0) AS satoshis ' +
|
'COALESCE(SUM(satoshis), 0) AS satoshis ' +
|
||||||
'FROM transactions ' +
|
'FROM transactions ' +
|
||||||
'WHERE device_fingerprint=$1 AND session_id=$2 AND stage=$3';
|
'WHERE device_fingerprint=$1 AND session_id=$2 AND stage=$3'
|
||||||
var txValues = [fingerprint, sessionId, 'partial_request'];
|
var txValues = [fingerprint, sessionId, 'partial_request']
|
||||||
|
|
||||||
async.parallel([
|
async.parallel([
|
||||||
async.apply(query, client, billsQuery, billsValues),
|
async.apply(query, client, billsQuery, billsValues),
|
||||||
async.apply(query, client, txQuery, txValues)
|
async.apply(query, client, txQuery, txValues)
|
||||||
], function(err, results) {
|
], function (err, results) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err)
|
||||||
|
|
||||||
// Note: PG SUM function returns int8, which is returned as a string, so
|
// Note: PG SUM function returns int8, which is returned as a string, so
|
||||||
// we need to parse, since we know these won't be huge numbers.
|
// we need to parse, since we know these won't be huge numbers.
|
||||||
cb(null, {
|
cb(null, {
|
||||||
billsFiat: parseInt(results[0].rows[0].fiat),
|
billsFiat: parseInt(results[0].rows[0].fiat, 10),
|
||||||
billsSatoshis: new BigNumber(results[0].rows[0].satoshis),
|
billsSatoshis: new BigNumber(results[0].rows[0].satoshis),
|
||||||
txFiat: parseInt(results[1].rows[0].fiat),
|
txFiat: parseInt(results[1].rows[0].fiat, 10),
|
||||||
txSatoshis: new BigNumber(results[1].rows[0].satoshis)
|
txSatoshis: new BigNumber(results[1].rows[0].satoshis)
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function computeSendAmount(tx, totals) {
|
function computeSendAmount (tx, totals) {
|
||||||
var fiatRemaining = (tx.fiat || totals.billsFiat) - totals.txFiat;
|
var fiatRemaining = (tx.fiat || totals.billsFiat) - totals.txFiat
|
||||||
|
|
||||||
var satoshisRemaining = tx.satoshis.eq(0)
|
var satoshisRemaining = tx.satoshis.eq(0)
|
||||||
? totals.billsSatoshis.minus(totals.txSatoshis)
|
? totals.billsSatoshis.minus(totals.txSatoshis)
|
||||||
: tx.satoshis.minus(totals.txSatoshis)
|
: tx.satoshis.minus(totals.txSatoshis)
|
||||||
|
|
||||||
var result = {
|
var result = {
|
||||||
fiat: fiatRemaining,
|
fiat: fiatRemaining,
|
||||||
satoshis: satoshisRemaining
|
satoshis: satoshisRemaining
|
||||||
};
|
}
|
||||||
if (result.fiat < 0 || result.satoshis.lt(0)) {
|
if (result.fiat < 0 || result.satoshis.lt(0)) {
|
||||||
logger.warn({tx: tx, totals: totals, result: result},
|
logger.warn({tx: tx, totals: totals, result: result},
|
||||||
'computeSendAmount result < 0, this shouldn\'t happen. ' +
|
"computeSendAmount result < 0, this shouldn't happen. " +
|
||||||
'Maybe timeout arrived after machineSend.');
|
'Maybe timeout arrived after machineSend.')
|
||||||
result.fiat = 0;
|
result.fiat = 0
|
||||||
result.satoshis = new BigNumber(0);
|
result.satoshis = new BigNumber(0)
|
||||||
}
|
}
|
||||||
return result;
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.removeOldPending = function removeOldPending(timeoutMS) {
|
exports.removeOldPending = function removeOldPending (timeoutMS) {
|
||||||
connect(function(cerr, client, done) {
|
connect(function (cerr, client, done) {
|
||||||
if (cerr) return;
|
if (cerr) return
|
||||||
var sql = 'DELETE FROM pending_transactions ' +
|
var sql = 'DELETE FROM pending_transactions ' +
|
||||||
'WHERE incoming AND extract(EPOCH FROM now() - updated) > $1';
|
'WHERE incoming AND extract(EPOCH FROM now() - updated) > $1'
|
||||||
var timeoutS = timeoutMS / 1000;
|
var timeoutS = timeoutMS / 1000
|
||||||
var values = [timeoutS];
|
var values = [timeoutS]
|
||||||
query(client, sql, values, function(err) {
|
query(client, sql, values, function (err) {
|
||||||
done();
|
done()
|
||||||
if (err) logger.error(err);
|
if (err) logger.error(err)
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
};
|
}
|
||||||
|
|
||||||
exports.pendingTxs = function pendingTxs(timeoutMS, cb) {
|
exports.pendingTxs = function pendingTxs (timeoutMS, cb) {
|
||||||
connect(function(cerr, client, done) {
|
connect(function (cerr, client, done) {
|
||||||
if (cerr) return cb(cerr);
|
if (cerr) return cb(cerr)
|
||||||
var sql = 'SELECT * ' +
|
var sql = 'SELECT * ' +
|
||||||
'FROM pending_transactions ' +
|
'FROM pending_transactions ' +
|
||||||
'WHERE (incoming OR extract(EPOCH FROM now() - updated) > $1) ' +
|
'WHERE (incoming OR extract(EPOCH FROM now() - updated) > $1) ' +
|
||||||
'ORDER BY updated ASC';
|
'ORDER BY updated ASC'
|
||||||
var timeoutS = timeoutMS / 1000;
|
var timeoutS = timeoutMS / 1000
|
||||||
var values = [timeoutS];
|
var values = [timeoutS]
|
||||||
query(client, sql, values, function(err, results) {
|
query(client, sql, values, function (err, results) {
|
||||||
done();
|
done()
|
||||||
cb(err, results);
|
cb(err, results)
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
};
|
|
||||||
|
|
||||||
function removePendingTx(client, session, cb) {
|
|
||||||
var sql = 'DELETE FROM pending_transactions ' +
|
|
||||||
'WHERE device_fingerprint=$1 AND session_id=$2';
|
|
||||||
silentQuery(client, sql, [session.fingerprint, session.id], cb);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function insertOutgoingTx(client, session, tx, totals, cb) {
|
function removePendingTx (client, session, cb) {
|
||||||
var sendAmount = computeSendAmount(tx, totals);
|
var sql = 'DELETE FROM pending_transactions ' +
|
||||||
var stage = 'partial_request';
|
'WHERE device_fingerprint=$1 AND session_id=$2'
|
||||||
var authority = tx.fiat ? 'machine' : 'timeout';
|
silentQuery(client, sql, [session.fingerprint, session.id], cb)
|
||||||
var satoshis = sendAmount.satoshis;
|
}
|
||||||
var fiat = sendAmount.fiat;
|
|
||||||
if (satoshis.eq(0)) return cb(null, {satoshis: new BigNumber(0), fiat: 0});
|
function insertOutgoingTx (client, session, tx, totals, cb) {
|
||||||
|
var sendAmount = computeSendAmount(tx, totals)
|
||||||
|
var stage = 'partial_request'
|
||||||
|
var authority = tx.fiat ? 'machine' : 'timeout'
|
||||||
|
var satoshis = sendAmount.satoshis
|
||||||
|
var fiat = sendAmount.fiat
|
||||||
|
if (satoshis.eq(0)) return cb(null, {satoshis: new BigNumber(0), fiat: 0})
|
||||||
|
|
||||||
insertOutgoing(client, session, tx, satoshis, fiat, stage, authority,
|
insertOutgoing(client, session, tx, satoshis, fiat, stage, authority,
|
||||||
function(err) {
|
function (err) {
|
||||||
|
if (err) return cb(err)
|
||||||
if (err) return cb(err);
|
cb(null, {satoshis: satoshis, fiat: fiat})
|
||||||
cb(null, {satoshis: satoshis, fiat: fiat});
|
})
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function insertOutgoingCompleteTx(client, session, tx, cb) {
|
function insertOutgoingCompleteTx (client, session, tx, cb) {
|
||||||
|
|
||||||
// Only relevant for machine source transactions, not timeouts
|
// Only relevant for machine source transactions, not timeouts
|
||||||
if (!tx.fiat) return cb();
|
if (!tx.fiat) return cb()
|
||||||
|
|
||||||
var stage = 'final_request';
|
var stage = 'final_request'
|
||||||
var authority = 'machine';
|
var authority = 'machine'
|
||||||
var satoshis = tx.satoshis;
|
var satoshis = tx.satoshis
|
||||||
var fiat = tx.fiat;
|
var fiat = tx.fiat
|
||||||
insertOutgoing(client, session, tx, satoshis, fiat, stage, authority, cb);
|
insertOutgoing(client, session, tx, satoshis, fiat, stage, authority, cb)
|
||||||
}
|
}
|
||||||
|
|
||||||
function insertIncoming(client, session, tx, satoshis, fiat, stage, authority,
|
function insertIncoming (client, session, tx, satoshis, fiat, stage, authority,
|
||||||
cb) {
|
cb) {
|
||||||
var realSatoshis = satoshis || new BigNumber(0);
|
var realSatoshis = satoshis || new BigNumber(0)
|
||||||
insertTx(client, session, true, tx, realSatoshis, fiat, stage, authority, cb);
|
insertTx(client, session, true, tx, realSatoshis, fiat, stage, authority, cb)
|
||||||
}
|
}
|
||||||
|
|
||||||
function insertOutgoing(client, session, tx, satoshis, fiat, stage, authority,
|
function insertOutgoing (client, session, tx, satoshis, fiat, stage, authority,
|
||||||
cb) {
|
cb) {
|
||||||
insertTx(client, session, false, tx, satoshis, fiat, stage, authority, cb);
|
insertTx(client, session, false, tx, satoshis, fiat, stage, authority, cb)
|
||||||
}
|
}
|
||||||
|
|
||||||
function insertTx(client, session, incoming, tx, satoshis, fiat, stage,
|
function insertTx (client, session, incoming, tx, satoshis, fiat, stage,
|
||||||
authority, cb) {
|
authority, cb) {
|
||||||
var fields = [
|
var fields = [
|
||||||
'session_id',
|
'session_id',
|
||||||
'stage',
|
'stage',
|
||||||
|
|
@ -292,7 +287,7 @@ function insertTx(client, session, incoming, tx, satoshis, fiat, stage,
|
||||||
'fiat',
|
'fiat',
|
||||||
'tx_hash',
|
'tx_hash',
|
||||||
'error'
|
'error'
|
||||||
];
|
]
|
||||||
|
|
||||||
var values = [
|
var values = [
|
||||||
session.id,
|
session.id,
|
||||||
|
|
@ -307,226 +302,227 @@ function insertTx(client, session, incoming, tx, satoshis, fiat, stage,
|
||||||
fiat,
|
fiat,
|
||||||
tx.txHash,
|
tx.txHash,
|
||||||
tx.error
|
tx.error
|
||||||
];
|
]
|
||||||
|
|
||||||
query(client, getInsertQuery('transactions', fields, true), values,
|
query(client, getInsertQuery('transactions', fields, true), values,
|
||||||
function(err, results) {
|
function (err, results) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err)
|
||||||
cb(null, results.rows[0].id);
|
cb(null, results.rows[0].id)
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function refreshPendingTx(client, session, cb) {
|
function refreshPendingTx (client, session, cb) {
|
||||||
var sql = 'UPDATE pending_transactions SET updated=now() ' +
|
var sql = 'UPDATE pending_transactions SET updated=now() ' +
|
||||||
'WHERE device_fingerprint=$1 AND session_id=$2';
|
'WHERE device_fingerprint=$1 AND session_id=$2'
|
||||||
connect(function(cerr, client, done) {
|
connect(function (cerr, client, done) {
|
||||||
if (cerr) return cb(cerr);
|
if (cerr) return cb(cerr)
|
||||||
query(client, sql, [session.fingerprint, session.id], function(err) {
|
query(client, sql, [session.fingerprint, session.id], function (err) {
|
||||||
done(err);
|
done(err)
|
||||||
cb(err);
|
cb(err)
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function addPendingTx(client, session, incoming, currencyCode, cryptoCode, toAddress,
|
function addPendingTx (client, session, incoming, currencyCode, cryptoCode, toAddress,
|
||||||
satoshis, cb) {
|
satoshis, cb) {
|
||||||
var fields = ['device_fingerprint', 'session_id', 'incoming',
|
var fields = ['device_fingerprint', 'session_id', 'incoming',
|
||||||
'currency_code', 'crypto_code', 'to_address', 'satoshis'];
|
'currency_code', 'crypto_code', 'to_address', 'satoshis']
|
||||||
var sql = getInsertQuery('pending_transactions', fields, false);
|
var sql = getInsertQuery('pending_transactions', fields, false)
|
||||||
var values = [session.fingerprint, session.id, incoming, currencyCode,
|
var values = [session.fingerprint, session.id, incoming, currencyCode,
|
||||||
cryptoCode, toAddress, satoshis];
|
cryptoCode, toAddress, satoshis]
|
||||||
query(client, sql, values, function(err) {
|
query(client, sql, values, function (err) {
|
||||||
cb(err);
|
cb(err)
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function buildOutgoingTx(client, session, tx, cb) {
|
function buildOutgoingTx (client, session, tx, cb) {
|
||||||
async.waterfall([
|
async.waterfall([
|
||||||
async.apply(billsAndTxs, client, session),
|
async.apply(billsAndTxs, client, session),
|
||||||
async.apply(insertOutgoingTx, client, session, tx)
|
async.apply(insertOutgoingTx, client, session, tx)
|
||||||
], cb);
|
], cb)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calling function should only send bitcoins if result.satoshisToSend > 0
|
// Calling function should only send bitcoins if result.satoshisToSend > 0
|
||||||
exports.addOutgoingTx = function addOutgoingTx(session, tx, cb) {
|
exports.addOutgoingTx = function addOutgoingTx (session, tx, cb) {
|
||||||
connect(function(cerr, client, done) {
|
connect(function (cerr, client, done) {
|
||||||
if (cerr) return cb(cerr);
|
if (cerr) return cb(cerr)
|
||||||
async.series([
|
async.series([
|
||||||
async.apply(silentQuery, client, 'BEGIN'),
|
async.apply(silentQuery, client, 'BEGIN'),
|
||||||
async.apply(silentQuery, client, 'LOCK TABLE transactions'),
|
async.apply(silentQuery, client, 'LOCK TABLE transactions'),
|
||||||
async.apply(insertOutgoingCompleteTx, client, session, tx),
|
async.apply(insertOutgoingCompleteTx, client, session, tx),
|
||||||
async.apply(removePendingTx, client, session),
|
async.apply(removePendingTx, client, session),
|
||||||
async.apply(buildOutgoingTx, client, session, tx)
|
async.apply(buildOutgoingTx, client, session, tx)
|
||||||
], function(err, results) {
|
], function (err, results) {
|
||||||
if (err) {
|
if (err) {
|
||||||
rollback(client, done);
|
rollback(client, done)
|
||||||
return cb(err);
|
return cb(err)
|
||||||
}
|
}
|
||||||
silentQuery(client, 'COMMIT', [], function() {
|
silentQuery(client, 'COMMIT', [], function () {
|
||||||
done();
|
done()
|
||||||
var toSend = results[4];
|
var toSend = results[4]
|
||||||
cb(null, toSend);
|
cb(null, toSend)
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
};
|
}
|
||||||
|
|
||||||
exports.sentCoins = function sentCoins(session, tx, authority, toSend, fee,
|
exports.sentCoins = function sentCoins (session, tx, authority, toSend, fee,
|
||||||
error, txHash) {
|
error, txHash) {
|
||||||
connect(function(cerr, client, done) {
|
connect(function (cerr, client, done) {
|
||||||
if (cerr) return logger.error(cerr);
|
if (cerr) return logger.error(cerr)
|
||||||
|
|
||||||
var newTx = _.clone(tx);
|
var newTx = _.clone(tx)
|
||||||
newTx.txHash = txHash;
|
newTx.txHash = txHash
|
||||||
newTx.error = error;
|
newTx.error = error
|
||||||
insertOutgoing(client, session, newTx, toSend.satoshis, toSend.fiat,
|
insertOutgoing(client, session, newTx, toSend.satoshis, toSend.fiat,
|
||||||
'partial_send', authority, function(err) {
|
'partial_send', authority, function (err) {
|
||||||
done();
|
done()
|
||||||
if (err) logger.error(err);
|
if (err) logger.error(err)
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
};
|
}
|
||||||
|
|
||||||
function ensureNotFinal(client, session, cb) {
|
function ensureNotFinal (client, session, cb) {
|
||||||
var sql = 'SELECT id FROM transactions ' +
|
var sql = 'SELECT id FROM transactions ' +
|
||||||
'WHERE device_fingerprint=$1 AND session_id=$2 AND incoming=$3 ' +
|
'WHERE device_fingerprint=$1 AND session_id=$2 AND incoming=$3 ' +
|
||||||
'AND stage=$4' +
|
'AND stage=$4' +
|
||||||
'LIMIT 1';
|
'LIMIT 1'
|
||||||
var values = [session.fingerprint, session.id, false, 'final_request'];
|
var values = [session.fingerprint, session.id, false, 'final_request']
|
||||||
|
|
||||||
client.query(sql, values, function(err, results) {
|
client.query(sql, values, function (err, results) {
|
||||||
var error;
|
var error
|
||||||
if (err) return cb(err);
|
if (err) return cb(err)
|
||||||
if (results.rows.length > 0) {
|
if (results.rows.length > 0) {
|
||||||
error = new Error('Final request already exists');
|
error = new Error('Final request already exists')
|
||||||
error.name = 'staleBill';
|
error.name = 'staleBill'
|
||||||
error.severity = 'low';
|
error.severity = 'low'
|
||||||
return cb(error);
|
return cb(error)
|
||||||
}
|
}
|
||||||
cb();
|
cb()
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.addOutgoingPending = function addOutgoingPending(session, currencyCode,
|
exports.addOutgoingPending = function addOutgoingPending (session, currencyCode,
|
||||||
cryptoCode, toAddress, cb) {
|
cryptoCode, toAddress, cb) {
|
||||||
connect(function(cerr, client, done) {
|
connect(function (cerr, client, done) {
|
||||||
if (cerr) return cb(cerr);
|
if (cerr) return cb(cerr)
|
||||||
|
|
||||||
async.series([
|
async.series([
|
||||||
async.apply(silentQuery, client, 'BEGIN', null),
|
async.apply(silentQuery, client, 'BEGIN', null),
|
||||||
async.apply(ensureNotFinal, client, session),
|
async.apply(ensureNotFinal, client, session),
|
||||||
async.apply(addPendingTx, client, session, false, currencyCode, cryptoCode, toAddress,
|
async.apply(addPendingTx, client, session, false, currencyCode, cryptoCode, toAddress,
|
||||||
0)
|
0)
|
||||||
], function(err) {
|
], function (err) {
|
||||||
if (err) {
|
if (err) {
|
||||||
return rollback(client, function(rberr) {
|
return rollback(client, function (rberr) {
|
||||||
done(rberr);
|
done(rberr)
|
||||||
|
|
||||||
if (isUniqueViolation(err)) {
|
if (isUniqueViolation(err)) {
|
||||||
// Pending tx exists, refresh it.
|
// Pending tx exists, refresh it.
|
||||||
return refreshPendingTx(client, session, cb);
|
return refreshPendingTx(client, session, cb)
|
||||||
}
|
}
|
||||||
if (err.name === 'staleBill') {
|
if (err.name === 'staleBill') {
|
||||||
logger.info('Received a bill insert after send coins request');
|
logger.info('Received a bill insert after send coins request')
|
||||||
return cb();
|
return cb()
|
||||||
}
|
}
|
||||||
logger.error(err);
|
logger.error(err)
|
||||||
return cb(err);
|
return cb(err)
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
silentQuery(client, 'COMMIT', null, function() {
|
silentQuery(client, 'COMMIT', null, function () {
|
||||||
done();
|
done()
|
||||||
cb();
|
cb()
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
};
|
}
|
||||||
|
|
||||||
exports.addInitialIncoming = function addInitialIncoming(session, tx, cb) {
|
exports.addInitialIncoming = function addInitialIncoming (session, tx, cb) {
|
||||||
connect(function(cerr, client, done) {
|
connect(function (cerr, client, done) {
|
||||||
if (cerr) return cb(cerr);
|
if (cerr) return cb(cerr)
|
||||||
async.series([
|
async.series([
|
||||||
async.apply(silentQuery, client, 'BEGIN', null),
|
async.apply(silentQuery, client, 'BEGIN', null),
|
||||||
async.apply(addPendingTx, client, session, true, tx.currencyCode,
|
async.apply(addPendingTx, client, session, true, tx.currencyCode,
|
||||||
tx.cryptoCode, tx.toAddress, tx.satoshis),
|
tx.cryptoCode, tx.toAddress, tx.satoshis),
|
||||||
async.apply(insertIncoming, client, session, tx, tx.satoshis, tx.fiat,
|
async.apply(insertIncoming, client, session, tx, tx.satoshis, tx.fiat,
|
||||||
'initial_request', 'pending')
|
'initial_request', 'pending')
|
||||||
], function(err) {
|
], function (err) {
|
||||||
if (err) {
|
if (err) {
|
||||||
rollback(client, done);
|
rollback(client, done)
|
||||||
return cb(err);
|
return cb(err)
|
||||||
}
|
}
|
||||||
silentQuery(client, 'COMMIT', null, function() {
|
silentQuery(client, 'COMMIT', null, function () {
|
||||||
done();
|
done()
|
||||||
cb();
|
cb()
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
};
|
}
|
||||||
|
|
||||||
function insertDispense(client, session, tx, cartridges, transactionId, cb) {
|
function insertDispense (client, session, tx, cartridges, transactionId, cb) {
|
||||||
var fields = [
|
var fields = [
|
||||||
'device_fingerprint', 'transaction_id',
|
'device_fingerprint', 'transaction_id',
|
||||||
'dispense1', 'reject1', 'count1',
|
'dispense1', 'reject1', 'count1',
|
||||||
'dispense2', 'reject2', 'count2',
|
'dispense2', 'reject2', 'count2',
|
||||||
'refill', 'error'
|
'refill', 'error'
|
||||||
];
|
]
|
||||||
|
|
||||||
var sql = getInsertQuery('dispenses', fields, true);
|
var sql = getInsertQuery('dispenses', fields, true)
|
||||||
|
|
||||||
var dispense1 = tx.bills[0].actualDispense;
|
var dispense1 = tx.bills[0].actualDispense
|
||||||
var dispense2 = tx.bills[1].actualDispense;
|
var dispense2 = tx.bills[1].actualDispense
|
||||||
var reject1 = tx.bills[0].rejected;
|
var reject1 = tx.bills[0].rejected
|
||||||
var reject2 = tx.bills[1].rejected;
|
var reject2 = tx.bills[1].rejected
|
||||||
var count1 = cartridges[0].count;
|
var count1 = cartridges[0].count
|
||||||
var count2 = cartridges[1].count;
|
var count2 = cartridges[1].count
|
||||||
var values = [
|
var values = [
|
||||||
session.fingerprint, transactionId,
|
session.fingerprint, transactionId,
|
||||||
dispense1, reject1, count1, dispense2, reject2, count2,
|
dispense1, reject1, count1, dispense2, reject2, count2,
|
||||||
false, tx.error
|
false, tx.error
|
||||||
];
|
]
|
||||||
client.query(sql, values, cb);
|
client.query(sql, values, cb)
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.addDispense = function addDispense(session, tx, cartridges) {
|
exports.addDispense = function addDispense (session, tx, cartridges) {
|
||||||
connect(function(cerr, client, done) {
|
connect(function (cerr, client, done) {
|
||||||
if (cerr) return;
|
if (cerr) return
|
||||||
|
|
||||||
async.waterfall([
|
async.waterfall([
|
||||||
async.apply(insertIncoming, client, session, tx, 0, tx.fiat,
|
async.apply(insertIncoming, client, session, tx, 0, tx.fiat,
|
||||||
'dispense', 'authorized'),
|
'dispense', 'authorized'),
|
||||||
async.apply(insertDispense, client, session, tx, cartridges)
|
async.apply(insertDispense, client, session, tx, cartridges)
|
||||||
], function(err) {
|
], function (err) {
|
||||||
done();
|
done()
|
||||||
if (err) logger.error(err);
|
if (err) logger.error(err)
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
};
|
}
|
||||||
|
|
||||||
exports.cartridgeCounts = function cartridgeCounts(session, cb) {
|
exports.cartridgeCounts = function cartridgeCounts (session, cb) {
|
||||||
connect(function(cerr, client, done) {
|
connect(function (cerr, client, done) {
|
||||||
if (cerr) return cb(cerr);
|
if (cerr) return cb(cerr)
|
||||||
var sql = 'SELECT id, count1, count2 FROM dispenses ' +
|
var sql = 'SELECT id, count1, count2 FROM dispenses ' +
|
||||||
'WHERE device_fingerprint=$1 AND refill=$2 ' +
|
'WHERE device_fingerprint=$1 AND refill=$2 ' +
|
||||||
'ORDER BY id DESC LIMIT 1';
|
'ORDER BY id DESC LIMIT 1'
|
||||||
query(client, sql, [session.fingerprint, true], function(err, results) {
|
query(client, sql, [session.fingerprint, true], function (err, results) {
|
||||||
done();
|
done()
|
||||||
if (err) return cb(err);
|
if (err) return cb(err)
|
||||||
var counts = results.rows.length === 1 ?
|
var counts = results.rows.length === 1
|
||||||
[results.rows[0].count1, results.rows[0].count2] :
|
? [results.rows[0].count1, results.rows[0].count2]
|
||||||
[0, 0];
|
: [0, 0]
|
||||||
cb(null, {id: results.rows[0].id, counts: counts});
|
|
||||||
});
|
cb(null, {id: results.rows[0].id, counts: counts})
|
||||||
});
|
})
|
||||||
};
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
exports.init('postgres://lamassu:lamassu@localhost/lamassu');
|
exports.init('postgres://lamassu:lamassu@localhost/lamassu')
|
||||||
connect(function(err, client, done) {
|
connect(function(err, client, done) {
|
||||||
var sql = 'select * from transactions where id=$1';
|
var sql = 'select * from transactions where id=$1'
|
||||||
query(client, sql, [130], function(_err, results) {
|
query(client, sql, [130], function(_err, results) {
|
||||||
done();
|
done()
|
||||||
console.dir(results.rows[0]);
|
console.dir(results.rows[0])
|
||||||
});
|
})
|
||||||
});
|
})
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue