+ Use with caution: This bypasses all dispense status checks and will process the transaction even if dispense_confirmed is false. Only use this for manually settled transactions.
+
+
+
+
+
+
+
+
+
+
+
+ This will:
+
+
Fetch the transaction from Lamassu regardless of dispense status
+
Process it through the normal DCA distribution flow
+
Credit the source wallet and distribute to clients
+
Send commission to the commission wallet (if configured)
+
+
+
+
+
+ Process Transaction
+
+
+ Cancel
+
+
+
+
+
+
{% endblock %}
diff --git a/transaction_processor.py b/transaction_processor.py
index 22c4edd..bd6ae77 100644
--- a/transaction_processor.py
+++ b/transaction_processor.py
@@ -493,14 +493,26 @@ class LamassuTransactionProcessor:
# Convert string values to appropriate types
processed_row = {}
for key, value in row.items():
- if value == '':
- processed_row[key] = None
+ # Handle None/empty values consistently at data ingestion boundary
+ if value == '' or value is None:
+ if key in ['fiat_amount', 'crypto_amount']:
+ processed_row[key] = 0 # Default numeric fields to 0
+ elif key in ['commission_percentage', 'discount']:
+ processed_row[key] = 0.0 # Default percentage fields to 0.0
+ else:
+ processed_row[key] = None # Keep None for non-numeric fields
elif key in ['transaction_id', 'device_id', 'crypto_code', 'fiat_code']:
processed_row[key] = str(value)
elif key in ['fiat_amount', 'crypto_amount']:
- processed_row[key] = int(float(value)) if value else 0
+ try:
+ processed_row[key] = int(float(value))
+ except (ValueError, TypeError):
+ processed_row[key] = 0 # Fallback to 0 for invalid values
elif key in ['commission_percentage', 'discount']:
- processed_row[key] = float(value) if value else 0.0
+ try:
+ processed_row[key] = float(value)
+ except (ValueError, TypeError):
+ processed_row[key] = 0.0 # Fallback to 0.0 for invalid values
elif key == 'transaction_time':
from datetime import datetime
# Parse PostgreSQL timestamp format and ensure it's in UTC for consistency
@@ -548,6 +560,44 @@ class LamassuTransactionProcessor:
logger.error(f"Error executing SSH query: {e}")
return []
+ async def fetch_transaction_by_id(self, db_config: Dict[str, Any], transaction_id: str) -> Optional[Dict[str, Any]]:
+ """Fetch a specific transaction by ID from Lamassu database, bypassing all status filters"""
+ try:
+ logger.info(f"Fetching transaction {transaction_id} from Lamassu database (bypass all filters)")
+
+ # Query for specific transaction ID without any status/dispense filters
+ lamassu_query = f"""
+ SELECT
+ co.id as transaction_id,
+ co.fiat as fiat_amount,
+ co.crypto_atoms as crypto_amount,
+ co.confirmed_at as transaction_time,
+ co.device_id,
+ co.status,
+ co.commission_percentage,
+ co.discount,
+ co.crypto_code,
+ co.fiat_code,
+ co.dispense,
+ co.dispense_confirmed
+ FROM cash_out_txs co
+ WHERE co.id = '{transaction_id}'
+ """
+
+ results = await self.execute_ssh_query(db_config, lamassu_query)
+
+ if not results:
+ logger.warning(f"Transaction {transaction_id} not found in Lamassu database")
+ return None
+
+ transaction = results[0]
+ logger.info(f"Found transaction {transaction_id}: status={transaction.get('status')}, dispense={transaction.get('dispense')}, dispense_confirmed={transaction.get('dispense_confirmed')}")
+ return transaction
+
+ except Exception as e:
+ logger.error(f"Error fetching transaction {transaction_id} from Lamassu database: {e}")
+ return None
+
async def fetch_new_transactions(self, db_config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch new successful transactions from Lamassu database since last poll"""
try:
@@ -561,13 +611,13 @@ class LamassuTransactionProcessor:
# Fallback to last 24 hours for first run or if no previous poll
time_threshold = datetime.now(timezone.utc) - timedelta(hours=24)
logger.info(f"No previous poll found, checking last 24 hours since: {time_threshold}")
-
+
# Convert to UTC if not already timezone-aware
if time_threshold.tzinfo is None:
time_threshold = time_threshold.replace(tzinfo=timezone.utc)
elif time_threshold.tzinfo != timezone.utc:
time_threshold = time_threshold.astimezone(timezone.utc)
-
+
# Format as UTC for database query
time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S UTC')
@@ -628,11 +678,11 @@ class LamassuTransactionProcessor:
logger.info("No Flow Mode clients found - skipping distribution")
return {}
- # Extract transaction details with None-safe defaults
- crypto_atoms = transaction.get("crypto_amount") # Total sats with commission baked in
- fiat_amount = transaction.get("fiat_amount") # Actual fiat dispensed (principal only)
- commission_percentage = transaction.get("commission_percentage") # Already stored as decimal (e.g., 0.045)
- discount = transaction.get("discount") # Discount percentage
+ # Extract transaction details - guaranteed clean from data ingestion
+ crypto_atoms = transaction.get("crypto_amount", 0) # Total sats with commission baked in
+ fiat_amount = transaction.get("fiat_amount", 0) # Actual fiat dispensed (principal only)
+ commission_percentage = transaction.get("commission_percentage", 0.0) # Already stored as decimal (e.g., 0.045)
+ discount = transaction.get("discount", 0.0) # Discount percentage
transaction_time = transaction.get("transaction_time") # ATM transaction timestamp for temporal accuracy
# Normalize transaction_time to UTC if present
@@ -837,10 +887,13 @@ class LamassuTransactionProcessor:
logger.error(f"CRITICAL: Client {client_id[:8]}... has negative balance ({current_balance.remaining_balance:.2f} GTQ) - REFUSING payment of {distribution['sats_amount']} sats")
continue
- # Verify balance is sufficient for this distribution
+ # Verify balance is sufficient for this distribution (round to 2 decimal places to match DECIMAL(10,2) precision)
fiat_equivalent = distribution["fiat_amount"] # Amount in GTQ
- if current_balance.remaining_balance < fiat_equivalent:
- logger.error(f"CRITICAL: Client {client_id[:8]}... insufficient balance ({current_balance.remaining_balance:.2f} < {fiat_equivalent:.2f} GTQ) - REFUSING payment")
+ # Round both values to 2 decimal places to match database precision and avoid floating point comparison issues
+ balance_rounded = round(current_balance.remaining_balance, 2)
+ amount_rounded = round(fiat_equivalent, 2)
+ if balance_rounded < amount_rounded:
+ logger.error(f"CRITICAL: Client {client_id[:8]}... insufficient balance ({balance_rounded:.2f} < {amount_rounded:.2f} GTQ) - REFUSING payment")
continue
logger.info(f"Client {client_id[:8]}... pre-payment balance check: {current_balance.remaining_balance:.2f} GTQ - SUFFICIENT for {fiat_equivalent:.2f} GTQ payment")
@@ -995,11 +1048,11 @@ class LamassuTransactionProcessor:
async def store_lamassu_transaction(self, transaction: Dict[str, Any]) -> Optional[str]:
"""Store the Lamassu transaction in our database for audit and UI"""
try:
- # Extract and validate transaction data
+ # Extract transaction data - guaranteed clean from data ingestion boundary
crypto_atoms = transaction.get("crypto_amount", 0)
fiat_amount = transaction.get("fiat_amount", 0)
- commission_percentage = transaction.get("commission_percentage") or 0.0
- discount = transaction.get("discount") or 0.0
+ commission_percentage = transaction.get("commission_percentage", 0.0)
+ discount = transaction.get("discount", 0.0)
transaction_time = transaction.get("transaction_time")
# Normalize transaction_time to UTC if present
@@ -1145,8 +1198,8 @@ class LamassuTransactionProcessor:
# Calculate commission amount for sending to commission wallet
crypto_atoms = transaction.get("crypto_amount", 0)
- commission_percentage = transaction.get("commission_percentage") or 0.0
- discount = transaction.get("discount") or 0.0
+ commission_percentage = transaction.get("commission_percentage", 0.0)
+ discount = transaction.get("discount", 0.0)
if commission_percentage and commission_percentage > 0:
effective_commission = commission_percentage * (100 - discount) / 100
diff --git a/views_api.py b/views_api.py
index 3700497..a4a8b94 100644
--- a/views_api.py
+++ b/views_api.py
@@ -233,69 +233,139 @@ async def api_manual_poll(
)
-@satmachineadmin_api_router.post("/api/v1/dca/test-transaction")
-async def api_test_transaction(
+@satmachineadmin_api_router.post("/api/v1/dca/process-transaction/{transaction_id}")
+async def api_process_specific_transaction(
+ transaction_id: str,
user: User = Depends(check_super_user),
- crypto_atoms: int = 103,
- commission_percentage: float = 0.03,
- discount: float = 0.0,
-) -> dict:
- """Test transaction processing with simulated Lamassu transaction data"""
+):
+ """
+ Manually process a specific Lamassu transaction by ID, bypassing all status filters.
+
+ This endpoint is useful for processing transactions that were manually settled
+ or had dispense issues but need to be included in DCA distribution.
+ """
try:
from .transaction_processor import transaction_processor
- import uuid
- from datetime import datetime, timezone
+ from .crud import get_payments_by_lamassu_transaction
- # Create a mock transaction that mimics Lamassu database structure
- mock_transaction = {
- "transaction_id": str(uuid.uuid4())[:8], # Short ID for testing
- "crypto_amount": crypto_atoms, # Total sats including commission
- "fiat_amount": 100, # Mock fiat amount (100 centavos = 1 GTQ)
- "commission_percentage": commission_percentage, # Already as decimal
- "discount": discount,
- "transaction_time": datetime.now(timezone.utc),
- "crypto_code": "BTC",
- "fiat_code": "GTQ",
- "device_id": "test_device",
- "status": "confirmed",
- }
+ # Get database configuration
+ db_config = await transaction_processor.connect_to_lamassu_db()
+ if not db_config:
+ raise HTTPException(
+ status_code=HTTPStatus.SERVICE_UNAVAILABLE,
+ detail="Could not get Lamassu database configuration",
+ )
- # Process the mock transaction through the complete DCA flow
- await transaction_processor.process_transaction(mock_transaction)
+ # Check if transaction was already processed
+ existing_payments = await get_payments_by_lamassu_transaction(transaction_id)
+ if existing_payments:
+ return {
+ "success": False,
+ "already_processed": True,
+ "message": f"Transaction {transaction_id} was already processed with {len(existing_payments)} distributions",
+ "payment_count": len(existing_payments),
+ }
- # Calculate commission for response
- if commission_percentage > 0:
- effective_commission = commission_percentage * (100 - discount) / 100
- base_crypto_atoms = int(crypto_atoms / (1 + effective_commission))
- commission_amount_sats = crypto_atoms - base_crypto_atoms
- else:
- base_crypto_atoms = crypto_atoms
- commission_amount_sats = 0
+ # Fetch the specific transaction from Lamassu (bypassing all filters)
+ transaction = await transaction_processor.fetch_transaction_by_id(db_config, transaction_id)
+
+ if not transaction:
+ raise HTTPException(
+ status_code=HTTPStatus.NOT_FOUND,
+ detail=f"Transaction {transaction_id} not found in Lamassu database",
+ )
+
+ # Process the transaction through normal DCA flow
+ await transaction_processor.process_transaction(transaction)
return {
"success": True,
- "message": "Test transaction processed successfully",
+ "message": f"Transaction {transaction_id} processed successfully",
"transaction_details": {
- "transaction_id": mock_transaction["transaction_id"],
- "total_amount_sats": crypto_atoms,
- "base_amount_sats": base_crypto_atoms,
- "commission_amount_sats": commission_amount_sats,
- "commission_percentage": commission_percentage
- * 100, # Show as percentage
- "effective_commission": effective_commission * 100
- if commission_percentage > 0
- else 0,
- "discount": discount,
+ "transaction_id": transaction_id,
+ "status": transaction.get("status"),
+ "dispense": transaction.get("dispense"),
+ "dispense_confirmed": transaction.get("dispense_confirmed"),
+ "crypto_amount": transaction.get("crypto_amount"),
+ "fiat_amount": transaction.get("fiat_amount"),
},
}
+ except HTTPException:
+ raise
except Exception as e:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
- detail=f"Error processing test transaction: {str(e)}",
+ detail=f"Error processing transaction {transaction_id}: {str(e)}",
)
+# COMMENTED OUT FOR PRODUCTION - Test transaction endpoint disabled
+# Uncomment only for development/debugging purposes
+#
+# @satmachineadmin_api_router.post("/api/v1/dca/test-transaction")
+# async def api_test_transaction(
+# user: User = Depends(check_super_user),
+# crypto_atoms: int = 103,
+# commission_percentage: float = 0.03,
+# discount: float = 0.0,
+# ) -> dict:
+# """Test transaction processing with simulated Lamassu transaction data"""
+# try:
+# from .transaction_processor import transaction_processor
+# import uuid
+# from datetime import datetime, timezone
+#
+# # Create a mock transaction that mimics Lamassu database structure
+# mock_transaction = {
+# "transaction_id": str(uuid.uuid4())[:8], # Short ID for testing
+# "crypto_amount": crypto_atoms, # Total sats including commission
+# "fiat_amount": 100, # Mock fiat amount (100 centavos = 1 GTQ)
+# "commission_percentage": commission_percentage, # Already as decimal
+# "discount": discount,
+# "transaction_time": datetime.now(timezone.utc),
+# "crypto_code": "BTC",
+# "fiat_code": "GTQ",
+# "device_id": "test_device",
+# "status": "confirmed",
+# }
+#
+# # Process the mock transaction through the complete DCA flow
+# await transaction_processor.process_transaction(mock_transaction)
+#
+# # Calculate commission for response
+# if commission_percentage > 0:
+# effective_commission = commission_percentage * (100 - discount) / 100
+# base_crypto_atoms = int(crypto_atoms / (1 + effective_commission))
+# commission_amount_sats = crypto_atoms - base_crypto_atoms
+# else:
+# base_crypto_atoms = crypto_atoms
+# commission_amount_sats = 0
+#
+# return {
+# "success": True,
+# "message": "Test transaction processed successfully",
+# "transaction_details": {
+# "transaction_id": mock_transaction["transaction_id"],
+# "total_amount_sats": crypto_atoms,
+# "base_amount_sats": base_crypto_atoms,
+# "commission_amount_sats": commission_amount_sats,
+# "commission_percentage": commission_percentage
+# * 100, # Show as percentage
+# "effective_commission": effective_commission * 100
+# if commission_percentage > 0
+# else 0,
+# "discount": discount,
+# },
+# }
+#
+# except Exception as e:
+# raise HTTPException(
+# status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+# detail=f"Error processing test transaction: {str(e)}",
+# )
+
+
# Lamassu Transaction Endpoints