# Description: This file contains the CRUD operations for talking to the database. from typing import List, Optional, Union from datetime import datetime, timezone from lnbits.db import Database from lnbits.helpers import urlsafe_short_hash from .models import ( CreateDcaClientData, DcaClient, UpdateDcaClientData, CreateDepositData, DcaDeposit, UpdateDepositStatusData, CreateDcaPaymentData, DcaPayment, ClientBalanceSummary, CreateLamassuConfigData, LamassuConfig, UpdateLamassuConfigData, CreateLamassuTransactionData, StoredLamassuTransaction ) db = Database("ext_satoshimachine") # DCA Client CRUD Operations async def create_dca_client(data: CreateDcaClientData) -> DcaClient: client_id = urlsafe_short_hash() await db.execute( """ INSERT INTO satoshimachine.dca_clients (id, user_id, wallet_id, username, dca_mode, fixed_mode_daily_limit, status, created_at, updated_at) VALUES (:id, :user_id, :wallet_id, :username, :dca_mode, :fixed_mode_daily_limit, :status, :created_at, :updated_at) """, { "id": client_id, "user_id": data.user_id, "wallet_id": data.wallet_id, "username": data.username, "dca_mode": data.dca_mode, "fixed_mode_daily_limit": data.fixed_mode_daily_limit, "status": "active", "created_at": datetime.now(), "updated_at": datetime.now() } ) return await get_dca_client(client_id) async def get_dca_client(client_id: str) -> Optional[DcaClient]: return await db.fetchone( "SELECT * FROM satoshimachine.dca_clients WHERE id = :id", {"id": client_id}, DcaClient, ) async def get_dca_clients() -> List[DcaClient]: return await db.fetchall( "SELECT * FROM satoshimachine.dca_clients ORDER BY created_at DESC", model=DcaClient, ) async def get_dca_client_by_user(user_id: str) -> Optional[DcaClient]: return await db.fetchone( "SELECT * FROM satoshimachine.dca_clients WHERE user_id = :user_id", {"user_id": user_id}, DcaClient, ) async def update_dca_client(client_id: str, data: UpdateDcaClientData) -> Optional[DcaClient]: update_data = {k: v for k, v in data.dict().items() if v is not None} if not update_data: return await get_dca_client(client_id) update_data["updated_at"] = datetime.now() set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()]) update_data["id"] = client_id await db.execute( f"UPDATE satoshimachine.dca_clients SET {set_clause} WHERE id = :id", update_data ) return await get_dca_client(client_id) async def delete_dca_client(client_id: str) -> None: await db.execute( "DELETE FROM satoshimachine.dca_clients WHERE id = :id", {"id": client_id} ) # DCA Deposit CRUD Operations async def create_deposit(data: CreateDepositData) -> DcaDeposit: deposit_id = urlsafe_short_hash() await db.execute( """ INSERT INTO satoshimachine.dca_deposits (id, client_id, amount, currency, status, notes, created_at) VALUES (:id, :client_id, :amount, :currency, :status, :notes, :created_at) """, { "id": deposit_id, "client_id": data.client_id, "amount": data.amount, "currency": data.currency, "status": "pending", "notes": data.notes, "created_at": datetime.now() } ) return await get_deposit(deposit_id) async def get_deposit(deposit_id: str) -> Optional[DcaDeposit]: return await db.fetchone( "SELECT * FROM satoshimachine.dca_deposits WHERE id = :id", {"id": deposit_id}, DcaDeposit, ) async def get_deposits_by_client(client_id: str) -> List[DcaDeposit]: return await db.fetchall( "SELECT * FROM satoshimachine.dca_deposits WHERE client_id = :client_id ORDER BY created_at DESC", {"client_id": client_id}, DcaDeposit, ) async def get_all_deposits() -> List[DcaDeposit]: return await db.fetchall( "SELECT * FROM satoshimachine.dca_deposits ORDER BY created_at DESC", model=DcaDeposit, ) async def update_deposit_status(deposit_id: str, data: UpdateDepositStatusData) -> Optional[DcaDeposit]: update_data = { "status": data.status, "notes": data.notes } if data.status == "confirmed": update_data["confirmed_at"] = datetime.now() set_clause = ", ".join([f"{k} = :{k}" for k, v in update_data.items() if v is not None]) filtered_data = {k: v for k, v in update_data.items() if v is not None} filtered_data["id"] = deposit_id await db.execute( f"UPDATE satoshimachine.dca_deposits SET {set_clause} WHERE id = :id", filtered_data ) return await get_deposit(deposit_id) # DCA Payment CRUD Operations async def create_dca_payment(data: CreateDcaPaymentData) -> DcaPayment: payment_id = urlsafe_short_hash() await db.execute( """ INSERT INTO satoshimachine.dca_payments (id, client_id, amount_sats, amount_fiat, exchange_rate, transaction_type, lamassu_transaction_id, payment_hash, status, created_at, transaction_time) VALUES (:id, :client_id, :amount_sats, :amount_fiat, :exchange_rate, :transaction_type, :lamassu_transaction_id, :payment_hash, :status, :created_at, :transaction_time) """, { "id": payment_id, "client_id": data.client_id, "amount_sats": data.amount_sats, "amount_fiat": data.amount_fiat, "exchange_rate": data.exchange_rate, "transaction_type": data.transaction_type, "lamassu_transaction_id": data.lamassu_transaction_id, "payment_hash": data.payment_hash, "status": "pending", "created_at": datetime.now(), "transaction_time": data.transaction_time } ) return await get_dca_payment(payment_id) async def get_dca_payment(payment_id: str) -> Optional[DcaPayment]: return await db.fetchone( "SELECT * FROM satoshimachine.dca_payments WHERE id = :id", {"id": payment_id}, DcaPayment, ) async def get_payments_by_client(client_id: str) -> List[DcaPayment]: return await db.fetchall( "SELECT * FROM satoshimachine.dca_payments WHERE client_id = :client_id ORDER BY created_at DESC", {"client_id": client_id}, DcaPayment, ) async def get_all_payments() -> List[DcaPayment]: return await db.fetchall( "SELECT * FROM satoshimachine.dca_payments ORDER BY created_at DESC", model=DcaPayment, ) async def update_dca_payment_status(payment_id: str, status: str) -> None: """Update the status of a DCA payment""" await db.execute( "UPDATE satoshimachine.dca_payments SET status = :status WHERE id = :id", {"status": status, "id": payment_id} ) async def get_payments_by_lamassu_transaction(lamassu_transaction_id: str) -> List[DcaPayment]: return await db.fetchall( "SELECT * FROM satoshimachine.dca_payments WHERE lamassu_transaction_id = :transaction_id", {"transaction_id": lamassu_transaction_id}, DcaPayment, ) # Balance and Summary Operations async def get_client_balance_summary(client_id: str, as_of_time: Optional[datetime] = None) -> ClientBalanceSummary: """Get client balance summary, optionally as of a specific point in time""" # Build time filter for temporal accuracy time_filter = "" params = {"client_id": client_id} if as_of_time is not None: time_filter = "AND confirmed_at <= :as_of_time" params["as_of_time"] = as_of_time # Get total confirmed deposits (only those confirmed before the cutoff time) total_deposits_result = await db.fetchone( f""" SELECT COALESCE(SUM(amount), 0) as total, currency FROM satoshimachine.dca_deposits WHERE client_id = :client_id AND status = 'confirmed' {time_filter} GROUP BY currency """, params ) # Get total payments made (only those with ATM transaction time before the cutoff) # Use transaction_time instead of created_at for temporal accuracy payment_time_filter = "" if as_of_time is not None: payment_time_filter = "AND transaction_time <= :as_of_time" total_payments_result = await db.fetchone( f""" SELECT COALESCE(SUM(amount_fiat), 0) as total FROM satoshimachine.dca_payments WHERE client_id = :client_id AND status = 'confirmed' {payment_time_filter} """, params ) total_deposits = total_deposits_result["total"] if total_deposits_result else 0 total_payments = total_payments_result["total"] if total_payments_result else 0 currency = total_deposits_result["currency"] if total_deposits_result else "GTQ" # Log temporal filtering if as_of_time was used if as_of_time is not None: from loguru import logger # Verify timezone consistency for temporal filtering tz_info = "UTC" if as_of_time.tzinfo == timezone.utc else f"TZ: {as_of_time.tzinfo}" logger.info(f"Client {client_id[:8]}... balance as of {as_of_time} ({tz_info}): deposits.confirmed_at <= cutoff, payments.transaction_time <= cutoff → {total_deposits - total_payments:.2f} GTQ remaining") return ClientBalanceSummary( client_id=client_id, total_deposits=total_deposits, total_payments=total_payments, remaining_balance=total_deposits - total_payments, currency=currency ) async def get_flow_mode_clients() -> List[DcaClient]: return await db.fetchall( "SELECT * FROM satoshimachine.dca_clients WHERE dca_mode = 'flow' AND status = 'active'", model=DcaClient, ) async def get_fixed_mode_clients() -> List[DcaClient]: return await db.fetchall( "SELECT * FROM satoshimachine.dca_clients WHERE dca_mode = 'fixed' AND status = 'active'", model=DcaClient, ) # Lamassu Configuration CRUD Operations async def create_lamassu_config(data: CreateLamassuConfigData) -> LamassuConfig: config_id = urlsafe_short_hash() # Deactivate any existing configs first (only one active config allowed) await db.execute( "UPDATE satoshimachine.lamassu_config SET is_active = false, updated_at = :updated_at", {"updated_at": datetime.now()} ) await db.execute( """ INSERT INTO satoshimachine.lamassu_config (id, host, port, database_name, username, password, source_wallet_id, commission_wallet_id, is_active, created_at, updated_at, use_ssh_tunnel, ssh_host, ssh_port, ssh_username, ssh_password, ssh_private_key, max_daily_limit_gtq) VALUES (:id, :host, :port, :database_name, :username, :password, :source_wallet_id, :commission_wallet_id, :is_active, :created_at, :updated_at, :use_ssh_tunnel, :ssh_host, :ssh_port, :ssh_username, :ssh_password, :ssh_private_key, :max_daily_limit_gtq) """, { "id": config_id, "host": data.host, "port": data.port, "database_name": data.database_name, "username": data.username, "password": data.password, "source_wallet_id": data.source_wallet_id, "commission_wallet_id": data.commission_wallet_id, "is_active": True, "created_at": datetime.now(), "updated_at": datetime.now(), "use_ssh_tunnel": data.use_ssh_tunnel, "ssh_host": data.ssh_host, "ssh_port": data.ssh_port, "ssh_username": data.ssh_username, "ssh_password": data.ssh_password, "ssh_private_key": data.ssh_private_key, "max_daily_limit_gtq": data.max_daily_limit_gtq } ) return await get_lamassu_config(config_id) async def get_lamassu_config(config_id: str) -> Optional[LamassuConfig]: return await db.fetchone( "SELECT * FROM satoshimachine.lamassu_config WHERE id = :id", {"id": config_id}, LamassuConfig, ) async def get_active_lamassu_config() -> Optional[LamassuConfig]: return await db.fetchone( "SELECT * FROM satoshimachine.lamassu_config WHERE is_active = true ORDER BY created_at DESC LIMIT 1", model=LamassuConfig, ) async def get_all_lamassu_configs() -> List[LamassuConfig]: return await db.fetchall( "SELECT * FROM satoshimachine.lamassu_config ORDER BY created_at DESC", model=LamassuConfig, ) async def update_lamassu_config(config_id: str, data: UpdateLamassuConfigData) -> Optional[LamassuConfig]: update_data = {k: v for k, v in data.dict().items() if v is not None} if not update_data: return await get_lamassu_config(config_id) update_data["updated_at"] = datetime.now() set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()]) update_data["id"] = config_id await db.execute( f"UPDATE satoshimachine.lamassu_config SET {set_clause} WHERE id = :id", update_data ) return await get_lamassu_config(config_id) async def update_config_test_result(config_id: str, success: bool) -> None: utc_now = datetime.now(timezone.utc) await db.execute( """ UPDATE satoshimachine.lamassu_config SET test_connection_last = :test_time, test_connection_success = :success, updated_at = :updated_at WHERE id = :id """, { "id": config_id, "test_time": utc_now, "success": success, "updated_at": utc_now } ) async def delete_lamassu_config(config_id: str) -> None: await db.execute( "DELETE FROM satoshimachine.lamassu_config WHERE id = :id", {"id": config_id} ) async def update_poll_start_time(config_id: str) -> None: """Update the last poll start time""" utc_now = datetime.now(timezone.utc) await db.execute( """ UPDATE satoshimachine.lamassu_config SET last_poll_time = :poll_time, updated_at = :updated_at WHERE id = :id """, { "id": config_id, "poll_time": utc_now, "updated_at": utc_now } ) async def update_poll_success_time(config_id: str) -> None: """Update the last successful poll time""" utc_now = datetime.now(timezone.utc) await db.execute( """ UPDATE satoshimachine.lamassu_config SET last_successful_poll = :poll_time, updated_at = :updated_at WHERE id = :id """, { "id": config_id, "poll_time": utc_now, "updated_at": utc_now } ) # Lamassu Transaction Storage CRUD Operations async def create_lamassu_transaction(data: CreateLamassuTransactionData) -> StoredLamassuTransaction: """Store a processed Lamassu transaction""" transaction_id = urlsafe_short_hash() await db.execute( """ INSERT INTO satoshimachine.lamassu_transactions (id, lamassu_transaction_id, fiat_amount, crypto_amount, commission_percentage, discount, effective_commission, commission_amount_sats, base_amount_sats, exchange_rate, crypto_code, fiat_code, device_id, transaction_time, processed_at, clients_count, distributions_total_sats) VALUES (:id, :lamassu_transaction_id, :fiat_amount, :crypto_amount, :commission_percentage, :discount, :effective_commission, :commission_amount_sats, :base_amount_sats, :exchange_rate, :crypto_code, :fiat_code, :device_id, :transaction_time, :processed_at, :clients_count, :distributions_total_sats) """, { "id": transaction_id, "lamassu_transaction_id": data.lamassu_transaction_id, "fiat_amount": data.fiat_amount, "crypto_amount": data.crypto_amount, "commission_percentage": data.commission_percentage, "discount": data.discount, "effective_commission": data.effective_commission, "commission_amount_sats": data.commission_amount_sats, "base_amount_sats": data.base_amount_sats, "exchange_rate": data.exchange_rate, "crypto_code": data.crypto_code, "fiat_code": data.fiat_code, "device_id": data.device_id, "transaction_time": data.transaction_time, "processed_at": datetime.now(), "clients_count": 0, # Will be updated after distributions "distributions_total_sats": 0 # Will be updated after distributions } ) return await get_lamassu_transaction(transaction_id) async def get_lamassu_transaction(transaction_id: str) -> Optional[StoredLamassuTransaction]: """Get a stored Lamassu transaction by ID""" return await db.fetchone( "SELECT * FROM satoshimachine.lamassu_transactions WHERE id = :id", {"id": transaction_id}, StoredLamassuTransaction, ) async def get_lamassu_transaction_by_lamassu_id(lamassu_transaction_id: str) -> Optional[StoredLamassuTransaction]: """Get a stored Lamassu transaction by Lamassu transaction ID""" return await db.fetchone( "SELECT * FROM satoshimachine.lamassu_transactions WHERE lamassu_transaction_id = :lamassu_id", {"lamassu_id": lamassu_transaction_id}, StoredLamassuTransaction, ) async def get_all_lamassu_transactions() -> List[StoredLamassuTransaction]: """Get all stored Lamassu transactions""" return await db.fetchall( "SELECT * FROM satoshimachine.lamassu_transactions ORDER BY transaction_time DESC", model=StoredLamassuTransaction, ) async def update_lamassu_transaction_distribution_stats( transaction_id: str, clients_count: int, distributions_total_sats: int ) -> None: """Update distribution statistics for a Lamassu transaction""" await db.execute( """ UPDATE satoshimachine.lamassu_transactions SET clients_count = :clients_count, distributions_total_sats = :distributions_total_sats WHERE id = :id """, { "clients_count": clients_count, "distributions_total_sats": distributions_total_sats, "id": transaction_id } )