+ Use with caution: This bypasses all dispense status checks and will process the transaction even if dispense_confirmed is false. Only use this for manually settled transactions.
+
+
+
+
+
+
+
+
+
+
+
+ This will:
+
+
Fetch the transaction from Lamassu regardless of dispense status
+
Process it through the normal DCA distribution flow
+
Credit the source wallet and distribute to clients
+
Send commission to the commission wallet (if configured)
+
+
+
+
+
+ Process Transaction
+
+
+ Cancel
+
+
+
+
+
+
{% endblock %}
diff --git a/transaction_processor.py b/transaction_processor.py
index 9a58408..bd6ae77 100644
--- a/transaction_processor.py
+++ b/transaction_processor.py
@@ -560,6 +560,44 @@ class LamassuTransactionProcessor:
logger.error(f"Error executing SSH query: {e}")
return []
+ async def fetch_transaction_by_id(self, db_config: Dict[str, Any], transaction_id: str) -> Optional[Dict[str, Any]]:
+ """Fetch a specific transaction by ID from Lamassu database, bypassing all status filters"""
+ try:
+ logger.info(f"Fetching transaction {transaction_id} from Lamassu database (bypass all filters)")
+
+ # Query for specific transaction ID without any status/dispense filters
+ lamassu_query = f"""
+ SELECT
+ co.id as transaction_id,
+ co.fiat as fiat_amount,
+ co.crypto_atoms as crypto_amount,
+ co.confirmed_at as transaction_time,
+ co.device_id,
+ co.status,
+ co.commission_percentage,
+ co.discount,
+ co.crypto_code,
+ co.fiat_code,
+ co.dispense,
+ co.dispense_confirmed
+ FROM cash_out_txs co
+ WHERE co.id = '{transaction_id}'
+ """
+
+ results = await self.execute_ssh_query(db_config, lamassu_query)
+
+ if not results:
+ logger.warning(f"Transaction {transaction_id} not found in Lamassu database")
+ return None
+
+ transaction = results[0]
+ logger.info(f"Found transaction {transaction_id}: status={transaction.get('status')}, dispense={transaction.get('dispense')}, dispense_confirmed={transaction.get('dispense_confirmed')}")
+ return transaction
+
+ except Exception as e:
+ logger.error(f"Error fetching transaction {transaction_id} from Lamassu database: {e}")
+ return None
+
async def fetch_new_transactions(self, db_config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch new successful transactions from Lamassu database since last poll"""
try:
@@ -573,13 +611,13 @@ class LamassuTransactionProcessor:
# Fallback to last 24 hours for first run or if no previous poll
time_threshold = datetime.now(timezone.utc) - timedelta(hours=24)
logger.info(f"No previous poll found, checking last 24 hours since: {time_threshold}")
-
+
# Convert to UTC if not already timezone-aware
if time_threshold.tzinfo is None:
time_threshold = time_threshold.replace(tzinfo=timezone.utc)
elif time_threshold.tzinfo != timezone.utc:
time_threshold = time_threshold.astimezone(timezone.utc)
-
+
# Format as UTC for database query
time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S UTC')
diff --git a/views_api.py b/views_api.py
index 3700497..42ae53d 100644
--- a/views_api.py
+++ b/views_api.py
@@ -233,6 +233,73 @@ async def api_manual_poll(
)
+@satmachineadmin_api_router.post("/api/v1/dca/process-transaction/{transaction_id}")
+async def api_process_specific_transaction(
+ transaction_id: str,
+ user: User = Depends(check_super_user),
+):
+ """
+ Manually process a specific Lamassu transaction by ID, bypassing all status filters.
+
+ This endpoint is useful for processing transactions that were manually settled
+ or had dispense issues but need to be included in DCA distribution.
+ """
+ try:
+ from .transaction_processor import transaction_processor
+ from .crud import get_payments_by_lamassu_transaction
+
+ # Get database configuration
+ db_config = await transaction_processor.connect_to_lamassu_db()
+ if not db_config:
+ raise HTTPException(
+ status_code=HTTPStatus.SERVICE_UNAVAILABLE,
+ detail="Could not get Lamassu database configuration",
+ )
+
+ # Check if transaction was already processed
+ existing_payments = await get_payments_by_lamassu_transaction(transaction_id)
+ if existing_payments:
+ return {
+ "success": False,
+ "already_processed": True,
+ "message": f"Transaction {transaction_id} was already processed with {len(existing_payments)} distributions",
+ "payment_count": len(existing_payments),
+ }
+
+ # Fetch the specific transaction from Lamassu (bypassing all filters)
+ transaction = await transaction_processor.fetch_transaction_by_id(db_config, transaction_id)
+
+ if not transaction:
+ raise HTTPException(
+ status_code=HTTPStatus.NOT_FOUND,
+ detail=f"Transaction {transaction_id} not found in Lamassu database",
+ )
+
+ # Process the transaction through normal DCA flow
+ await transaction_processor.process_transaction(transaction)
+
+ return {
+ "success": True,
+ "message": f"Transaction {transaction_id} processed successfully",
+ "transaction_details": {
+ "transaction_id": transaction_id,
+ "status": transaction.get("status"),
+ "dispense": transaction.get("dispense"),
+ "dispense_confirmed": transaction.get("dispense_confirmed"),
+ "crypto_amount": transaction.get("crypto_amount"),
+ "fiat_amount": transaction.get("fiat_amount"),
+ },
+ }
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(
+ status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+ detail=f"Error processing transaction {transaction_id}: {str(e)}",
+ )
+
+
@satmachineadmin_api_router.post("/api/v1/dca/test-transaction")
async def api_test_transaction(
user: User = Depends(check_super_user),