refactor(psql): psql interface cleanup

This commit is contained in:
Damian Mee 2014-09-09 06:58:09 +02:00
parent 88c17120ab
commit be6900a7a9
2 changed files with 85 additions and 53 deletions

View file

@ -6,7 +6,7 @@ var express = require('express');
var LamassuConfig = require('lamassu-config'); var LamassuConfig = require('lamassu-config');
var routes = require('./routes'); var routes = require('./routes');
var plugins = require('./plugins'); var plugins = require('./plugins');
var PostgresqlInterface = require('./postgresql_interface'); var db = require('./postgresql_interface');
var logger = require('./logger'); var logger = require('./logger');
module.exports = function (options) { module.exports = function (options) {
@ -14,15 +14,14 @@ module.exports = function (options) {
var connectionString; var connectionString;
var server; var server;
var lamassuConfig; var lamassuConfig;
var db;
connectionString = options.postgres || connectionString = options.postgres ||
'postgres://lamassu:lamassu@localhost/lamassu'; 'postgres://lamassu:lamassu@localhost/lamassu';
lamassuConfig = new LamassuConfig(connectionString); lamassuConfig = new LamassuConfig(connectionString);
db = new PostgresqlInterface(connectionString);
plugins.init(db);
db.init(connectionString);
plugins.init(db);
lamassuConfig.load(function (err, config) { lamassuConfig.load(function (err, config) {
if (err) { if (err) {

View file

@ -7,82 +7,115 @@ var PG_ERRORS = {
23505: 'uniqueViolation' 23505: 'uniqueViolation'
}; };
var PostgresqlInterface = function (conString) { var client = null;
exports.init = function init(conString) {
if (client !== null) return;
if (!conString) { if (!conString) {
throw new Error('Postgres connection string is required'); throw new Error('Postgres connection string is required');
} }
this.client = new pg.Client(conString); client = new pg.Client(conString);
this.client.on('error', function (err) { logger.error(err); }); client.on('error', function (err) { logger.error(err); });
this.client.connect(); client.connect();
}; };
PostgresqlInterface.factory = function factory(conString) { return new PostgresqlInterface(conString); };
module.exports = PostgresqlInterface;
PostgresqlInterface.prototype.recordBill =
function recordBill(deviceFingerprint, rec, cb) {
this.client.query('INSERT INTO bills (device_fingerprint, denomination, currency_code, ' + exports.recordBill = function recordBill(deviceFingerprint, rec, cb) {
client.query('INSERT INTO bills (device_fingerprint, denomination, currency_code, ' +
'satoshis, to_address, transaction_id, device_time) ' + 'satoshis, to_address, transaction_id, device_time) ' +
'VALUES ($1, $2, $3, $4, $5, $6, $7)', 'VALUES ($1, $2, $3, $4, $5, $6, $7)',
[deviceFingerprint, rec.fiat, rec.currency, rec.satoshis, rec.toAddress, rec.txId, rec.deviceTime], [deviceFingerprint, rec.fiat, rec.currency, rec.satoshis, rec.toAddress, rec.txId, rec.deviceTime],
cb); cb);
}; };
PostgresqlInterface.prototype.recordDeviceEvent = exports.recordDeviceEvent = function recordDeviceEvent(deviceFingerprint, event, cb) {
function recordBillValidatorEvent(deviceFingerprint, event, cb) { client.query('INSERT INTO device_events (device_fingerprint, event_type, note, device_time)' +
this.client.query('INSERT INTO device_events (device_fingerprint, event_type, note, device_time)' +
'VALUES ($1, $2, $3, $4)', 'VALUES ($1, $2, $3, $4)',
[deviceFingerprint, event.eventType, event.note, event.deviceTime], [deviceFingerprint, event.eventType, event.note, event.deviceTime],
cb); cb);
}; };
PostgresqlInterface.prototype.summonTransaction = // each received "partial transaction" contains sum of all previous bills
function summonTransaction(deviceFingerprint, tx, cb) { // (vel. no need to do any server-side summing)
// First do an INSERT function updatePartialTransaction(values, cb) {
var values2 = [
values[4],
values[6],
values[0],
'partial'
];
client.query('UPDATE transactions SET ' +
'satoshis=$1, ' +
'fiat=$2 ' +
'WHERE id=$3 AND status=$4',
values2,
cb);
}
function fetchTransaction(txId, cb) {
client.query('SELECT status, tx_hash, error FROM transactions WHERE id=$1',
[txId],
function (err, results) {
if (err) return cb(err);
// This should never happen, since we already checked for existence
if (results.rows.length === 0) return cb(new Error('Couldn\'t find transaction.'));
var result = results.rows[0];
cb(null, {txHash: result.tx_hash, err: result.error, status: result.status});
});
}
exports.summonTransaction = function summonTransaction(deviceFingerprint, tx, cb) {
var status = tx.status || 'pending';
var values = [
tx.txId,
status,
deviceFingerprint,
tx.toAddress,
tx.satoshis,
tx.currencyCode,
tx.fiat
];
// First attampt an INSERT
// If it worked, go ahead with transaction // If it worked, go ahead with transaction
// If duplicate, fetch status and return // If duplicate and partial update with new bills
var self = this; // If duplicate, but not partial fetch status and return
this.client.query('INSERT INTO transactions (id, status, device_fingerprint, ' +
'to_address, satoshis, currency_code, fiat) ' + client.query('INSERT INTO transactions ' +
'VALUES ($1, $2, $3, $4, $5, $6, $7)', [tx.txId, 'pending', deviceFingerprint, '(id, status, device_fingerprint, to_address, satoshis, currency_code, fiat) ' +
tx.toAddress, tx.satoshis, tx.currencyCode, tx.fiat], 'VALUES ($1, $2, $3, $4, $5, $6, $7)',
function (err) { values,
if (err && PG_ERRORS[err.code] === 'uniqueViolation') function(err) {
return self._fetchTransaction(tx.txId, cb); if (err) {
if (err) return cb(err); if (PG_ERRORS[err.code] === 'uniqueViolation') {
cb(); if (status === 'partial')
}); return updatePartialTransaction(values, cb);
return fetchTransaction(tx.txId, cb);
}
return cb(err);
}
cb();
});
}; };
PostgresqlInterface.prototype.reportTransactionError =
function reportTransactionError(tx, errString, status) { exports.reportTransactionError = function reportTransactionError(tx, errString, status) {
this.client.query('UPDATE transactions SET status=$1, error=$2 WHERE id=$3', client.query('UPDATE transactions SET status=$1, error=$2 WHERE id=$3',
[status, errString, tx.txId]); [status, errString, tx.txId]);
}; };
PostgresqlInterface.prototype.completeTransaction = exports.completeTransaction = function completeTransaction(tx, txHash) {
function completeTransaction(tx, txHash) {
if (txHash) if (txHash)
this.client.query('UPDATE transactions SET tx_hash=$1, status=$2, completed=now() WHERE id=$3', client.query('UPDATE transactions SET tx_hash=$1, status=$2, completed=now() WHERE id=$3',
[txHash, 'completed', tx.txId]); [txHash, 'completed', tx.txId]);
else else
this.client.query('UPDATE transactions SET status=$1, error=$2 WHERE id=$3', client.query('UPDATE transactions SET status=$1, error=$2 WHERE id=$3',
['failed', 'No txHash received', tx.txId]); ['failed', 'No txHash received', tx.txId]);
}; };
PostgresqlInterface.prototype._fetchTransaction =
function _fetchTransaction(txId, cb) {
this.client.query('SELECT status, tx_hash, error FROM transactions WHERE id=$1',
[txId], function (err, results) {
if (err) return cb(err);
// This should never happen, since we already checked for existence
if (results.rows.length === 0) return cb(new Error('Couldn\'t find transaction.'));
var result = results.rows[0];
cb(null, {txHash: result.tx_hash, err: result.error, status: result.status});
});
};