From 1570e7ba959eeae7a3b81812d882d58106829265 Mon Sep 17 00:00:00 2001 From: padreug Date: Sun, 22 Jun 2025 12:32:10 +0200 Subject: [PATCH] Refactor CRUD operations for client extension: Update database interactions to focus on the admin extension, streamline client dashboard functionalities, and enhance transaction history retrieval. Remove legacy DCA client references and improve code organization for better maintainability. --- crud.py | 709 ++++++++++++++++++++++---------------------------------- 1 file changed, 280 insertions(+), 429 deletions(-) diff --git a/crud.py b/crud.py index 1103794..072ab23 100644 --- a/crud.py +++ b/crud.py @@ -1,453 +1,304 @@ -# Description: This file contains the CRUD operations for talking to the database. +# Description: Client extension CRUD operations - reads from admin extension database -from typing import List, Optional, Union -from datetime import datetime, timezone +from typing import List, Optional +from datetime import datetime, timedelta from lnbits.db import Database -from lnbits.helpers import urlsafe_short_hash -from .models import () +from .models import ( + ClientDashboardSummary, + ClientTransaction, + ClientAnalytics, + UpdateClientSettings, +) -db = Database("ext_satmachineclient") +# Connect to admin extension's database +db = Database("ext_satmachineadmin") -async def get_dca_client(client_id: str) -> Optional[DcaClient]: - return await db.fetchone( - "SELECT * FROM satmachineclient.dca_clients WHERE id = :id", - {"id": client_id}, - DcaClient, - ) +################################################### +############## CLIENT DASHBOARD CRUD ############## +################################################### - -async def get_dca_client_by_user(user_id: str) -> Optional[DcaClient]: - return await db.fetchone( - "SELECT * FROM satmachineclient.dca_clients WHERE user_id = :user_id", - {"user_id": user_id}, - DcaClient, - ) - - -async def update_dca_client(client_id: str, data: UpdateDcaClientData) -> Optional[DcaClient]: - update_data = {k: v for k, v in data.dict().items() if v is not None} - if not update_data: - return await get_dca_client(client_id) +async def get_client_dashboard_summary(user_id: str) -> Optional[ClientDashboardSummary]: + """Get dashboard summary for a specific user""" - update_data["updated_at"] = datetime.now() - set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()]) - update_data["id"] = client_id + # Get client info + client = await db.fetchone( + "SELECT * FROM satmachineadmin.dca_clients WHERE user_id = :user_id", + {"user_id": user_id} + ) - await db.execute( - f"UPDATE satmachineclient.dca_clients SET {set_clause} WHERE id = :id", - update_data - ) - return await get_dca_client(client_id) - - -async def delete_dca_client(client_id: str) -> None: - await db.execute( - "DELETE FROM satmachineclient.dca_clients WHERE id = :id", - {"id": client_id} - ) - - -# DCA Deposit CRUD Operations -async def create_deposit(data: CreateDepositData) -> DcaDeposit: - deposit_id = urlsafe_short_hash() - await db.execute( + if not client: + return None + + # Get total sats accumulated + sats_result = await db.fetchone( """ - INSERT INTO satmachineclient.dca_deposits - (id, client_id, amount, currency, status, notes, created_at) - VALUES (:id, :client_id, :amount, :currency, :status, :notes, :created_at) + SELECT COALESCE(SUM(amount_sats), 0) as total_sats + FROM satmachineadmin.dca_payments + WHERE client_id = :client_id AND status = 'confirmed' """, + {"client_id": client["id"]} + ) + + # Get total fiat invested + fiat_result = await db.fetchone( + """ + SELECT COALESCE(SUM(amount_fiat), 0) as total_fiat + FROM satmachineadmin.dca_payments + WHERE client_id = :client_id AND status = 'confirmed' + """, + {"client_id": client["id"]} + ) + + # Get current fiat balance (deposits - payments) + balance_result = await db.fetchone( + """ + SELECT + COALESCE(SUM(CASE WHEN d.status = 'confirmed' THEN d.amount ELSE 0 END), 0) as deposits, + COALESCE(SUM(CASE WHEN p.status = 'confirmed' THEN p.amount_fiat ELSE 0 END), 0) as payments + FROM satmachineadmin.dca_clients c + LEFT JOIN satmachineadmin.dca_deposits d ON c.id = d.client_id + LEFT JOIN satmachineadmin.dca_payments p ON c.id = p.client_id + WHERE c.id = :client_id + """, + {"client_id": client["id"]} + ) + + # Get transaction count and last transaction date + tx_stats = await db.fetchone( + """ + SELECT + COUNT(*) as tx_count, + MAX(created_at) as last_tx_date + FROM satmachineadmin.dca_payments + WHERE client_id = :client_id AND status = 'confirmed' + """, + {"client_id": client["id"]} + ) + + total_sats = sats_result["total_sats"] if sats_result else 0 + total_fiat = fiat_result["total_fiat"] if fiat_result else 0 + deposits = balance_result["deposits"] if balance_result else 0 + payments = balance_result["payments"] if balance_result else 0 + + # Calculate average cost basis (sats per fiat unit) + avg_cost_basis = total_sats / total_fiat if total_fiat > 0 else 0 + + return ClientDashboardSummary( + user_id=user_id, + total_sats_accumulated=total_sats, + total_fiat_invested=total_fiat, + average_cost_basis=avg_cost_basis, + current_fiat_balance=deposits - payments, + total_transactions=tx_stats["tx_count"] if tx_stats else 0, + dca_mode=client["dca_mode"], + dca_status=client["status"], + last_transaction_date=tx_stats["last_tx_date"] if tx_stats else None + ) + + +async def get_client_transactions( + user_id: str, + limit: int = 50, + offset: int = 0, + transaction_type: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None +) -> List[ClientTransaction]: + """Get client's transaction history with filtering""" + + # Get client ID first + client = await db.fetchone( + "SELECT id FROM satmachineadmin.dca_clients WHERE user_id = :user_id", + {"user_id": user_id} + ) + + if not client: + return [] + + # Build query with filters + where_conditions = ["client_id = :client_id"] + params = {"client_id": client["id"], "limit": limit, "offset": offset} + + if transaction_type: + where_conditions.append("transaction_type = :transaction_type") + params["transaction_type"] = transaction_type + + if start_date: + where_conditions.append("created_at >= :start_date") + params["start_date"] = start_date + + if end_date: + where_conditions.append("created_at <= :end_date") + params["end_date"] = end_date + + where_clause = " AND ".join(where_conditions) + + transactions = await db.fetchall( + f""" + SELECT id, amount_sats, amount_fiat, exchange_rate, transaction_type, + status, created_at, lamassu_transaction_id + FROM satmachineadmin.dca_payments + WHERE {where_clause} + ORDER BY created_at DESC + LIMIT :limit OFFSET :offset + """, + params + ) + + return [ + ClientTransaction( + id=tx["id"], + amount_sats=tx["amount_sats"], + amount_fiat=tx["amount_fiat"], + exchange_rate=tx["exchange_rate"], + transaction_type=tx["transaction_type"], + status=tx["status"], + created_at=tx["created_at"], + lamassu_transaction_id=tx["lamassu_transaction_id"] + ) + for tx in transactions + ] + + +async def get_client_analytics(user_id: str, time_range: str = "30d") -> Optional[ClientAnalytics]: + """Get client performance analytics""" + + # Get client ID + client = await db.fetchone( + "SELECT id FROM satmachineadmin.dca_clients WHERE user_id = :user_id", + {"user_id": user_id} + ) + + if not client: + return None + + # Calculate date range + if time_range == "7d": + start_date = datetime.now() - timedelta(days=7) + elif time_range == "30d": + start_date = datetime.now() - timedelta(days=30) + elif time_range == "90d": + start_date = datetime.now() - timedelta(days=90) + elif time_range == "1y": + start_date = datetime.now() - timedelta(days=365) + else: # "all" + start_date = datetime(2020, 1, 1) # Arbitrary early date + + # Get cost basis history (running average) + cost_basis_data = await db.fetchall( + """ + SELECT + created_at, + amount_sats, + amount_fiat, + exchange_rate, + SUM(amount_sats) OVER (ORDER BY created_at) as cumulative_sats, + SUM(amount_fiat) OVER (ORDER BY created_at) as cumulative_fiat + FROM satmachineadmin.dca_payments + WHERE client_id = :client_id + AND status = 'confirmed' + AND created_at >= :start_date + ORDER BY created_at + """, + {"client_id": client["id"], "start_date": start_date} + ) + + # Build cost basis history + cost_basis_history = [] + for record in cost_basis_data: + avg_cost_basis = record["cumulative_sats"] / record["cumulative_fiat"] if record["cumulative_fiat"] > 0 else 0 + cost_basis_history.append({ + "date": record["created_at"].isoformat(), + "average_cost_basis": avg_cost_basis, + "cumulative_sats": record["cumulative_sats"], + "cumulative_fiat": record["cumulative_fiat"] + }) + + # Get accumulation timeline (daily/weekly aggregation) + accumulation_data = await db.fetchall( + """ + SELECT + DATE(created_at) as date, + SUM(amount_sats) as daily_sats, + SUM(amount_fiat) as daily_fiat, + COUNT(*) as daily_transactions + FROM satmachineadmin.dca_payments + WHERE client_id = :client_id + AND status = 'confirmed' + AND created_at >= :start_date + GROUP BY DATE(created_at) + ORDER BY date + """, + {"client_id": client["id"], "start_date": start_date} + ) + + accumulation_timeline = [ { - "id": deposit_id, - "client_id": data.client_id, - "amount": data.amount, - "currency": data.currency, - "status": "pending", - "notes": data.notes, - "created_at": datetime.now() + "date": record["date"], + "sats": record["daily_sats"], + "fiat": record["daily_fiat"], + "transactions": record["daily_transactions"] } + for record in accumulation_data + ] + + # Get transaction frequency metrics + frequency_stats = await db.fetchone( + """ + SELECT + COUNT(*) as total_transactions, + AVG(amount_sats) as avg_sats_per_tx, + AVG(amount_fiat) as avg_fiat_per_tx, + MIN(created_at) as first_tx, + MAX(created_at) as last_tx + FROM satmachineadmin.dca_payments + WHERE client_id = :client_id AND status = 'confirmed' + """, + {"client_id": client["id"]} ) - return await get_deposit(deposit_id) - - -async def get_deposit(deposit_id: str) -> Optional[DcaDeposit]: - return await db.fetchone( - "SELECT * FROM satmachineclient.dca_deposits WHERE id = :id", - {"id": deposit_id}, - DcaDeposit, - ) - - -async def get_deposits_by_client(client_id: str) -> List[DcaDeposit]: - return await db.fetchall( - "SELECT * FROM satmachineclient.dca_deposits WHERE client_id = :client_id ORDER BY created_at DESC", - {"client_id": client_id}, - DcaDeposit, - ) - - -async def get_all_deposits() -> List[DcaDeposit]: - return await db.fetchall( - "SELECT * FROM satmachineclient.dca_deposits ORDER BY created_at DESC", - model=DcaDeposit, - ) - - -async def update_deposit_status(deposit_id: str, data: UpdateDepositStatusData) -> Optional[DcaDeposit]: - update_data = { - "status": data.status, - "notes": data.notes + + transaction_frequency = { + "total_transactions": frequency_stats["total_transactions"] if frequency_stats else 0, + "avg_sats_per_transaction": frequency_stats["avg_sats_per_tx"] if frequency_stats else 0, + "avg_fiat_per_transaction": frequency_stats["avg_fiat_per_tx"] if frequency_stats else 0, + "first_transaction": frequency_stats["first_tx"].isoformat() if frequency_stats and frequency_stats["first_tx"] else None, + "last_transaction": frequency_stats["last_tx"].isoformat() if frequency_stats and frequency_stats["last_tx"] else None } - if data.status == "confirmed": - update_data["confirmed_at"] = datetime.now() - - set_clause = ", ".join([f"{k} = :{k}" for k, v in update_data.items() if v is not None]) - filtered_data = {k: v for k, v in update_data.items() if v is not None} - filtered_data["id"] = deposit_id - - await db.execute( - f"UPDATE satmachineclient.dca_deposits SET {set_clause} WHERE id = :id", - filtered_data + return ClientAnalytics( + user_id=user_id, + cost_basis_history=cost_basis_history, + accumulation_timeline=accumulation_timeline, + transaction_frequency=transaction_frequency ) - return await get_deposit(deposit_id) -# DCA Payment CRUD Operations -async def create_dca_payment(data: CreateDcaPaymentData) -> DcaPayment: - payment_id = urlsafe_short_hash() - await db.execute( - """ - INSERT INTO satmachineclient.dca_payments - (id, client_id, amount_sats, amount_fiat, exchange_rate, transaction_type, - lamassu_transaction_id, payment_hash, status, created_at) - VALUES (:id, :client_id, :amount_sats, :amount_fiat, :exchange_rate, :transaction_type, - :lamassu_transaction_id, :payment_hash, :status, :created_at) - """, - { - "id": payment_id, - "client_id": data.client_id, - "amount_sats": data.amount_sats, - "amount_fiat": data.amount_fiat, - "exchange_rate": data.exchange_rate, - "transaction_type": data.transaction_type, - "lamassu_transaction_id": data.lamassu_transaction_id, - "payment_hash": data.payment_hash, - "status": "pending", - "created_at": datetime.now() - } - ) - return await get_dca_payment(payment_id) - - -async def get_dca_payment(payment_id: str) -> Optional[DcaPayment]: +async def get_client_by_user_id(user_id: str): + """Get client record by user_id""" return await db.fetchone( - "SELECT * FROM satmachineclient.dca_payments WHERE id = :id", - {"id": payment_id}, - DcaPayment, + "SELECT * FROM satmachineadmin.dca_clients WHERE user_id = :user_id", + {"user_id": user_id} ) -async def get_payments_by_client(client_id: str) -> List[DcaPayment]: - return await db.fetchall( - "SELECT * FROM satmachineclient.dca_payments WHERE client_id = :client_id ORDER BY created_at DESC", - {"client_id": client_id}, - DcaPayment, - ) - - -async def get_all_payments() -> List[DcaPayment]: - return await db.fetchall( - "SELECT * FROM satmachineclient.dca_payments ORDER BY created_at DESC", - model=DcaPayment, - ) - - -async def update_dca_payment_status(payment_id: str, status: str) -> None: - """Update the status of a DCA payment""" - await db.execute( - "UPDATE satmachineclient.dca_payments SET status = :status WHERE id = :id", - {"status": status, "id": payment_id} - ) - - -async def get_payments_by_lamassu_transaction(lamassu_transaction_id: str) -> List[DcaPayment]: - return await db.fetchall( - "SELECT * FROM satmachineclient.dca_payments WHERE lamassu_transaction_id = :transaction_id", - {"transaction_id": lamassu_transaction_id}, - DcaPayment, - ) - - -# Balance and Summary Operations -async def get_client_balance_summary(client_id: str) -> ClientBalanceSummary: - # Get total confirmed deposits - total_deposits_result = await db.fetchone( - """ - SELECT COALESCE(SUM(amount), 0) as total, currency - FROM satmachineclient.dca_deposits - WHERE client_id = :client_id AND status = 'confirmed' - GROUP BY currency - """, - {"client_id": client_id} - ) - - # Get total payments made - total_payments_result = await db.fetchone( - """ - SELECT COALESCE(SUM(amount_fiat), 0) as total - FROM satmachineclient.dca_payments - WHERE client_id = :client_id AND status = 'confirmed' - """, - {"client_id": client_id} - ) - - total_deposits = total_deposits_result["total"] if total_deposits_result else 0 - total_payments = total_payments_result["total"] if total_payments_result else 0 - currency = total_deposits_result["currency"] if total_deposits_result else "GTQ" - - return ClientBalanceSummary( - client_id=client_id, - total_deposits=total_deposits, - total_payments=total_payments, - remaining_balance=total_deposits - total_payments, - currency=currency - ) - - -async def get_flow_mode_clients() -> List[DcaClient]: - return await db.fetchall( - "SELECT * FROM satmachineclient.dca_clients WHERE dca_mode = 'flow' AND status = 'active'", - model=DcaClient, - ) - - -async def get_fixed_mode_clients() -> List[DcaClient]: - return await db.fetchall( - "SELECT * FROM satmachineclient.dca_clients WHERE dca_mode = 'fixed' AND status = 'active'", - model=DcaClient, - ) - - -# Lamassu Configuration CRUD Operations -async def create_lamassu_config(data: CreateLamassuConfigData) -> LamassuConfig: - config_id = urlsafe_short_hash() - - # Deactivate any existing configs first (only one active config allowed) - await db.execute( - "UPDATE satmachineclient.lamassu_config SET is_active = false, updated_at = :updated_at", - {"updated_at": datetime.now()} - ) - - await db.execute( - """ - INSERT INTO satmachineclient.lamassu_config - (id, host, port, database_name, username, password, source_wallet_id, commission_wallet_id, is_active, created_at, updated_at, - use_ssh_tunnel, ssh_host, ssh_port, ssh_username, ssh_password, ssh_private_key) - VALUES (:id, :host, :port, :database_name, :username, :password, :source_wallet_id, :commission_wallet_id, :is_active, :created_at, :updated_at, - :use_ssh_tunnel, :ssh_host, :ssh_port, :ssh_username, :ssh_password, :ssh_private_key) - """, - { - "id": config_id, - "host": data.host, - "port": data.port, - "database_name": data.database_name, - "username": data.username, - "password": data.password, - "source_wallet_id": data.source_wallet_id, - "commission_wallet_id": data.commission_wallet_id, - "is_active": True, - "created_at": datetime.now(), - "updated_at": datetime.now(), - "use_ssh_tunnel": data.use_ssh_tunnel, - "ssh_host": data.ssh_host, - "ssh_port": data.ssh_port, - "ssh_username": data.ssh_username, - "ssh_password": data.ssh_password, - "ssh_private_key": data.ssh_private_key - } - ) - return await get_lamassu_config(config_id) - - -async def get_lamassu_config(config_id: str) -> Optional[LamassuConfig]: - return await db.fetchone( - "SELECT * FROM satmachineclient.lamassu_config WHERE id = :id", - {"id": config_id}, - LamassuConfig, - ) - - -async def get_active_lamassu_config() -> Optional[LamassuConfig]: - return await db.fetchone( - "SELECT * FROM satmachineclient.lamassu_config WHERE is_active = true ORDER BY created_at DESC LIMIT 1", - model=LamassuConfig, - ) - - -async def get_all_lamassu_configs() -> List[LamassuConfig]: - return await db.fetchall( - "SELECT * FROM satmachineclient.lamassu_config ORDER BY created_at DESC", - model=LamassuConfig, - ) - - -async def update_lamassu_config(config_id: str, data: UpdateLamassuConfigData) -> Optional[LamassuConfig]: - update_data = {k: v for k, v in data.dict().items() if v is not None} - if not update_data: - return await get_lamassu_config(config_id) - - update_data["updated_at"] = datetime.now() - set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()]) - update_data["id"] = config_id - - await db.execute( - f"UPDATE satmachineclient.lamassu_config SET {set_clause} WHERE id = :id", - update_data - ) - return await get_lamassu_config(config_id) - - -async def update_config_test_result(config_id: str, success: bool) -> None: - utc_now = datetime.now(timezone.utc) - await db.execute( - """ - UPDATE satmachineclient.lamassu_config - SET test_connection_last = :test_time, test_connection_success = :success, updated_at = :updated_at - WHERE id = :id - """, - { - "id": config_id, - "test_time": utc_now, - "success": success, - "updated_at": utc_now - } - ) - - -async def delete_lamassu_config(config_id: str) -> None: - await db.execute( - "DELETE FROM satmachineclient.lamassu_config WHERE id = :id", - {"id": config_id} - ) - - -async def update_poll_start_time(config_id: str) -> None: - """Update the last poll start time""" - utc_now = datetime.now(timezone.utc) - await db.execute( - """ - UPDATE satmachineclient.lamassu_config - SET last_poll_time = :poll_time, updated_at = :updated_at - WHERE id = :id - """, - { - "id": config_id, - "poll_time": utc_now, - "updated_at": utc_now - } - ) - - -async def update_poll_success_time(config_id: str) -> None: - """Update the last successful poll time""" - utc_now = datetime.now(timezone.utc) - await db.execute( - """ - UPDATE satmachineclient.lamassu_config - SET last_successful_poll = :poll_time, updated_at = :updated_at - WHERE id = :id - """, - { - "id": config_id, - "poll_time": utc_now, - "updated_at": utc_now - } - ) - - -# Lamassu Transaction Storage CRUD Operations -async def create_lamassu_transaction(data: CreateLamassuTransactionData) -> StoredLamassuTransaction: - """Store a processed Lamassu transaction""" - transaction_id = urlsafe_short_hash() - await db.execute( - """ - INSERT INTO satmachineclient.lamassu_transactions - (id, lamassu_transaction_id, fiat_amount, crypto_amount, commission_percentage, - discount, effective_commission, commission_amount_sats, base_amount_sats, - exchange_rate, crypto_code, fiat_code, device_id, transaction_time, processed_at, - clients_count, distributions_total_sats) - VALUES (:id, :lamassu_transaction_id, :fiat_amount, :crypto_amount, :commission_percentage, - :discount, :effective_commission, :commission_amount_sats, :base_amount_sats, - :exchange_rate, :crypto_code, :fiat_code, :device_id, :transaction_time, :processed_at, - :clients_count, :distributions_total_sats) - """, - { - "id": transaction_id, - "lamassu_transaction_id": data.lamassu_transaction_id, - "fiat_amount": data.fiat_amount, - "crypto_amount": data.crypto_amount, - "commission_percentage": data.commission_percentage, - "discount": data.discount, - "effective_commission": data.effective_commission, - "commission_amount_sats": data.commission_amount_sats, - "base_amount_sats": data.base_amount_sats, - "exchange_rate": data.exchange_rate, - "crypto_code": data.crypto_code, - "fiat_code": data.fiat_code, - "device_id": data.device_id, - "transaction_time": data.transaction_time, - "processed_at": datetime.now(), - "clients_count": 0, # Will be updated after distributions - "distributions_total_sats": 0 # Will be updated after distributions - } - ) - return await get_lamassu_transaction(transaction_id) - - -async def get_lamassu_transaction(transaction_id: str) -> Optional[StoredLamassuTransaction]: - """Get a stored Lamassu transaction by ID""" - return await db.fetchone( - "SELECT * FROM satmachineclient.lamassu_transactions WHERE id = :id", - {"id": transaction_id}, - StoredLamassuTransaction, - ) - - -async def get_lamassu_transaction_by_lamassu_id(lamassu_transaction_id: str) -> Optional[StoredLamassuTransaction]: - """Get a stored Lamassu transaction by Lamassu transaction ID""" - return await db.fetchone( - "SELECT * FROM satmachineclient.lamassu_transactions WHERE lamassu_transaction_id = :lamassu_id", - {"lamassu_id": lamassu_transaction_id}, - StoredLamassuTransaction, - ) - - -async def get_all_lamassu_transactions() -> List[StoredLamassuTransaction]: - """Get all stored Lamassu transactions""" - return await db.fetchall( - "SELECT * FROM satmachineclient.lamassu_transactions ORDER BY transaction_time DESC", - model=StoredLamassuTransaction, - ) - - -async def update_lamassu_transaction_distribution_stats( - transaction_id: str, - clients_count: int, - distributions_total_sats: int -) -> None: - """Update distribution statistics for a Lamassu transaction""" - await db.execute( - """ - UPDATE satmachineclient.lamassu_transactions - SET clients_count = :clients_count, distributions_total_sats = :distributions_total_sats - WHERE id = :id - """, - { - "clients_count": clients_count, - "distributions_total_sats": distributions_total_sats, - "id": transaction_id - } - ) +async def update_client_dca_settings(client_id: str, settings: UpdateClientSettings) -> bool: + """Update client DCA settings (mode, limits, status)""" + try: + update_data = {k: v for k, v in settings.dict().items() if v is not None} + if not update_data: + return True # Nothing to update + + update_data["updated_at"] = datetime.now() + set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()]) + update_data["id"] = client_id + + await db.execute( + f"UPDATE satmachineadmin.dca_clients SET {set_clause} WHERE id = :id", + update_data + ) + return True + except Exception: + return False \ No newline at end of file