diff --git a/crud.py b/crud.py index a2b1ddd..445526b 100644 --- a/crud.py +++ b/crud.py @@ -1,7 +1,7 @@ # Description: This file contains the CRUD operations for talking to the database. from typing import List, Optional, Union -from datetime import datetime +from datetime import datetime, timezone from lnbits.db import Database from lnbits.helpers import urlsafe_short_hash @@ -373,6 +373,7 @@ async def update_lamassu_config(config_id: str, data: UpdateLamassuConfigData) - async def update_config_test_result(config_id: str, success: bool) -> None: + utc_now = datetime.now(timezone.utc) await db.execute( """ UPDATE myextension.lamassu_config @@ -381,9 +382,9 @@ async def update_config_test_result(config_id: str, success: bool) -> None: """, { "id": config_id, - "test_time": datetime.now(), + "test_time": utc_now, "success": success, - "updated_at": datetime.now() + "updated_at": utc_now } ) @@ -393,3 +394,37 @@ async def delete_lamassu_config(config_id: str) -> None: "DELETE FROM myextension.lamassu_config WHERE id = :id", {"id": config_id} ) + + +async def update_poll_start_time(config_id: str) -> None: + """Update the last poll start time""" + utc_now = datetime.now(timezone.utc) + await db.execute( + """ + UPDATE myextension.lamassu_config + SET last_poll_time = :poll_time, updated_at = :updated_at + WHERE id = :id + """, + { + "id": config_id, + "poll_time": utc_now, + "updated_at": utc_now + } + ) + + +async def update_poll_success_time(config_id: str) -> None: + """Update the last successful poll time""" + utc_now = datetime.now(timezone.utc) + await db.execute( + """ + UPDATE myextension.lamassu_config + SET last_successful_poll = :poll_time, updated_at = :updated_at + WHERE id = :id + """, + { + "id": config_id, + "poll_time": utc_now, + "updated_at": utc_now + } + ) diff --git a/migrations.py b/migrations.py index 7556f97..4080c8b 100644 --- a/migrations.py +++ b/migrations.py @@ -158,3 +158,21 @@ async def m007_add_ssh_tunnel_support(db): ADD COLUMN ssh_private_key TEXT; """ ) + + +async def m008_add_last_poll_tracking(db): + """ + Add last poll time tracking to Lamassu configuration table. + """ + await db.execute( + """ + ALTER TABLE myextension.lamassu_config + ADD COLUMN last_poll_time TIMESTAMP; + """ + ) + await db.execute( + """ + ALTER TABLE myextension.lamassu_config + ADD COLUMN last_successful_poll TIMESTAMP; + """ + ) diff --git a/models.py b/models.py index c916953..6fefdf0 100644 --- a/models.py +++ b/models.py @@ -134,6 +134,9 @@ class LamassuConfig(BaseModel): ssh_username: Optional[str] = None ssh_password: Optional[str] = None ssh_private_key: Optional[str] = None + # Poll tracking + last_poll_time: Optional[datetime] = None + last_successful_poll: Optional[datetime] = None class UpdateLamassuConfigData(BaseModel): diff --git a/static/js/index.js b/static/js/index.js index 027ebb0..f812676 100644 --- a/static/js/index.js +++ b/static/js/index.js @@ -474,6 +474,7 @@ window.app = Vue.createApp({ // Refresh data await this.getDeposits() + await this.getLamassuConfig() } catch (error) { LNbits.utils.notifyApiError(error) } finally { diff --git a/templates/myextension/index.html b/templates/myextension/index.html index 503dc93..6c5ecdf 100644 --- a/templates/myextension/index.html +++ b/templates/myextension/index.html @@ -346,7 +346,8 @@ Failed Not tested

-

Last Poll: ${ lastPollTime || 'Not yet run' }

+

Last Poll: ${ lamassuConfig.last_poll_time ? formatDate(lamassuConfig.last_poll_time) : 'Not yet run' }

+

Last Success: ${ lamassuConfig.last_successful_poll ? formatDate(lamassuConfig.last_successful_poll) : 'Never' }

Status: Not configured

diff --git a/transaction_processor.py b/transaction_processor.py index 28bf167..dedb7e9 100644 --- a/transaction_processor.py +++ b/transaction_processor.py @@ -2,7 +2,7 @@ import asyncio import asyncpg -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import List, Optional, Dict, Any from loguru import logger import socket @@ -30,7 +30,9 @@ from .crud import ( create_dca_payment, get_client_balance_summary, get_active_lamassu_config, - update_config_test_result + update_config_test_result, + update_poll_start_time, + update_poll_success_time ) from .models import CreateDcaPaymentData, LamassuTransaction @@ -261,7 +263,7 @@ class LamassuTransactionProcessor: result["database_connection_success"] = True result["steps"].append("✅ Database query test successful") - # Step 4: Test actual table access + # Step 4: Test actual table access and check timezone result["steps"].append("Testing access to cash_out_txs table...") table_query = "SELECT COUNT(*) FROM cash_out_txs" table_results = await self.execute_ssh_query(db_config, table_query) @@ -274,6 +276,18 @@ class LamassuTransactionProcessor: count = table_results[0].get('count', 0) result["steps"].append(f"✅ Table access successful (found {count} transactions)") + # Step 5: Check database timezone + result["steps"].append("Checking database timezone...") + timezone_query = "SELECT NOW() as db_time, EXTRACT(timezone FROM NOW()) as timezone_offset" + timezone_results = await self.execute_ssh_query(db_config, timezone_query) + + if timezone_results: + db_time = timezone_results[0].get('db_time', 'unknown') + timezone_offset = timezone_results[0].get('timezone_offset', 'unknown') + result["steps"].append(f"✅ Database time: {db_time} (offset: {timezone_offset})") + else: + result["steps"].append("⚠️ Could not determine database timezone") + result["success"] = True result["message"] = "All connection tests passed successfully" @@ -380,7 +394,7 @@ class LamassuTransactionProcessor: for key, value in row.items(): if value == '': processed_row[key] = None - elif key in ['transaction_id', 'session_id', 'machine_id', 'tx_hash']: + elif key in ['transaction_id', 'device_id', 'crypto_code', 'fiat_code']: processed_row[key] = str(value) elif key in ['fiat_amount', 'crypto_amount']: processed_row[key] = int(value) if value else 0 @@ -407,11 +421,27 @@ class LamassuTransactionProcessor: return [] async def fetch_new_transactions(self, db_config: Dict[str, Any]) -> List[Dict[str, Any]]: - """Fetch new successful transactions from Lamassu database""" + """Fetch new successful transactions from Lamassu database since last poll""" try: - # Set the time window - check for transactions in the last hour + 5 minutes buffer - time_threshold = datetime.now() - timedelta(hours=1, minutes=5) - time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S') + # Determine the time threshold based on last successful poll + config = await get_active_lamassu_config() + if config and config.last_successful_poll: + # Use last successful poll time + time_threshold = config.last_successful_poll + logger.info(f"Checking for transactions since last successful poll: {time_threshold}") + else: + # Fallback to last 24 hours for first run or if no previous poll + time_threshold = datetime.now(timezone.utc) - timedelta(hours=24) + logger.info(f"No previous poll found, checking last 24 hours since: {time_threshold}") + + # Convert to UTC if not already timezone-aware + if time_threshold.tzinfo is None: + time_threshold = time_threshold.replace(tzinfo=timezone.utc) + elif time_threshold.tzinfo != timezone.utc: + time_threshold = time_threshold.astimezone(timezone.utc) + + # Format as UTC for database query + time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S UTC') # Query for successful cash-out transactions (people selling BTC for fiat) # These are the transactions that trigger DCA distributions @@ -419,15 +449,15 @@ class LamassuTransactionProcessor: SELECT co.id as transaction_id, co.fiat as fiat_amount, - co.crypto as crypto_amount, + co.crypto_atoms as crypto_amount, co.created as transaction_time, - co.session_id, - co.machine_id, + co.device_id, co.status, co.commission_percentage, - co.tx_hash + co.crypto_code, + co.fiat_code FROM cash_out_txs co - WHERE co.created >= '{time_threshold_str}' + WHERE co.created > '{time_threshold_str}' AND co.status = 'confirmed' AND co.id NOT IN ( SELECT DISTINCT lamassu_transaction_id @@ -439,7 +469,7 @@ class LamassuTransactionProcessor: transactions = await self.execute_ssh_query(db_config, query) - logger.info(f"Found {len(transactions)} new transactions to process") + logger.info(f"Found {len(transactions)} new transactions to process since {time_threshold}") return transactions except Exception as e: @@ -571,6 +601,7 @@ class LamassuTransactionProcessor: async def poll_and_process(self) -> None: """Main polling function - checks for new transactions and processes them""" + config_id = None try: logger.info("Starting Lamassu transaction polling...") @@ -580,17 +611,28 @@ class LamassuTransactionProcessor: logger.error("Could not get Lamassu database configuration - skipping this poll") return + config_id = db_config["config_id"] + + # Record poll start time + await update_poll_start_time(config_id) + logger.info("Poll start time recorded") + # Fetch new transactions via SSH new_transactions = await self.fetch_new_transactions(db_config) # Process each transaction + transactions_processed = 0 for transaction in new_transactions: await self.process_transaction(transaction) + transactions_processed += 1 - logger.info(f"Completed processing {len(new_transactions)} transactions") + # Record successful poll completion + await update_poll_success_time(config_id) + logger.info(f"Completed processing {transactions_processed} transactions. Poll success time recorded.") except Exception as e: logger.error(f"Error in polling cycle: {e}") + # Don't update success time on error, but poll start time remains as attempted # Global processor instance diff --git a/views_api.py b/views_api.py index 8d63919..336c59a 100644 --- a/views_api.py +++ b/views_api.py @@ -351,6 +351,7 @@ async def api_manual_poll( """Manually trigger a poll of the Lamassu database""" try: from .transaction_processor import transaction_processor + from .crud import update_poll_start_time, update_poll_success_time # Get database configuration db_config = await transaction_processor.connect_to_lamassu_db() @@ -360,6 +361,11 @@ async def api_manual_poll( detail="Could not get Lamassu database configuration" ) + config_id = db_config["config_id"] + + # Record manual poll start time + await update_poll_start_time(config_id) + # Fetch and process transactions via SSH new_transactions = await transaction_processor.fetch_new_transactions(db_config) @@ -368,10 +374,13 @@ async def api_manual_poll( await transaction_processor.process_transaction(transaction) transactions_processed += 1 + # Record successful manual poll completion + await update_poll_success_time(config_id) + return { "success": True, "transactions_processed": transactions_processed, - "message": f"Processed {transactions_processed} new transactions" + "message": f"Processed {transactions_processed} new transactions since last poll" } except Exception as e: