Add last poll tracking to Lamassu configuration: update database schema to include last_poll_time and last_successful_poll fields, modify CRUD operations to record poll times, and enhance transaction processing to utilize these timestamps for improved polling accuracy.

This commit is contained in:
padreug 2025-06-18 15:56:55 +02:00
parent 3c6262b309
commit a107f825af
7 changed files with 129 additions and 20 deletions

View file

@ -2,7 +2,7 @@
import asyncio
import asyncpg
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Dict, Any
from loguru import logger
import socket
@ -30,7 +30,9 @@ from .crud import (
create_dca_payment,
get_client_balance_summary,
get_active_lamassu_config,
update_config_test_result
update_config_test_result,
update_poll_start_time,
update_poll_success_time
)
from .models import CreateDcaPaymentData, LamassuTransaction
@ -261,7 +263,7 @@ class LamassuTransactionProcessor:
result["database_connection_success"] = True
result["steps"].append("✅ Database query test successful")
# Step 4: Test actual table access
# Step 4: Test actual table access and check timezone
result["steps"].append("Testing access to cash_out_txs table...")
table_query = "SELECT COUNT(*) FROM cash_out_txs"
table_results = await self.execute_ssh_query(db_config, table_query)
@ -274,6 +276,18 @@ class LamassuTransactionProcessor:
count = table_results[0].get('count', 0)
result["steps"].append(f"✅ Table access successful (found {count} transactions)")
# Step 5: Check database timezone
result["steps"].append("Checking database timezone...")
timezone_query = "SELECT NOW() as db_time, EXTRACT(timezone FROM NOW()) as timezone_offset"
timezone_results = await self.execute_ssh_query(db_config, timezone_query)
if timezone_results:
db_time = timezone_results[0].get('db_time', 'unknown')
timezone_offset = timezone_results[0].get('timezone_offset', 'unknown')
result["steps"].append(f"✅ Database time: {db_time} (offset: {timezone_offset})")
else:
result["steps"].append("⚠️ Could not determine database timezone")
result["success"] = True
result["message"] = "All connection tests passed successfully"
@ -380,7 +394,7 @@ class LamassuTransactionProcessor:
for key, value in row.items():
if value == '':
processed_row[key] = None
elif key in ['transaction_id', 'session_id', 'machine_id', 'tx_hash']:
elif key in ['transaction_id', 'device_id', 'crypto_code', 'fiat_code']:
processed_row[key] = str(value)
elif key in ['fiat_amount', 'crypto_amount']:
processed_row[key] = int(value) if value else 0
@ -407,11 +421,27 @@ class LamassuTransactionProcessor:
return []
async def fetch_new_transactions(self, db_config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch new successful transactions from Lamassu database"""
"""Fetch new successful transactions from Lamassu database since last poll"""
try:
# Set the time window - check for transactions in the last hour + 5 minutes buffer
time_threshold = datetime.now() - timedelta(hours=1, minutes=5)
time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S')
# Determine the time threshold based on last successful poll
config = await get_active_lamassu_config()
if config and config.last_successful_poll:
# Use last successful poll time
time_threshold = config.last_successful_poll
logger.info(f"Checking for transactions since last successful poll: {time_threshold}")
else:
# Fallback to last 24 hours for first run or if no previous poll
time_threshold = datetime.now(timezone.utc) - timedelta(hours=24)
logger.info(f"No previous poll found, checking last 24 hours since: {time_threshold}")
# Convert to UTC if not already timezone-aware
if time_threshold.tzinfo is None:
time_threshold = time_threshold.replace(tzinfo=timezone.utc)
elif time_threshold.tzinfo != timezone.utc:
time_threshold = time_threshold.astimezone(timezone.utc)
# Format as UTC for database query
time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S UTC')
# Query for successful cash-out transactions (people selling BTC for fiat)
# These are the transactions that trigger DCA distributions
@ -419,15 +449,15 @@ class LamassuTransactionProcessor:
SELECT
co.id as transaction_id,
co.fiat as fiat_amount,
co.crypto as crypto_amount,
co.crypto_atoms as crypto_amount,
co.created as transaction_time,
co.session_id,
co.machine_id,
co.device_id,
co.status,
co.commission_percentage,
co.tx_hash
co.crypto_code,
co.fiat_code
FROM cash_out_txs co
WHERE co.created >= '{time_threshold_str}'
WHERE co.created > '{time_threshold_str}'
AND co.status = 'confirmed'
AND co.id NOT IN (
SELECT DISTINCT lamassu_transaction_id
@ -439,7 +469,7 @@ class LamassuTransactionProcessor:
transactions = await self.execute_ssh_query(db_config, query)
logger.info(f"Found {len(transactions)} new transactions to process")
logger.info(f"Found {len(transactions)} new transactions to process since {time_threshold}")
return transactions
except Exception as e:
@ -571,6 +601,7 @@ class LamassuTransactionProcessor:
async def poll_and_process(self) -> None:
"""Main polling function - checks for new transactions and processes them"""
config_id = None
try:
logger.info("Starting Lamassu transaction polling...")
@ -580,17 +611,28 @@ class LamassuTransactionProcessor:
logger.error("Could not get Lamassu database configuration - skipping this poll")
return
config_id = db_config["config_id"]
# Record poll start time
await update_poll_start_time(config_id)
logger.info("Poll start time recorded")
# Fetch new transactions via SSH
new_transactions = await self.fetch_new_transactions(db_config)
# Process each transaction
transactions_processed = 0
for transaction in new_transactions:
await self.process_transaction(transaction)
transactions_processed += 1
logger.info(f"Completed processing {len(new_transactions)} transactions")
# Record successful poll completion
await update_poll_success_time(config_id)
logger.info(f"Completed processing {transactions_processed} transactions. Poll success time recorded.")
except Exception as e:
logger.error(f"Error in polling cycle: {e}")
# Don't update success time on error, but poll start time remains as attempted
# Global processor instance