From a107f825afa4be96852b3fcb84d1dfa5354aa848 Mon Sep 17 00:00:00 2001
From: padreug
Date: Wed, 18 Jun 2025 15:56:55 +0200
Subject: [PATCH] Add last poll tracking to Lamassu configuration: update
database schema to include last_poll_time and last_successful_poll fields,
modify CRUD operations to record poll times, and enhance transaction
processing to utilize these timestamps for improved polling accuracy.
---
crud.py | 41 ++++++++++++++++--
migrations.py | 18 ++++++++
models.py | 3 ++
static/js/index.js | 1 +
templates/myextension/index.html | 3 +-
transaction_processor.py | 72 +++++++++++++++++++++++++-------
views_api.py | 11 ++++-
7 files changed, 129 insertions(+), 20 deletions(-)
diff --git a/crud.py b/crud.py
index a2b1ddd..445526b 100644
--- a/crud.py
+++ b/crud.py
@@ -1,7 +1,7 @@
# Description: This file contains the CRUD operations for talking to the database.
from typing import List, Optional, Union
-from datetime import datetime
+from datetime import datetime, timezone
from lnbits.db import Database
from lnbits.helpers import urlsafe_short_hash
@@ -373,6 +373,7 @@ async def update_lamassu_config(config_id: str, data: UpdateLamassuConfigData) -
async def update_config_test_result(config_id: str, success: bool) -> None:
+ utc_now = datetime.now(timezone.utc)
await db.execute(
"""
UPDATE myextension.lamassu_config
@@ -381,9 +382,9 @@ async def update_config_test_result(config_id: str, success: bool) -> None:
""",
{
"id": config_id,
- "test_time": datetime.now(),
+ "test_time": utc_now,
"success": success,
- "updated_at": datetime.now()
+ "updated_at": utc_now
}
)
@@ -393,3 +394,37 @@ async def delete_lamassu_config(config_id: str) -> None:
"DELETE FROM myextension.lamassu_config WHERE id = :id",
{"id": config_id}
)
+
+
+async def update_poll_start_time(config_id: str) -> None:
+ """Update the last poll start time"""
+ utc_now = datetime.now(timezone.utc)
+ await db.execute(
+ """
+ UPDATE myextension.lamassu_config
+ SET last_poll_time = :poll_time, updated_at = :updated_at
+ WHERE id = :id
+ """,
+ {
+ "id": config_id,
+ "poll_time": utc_now,
+ "updated_at": utc_now
+ }
+ )
+
+
+async def update_poll_success_time(config_id: str) -> None:
+ """Update the last successful poll time"""
+ utc_now = datetime.now(timezone.utc)
+ await db.execute(
+ """
+ UPDATE myextension.lamassu_config
+ SET last_successful_poll = :poll_time, updated_at = :updated_at
+ WHERE id = :id
+ """,
+ {
+ "id": config_id,
+ "poll_time": utc_now,
+ "updated_at": utc_now
+ }
+ )
diff --git a/migrations.py b/migrations.py
index 7556f97..4080c8b 100644
--- a/migrations.py
+++ b/migrations.py
@@ -158,3 +158,21 @@ async def m007_add_ssh_tunnel_support(db):
ADD COLUMN ssh_private_key TEXT;
"""
)
+
+
+async def m008_add_last_poll_tracking(db):
+ """
+ Add last poll time tracking to Lamassu configuration table.
+ """
+ await db.execute(
+ """
+ ALTER TABLE myextension.lamassu_config
+ ADD COLUMN last_poll_time TIMESTAMP;
+ """
+ )
+ await db.execute(
+ """
+ ALTER TABLE myextension.lamassu_config
+ ADD COLUMN last_successful_poll TIMESTAMP;
+ """
+ )
diff --git a/models.py b/models.py
index c916953..6fefdf0 100644
--- a/models.py
+++ b/models.py
@@ -134,6 +134,9 @@ class LamassuConfig(BaseModel):
ssh_username: Optional[str] = None
ssh_password: Optional[str] = None
ssh_private_key: Optional[str] = None
+ # Poll tracking
+ last_poll_time: Optional[datetime] = None
+ last_successful_poll: Optional[datetime] = None
class UpdateLamassuConfigData(BaseModel):
diff --git a/static/js/index.js b/static/js/index.js
index 027ebb0..f812676 100644
--- a/static/js/index.js
+++ b/static/js/index.js
@@ -474,6 +474,7 @@ window.app = Vue.createApp({
// Refresh data
await this.getDeposits()
+ await this.getLamassuConfig()
} catch (error) {
LNbits.utils.notifyApiError(error)
} finally {
diff --git a/templates/myextension/index.html b/templates/myextension/index.html
index 503dc93..6c5ecdf 100644
--- a/templates/myextension/index.html
+++ b/templates/myextension/index.html
@@ -346,7 +346,8 @@
Failed
Not tested
- Last Poll: ${ lastPollTime || 'Not yet run' }
+ Last Poll: ${ lamassuConfig.last_poll_time ? formatDate(lamassuConfig.last_poll_time) : 'Not yet run' }
+ Last Success: ${ lamassuConfig.last_successful_poll ? formatDate(lamassuConfig.last_successful_poll) : 'Never' }
Status: Not configured
diff --git a/transaction_processor.py b/transaction_processor.py
index 28bf167..dedb7e9 100644
--- a/transaction_processor.py
+++ b/transaction_processor.py
@@ -2,7 +2,7 @@
import asyncio
import asyncpg
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from typing import List, Optional, Dict, Any
from loguru import logger
import socket
@@ -30,7 +30,9 @@ from .crud import (
create_dca_payment,
get_client_balance_summary,
get_active_lamassu_config,
- update_config_test_result
+ update_config_test_result,
+ update_poll_start_time,
+ update_poll_success_time
)
from .models import CreateDcaPaymentData, LamassuTransaction
@@ -261,7 +263,7 @@ class LamassuTransactionProcessor:
result["database_connection_success"] = True
result["steps"].append("✅ Database query test successful")
- # Step 4: Test actual table access
+ # Step 4: Test actual table access and check timezone
result["steps"].append("Testing access to cash_out_txs table...")
table_query = "SELECT COUNT(*) FROM cash_out_txs"
table_results = await self.execute_ssh_query(db_config, table_query)
@@ -274,6 +276,18 @@ class LamassuTransactionProcessor:
count = table_results[0].get('count', 0)
result["steps"].append(f"✅ Table access successful (found {count} transactions)")
+ # Step 5: Check database timezone
+ result["steps"].append("Checking database timezone...")
+ timezone_query = "SELECT NOW() as db_time, EXTRACT(timezone FROM NOW()) as timezone_offset"
+ timezone_results = await self.execute_ssh_query(db_config, timezone_query)
+
+ if timezone_results:
+ db_time = timezone_results[0].get('db_time', 'unknown')
+ timezone_offset = timezone_results[0].get('timezone_offset', 'unknown')
+ result["steps"].append(f"✅ Database time: {db_time} (offset: {timezone_offset})")
+ else:
+ result["steps"].append("⚠️ Could not determine database timezone")
+
result["success"] = True
result["message"] = "All connection tests passed successfully"
@@ -380,7 +394,7 @@ class LamassuTransactionProcessor:
for key, value in row.items():
if value == '':
processed_row[key] = None
- elif key in ['transaction_id', 'session_id', 'machine_id', 'tx_hash']:
+ elif key in ['transaction_id', 'device_id', 'crypto_code', 'fiat_code']:
processed_row[key] = str(value)
elif key in ['fiat_amount', 'crypto_amount']:
processed_row[key] = int(value) if value else 0
@@ -407,11 +421,27 @@ class LamassuTransactionProcessor:
return []
async def fetch_new_transactions(self, db_config: Dict[str, Any]) -> List[Dict[str, Any]]:
- """Fetch new successful transactions from Lamassu database"""
+ """Fetch new successful transactions from Lamassu database since last poll"""
try:
- # Set the time window - check for transactions in the last hour + 5 minutes buffer
- time_threshold = datetime.now() - timedelta(hours=1, minutes=5)
- time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S')
+ # Determine the time threshold based on last successful poll
+ config = await get_active_lamassu_config()
+ if config and config.last_successful_poll:
+ # Use last successful poll time
+ time_threshold = config.last_successful_poll
+ logger.info(f"Checking for transactions since last successful poll: {time_threshold}")
+ else:
+ # Fallback to last 24 hours for first run or if no previous poll
+ time_threshold = datetime.now(timezone.utc) - timedelta(hours=24)
+ logger.info(f"No previous poll found, checking last 24 hours since: {time_threshold}")
+
+ # Convert to UTC if not already timezone-aware
+ if time_threshold.tzinfo is None:
+ time_threshold = time_threshold.replace(tzinfo=timezone.utc)
+ elif time_threshold.tzinfo != timezone.utc:
+ time_threshold = time_threshold.astimezone(timezone.utc)
+
+ # Format as UTC for database query
+ time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S UTC')
# Query for successful cash-out transactions (people selling BTC for fiat)
# These are the transactions that trigger DCA distributions
@@ -419,15 +449,15 @@ class LamassuTransactionProcessor:
SELECT
co.id as transaction_id,
co.fiat as fiat_amount,
- co.crypto as crypto_amount,
+ co.crypto_atoms as crypto_amount,
co.created as transaction_time,
- co.session_id,
- co.machine_id,
+ co.device_id,
co.status,
co.commission_percentage,
- co.tx_hash
+ co.crypto_code,
+ co.fiat_code
FROM cash_out_txs co
- WHERE co.created >= '{time_threshold_str}'
+ WHERE co.created > '{time_threshold_str}'
AND co.status = 'confirmed'
AND co.id NOT IN (
SELECT DISTINCT lamassu_transaction_id
@@ -439,7 +469,7 @@ class LamassuTransactionProcessor:
transactions = await self.execute_ssh_query(db_config, query)
- logger.info(f"Found {len(transactions)} new transactions to process")
+ logger.info(f"Found {len(transactions)} new transactions to process since {time_threshold}")
return transactions
except Exception as e:
@@ -571,6 +601,7 @@ class LamassuTransactionProcessor:
async def poll_and_process(self) -> None:
"""Main polling function - checks for new transactions and processes them"""
+ config_id = None
try:
logger.info("Starting Lamassu transaction polling...")
@@ -580,17 +611,28 @@ class LamassuTransactionProcessor:
logger.error("Could not get Lamassu database configuration - skipping this poll")
return
+ config_id = db_config["config_id"]
+
+ # Record poll start time
+ await update_poll_start_time(config_id)
+ logger.info("Poll start time recorded")
+
# Fetch new transactions via SSH
new_transactions = await self.fetch_new_transactions(db_config)
# Process each transaction
+ transactions_processed = 0
for transaction in new_transactions:
await self.process_transaction(transaction)
+ transactions_processed += 1
- logger.info(f"Completed processing {len(new_transactions)} transactions")
+ # Record successful poll completion
+ await update_poll_success_time(config_id)
+ logger.info(f"Completed processing {transactions_processed} transactions. Poll success time recorded.")
except Exception as e:
logger.error(f"Error in polling cycle: {e}")
+ # Don't update success time on error, but poll start time remains as attempted
# Global processor instance
diff --git a/views_api.py b/views_api.py
index 8d63919..336c59a 100644
--- a/views_api.py
+++ b/views_api.py
@@ -351,6 +351,7 @@ async def api_manual_poll(
"""Manually trigger a poll of the Lamassu database"""
try:
from .transaction_processor import transaction_processor
+ from .crud import update_poll_start_time, update_poll_success_time
# Get database configuration
db_config = await transaction_processor.connect_to_lamassu_db()
@@ -360,6 +361,11 @@ async def api_manual_poll(
detail="Could not get Lamassu database configuration"
)
+ config_id = db_config["config_id"]
+
+ # Record manual poll start time
+ await update_poll_start_time(config_id)
+
# Fetch and process transactions via SSH
new_transactions = await transaction_processor.fetch_new_transactions(db_config)
@@ -368,10 +374,13 @@ async def api_manual_poll(
await transaction_processor.process_transaction(transaction)
transactions_processed += 1
+ # Record successful manual poll completion
+ await update_poll_success_time(config_id)
+
return {
"success": True,
"transactions_processed": transactions_processed,
- "message": f"Processed {transactions_processed} new transactions"
+ "message": f"Processed {transactions_processed} new transactions since last poll"
}
except Exception as e: