From c4ab8c27eff7c703af3305132b71bee8dfbe7be3 Mon Sep 17 00:00:00 2001 From: padreug Date: Wed, 18 Jun 2025 15:33:51 +0200 Subject: [PATCH] Enhance SSH tunnel functionality in transaction processing: implement subprocess-based SSH tunnel as a fallback, improve error handling, and add detailed connection testing with step-by-step reporting. Update API to utilize new testing method and enhance UI to display detailed results. --- static/js/index.js | 27 ++++- transaction_processor.py | 251 ++++++++++++++++++++++++++++++++++----- views_api.py | 24 ++-- 3 files changed, 256 insertions(+), 46 deletions(-) diff --git a/static/js/index.js b/static/js/index.js index 0e02e66..027ebb0 100644 --- a/static/js/index.js +++ b/static/js/index.js @@ -419,11 +419,36 @@ window.app = Vue.createApp({ this.g.user.wallets[0].adminkey ) + // Show detailed results in a dialog + const stepsList = data.steps ? data.steps.join('\n') : 'No detailed steps available' + + let dialogContent = `Connection Test Results

` + + if (data.ssh_tunnel_used) { + dialogContent += `SSH Tunnel: ${data.ssh_tunnel_success ? '✅ Success' : '❌ Failed'}
` + } + + dialogContent += `Database: ${data.database_connection_success ? '✅ Success' : '❌ Failed'}

` + dialogContent += `Detailed Steps:
` + dialogContent += stepsList.replace(/\n/g, '
') + + this.$q.dialog({ + title: data.success ? 'Connection Test Passed' : 'Connection Test Failed', + message: dialogContent, + html: true, + ok: { + color: data.success ? 'positive' : 'negative', + label: 'Close' + } + }) + + // Also show a brief notification this.$q.notify({ type: data.success ? 'positive' : 'negative', message: data.message, - timeout: 5000 + timeout: 3000 }) + } catch (error) { LNbits.utils.notifyApiError(error) } finally { diff --git a/transaction_processor.py b/transaction_processor.py index dea8165..2cb1bb2 100644 --- a/transaction_processor.py +++ b/transaction_processor.py @@ -10,12 +10,16 @@ import threading import time try: - import paramiko - from sshtunnel import SSHTunnelForwarder + import asyncssh SSH_AVAILABLE = True except ImportError: - SSH_AVAILABLE = False - logger.warning("SSH tunnel support not available. Install paramiko and sshtunnel: pip install paramiko sshtunnel") + try: + # Fallback to subprocess-based SSH tunnel + import subprocess + SSH_AVAILABLE = True + except ImportError: + SSH_AVAILABLE = False + logger.warning("SSH tunnel support not available") from lnbits.core.services import create_invoice, pay_invoice from lnbits.settings import settings @@ -37,7 +41,8 @@ class LamassuTransactionProcessor: def __init__(self): self.last_check_time = None self.processed_transaction_ids = set() - self.ssh_tunnel = None + self.ssh_process = None + self.ssh_key_path = None async def get_db_config(self) -> Optional[Dict[str, Any]]: """Get database configuration from the database""" @@ -71,35 +76,88 @@ class LamassuTransactionProcessor: return db_config if not SSH_AVAILABLE: - logger.error("SSH tunnel requested but paramiko/sshtunnel not available") + logger.error("SSH tunnel requested but SSH libraries not available") return None try: # Close existing tunnel if any self.close_ssh_tunnel() - ssh_config = { - "ssh_address_or_host": (db_config["ssh_host"], db_config["ssh_port"]), - "remote_bind_address": (db_config["host"], db_config["port"]), - "ssh_username": db_config["ssh_username"], - "local_bind_address": ("127.0.0.1",) # Let sshtunnel choose local port - } + # Use subprocess-based SSH tunnel as fallback + return self._setup_subprocess_ssh_tunnel(db_config) - # Add authentication method - if db_config.get("ssh_private_key"): - # Use private key authentication - ssh_config["ssh_pkey"] = db_config["ssh_private_key"] - elif db_config.get("ssh_password"): - # Use password authentication - ssh_config["ssh_password"] = db_config["ssh_password"] - else: - logger.error("SSH tunnel requires either private key or password") + except Exception as e: + logger.error(f"Failed to setup SSH tunnel: {e}") + self.close_ssh_tunnel() + return None + + def _setup_subprocess_ssh_tunnel(self, db_config: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Setup SSH tunnel using subprocess (compatible with all environments)""" + import subprocess + import socket + + # Find an available local port + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + local_port = s.getsockname()[1] + + # Build SSH command + ssh_cmd = [ + "ssh", + "-N", # Don't execute remote command + "-L", f"{local_port}:{db_config['host']}:{db_config['port']}", + f"{db_config['ssh_username']}@{db_config['ssh_host']}", + "-p", str(db_config['ssh_port']), + "-o", "StrictHostKeyChecking=no", + "-o", "UserKnownHostsFile=/dev/null", + "-o", "LogLevel=ERROR" + ] + + # Add authentication method + if db_config.get("ssh_password"): + # Check if sshpass is available for password authentication + try: + import subprocess + subprocess.run(["which", "sshpass"], check=True, capture_output=True) + ssh_cmd = ["sshpass", "-p", db_config["ssh_password"]] + ssh_cmd + except subprocess.CalledProcessError: + logger.error("Password authentication requires 'sshpass' tool which is not installed. Please use SSH key authentication instead.") return None + elif db_config.get("ssh_private_key"): + # Write private key to temporary file + import tempfile + import os + key_fd, key_path = tempfile.mkstemp(suffix='.pem') + try: + with os.fdopen(key_fd, 'w') as f: + f.write(db_config["ssh_private_key"]) + os.chmod(key_path, 0o600) + ssh_cmd.extend(["-i", key_path]) + self.ssh_key_path = key_path # Store for cleanup + except Exception as e: + os.unlink(key_path) + raise e + else: + logger.error("SSH tunnel requires either private key or password") + return None + + # Start SSH tunnel process + try: + self.ssh_process = subprocess.Popen( + ssh_cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL + ) - self.ssh_tunnel = SSHTunnelForwarder(**ssh_config) - self.ssh_tunnel.start() + # Wait a moment for tunnel to establish + import time + time.sleep(2) + + # Check if process is still running + if self.ssh_process.poll() is not None: + raise Exception("SSH tunnel process terminated immediately") - local_port = self.ssh_tunnel.local_bind_port logger.info(f"SSH tunnel established: localhost:{local_port} -> {db_config['ssh_host']}:{db_config['ssh_port']} -> {db_config['host']}:{db_config['port']}") # Return modified config to connect through tunnel @@ -109,21 +167,152 @@ class LamassuTransactionProcessor: return tunnel_config + except FileNotFoundError: + logger.error("SSH command not found. SSH tunneling requires ssh (and sshpass for password auth) to be installed on the system.") + return None except Exception as e: - logger.error(f"Failed to setup SSH tunnel: {e}") - self.close_ssh_tunnel() + logger.error(f"Failed to establish SSH tunnel: {e}") return None def close_ssh_tunnel(self): """Close SSH tunnel if active""" - if self.ssh_tunnel: + # Close subprocess-based tunnel + if hasattr(self, 'ssh_process') and self.ssh_process: try: - self.ssh_tunnel.stop() - logger.info("SSH tunnel closed") + self.ssh_process.terminate() + self.ssh_process.wait(timeout=5) + logger.info("SSH tunnel process closed") except Exception as e: - logger.warning(f"Error closing SSH tunnel: {e}") + logger.warning(f"Error closing SSH tunnel process: {e}") + try: + self.ssh_process.kill() + except: + pass finally: - self.ssh_tunnel = None + self.ssh_process = None + + # Clean up temporary key file if exists + if hasattr(self, 'ssh_key_path') and self.ssh_key_path: + try: + import os + os.unlink(self.ssh_key_path) + logger.info("SSH key file cleaned up") + except Exception as e: + logger.warning(f"Error cleaning up SSH key file: {e}") + finally: + self.ssh_key_path = None + + async def test_connection_detailed(self) -> Dict[str, Any]: + """Test connection with detailed step-by-step reporting""" + result = { + "success": False, + "message": "", + "steps": [], + "ssh_tunnel_used": False, + "ssh_tunnel_success": False, + "database_connection_success": False, + "config_id": None + } + + try: + # Step 1: Get configuration + result["steps"].append("Retrieving database configuration...") + db_config = await self.get_db_config() + if not db_config: + result["message"] = "No active Lamassu database configuration found" + result["steps"].append("❌ No configuration found") + return result + + result["config_id"] = db_config["config_id"] + result["steps"].append("✅ Configuration retrieved") + + # Step 2: SSH Tunnel setup (if required) + if db_config.get("use_ssh_tunnel"): + result["ssh_tunnel_used"] = True + result["steps"].append("Setting up SSH tunnel...") + + if not SSH_AVAILABLE: + result["message"] = "SSH tunnel required but SSH support not available" + result["steps"].append("❌ SSH support missing (requires ssh command line tool)") + return result + + connection_config = self.setup_ssh_tunnel(db_config) + if not connection_config: + result["message"] = "Failed to establish SSH tunnel" + result["steps"].append("❌ SSH tunnel failed - check SSH credentials and server accessibility") + return result + + result["ssh_tunnel_success"] = True + result["steps"].append(f"✅ SSH tunnel established to {db_config['ssh_host']}:{db_config['ssh_port']}") + else: + connection_config = db_config + result["steps"].append("ℹ️ Direct database connection (no SSH tunnel)") + + # Step 3: Database connection + result["steps"].append("Connecting to Postgres database...") + connection = await asyncpg.connect( + host=connection_config["host"], + port=connection_config["port"], + database=connection_config["database"], + user=connection_config["user"], + password=connection_config["password"], + timeout=30 + ) + + result["database_connection_success"] = True + result["steps"].append("✅ Database connection successful") + + # Step 4: Test query + result["steps"].append("Testing database query...") + test_query = "SELECT 1 as test" + await connection.fetchval(test_query) + result["steps"].append("✅ Database query test successful") + + # Step 5: Test actual table access + result["steps"].append("Testing access to cash_out_txs table...") + table_query = "SELECT COUNT(*) FROM cash_out_txs LIMIT 1" + count = await connection.fetchval(table_query) + result["steps"].append(f"✅ Table access successful (found {count} transactions)") + + await connection.close() + result["success"] = True + result["message"] = "All connection tests passed successfully" + + except asyncpg.InvalidCatalogNameError: + result["message"] = "Database not found - check database name" + result["steps"].append("❌ Database does not exist") + except asyncpg.InvalidPasswordError: + result["message"] = "Authentication failed - check username/password" + result["steps"].append("❌ Invalid database credentials") + except asyncpg.CannotConnectNowError: + result["message"] = "Database server not accepting connections" + result["steps"].append("❌ Database server unavailable") + except asyncpg.ConnectionDoesNotExistError: + result["message"] = "Cannot connect to database server" + result["steps"].append("❌ Cannot reach database server") + except Exception as e: + error_msg = str(e) + if "cash_out_txs" in error_msg: + result["message"] = "Connected to database but cash_out_txs table not found" + result["steps"].append("❌ Lamassu transaction table missing") + elif "paramiko" in error_msg.lower() or "ssh" in error_msg.lower(): + result["message"] = f"SSH tunnel error: {error_msg}" + result["steps"].append(f"❌ SSH error: {error_msg}") + else: + result["message"] = f"Connection test failed: {error_msg}" + result["steps"].append(f"❌ Unexpected error: {error_msg}") + finally: + # Always cleanup SSH tunnel + self.close_ssh_tunnel() + + # Update test result in database + if result["config_id"]: + try: + await update_config_test_result(result["config_id"], result["success"]) + except Exception as e: + logger.warning(f"Could not update config test result: {e}") + + return result async def connect_to_lamassu_db(self) -> Optional[asyncpg.Connection]: """Establish connection to Lamassu Postgres database""" diff --git a/views_api.py b/views_api.py index a87e5eb..de1b49b 100644 --- a/views_api.py +++ b/views_api.py @@ -325,26 +325,22 @@ async def api_update_deposit_status( async def api_test_database_connection( wallet: WalletTypeInfo = Depends(require_admin_key), ): - """Test connection to Lamassu database""" + """Test connection to Lamassu database with detailed reporting""" try: from .transaction_processor import transaction_processor - connection = await transaction_processor.connect_to_lamassu_db() - if connection: - await connection.close() - return { - "success": True, - "message": "Successfully connected to Lamassu database" - } - else: - return { - "success": False, - "message": "Failed to connect to Lamassu database. Check configuration." - } + # Use the detailed test method + result = await transaction_processor.test_connection_detailed() + return result + except Exception as e: return { "success": False, - "message": f"Database connection error: {str(e)}" + "message": f"Test connection error: {str(e)}", + "steps": [f"❌ Unexpected error: {str(e)}"], + "ssh_tunnel_used": False, + "ssh_tunnel_success": False, + "database_connection_success": False }