From a4c5d11d99ba05d039d259ee013dbae75321cd3f Mon Sep 17 00:00:00 2001 From: padreug Date: Wed, 18 Jun 2025 16:07:33 +0200 Subject: [PATCH] Refactor transaction fetching logic in LamassuTransactionProcessor: update query to retrieve all transactions since the threshold, filter out already processed transactions using local database, and enhance logging to provide detailed insights on new transactions. --- transaction_processor.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/transaction_processor.py b/transaction_processor.py index dedb7e9..239f484 100644 --- a/transaction_processor.py +++ b/transaction_processor.py @@ -443,9 +443,8 @@ class LamassuTransactionProcessor: # Format as UTC for database query time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S UTC') - # Query for successful cash-out transactions (people selling BTC for fiat) - # These are the transactions that trigger DCA distributions - query = f""" + # First, get all transactions since the threshold from Lamassu database + lamassu_query = f""" SELECT co.id as transaction_id, co.fiat as fiat_amount, @@ -459,18 +458,28 @@ class LamassuTransactionProcessor: FROM cash_out_txs co WHERE co.created > '{time_threshold_str}' AND co.status = 'confirmed' - AND co.id NOT IN ( - SELECT DISTINCT lamassu_transaction_id - FROM myextension.dca_payments - WHERE lamassu_transaction_id IS NOT NULL - ) ORDER BY co.created DESC """ - transactions = await self.execute_ssh_query(db_config, query) + all_transactions = await self.execute_ssh_query(db_config, lamassu_query) - logger.info(f"Found {len(transactions)} new transactions to process since {time_threshold}") - return transactions + # Then filter out already processed transactions using our local database + from .crud import get_all_payments + processed_payments = await get_all_payments() + processed_transaction_ids = { + payment.lamassu_transaction_id + for payment in processed_payments + if payment.lamassu_transaction_id + } + + # Filter out already processed transactions + new_transactions = [ + tx for tx in all_transactions + if tx['transaction_id'] not in processed_transaction_ids + ] + + logger.info(f"Found {len(all_transactions)} total transactions since {time_threshold}, {len(new_transactions)} are new") + return new_transactions except Exception as e: logger.error(f"Error fetching transactions from Lamassu database: {e}")