Refactor transaction fetching logic in LamassuTransactionProcessor: update query to retrieve all transactions since the threshold, filter out already processed transactions using local database, and enhance logging to provide detailed insights on new transactions.
This commit is contained in:
parent
a107f825af
commit
a4c5d11d99
1 changed files with 20 additions and 11 deletions
|
|
@ -443,9 +443,8 @@ class LamassuTransactionProcessor:
|
|||
# Format as UTC for database query
|
||||
time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S UTC')
|
||||
|
||||
# Query for successful cash-out transactions (people selling BTC for fiat)
|
||||
# These are the transactions that trigger DCA distributions
|
||||
query = f"""
|
||||
# First, get all transactions since the threshold from Lamassu database
|
||||
lamassu_query = f"""
|
||||
SELECT
|
||||
co.id as transaction_id,
|
||||
co.fiat as fiat_amount,
|
||||
|
|
@ -459,18 +458,28 @@ class LamassuTransactionProcessor:
|
|||
FROM cash_out_txs co
|
||||
WHERE co.created > '{time_threshold_str}'
|
||||
AND co.status = 'confirmed'
|
||||
AND co.id NOT IN (
|
||||
SELECT DISTINCT lamassu_transaction_id
|
||||
FROM myextension.dca_payments
|
||||
WHERE lamassu_transaction_id IS NOT NULL
|
||||
)
|
||||
ORDER BY co.created DESC
|
||||
"""
|
||||
|
||||
transactions = await self.execute_ssh_query(db_config, query)
|
||||
all_transactions = await self.execute_ssh_query(db_config, lamassu_query)
|
||||
|
||||
logger.info(f"Found {len(transactions)} new transactions to process since {time_threshold}")
|
||||
return transactions
|
||||
# Then filter out already processed transactions using our local database
|
||||
from .crud import get_all_payments
|
||||
processed_payments = await get_all_payments()
|
||||
processed_transaction_ids = {
|
||||
payment.lamassu_transaction_id
|
||||
for payment in processed_payments
|
||||
if payment.lamassu_transaction_id
|
||||
}
|
||||
|
||||
# Filter out already processed transactions
|
||||
new_transactions = [
|
||||
tx for tx in all_transactions
|
||||
if tx['transaction_id'] not in processed_transaction_ids
|
||||
]
|
||||
|
||||
logger.info(f"Found {len(all_transactions)} total transactions since {time_threshold}, {len(new_transactions)} are new")
|
||||
return new_transactions
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching transactions from Lamassu database: {e}")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue