From 3c6262b30913565ed27f6af367e416b9cf855221 Mon Sep 17 00:00:00 2001 From: padreug Date: Wed, 18 Jun 2025 15:37:00 +0200 Subject: [PATCH] Refactor transaction processing to utilize SSH for database queries: implement execute_ssh_query method, enhance error handling, and update connection testing steps. Modify API to fetch transactions via SSH configuration, improving security and reliability. --- transaction_processor.py | 242 ++++++++++++++++++++++----------------- views_api.py | 38 +++--- 2 files changed, 151 insertions(+), 129 deletions(-) diff --git a/transaction_processor.py b/transaction_processor.py index 2cb1bb2..28bf167 100644 --- a/transaction_processor.py +++ b/transaction_processor.py @@ -248,62 +248,49 @@ class LamassuTransactionProcessor: connection_config = db_config result["steps"].append("ℹ️ Direct database connection (no SSH tunnel)") - # Step 3: Database connection - result["steps"].append("Connecting to Postgres database...") - connection = await asyncpg.connect( - host=connection_config["host"], - port=connection_config["port"], - database=connection_config["database"], - user=connection_config["user"], - password=connection_config["password"], - timeout=30 - ) + # Step 3: Test SSH-based database query + result["steps"].append("Testing database query via SSH...") + test_query = "SELECT 1 as test" + test_results = await self.execute_ssh_query(db_config, test_query) + + if not test_results: + result["message"] = "SSH connection succeeded but database query failed" + result["steps"].append("❌ Database query test failed") + return result result["database_connection_success"] = True - result["steps"].append("✅ Database connection successful") - - # Step 4: Test query - result["steps"].append("Testing database query...") - test_query = "SELECT 1 as test" - await connection.fetchval(test_query) result["steps"].append("✅ Database query test successful") - # Step 5: Test actual table access + # Step 4: Test actual table access result["steps"].append("Testing access to cash_out_txs table...") - table_query = "SELECT COUNT(*) FROM cash_out_txs LIMIT 1" - count = await connection.fetchval(table_query) + table_query = "SELECT COUNT(*) FROM cash_out_txs" + table_results = await self.execute_ssh_query(db_config, table_query) + + if not table_results: + result["message"] = "Connected but cash_out_txs table not accessible" + result["steps"].append("❌ Table access failed") + return result + + count = table_results[0].get('count', 0) result["steps"].append(f"✅ Table access successful (found {count} transactions)") - await connection.close() result["success"] = True result["message"] = "All connection tests passed successfully" - except asyncpg.InvalidCatalogNameError: - result["message"] = "Database not found - check database name" - result["steps"].append("❌ Database does not exist") - except asyncpg.InvalidPasswordError: - result["message"] = "Authentication failed - check username/password" - result["steps"].append("❌ Invalid database credentials") - except asyncpg.CannotConnectNowError: - result["message"] = "Database server not accepting connections" - result["steps"].append("❌ Database server unavailable") - except asyncpg.ConnectionDoesNotExistError: - result["message"] = "Cannot connect to database server" - result["steps"].append("❌ Cannot reach database server") except Exception as e: error_msg = str(e) if "cash_out_txs" in error_msg: result["message"] = "Connected to database but cash_out_txs table not found" result["steps"].append("❌ Lamassu transaction table missing") - elif "paramiko" in error_msg.lower() or "ssh" in error_msg.lower(): - result["message"] = f"SSH tunnel error: {error_msg}" + elif "ssh" in error_msg.lower() or "connection" in error_msg.lower(): + result["message"] = f"SSH connection error: {error_msg}" result["steps"].append(f"❌ SSH error: {error_msg}") + elif "permission denied" in error_msg.lower() or "authentication" in error_msg.lower(): + result["message"] = f"SSH authentication failed: {error_msg}" + result["steps"].append(f"❌ SSH authentication error: {error_msg}") else: result["message"] = f"Connection test failed: {error_msg}" result["steps"].append(f"❌ Unexpected error: {error_msg}") - finally: - # Always cleanup SSH tunnel - self.close_ssh_tunnel() # Update test result in database if result["config_id"]: @@ -314,57 +301,121 @@ class LamassuTransactionProcessor: return result - async def connect_to_lamassu_db(self) -> Optional[asyncpg.Connection]: - """Establish connection to Lamassu Postgres database""" + async def connect_to_lamassu_db(self) -> Optional[Dict[str, Any]]: + """Get database configuration (returns config dict instead of connection)""" try: db_config = await self.get_db_config() if not db_config: return None - - # Setup SSH tunnel if required - connection_config = self.setup_ssh_tunnel(db_config) - if not connection_config: - return None - - connection = await asyncpg.connect( - host=connection_config["host"], - port=connection_config["port"], - database=connection_config["database"], - user=connection_config["user"], - password=connection_config["password"], - timeout=30 - ) - logger.info("Successfully connected to Lamassu database") - - # Update test result on successful connection + + # Update test result on successful config retrieval try: await update_config_test_result(db_config["config_id"], True) except Exception as e: logger.warning(f"Could not update config test result: {e}") - return connection + return db_config except Exception as e: - logger.error(f"Failed to connect to Lamassu database: {e}") - - # Update test result on failed connection - try: - db_config = await self.get_db_config() - if db_config: - await update_config_test_result(db_config["config_id"], False) - except Exception as update_error: - logger.warning(f"Could not update config test result: {update_error}") - + logger.error(f"Failed to get database configuration: {e}") return None - async def fetch_new_transactions(self, connection: asyncpg.Connection) -> List[Dict[str, Any]]: + async def execute_ssh_query(self, db_config: Dict[str, Any], query: str) -> List[Dict[str, Any]]: + """Execute a query via SSH connection""" + import subprocess + import json + import asyncio + + try: + # Build SSH command to execute the query + ssh_cmd = [ + "ssh", + f"{db_config['ssh_username']}@{db_config['ssh_host']}", + "-p", str(db_config['ssh_port']), + "-o", "StrictHostKeyChecking=no", + "-o", "UserKnownHostsFile=/dev/null", + "-o", "LogLevel=ERROR" + ] + + # Add key authentication if provided + if db_config.get("ssh_private_key"): + import tempfile + import os + key_fd, key_path = tempfile.mkstemp(suffix='.pem') + try: + with os.fdopen(key_fd, 'w') as f: + f.write(db_config["ssh_private_key"]) + os.chmod(key_path, 0o600) + ssh_cmd.extend(["-i", key_path]) + + # Build the psql command to return JSON + psql_cmd = f"psql {db_config['database']} -t -c \"COPY ({query}) TO STDOUT WITH CSV HEADER\"" + ssh_cmd.append(psql_cmd) + + # Execute the command + process = await asyncio.create_subprocess_exec( + *ssh_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + stdout, stderr = await process.communicate() + + if process.returncode != 0: + logger.error(f"SSH query failed: {stderr.decode()}") + return [] + + # Parse CSV output + import csv + import io + + csv_data = stdout.decode() + if not csv_data.strip(): + return [] + + reader = csv.DictReader(io.StringIO(csv_data)) + results = [] + for row in reader: + # Convert string values to appropriate types + processed_row = {} + for key, value in row.items(): + if value == '': + processed_row[key] = None + elif key in ['transaction_id', 'session_id', 'machine_id', 'tx_hash']: + processed_row[key] = str(value) + elif key in ['fiat_amount', 'crypto_amount']: + processed_row[key] = int(value) if value else 0 + elif key == 'commission_percentage': + processed_row[key] = float(value) if value else 0.0 + elif key == 'transaction_time': + from datetime import datetime + processed_row[key] = datetime.fromisoformat(value.replace('Z', '+00:00')) + else: + processed_row[key] = value + results.append(processed_row) + + return results + + finally: + os.unlink(key_path) + + else: + logger.error("SSH private key required for database queries") + return [] + + except Exception as e: + logger.error(f"Error executing SSH query: {e}") + return [] + + async def fetch_new_transactions(self, db_config: Dict[str, Any]) -> List[Dict[str, Any]]: """Fetch new successful transactions from Lamassu database""" try: # Set the time window - check for transactions in the last hour + 5 minutes buffer time_threshold = datetime.now() - timedelta(hours=1, minutes=5) + time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S') # Query for successful cash-out transactions (people selling BTC for fiat) # These are the transactions that trigger DCA distributions - query = """ + query = f""" SELECT co.id as transaction_id, co.fiat as fiat_amount, @@ -376,10 +427,9 @@ class LamassuTransactionProcessor: co.commission_percentage, co.tx_hash FROM cash_out_txs co - WHERE co.created >= $1 + WHERE co.created >= '{time_threshold_str}' AND co.status = 'confirmed' AND co.id NOT IN ( - -- Exclude already processed transactions SELECT DISTINCT lamassu_transaction_id FROM myextension.dca_payments WHERE lamassu_transaction_id IS NOT NULL @@ -387,23 +437,7 @@ class LamassuTransactionProcessor: ORDER BY co.created DESC """ - rows = await connection.fetch(query, time_threshold) - - transactions = [] - for row in rows: - # Convert asyncpg.Record to dict - transaction = { - "transaction_id": str(row["transaction_id"]), - "fiat_amount": int(row["fiat_amount"]), # Amount in smallest currency unit - "crypto_amount": int(row["crypto_amount"]), # Amount in satoshis - "transaction_time": row["transaction_time"], - "session_id": row["session_id"], - "machine_id": row["machine_id"], - "status": row["status"], - "commission_percentage": float(row["commission_percentage"]) if row["commission_percentage"] else 0.0, - "tx_hash": row["tx_hash"] - } - transactions.append(transaction) + transactions = await self.execute_ssh_query(db_config, query) logger.info(f"Found {len(transactions)} new transactions to process") return transactions @@ -540,31 +574,23 @@ class LamassuTransactionProcessor: try: logger.info("Starting Lamassu transaction polling...") - # Connect to Lamassu database - connection = await self.connect_to_lamassu_db() - if not connection: - logger.error("Could not connect to Lamassu database - skipping this poll") + # Get database configuration + db_config = await self.connect_to_lamassu_db() + if not db_config: + logger.error("Could not get Lamassu database configuration - skipping this poll") return - try: - # Fetch new transactions - new_transactions = await self.fetch_new_transactions(connection) - - # Process each transaction - for transaction in new_transactions: - await self.process_transaction(transaction) - - logger.info(f"Completed processing {len(new_transactions)} transactions") - - finally: - await connection.close() - # Close SSH tunnel if it was used - self.close_ssh_tunnel() + # Fetch new transactions via SSH + new_transactions = await self.fetch_new_transactions(db_config) + + # Process each transaction + for transaction in new_transactions: + await self.process_transaction(transaction) + + logger.info(f"Completed processing {len(new_transactions)} transactions") except Exception as e: logger.error(f"Error in polling cycle: {e}") - # Ensure cleanup on error - self.close_ssh_tunnel() # Global processor instance diff --git a/views_api.py b/views_api.py index de1b49b..8d63919 100644 --- a/views_api.py +++ b/views_api.py @@ -352,31 +352,27 @@ async def api_manual_poll( try: from .transaction_processor import transaction_processor - # Connect to database - connection = await transaction_processor.connect_to_lamassu_db() - if not connection: + # Get database configuration + db_config = await transaction_processor.connect_to_lamassu_db() + if not db_config: raise HTTPException( status_code=HTTPStatus.SERVICE_UNAVAILABLE, - detail="Could not connect to Lamassu database" + detail="Could not get Lamassu database configuration" ) - try: - # Fetch and process transactions - new_transactions = await transaction_processor.fetch_new_transactions(connection) - - transactions_processed = 0 - for transaction in new_transactions: - await transaction_processor.process_transaction(transaction) - transactions_processed += 1 - - return { - "success": True, - "transactions_processed": transactions_processed, - "message": f"Processed {transactions_processed} new transactions" - } - - finally: - await connection.close() + # Fetch and process transactions via SSH + new_transactions = await transaction_processor.fetch_new_transactions(db_config) + + transactions_processed = 0 + for transaction in new_transactions: + await transaction_processor.process_transaction(transaction) + transactions_processed += 1 + + return { + "success": True, + "transactions_processed": transactions_processed, + "message": f"Processed {transactions_processed} new transactions" + } except Exception as e: raise HTTPException(