diff --git a/transaction_processor.py b/transaction_processor.py index dedb7e9..239f484 100644 --- a/transaction_processor.py +++ b/transaction_processor.py @@ -443,9 +443,8 @@ class LamassuTransactionProcessor: # Format as UTC for database query time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S UTC') - # Query for successful cash-out transactions (people selling BTC for fiat) - # These are the transactions that trigger DCA distributions - query = f""" + # First, get all transactions since the threshold from Lamassu database + lamassu_query = f""" SELECT co.id as transaction_id, co.fiat as fiat_amount, @@ -459,18 +458,28 @@ class LamassuTransactionProcessor: FROM cash_out_txs co WHERE co.created > '{time_threshold_str}' AND co.status = 'confirmed' - AND co.id NOT IN ( - SELECT DISTINCT lamassu_transaction_id - FROM myextension.dca_payments - WHERE lamassu_transaction_id IS NOT NULL - ) ORDER BY co.created DESC """ - transactions = await self.execute_ssh_query(db_config, query) + all_transactions = await self.execute_ssh_query(db_config, lamassu_query) - logger.info(f"Found {len(transactions)} new transactions to process since {time_threshold}") - return transactions + # Then filter out already processed transactions using our local database + from .crud import get_all_payments + processed_payments = await get_all_payments() + processed_transaction_ids = { + payment.lamassu_transaction_id + for payment in processed_payments + if payment.lamassu_transaction_id + } + + # Filter out already processed transactions + new_transactions = [ + tx for tx in all_transactions + if tx['transaction_id'] not in processed_transaction_ids + ] + + logger.info(f"Found {len(all_transactions)} total transactions since {time_threshold}, {len(new_transactions)} are new") + return new_transactions except Exception as e: logger.error(f"Error fetching transactions from Lamassu database: {e}")