from datetime import datetime from decimal import Decimal from http import HTTPStatus from fastapi import APIRouter, Depends, HTTPException from loguru import logger from lnbits.core.models import User, WalletTypeInfo from lnbits.decorators import ( check_super_user, check_user_exists, require_admin_key, require_invoice_key, ) from lnbits.utils.exchange_rates import allowed_currencies, fiat_amount_as_satoshis from .crud import ( approve_manual_payment_request, check_balance_assertion, create_account, create_account_permission, create_balance_assertion, create_manual_payment_request, db, delete_account_permission, delete_balance_assertion, get_account, get_account_by_name, get_account_permission, get_account_permissions, get_all_accounts, get_all_manual_payment_requests, get_all_user_wallet_settings, get_balance_assertion, get_balance_assertions, get_manual_payment_request, get_or_create_user_account, get_user_manual_payment_requests, get_user_permissions, get_user_permissions_with_inheritance, reject_manual_payment_request, ) from .models import ( Account, AccountPermission, AccountType, AccountWithPermissions, AssertionStatus, AssignUserRole, BalanceAssertion, BulkGrantPermission, BulkGrantResult, CastleSettings, CreateAccount, CreateAccountPermission, CreateBalanceAssertion, CreateEntryLine, CreateJournalEntry, CreateManualPaymentRequest, CreateRole, CreateRolePermission, CreateUserEquityStatus, ExpenseEntry, GeneratePaymentInvoice, JournalEntry, JournalEntryFlag, ManualPaymentRequest, PayUser, PermissionType, ReceivableEntry, RecordPayment, RevenueEntry, Role, RolePermission, RoleWithPermissions, SettleReceivable, UpdateRole, UserBalance, UserEquityStatus, UserInfo, UserRole, UserWalletSettings, UserWithRoles, ) from .services import get_settings, get_user_wallet, update_settings, update_user_wallet castle_api_router = APIRouter() # ===== HELPER FUNCTIONS ===== async def check_castle_wallet_configured() -> str: """Ensure castle wallet is configured, return wallet_id""" settings = await get_settings("admin") if not settings or not settings.castle_wallet_id: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="Castle wallet not configured. Please contact the super user to configure the Castle wallet in settings.", ) return settings.castle_wallet_id async def check_user_wallet_configured(user_id: str) -> str: """Ensure user has configured their wallet, return wallet_id""" from lnbits.settings import settings as lnbits_settings # If user is super user, use the castle wallet if user_id == lnbits_settings.super_user: castle_settings = await get_settings("admin") if castle_settings and castle_settings.castle_wallet_id: return castle_settings.castle_wallet_id raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="Castle wallet not configured. Please configure the Castle wallet in settings.", ) # For regular users, check their personal wallet user_wallet = await get_user_wallet(user_id) if not user_wallet or not user_wallet.user_wallet_id: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="You must configure your wallet in settings before using this feature.", ) return user_wallet.user_wallet_id # ===== UTILITY ENDPOINTS ===== @castle_api_router.get("/api/v1/currencies") async def api_get_currencies() -> list[str]: """Get list of allowed currencies for fiat conversion""" return allowed_currencies() # ===== ACCOUNT ENDPOINTS ===== @castle_api_router.get("/api/v1/accounts") async def api_get_accounts( filter_by_user: bool = False, exclude_virtual: bool = True, wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> list[Account] | list[AccountWithPermissions]: """ Get all accounts in the chart of accounts. - filter_by_user: If true, only return accounts the user has permissions for - exclude_virtual: If true, exclude virtual parent accounts (default True) - Returns AccountWithPermissions objects when filter_by_user=true, otherwise Account objects """ from lnbits.settings import settings as lnbits_settings from . import crud all_accounts = await get_all_accounts() user_id = wallet.wallet.user is_super_user = user_id == lnbits_settings.super_user # Auto-assign default role if user has no roles (only for non-super users) if not is_super_user: assigned_role = await crud.auto_assign_default_role(user_id, "system") if assigned_role: logger.info(f"[ACCOUNTS] Auto-assigned role to user {user_id}") # Super users bypass permission filtering - they see everything if not filter_by_user or is_super_user: # Filter out virtual accounts if requested (default behavior for user views) if exclude_virtual: all_accounts = [acc for acc in all_accounts if not acc.is_virtual] # Return all accounts without filtering by permissions return all_accounts # Filter by user permissions # NOTE: Do NOT filter out virtual accounts yet - they're needed for inheritance logic # Get direct user permissions user_permissions = await get_user_permissions(user_id) # Get role-based permissions role_permissions_list = await crud.get_user_permissions_from_roles(user_id) # Flatten role permissions into a single list role_perms = [] for role, perms in role_permissions_list: role_perms.extend(perms) # Combine direct and role-based permissions all_permissions = list(user_permissions) + role_perms logger.info(f"[ACCOUNTS] User {user_id} has {len(user_permissions)} direct permissions and {len(role_perms)} role permissions (total: {len(all_permissions)})") if role_perms: logger.info(f"[ACCOUNTS] Role permissions: {[(p.account_id, p.permission_type) for p in role_perms]}") logger.info(f"[ACCOUNTS] Total accounts in system: {len(all_accounts)}") if len(all_accounts) > 0: logger.info(f"[ACCOUNTS] Sample account IDs: {[acc.id for acc in all_accounts[:5]]}") # Get set of account IDs the user has any permission on permitted_account_ids = {perm.account_id for perm in all_permissions} # Build list of accounts with permission metadata accounts_with_permissions = [] for account in all_accounts: # Check if user has permission on this account (direct or from role) account_perms = [ perm for perm in all_permissions if perm.account_id == account.id ] # Check if user has inherited permission from parent account (using combined permissions) # Check both direct and role-based permissions for parent accounts inherited_perms = [] for perm in all_permissions: # Get the account for this permission perm_account = await get_account(perm.account_id) if not perm_account: continue # Check if this permission's account is a parent of the current account # e.g., "Expenses:Supplies" is parent of "Expenses:Supplies:Food" if account.name.startswith(perm_account.name + ":"): # Inherited permission from parent account inherited_perms.append((perm, perm_account.name)) # Determine if account should be included has_access = bool(account_perms) or bool(inherited_perms) if has_access: # Parse hierarchical account name to get parent and level parts = account.name.split(":") level = len(parts) - 1 parent_account = ":".join(parts[:-1]) if level > 0 else None # Determine inherited_from (which parent account gave access) inherited_from = None if inherited_perms and not account_perms: # Permission is inherited, use the parent account name _, parent_name = inherited_perms[0] inherited_from = parent_name # Collect permission types for this account permission_types = [perm.permission_type for perm in account_perms] # Check if account has children has_children = any( a.name.startswith(account.name + ":") for a in all_accounts ) accounts_with_permissions.append( AccountWithPermissions( id=account.id, name=account.name, account_type=account.account_type, description=account.description, user_id=account.user_id, created_at=account.created_at, is_active=account.is_active, is_virtual=account.is_virtual, user_permissions=permission_types if permission_types else None, inherited_from=inherited_from, parent_account=parent_account, level=level, has_children=has_children, ) ) # Filter out virtual accounts if requested (after permission inheritance logic) if exclude_virtual: accounts_with_permissions = [ acc for acc in accounts_with_permissions if not acc.is_virtual ] logger.info(f"[ACCOUNTS] Returning {len(accounts_with_permissions)} accounts for user {user_id}") return accounts_with_permissions @castle_api_router.post("/api/v1/accounts", status_code=HTTPStatus.CREATED) async def api_create_account( data: CreateAccount, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> Account: """Create a new account (admin only)""" return await create_account(data) @castle_api_router.get("/api/v1/accounts/{account_id}") async def api_get_account(account_id: str) -> Account: """Get a specific account""" account = await get_account(account_id) if not account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Account not found" ) return account @castle_api_router.get("/api/v1/accounts/{account_id}/balance") async def api_get_account_balance(account_id: str) -> dict: """Get account balance from Fava/Beancount""" from .fava_client import get_fava_client # Get account to retrieve its name account = await get_account(account_id) if not account: raise HTTPException(status_code=404, detail="Account not found") # Query Fava for balance fava = get_fava_client() balance_data = await fava.get_account_balance(account.name) return { "account_id": account_id, "balance": balance_data["sats"], # Balance in satoshis "positions": balance_data["positions"] # Full Beancount positions with cost basis } @castle_api_router.get("/api/v1/accounts/{account_id}/transactions") async def api_get_account_transactions(account_id: str, limit: int = 100) -> list[dict]: """ Get all transactions for an account from Fava/Beancount. Returns transactions affecting this account in reverse chronological order. """ from .fava_client import get_fava_client # Get account details account = await get_account(account_id) if not account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Account {account_id} not found" ) # Query Fava for transactions fava = get_fava_client() transactions = await fava.get_account_transactions(account.name, limit) return transactions # ===== JOURNAL ENTRY ENDPOINTS ===== @castle_api_router.get("/api/v1/entries") async def api_get_journal_entries(limit: int = 100) -> list[dict]: """ Get all journal entries from Fava/Beancount. Returns all transactions in reverse chronological order with username enrichment. """ from lnbits.core.crud.users import get_user from .fava_client import get_fava_client fava = get_fava_client() all_entries = await fava.get_journal_entries() # Filter to transactions only and enrich with username enriched_entries = [] for e in all_entries: if e.get("t") != "Transaction": continue # Extract user ID from metadata or account names user_id = None entry_meta = e.get("meta", {}) if "user-id" in entry_meta: user_id = entry_meta["user-id"] else: # Try to extract from account names in postings for posting in e.get("postings", []): account = posting.get("account", "") if "User-" in account: parts = account.split("User-") if len(parts) > 1: user_id = parts[1] break # Look up username username = None if user_id: user = await get_user(user_id) username = user.username if user and user.username else f"User-{user_id[:8]}" # Add username to entry enriched_entry = dict(e) enriched_entry["user_id"] = user_id enriched_entry["username"] = username enriched_entries.append(enriched_entry) if len(enriched_entries) >= limit: break return enriched_entries @castle_api_router.get("/api/v1/entries/user") async def api_get_user_entries( wallet: WalletTypeInfo = Depends(require_invoice_key), limit: int = 20, offset: int = 0, filter_user_id: str = None, filter_account_type: str = None, # 'asset' for receivable, 'liability' for payable days: int = 15, # Default 15 days, options: 15, 30, 60 start_date: str = None, # ISO format: YYYY-MM-DD end_date: str = None, # ISO format: YYYY-MM-DD ) -> dict: """ Get journal entries that affect the current user's accounts from Fava/Beancount. Returns transactions in reverse chronological order with optional filtering. Args: days: Number of days to fetch (default: 15, options: 15, 30, 60) start_date: Start date for custom range (YYYY-MM-DD). Requires end_date. end_date: End date for custom range (YYYY-MM-DD). Requires start_date. Note: If both days and start_date/end_date are provided, start_date/end_date takes precedence. """ from lnbits.settings import settings as lnbits_settings from .fava_client import get_fava_client fava = get_fava_client() # Determine which user's entries to fetch if wallet.wallet.user == lnbits_settings.super_user: # Super user can view all or filter by user_id target_user_id = filter_user_id else: # Regular user can only see their own entries target_user_id = wallet.wallet.user # Get journal entries from Fava # Priority: custom date range > days > default (5 days) all_entries = await fava.get_journal_entries( days=days, start_date=start_date, end_date=end_date ) # Filter and transform entries filtered_entries = [] for e in all_entries: if e.get("t") != "Transaction": continue # Skip voided transactions if "voided" in e.get("tags", []): continue # Extract user ID from metadata or account names user_id_match = None entry_meta = e.get("meta", {}) if "user-id" in entry_meta: user_id_match = entry_meta["user-id"] else: # Try to extract from account names in postings for posting in e.get("postings", []): account = posting.get("account", "") if "User-" in account: # Extract user ID from account name (e.g., "Liabilities:Payable:User-abc123") parts = account.split("User-") if len(parts) > 1: user_id_match = parts[1] # Just the short ID after User- break # Filter by target user if specified if target_user_id and user_id_match: if not user_id_match.startswith(target_user_id[:8]): continue # Filter by account type if specified if filter_account_type and user_id_match: postings = e.get("postings", []) has_matching_account = False for posting in postings: account = posting.get("account", "") if filter_account_type.lower() == "asset" and "Receivable" in account: has_matching_account = True break elif filter_account_type.lower() == "liability" and "Payable" in account: has_matching_account = True break if not has_matching_account: continue # Extract data for frontend # Extract entry ID from links entry_id = None links = e.get("links", []) if isinstance(links, (list, set)): for link in links: if isinstance(link, str): link_clean = link.lstrip('^') if "castle-" in link_clean: parts = link_clean.split("castle-") if len(parts) > 1: entry_id = parts[-1] break # Extract amount from postings amount_sats = 0 fiat_amount = None fiat_currency = None postings = e.get("postings", []) if postings: first_posting = postings[0] if isinstance(first_posting, dict): amount_str = first_posting.get("amount", "") # Parse amount string: can be EUR/USD directly (new format) or "SATS {EUR}" (old format) if isinstance(amount_str, str) and amount_str: import re # Try EUR/USD format first (new format: "37.22 EUR") fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str) if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'): # Direct fiat amount (new approach) fiat_amount = abs(float(fiat_match.group(1))) fiat_currency = fiat_match.group(2) # Get SATS from metadata posting_meta = first_posting.get("meta", {}) sats_equiv = posting_meta.get("sats-equivalent") if sats_equiv: amount_sats = abs(int(sats_equiv)) else: # Old format: "36791 SATS {33.33 EUR, 2025-11-09}" or "36791 SATS" sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str) if sats_match: amount_sats = abs(int(sats_match.group(1))) # Extract fiat from cost syntax: {33.33 EUR, ...} cost_match = re.search(r'\{([\d.]+)\s+([A-Z]+)', amount_str) if cost_match: fiat_amount = float(cost_match.group(1)) fiat_currency = cost_match.group(2) # Extract reference from links (first non-castle link) reference = None if isinstance(links, (list, set)): for link in links: if isinstance(link, str): link_clean = link.lstrip('^') if not link_clean.startswith("castle-") and not link_clean.startswith("ln-"): reference = link_clean break # Look up actual username using helper function username = await _get_username_from_user_id(user_id_match) if user_id_match else None entry_data = { "id": entry_id or e.get("entry_hash", "unknown"), "date": e.get("date", ""), "entry_date": e.get("date", ""), "flag": e.get("flag"), "description": e.get("narration", ""), "payee": e.get("payee"), "tags": e.get("tags", []), "links": links, "amount": amount_sats, "user_id": user_id_match, "username": username, "reference": reference, "meta": entry_meta, # Include metadata for frontend } if fiat_amount and fiat_currency: entry_data["fiat_amount"] = fiat_amount entry_data["fiat_currency"] = fiat_currency filtered_entries.append(entry_data) # Sort by date descending filtered_entries.sort(key=lambda x: x.get("date", ""), reverse=True) # Apply pagination total = len(filtered_entries) paginated_entries = filtered_entries[offset:offset + limit] return { "entries": paginated_entries, "total": total, "limit": limit, "offset": offset, "has_next": (offset + limit) < total, "has_prev": offset > 0, } async def _get_username_from_user_id(user_id: str) -> str: """ Helper function to get username from user_id, handling various formats. Supports: - Full UUID with dashes (36 chars): "375ec158-686c-4a21-b44d-a51cc90ef07d" - Dashless UUID (32 chars): "375ec158686c4a21b44da51cc90ef07d" - Partial ID (8 chars from account names): "375ec158" Returns username or formatted fallback. """ from lnbits.core.crud.users import get_user logger.debug(f"[USERNAME] Called with: '{user_id}' (len={len(user_id) if user_id else 0})") if not user_id: return None # Case 1: Already in standard UUID format (36 chars with dashes) if len(user_id) == 36 and user_id.count('-') == 4: logger.debug(f"[USERNAME] Case 1: Full UUID format") user = await get_user(user_id) result = user.username if user and user.username else f"User-{user_id[:8]}" logger.debug(f"[USERNAME] Case 1 result: '{result}'") return result # Case 2: Dashless 32-char UUID - lookup via Castle user settings, fallback to LNbits elif len(user_id) == 32 and '-' not in user_id: logger.debug(f"[USERNAME] Case 2: Dashless UUID format - looking up in Castle user settings") try: # Convert dashless to dashed format user_id_with_dashes = f"{user_id[0:8]}-{user_id[8:12]}-{user_id[12:16]}-{user_id[16:20]}-{user_id[20:32]}" logger.debug(f"[USERNAME] Converted to dashed format: {user_id_with_dashes}") # Try Castle settings first user_settings = await get_all_user_wallet_settings() for setting in user_settings: if setting.id == user_id_with_dashes: logger.debug(f"[USERNAME] Found matching user in Castle settings") user = await get_user(setting.id) result = user.username if user and user.username else f"User-{user_id[:8]}" logger.debug(f"[USERNAME] Case 2 result (from Castle): '{result}'") return result # Not in Castle settings - try LNbits database directly logger.debug(f"[USERNAME] Not in Castle settings, querying LNbits database directly") from lnbits.db import Database db = Database("database") async with db.connect() as conn: row = await conn.fetchone( "SELECT id, username FROM accounts WHERE id = :user_id LIMIT 1", {"user_id": user_id_with_dashes} ) logger.debug(f"[USERNAME] Database query result: {row}") if row and row["username"]: result = row["username"] logger.debug(f"[USERNAME] Case 2 result (from LNbits DB): '{result}'") return result # User doesn't exist anywhere logger.debug(f"[USERNAME] User not found in LNbits database either") result = f"User-{user_id[:8]}" logger.debug(f"[USERNAME] Case 2 result (not found): '{result}'") return result except Exception as e: logger.error(f"Error looking up user by dashless UUID {user_id}: {e}") result = f"User-{user_id[:8]}" return result # Case 3: Partial ID (8 chars from account name) - lookup via Castle user settings elif len(user_id) == 8: logger.debug(f"[USERNAME] Case 3: Partial ID format - looking up in Castle user settings") try: # Get all Castle users (which have full user_ids) user_settings = await get_all_user_wallet_settings() # Find matching user by first 8 chars for setting in user_settings: if setting.id.startswith(user_id): logger.debug(f"[USERNAME] Found full user_id: {setting.id}") # Now get username from LNbits with full ID user = await get_user(setting.id) result = user.username if user and user.username else f"User-{user_id}" logger.debug(f"[USERNAME] Case 3 result (found): '{result}'") return result # No matching user found in Castle settings logger.debug(f"[USERNAME] No matching user found in Castle settings") result = f"User-{user_id}" logger.debug(f"[USERNAME] Case 3 result (not found): '{result}'") return result except Exception as e: logger.error(f"Error looking up user by partial ID {user_id}: {e}") result = f"User-{user_id}" return result # Case 4: Unknown format - try as-is and fall back else: logger.debug(f"[USERNAME] Case 4: Unknown format - trying as-is") try: user = await get_user(user_id) result = user.username if user and user.username else f"User-{user_id[:8]}" logger.debug(f"[USERNAME] Case 4 result: '{result}'") return result except Exception as e: logger.debug(f"[USERNAME] Case 4 exception: {e}") result = f"User-{user_id[:8]}" logger.debug(f"[USERNAME] Case 4 fallback result: '{result}'") return result @castle_api_router.get("/api/v1/entries/pending") async def api_get_pending_entries( wallet: WalletTypeInfo = Depends(require_admin_key), ) -> list[dict]: """ Get all pending expense entries that need approval (admin only). Returns transactions with flag='!' from Fava/Beancount. """ from lnbits.settings import settings as lnbits_settings from .fava_client import get_fava_client if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can access this endpoint", ) # Query Fava for all journal entries (includes links, tags, full metadata) fava = get_fava_client() all_entries = await fava.get_journal_entries() # Filter for pending transactions and extract info pending_entries = [] for e in all_entries: # Only include pending transactions that are NOT voided if e.get("t") == "Transaction" and e.get("flag") == "!" and "voided" not in e.get("tags", []): # Extract entry ID from links field entry_id = None links = e.get("links", []) if isinstance(links, (list, set)): for link in links: if isinstance(link, str): # Strip ^ prefix if present (Beancount link syntax) link_clean = link.lstrip('^') if "castle-" in link_clean: parts = link_clean.split("castle-") if len(parts) > 1: entry_id = parts[-1] break # Extract user ID from metadata or account names user_id = None entry_meta = e.get("meta", {}) logger.info(f"[EXTRACT] Entry metadata keys: {list(entry_meta.keys())}") logger.info(f"[EXTRACT] Entry metadata: {entry_meta}") if "user-id" in entry_meta: user_id = entry_meta["user-id"] logger.info(f"[EXTRACT] Found user-id in metadata: {user_id}") else: logger.info(f"[EXTRACT] No user-id in metadata, checking account names") # Try to extract from account names in postings for posting in e.get("postings", []): account = posting.get("account", "") if "User-" in account: # Extract user ID from account name (e.g., "Liabilities:Payable:User-abc123") parts = account.split("User-") if len(parts) > 1: user_id = parts[1] # Short ID after User- logger.info(f"[EXTRACT] Extracted user_id from account name: {user_id}") break # Look up username using helper function username = await _get_username_from_user_id(user_id) if user_id else None # Extract amount from postings (sum of absolute values / 2) amount_sats = 0 fiat_amount = None fiat_currency = None postings = e.get("postings", []) if postings: first_posting = postings[0] if isinstance(first_posting, dict): amount_str = first_posting.get("amount", "") # Parse amount string format if isinstance(amount_str, str) and amount_str: import re # Try EUR/USD format first (new architecture): "50.00 EUR" fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str) if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'): fiat_amount = abs(float(fiat_match.group(1))) fiat_currency = fiat_match.group(2) # Extract sats equivalent from metadata posting_meta = first_posting.get("meta", {}) sats_equiv = posting_meta.get("sats-equivalent") if sats_equiv: amount_sats = abs(int(sats_equiv)) else: # Legacy SATS format: "36791 SATS {33.33 EUR, 2025-11-09}" or "36791 SATS" sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str) if sats_match: amount_sats = abs(int(sats_match.group(1))) # Extract fiat from cost syntax: {33.33 EUR, ...} cost_match = re.search(r'\{([\d.]+)\s+([A-Z]+)', amount_str) if cost_match: fiat_amount = float(cost_match.group(1)) fiat_currency = cost_match.group(2) entry_data = { "id": entry_id or "unknown", "date": e.get("date", ""), "entry_date": e.get("date", ""), "flag": e.get("flag"), "description": e.get("narration", ""), "payee": e.get("payee"), "tags": e.get("tags", []), "links": links, "amount": amount_sats, "user_id": user_id, "username": username, } # Add fiat info if available if fiat_amount and fiat_currency: entry_data["fiat_amount"] = fiat_amount entry_data["fiat_currency"] = fiat_currency pending_entries.append(entry_data) return pending_entries @castle_api_router.post("/api/v1/entries", status_code=HTTPStatus.CREATED) async def api_create_journal_entry( data: CreateJournalEntry, wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> JournalEntry: """ Create a new generic journal entry. Submits entry to Fava/Beancount. """ from .fava_client import get_fava_client from .beancount_format import format_transaction, format_posting_with_cost # Validate that entry balances to zero total = sum(line.amount for line in data.lines) if total != 0: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Entry does not balance (total: {total}, expected: 0)" ) # Get all accounts and validate they exist account_map = {} for line in data.lines: account = await get_account(line.account_id) if not account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Account '{line.account_id}' not found" ) account_map[line.account_id] = account # Format postings postings = [] for line in data.lines: account = account_map[line.account_id] # Extract fiat info from metadata if present fiat_currency = line.metadata.get("fiat_currency") fiat_amount_str = line.metadata.get("fiat_amount") fiat_amount = Decimal(fiat_amount_str) if fiat_amount_str else None # Create posting metadata (excluding fiat fields that are used for primary amount) posting_metadata = {k: v for k, v in line.metadata.items() if k not in ["fiat_currency", "fiat_amount"]} # If fiat currency is provided, use EUR-based format (primary amount in EUR, sats in metadata) # Otherwise, use SATS-based format if fiat_currency and fiat_amount: # EUR-based posting (current architecture) posting_metadata["sats-equivalent"] = str(abs(line.amount)) # Apply the sign from line.amount to fiat_amount # line.amount is positive for debits, negative for credits signed_fiat_amount = fiat_amount if line.amount >= 0 else -fiat_amount posting = { "account": account.name, "amount": f"{signed_fiat_amount:.2f} {fiat_currency}", "meta": posting_metadata if posting_metadata else None } else: # SATS-based posting (legacy/fallback) if line.description: posting_metadata["description"] = line.description posting = format_posting_with_cost( account=account.name, amount_sats=line.amount, fiat_currency=None, fiat_amount=None, metadata=posting_metadata if posting_metadata else None ) postings.append(posting) # Extract tags and links from meta tags = data.meta.get("tags", []) links = data.meta.get("links", []) if data.reference: links.append(data.reference) # Entry metadata (excluding tags and links which go at transaction level) entry_meta = {k: v for k, v in data.meta.items() if k not in ["tags", "links"]} entry_meta["source"] = "castle-api" entry_meta["created-by"] = wallet.wallet.id # Format as Beancount entry fava = get_fava_client() entry = format_transaction( date_val=data.entry_date.date() if data.entry_date else datetime.now().date(), flag=data.flag.value if data.flag else "*", narration=data.description, postings=postings, tags=tags if tags else None, links=links if links else None, meta=entry_meta ) # Submit to Fava result = await fava.add_entry(entry) logger.info(f"Journal entry submitted to Fava: {result.get('data', 'Unknown')}") # Return simplified JournalEntry for API compatibility # Note: Castle no longer stores entries in DB, Fava is the source of truth timestamp = datetime.now().timestamp() return JournalEntry( id=f"fava-{timestamp}", description=data.description, entry_date=data.entry_date if data.entry_date else datetime.now(), created_by=wallet.wallet.id, created_at=datetime.now(), reference=data.reference, flag=data.flag if data.flag else JournalEntryFlag.CLEARED, lines=[], # Empty - entry is stored in Fava, not Castle DB meta={"source": "fava", "fava_response": result.get('data', 'Unknown')} ) # ===== SIMPLIFIED ENTRY ENDPOINTS ===== @castle_api_router.post("/api/v1/entries/expense", status_code=HTTPStatus.CREATED) async def api_create_expense_entry( data: ExpenseEntry, wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> JournalEntry: """ Create an expense entry for a user. If is_equity=True, records as equity contribution. If is_equity=False, records as liability (castle owes user). If currency is provided, amount is converted from fiat to satoshis. """ # Check that castle wallet is configured await check_castle_wallet_configured() # Check that user has configured their wallet await check_user_wallet_configured(wallet.wallet.user) # Handle currency conversion amount_sats = int(data.amount) metadata = {} if data.currency: # Validate currency if data.currency.upper() not in allowed_currencies(): raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Currency '{data.currency}' not allowed. Use one of: {', '.join(allowed_currencies())}", ) # Convert fiat to satoshis amount_sats = await fiat_amount_as_satoshis(float(data.amount), data.currency) # Store currency metadata (store fiat_amount as string to preserve Decimal precision) metadata = { "fiat_currency": data.currency.upper(), "fiat_amount": str(data.amount.quantize(Decimal("0.001"))), # Store as string with 3 decimal places "fiat_rate": float(amount_sats) / float(data.amount) if data.amount > 0 else 0, "btc_rate": float(data.amount) / float(amount_sats) * 100_000_000 if amount_sats > 0 else 0, } # Get or create expense account expense_account = await get_account_by_name(data.expense_account) if not expense_account: # Try to get it by ID expense_account = await get_account(data.expense_account) if not expense_account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Expense account '{data.expense_account}' not found", ) # Validate user has permission to submit expenses to this account from .crud import get_user_permissions_with_inheritance submit_perms = await get_user_permissions_with_inheritance( wallet.wallet.user, expense_account.name, PermissionType.SUBMIT_EXPENSE ) if not submit_perms: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail=f"You do not have permission to submit expenses to account '{expense_account.name}'. Please contact an administrator to request access.", ) # Get or create user-specific account if data.is_equity: # Validate equity eligibility from .crud import get_user_equity_status equity_status = await get_user_equity_status(wallet.wallet.user) if not equity_status or not equity_status.is_equity_eligible: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="User is not eligible to contribute expenses to equity. Please submit for cash reimbursement.", ) if not equity_status.equity_account_name: raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="User equity account not configured. Contact administrator.", ) # Equity contribution - use user's specific equity account user_account = await get_account_by_name(equity_status.equity_account_name) if not user_account: raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Equity account '{equity_status.equity_account_name}' not found. Contact administrator.", ) else: # Liability (castle owes user) user_account = await get_or_create_user_account( wallet.wallet.user, AccountType.LIABILITY, "Accounts Payable" ) # Create journal entry # DR Expense, CR User Account (Liability or Equity) description_suffix = f" ({metadata['fiat_amount']} {metadata['fiat_currency']})" if metadata else "" # Add meta information for audit trail entry_meta = { "source": "api", "created_via": "expense_entry", "user_id": wallet.wallet.user, "is_equity": data.is_equity, } # Format as Beancount entry and submit to Fava from .fava_client import get_fava_client from .beancount_format import format_expense_entry, sanitize_link fava = get_fava_client() # Extract fiat info from metadata fiat_currency = metadata.get("fiat_currency") if metadata else None fiat_amount = Decimal(metadata.get("fiat_amount")) if metadata and metadata.get("fiat_amount") else None # Generate unique entry ID for tracking import uuid entry_id = str(uuid.uuid4()).replace("-", "")[:16] # Add castle ID as reference/link (sanitized for Beancount) castle_reference = f"castle-{entry_id}" if data.reference: castle_reference = f"{sanitize_link(data.reference)}-{entry_id}" # Format Beancount entry entry = format_expense_entry( user_id=wallet.wallet.user, expense_account=expense_account.name, user_account=user_account.name, amount_sats=amount_sats, description=data.description, entry_date=data.entry_date.date() if data.entry_date else datetime.now().date(), is_equity=data.is_equity, fiat_currency=fiat_currency, fiat_amount=fiat_amount, reference=castle_reference # Add castle ID as link ) # Submit to Fava result = await fava.add_entry(entry) # Return a JournalEntry-like response for compatibility from .models import EntryLine return JournalEntry( id=entry_id, # Use the generated castle entry ID description=data.description + description_suffix, entry_date=data.entry_date if data.entry_date else datetime.now(), created_by=wallet.wallet.id, created_at=datetime.now(), reference=castle_reference, flag=JournalEntryFlag.PENDING, meta=entry_meta, lines=[ EntryLine( id=f"line-1-{entry_id}", journal_entry_id=entry_id, account_id=expense_account.id, amount=amount_sats, description=f"Expense paid by user {wallet.wallet.user[:8]}", metadata=metadata or {} ), EntryLine( id=f"line-2-{entry_id}", journal_entry_id=entry_id, account_id=user_account.id, amount=-amount_sats, description=f"{'Equity contribution' if data.is_equity else 'Amount owed to user'}", metadata=metadata or {} ), ] ) @castle_api_router.post("/api/v1/entries/receivable", status_code=HTTPStatus.CREATED) async def api_create_receivable_entry( data: ReceivableEntry, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> JournalEntry: """ Create an accounts receivable entry (user owes castle). Admin only to prevent abuse. If currency is provided, amount is converted from fiat to satoshis. """ # Handle currency conversion amount_sats = int(data.amount) metadata = {} if data.currency: # Validate currency if data.currency.upper() not in allowed_currencies(): raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Currency '{data.currency}' not allowed. Use one of: {', '.join(allowed_currencies())}", ) # Convert fiat to satoshis amount_sats = await fiat_amount_as_satoshis(float(data.amount), data.currency) # Store currency metadata (store fiat_amount as string to preserve Decimal precision) metadata = { "fiat_currency": data.currency.upper(), "fiat_amount": str(data.amount.quantize(Decimal("0.001"))), # Store as string with 3 decimal places "fiat_rate": float(amount_sats) / float(data.amount) if data.amount > 0 else 0, "btc_rate": float(data.amount) / float(amount_sats) * 100_000_000 if amount_sats > 0 else 0, } # Get or create revenue account revenue_account = await get_account_by_name(data.revenue_account) if not revenue_account: revenue_account = await get_account(data.revenue_account) if not revenue_account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Revenue account '{data.revenue_account}' not found", ) # Get or create user-specific receivable account user_receivable = await get_or_create_user_account( data.user_id, AccountType.ASSET, "Accounts Receivable" ) # Create journal entry # DR Accounts Receivable (User), CR Revenue description_suffix = f" ({metadata['fiat_amount']} {metadata['fiat_currency']})" if metadata else "" # Add meta information for audit trail entry_meta = { "source": "api", "created_via": "receivable_entry", "debtor_user_id": data.user_id, } # Format as Beancount entry and submit to Fava from .fava_client import get_fava_client from .beancount_format import format_receivable_entry, sanitize_link fava = get_fava_client() # Extract fiat info from metadata fiat_currency = metadata.get("fiat_currency") if metadata else None fiat_amount = Decimal(metadata.get("fiat_amount")) if metadata and metadata.get("fiat_amount") else None # Generate unique entry ID for tracking import uuid entry_id = str(uuid.uuid4()).replace("-", "")[:16] # Add castle ID as reference/link (sanitized for Beancount) castle_reference = f"castle-{entry_id}" if data.reference: castle_reference = f"{sanitize_link(data.reference)}-{entry_id}" # Format Beancount entry entry = format_receivable_entry( user_id=data.user_id, revenue_account=revenue_account.name, receivable_account=user_receivable.name, amount_sats=amount_sats, description=data.description, entry_date=datetime.now().date(), fiat_currency=fiat_currency, fiat_amount=fiat_amount, reference=castle_reference # Use castle reference with unique ID ) # Submit to Fava result = await fava.add_entry(entry) # Return a JournalEntry-like response for compatibility from .models import EntryLine return JournalEntry( id=entry_id, # Use the generated castle entry ID description=data.description + description_suffix, entry_date=datetime.now(), created_by=wallet.wallet.id, created_at=datetime.now(), reference=castle_reference, # Use castle reference with unique ID flag=JournalEntryFlag.PENDING, meta=entry_meta, lines=[ EntryLine( id=f"line-1-{entry_id}", journal_entry_id=entry_id, account_id=user_receivable.id, amount=amount_sats, description=f"Amount owed by user {data.user_id[:8]}", metadata=metadata or {} ), EntryLine( id=f"line-2-{entry_id}", journal_entry_id=entry_id, account_id=revenue_account.id, amount=-amount_sats, description="Revenue earned", metadata=metadata or {} ), ] ) @castle_api_router.post("/api/v1/entries/revenue", status_code=HTTPStatus.CREATED) async def api_create_revenue_entry( data: RevenueEntry, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> JournalEntry: """ Create a revenue entry (castle receives payment). Admin only. Submits entry to Fava/Beancount. """ from .fava_client import get_fava_client from .beancount_format import format_revenue_entry, sanitize_link # Get revenue account revenue_account = await get_account_by_name(data.revenue_account) if not revenue_account: revenue_account = await get_account(data.revenue_account) if not revenue_account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Revenue account '{data.revenue_account}' not found", ) # Get payment method account payment_account = await get_account_by_name(data.payment_method_account) if not payment_account: payment_account = await get_account(data.payment_method_account) if not payment_account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Payment account '{data.payment_method_account}' not found", ) # Handle currency conversion if provided amount_sats = int(data.amount) fiat_currency = None fiat_amount = None if data.currency: # Validate currency if data.currency.upper() not in allowed_currencies(): raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Currency '{data.currency}' not supported. Allowed: {', '.join(allowed_currencies())}", ) # Store fiat info for cost basis fiat_currency = data.currency.upper() fiat_amount = data.amount # Original fiat amount # In this case, data.amount should be the satoshi amount # This is a bit confusing - the API accepts amount as Decimal which could be either sats or fiat # For now, assume if currency is provided, amount is fiat and needs conversion # TODO: Consider updating the API model to be clearer about this # Format as Beancount entry and submit to Fava fava = get_fava_client() # Generate unique entry ID for tracking import uuid entry_id = str(uuid.uuid4()).replace("-", "")[:16] # Add castle ID as reference/link (sanitized for Beancount) castle_reference = f"castle-{entry_id}" if data.reference: castle_reference = f"{sanitize_link(data.reference)}-{entry_id}" entry = format_revenue_entry( payment_account=payment_account.name, revenue_account=revenue_account.name, amount_sats=amount_sats, description=data.description, entry_date=datetime.now().date(), fiat_currency=fiat_currency, fiat_amount=fiat_amount, reference=castle_reference # Use castle reference with unique ID ) # Submit to Fava result = await fava.add_entry(entry) logger.info(f"Revenue entry submitted to Fava: {result.get('data', 'Unknown')}") # Return simplified JournalEntry for API compatibility # Note: Castle no longer stores entries in DB, Fava is the source of truth return JournalEntry( id=entry_id, description=data.description, entry_date=datetime.now(), created_by=wallet.wallet.id, created_at=datetime.now(), reference=castle_reference, flag=JournalEntryFlag.CLEARED, lines=[], # Empty - entry is stored in Fava, not Castle DB meta={"source": "fava", "fava_response": result.get('data', 'Unknown')} ) # ===== USER BALANCE ENDPOINTS ===== @castle_api_router.get("/api/v1/balance") async def api_get_my_balance( wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> UserBalance: """Get current user's balance with the Castle (from Fava/Beancount)""" from lnbits.settings import settings as lnbits_settings from .fava_client import get_fava_client fava = get_fava_client() # If super user, show total castle position if wallet.wallet.user == lnbits_settings.super_user: all_balances = await fava.get_all_user_balances() # Calculate total: # From get_user_balance(): positive = user owes castle, negative = castle owes user # Positive balances = Users owe Castle (receivables for Castle) # Negative balances = Castle owes users (liabilities for Castle) # Net: positive means castle is owed money, negative means castle owes money total_receivables = sum(b["balance"] for b in all_balances if b["balance"] > 0) total_liabilities = sum(abs(b["balance"]) for b in all_balances if b["balance"] < 0) net_balance = total_receivables - total_liabilities # Aggregate fiat balances from all users total_fiat_balances = {} for user_balance in all_balances: for currency, amount in user_balance["fiat_balances"].items(): if currency not in total_fiat_balances: total_fiat_balances[currency] = Decimal("0") # Add all balances (positive and negative) total_fiat_balances[currency] += amount # Return net position return UserBalance( user_id=wallet.wallet.user, balance=net_balance, accounts=[], fiat_balances=total_fiat_balances, ) # For regular users, show their individual balance from Fava balance_data = await fava.get_user_balance(wallet.wallet.user) return UserBalance( user_id=wallet.wallet.user, balance=balance_data["balance"], accounts=[], # Could populate from balance_data["accounts"] if needed fiat_balances=balance_data["fiat_balances"], ) @castle_api_router.get("/api/v1/balance/{user_id}") async def api_get_user_balance(user_id: str) -> UserBalance: """Get a specific user's balance with the Castle (from Fava/Beancount)""" from .fava_client import get_fava_client fava = get_fava_client() balance_data = await fava.get_user_balance(user_id) return UserBalance( user_id=user_id, balance=balance_data["balance"], accounts=[], fiat_balances=balance_data["fiat_balances"], ) @castle_api_router.get("/api/v1/balances/all") async def api_get_all_balances( wallet: WalletTypeInfo = Depends(require_admin_key), ) -> list[dict]: """Get all user balances (admin/super user only) from Fava/Beancount""" from .fava_client import get_fava_client fava = get_fava_client() balances = await fava.get_all_user_balances() # Enrich with username information using helper function result = [] for balance in balances: username = await _get_username_from_user_id(balance["user_id"]) result.append({ "user_id": balance["user_id"], "username": username, "balance": balance["balance"], "fiat_balances": balance["fiat_balances"], "accounts": balance["accounts"], }) return result # ===== PAYMENT ENDPOINTS ===== @castle_api_router.post("/api/v1/generate-payment-invoice") async def api_generate_payment_invoice( data: GeneratePaymentInvoice, wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> dict: """ Generate an invoice on the Castle wallet for user to pay their balance. User can then pay this invoice to settle their debt. If user_id is provided (admin only), the invoice is generated for that specific user. """ from lnbits.core.crud.wallets import get_wallet from lnbits.core.models import CreateInvoice from lnbits.core.services import create_payment_request from lnbits.settings import settings as lnbits_settings # Determine which user this invoice is for if data.user_id: # Admin generating invoice for a specific user if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can generate invoices for other users", ) target_user_id = data.user_id else: # User generating invoice for themselves target_user_id = wallet.wallet.user # Get castle wallet ID castle_wallet_id = await check_castle_wallet_configured() # Get user's balance from Fava to calculate fiat metadata from .fava_client import get_fava_client fava = get_fava_client() balance_data = await fava.get_user_balance(target_user_id) # Build UserBalance object for compatibility user_balance = UserBalance( user_id=target_user_id, balance=balance_data["balance"], accounts=[], fiat_balances=balance_data["fiat_balances"] ) # Calculate proportional fiat amount for this invoice invoice_extra = {"tag": "castle", "user_id": target_user_id} logger.info(f"User balance for invoice generation - sats: {user_balance.balance}, fiat_balances: {user_balance.fiat_balances}") if user_balance.fiat_balances: # Simple single-currency solution: use the first (and should be only) currency currencies = list(user_balance.fiat_balances.keys()) if len(currencies) > 1: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"User has multiple currencies ({', '.join(currencies)}). Please settle to a single currency first.", ) if len(currencies) == 1: fiat_currency = currencies[0] total_fiat_balance = user_balance.fiat_balances[fiat_currency] total_sat_balance = abs(user_balance.balance) # Use absolute value if total_sat_balance > 0: # Calculate proportional fiat amount for this invoice # fiat_amount = (invoice_amount / total_sats) * total_fiat from decimal import Decimal proportion = Decimal(data.amount) / Decimal(total_sat_balance) invoice_fiat_amount = abs(total_fiat_balance) * proportion # Calculate fiat rate (sats per fiat unit) fiat_rate = float(data.amount) / float(invoice_fiat_amount) if invoice_fiat_amount > 0 else 0 btc_rate = float(invoice_fiat_amount) / float(data.amount) * 100_000_000 if data.amount > 0 else 0 invoice_extra.update({ "fiat_currency": fiat_currency, "fiat_amount": str(invoice_fiat_amount.quantize(Decimal("0.001"))), "fiat_rate": fiat_rate, "btc_rate": btc_rate, }) logger.info(f"Invoice extra metadata: {invoice_extra}") # Create invoice on castle wallet invoice_data = CreateInvoice( out=False, amount=data.amount, memo=f"Payment from user {target_user_id[:8]} to Castle", unit="sat", extra=invoice_extra, ) payment = await create_payment_request(castle_wallet_id, invoice_data) # Get castle wallet to return its inkey for payment checking castle_wallet = await get_wallet(castle_wallet_id) if not castle_wallet: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Castle wallet not found" ) return { "payment_hash": payment.payment_hash, "payment_request": payment.bolt11, "amount": data.amount, "memo": invoice_data.memo, "check_wallet_key": castle_wallet.inkey, # Key to check payment status } @castle_api_router.post("/api/v1/record-payment") async def api_record_payment( data: RecordPayment, wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> dict: """ Record a lightning payment in accounting after invoice is paid. This reduces what the user owes to the castle. The user_id is extracted from the payment metadata (set during invoice generation). """ from lnbits.core.crud.payments import get_standalone_payment # Get the payment details (incoming=True to get the invoice, not the payment) payment = await get_standalone_payment(data.payment_hash, incoming=True) if not payment: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Payment not found" ) if payment.pending: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="Payment not yet settled" ) # Get user_id from payment metadata (set during invoice generation) target_user_id = None if payment.extra and isinstance(payment.extra, dict): target_user_id = payment.extra.get("user_id") if not target_user_id: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="Payment metadata missing user_id. Cannot determine which user to credit.", ) # Check if payment already recorded in Fava (idempotency) from .fava_client import get_fava_client from .beancount_format import format_payment_entry import httpx fava = get_fava_client() # Check if payment already recorded by fetching recent entries # Note: We can't use BQL query with `links ~ 'pattern'` because links is a set type # and BQL doesn't support regex matching on sets. Instead, fetch entries and filter in Python. link_to_find = f"ln-{data.payment_hash[:16]}" try: async with httpx.AsyncClient(timeout=5.0) as client: # Get recent entries from Fava's journal endpoint response = await client.get( f"{fava.base_url}/api/journal", params={"time": ""} # Get all entries ) if response.status_code == 200: response_data = response.json() entries = response_data.get('entries', []) # Check if any entry has our payment link for entry in entries: entry_links = entry.get('links', []) if link_to_find in entry_links: # Payment already recorded, return existing entry balance_data = await fava.get_user_balance(target_user_id) return { "journal_entry_id": f"fava-exists-{data.payment_hash[:16]}", "new_balance": balance_data["balance"], "message": "Payment already recorded", } except Exception as e: logger.warning(f"Could not check Fava for duplicate payment: {e}") # Continue anyway - Fava/Beancount will catch duplicate if it exists # Convert amount from millisatoshis to satoshis amount_sats = payment.amount // 1000 # Extract fiat metadata from invoice (if present) fiat_currency = None fiat_amount = None if payment.extra and isinstance(payment.extra, dict): logger.info(f"Payment.extra contents: {payment.extra}") fiat_currency = payment.extra.get("fiat_currency") fiat_amount_str = payment.extra.get("fiat_amount") if fiat_amount_str: from decimal import Decimal fiat_amount = Decimal(str(fiat_amount_str)) logger.info(f"Extracted fiat metadata - currency: {fiat_currency}, amount: {fiat_amount}") # Get user's receivable account (what user owes) user_receivable = await get_or_create_user_account( target_user_id, AccountType.ASSET, "Accounts Receivable" ) # Get lightning account lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning") if not lightning_account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Lightning account not found" ) # Format payment entry and submit to Fava entry = format_payment_entry( user_id=target_user_id, payment_account=lightning_account.name, payable_or_receivable_account=user_receivable.name, amount_sats=amount_sats, description=f"Lightning payment from user {target_user_id[:8]}", entry_date=datetime.now().date(), is_payable=False, # User paying castle (receivable settlement) fiat_currency=fiat_currency, fiat_amount=fiat_amount, payment_hash=data.payment_hash, reference=data.payment_hash ) logger.info(f"Formatted payment entry: {entry}") # Submit to Fava result = await fava.add_entry(entry) logger.info(f"Payment entry submitted to Fava: {result.get('data', 'Unknown')}") # Get updated balance from Fava balance_data = await fava.get_user_balance(target_user_id) return { "journal_entry_id": f"fava-{datetime.now().timestamp()}", "new_balance": balance_data["balance"], "message": "Payment recorded successfully", } @castle_api_router.post("/api/v1/pay-user") async def api_pay_user( user_id: str, amount: int, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """ Record a payment from castle to user (reduces what castle owes user). Admin only. """ # Get user's payable account (what castle owes) user_payable = await get_or_create_user_account( user_id, AccountType.LIABILITY, "Accounts Payable" ) # Get lightning account lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning") if not lightning_account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Lightning account not found" ) # Format payment entry and submit to Fava # DR Liabilities:Payable (User), CR Assets:Bitcoin:Lightning from .fava_client import get_fava_client from .beancount_format import format_payment_entry fava = get_fava_client() entry = format_payment_entry( user_id=user_id, payment_account=lightning_account.name, payable_or_receivable_account=user_payable.name, amount_sats=amount, description=f"Payment to user {user_id[:8]}", entry_date=datetime.now().date(), is_payable=True, # Castle paying user reference=f"PAY-{user_id[:8]}" ) # Submit to Fava result = await fava.add_entry(entry) logger.info(f"Payment submitted to Fava: {result.get('data', 'Unknown')}") # Get updated balance from Fava balance_data = await fava.get_user_balance(user_id) return { "journal_entry_id": f"fava-{datetime.now().timestamp()}", "new_balance": balance_data["balance"], "message": "Payment recorded successfully", } @castle_api_router.post("/api/v1/receivables/settle") async def api_settle_receivable( data: SettleReceivable, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """ Manually settle a receivable (record when user pays castle in person). This endpoint is for non-lightning payments like: - Cash payments - Bank transfers - Other manual settlements Admin only. """ from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can settle receivables", ) # Validate payment method valid_methods = ["cash", "bank_transfer", "check", "lightning", "btc_onchain", "other"] if data.payment_method.lower() not in valid_methods: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Invalid payment method. Must be one of: {', '.join(valid_methods)}", ) # Get user's receivable account (what user owes) user_receivable = await get_or_create_user_account( data.user_id, AccountType.ASSET, "Accounts Receivable" ) # Get the appropriate asset account based on payment method payment_account_map = { "cash": "Assets:Cash", "bank_transfer": "Assets:Bank", "check": "Assets:Bank", "lightning": "Assets:Bitcoin:Lightning", "btc_onchain": "Assets:Bitcoin:OnChain", "other": "Assets:Cash" } account_name = payment_account_map.get(data.payment_method.lower(), "Assets:Cash") payment_account = await get_account_by_name(account_name) # If account doesn't exist, try to find or create a generic one if not payment_account: # Try to find any asset account that's not receivable all_accounts = await get_all_accounts() for acc in all_accounts: if acc.account_type == AccountType.ASSET and "receivable" not in acc.name.lower(): payment_account = acc break if not payment_account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Payment account '{account_name}' not found. Please create it first.", ) # Format settlement entry and submit to Fava # DR Cash/Bank (asset increased), CR Accounts Receivable (asset decreased) # This records that user paid their debt from .fava_client import get_fava_client from .beancount_format import format_payment_entry, format_fiat_settlement_entry from decimal import Decimal fava = get_fava_client() # Determine if this is a fiat or lightning payment is_fiat_payment = data.currency and data.payment_method.lower() in [ "cash", "bank_transfer", "check", "other" ] if is_fiat_payment: # Fiat currency payment (cash, bank transfer, etc.) # Record in fiat currency with sats as metadata if not data.amount_sats: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="amount_sats is required when settling with fiat currency" ) entry = format_fiat_settlement_entry( user_id=data.user_id, payment_account=payment_account.name, payable_or_receivable_account=user_receivable.name, fiat_amount=Decimal(str(data.amount)), fiat_currency=data.currency.upper(), amount_sats=data.amount_sats, description=data.description, entry_date=datetime.now().date(), is_payable=False, # User paying castle (receivable settlement) payment_method=data.payment_method, reference=data.reference or f"MANUAL-{data.user_id[:8]}" ) else: # Lightning or BTC onchain payment # Record in SATS with optional fiat metadata amount_in_sats = data.amount_sats if data.amount_sats else int(data.amount) fiat_currency = data.currency.upper() if data.currency else None fiat_amount = Decimal(str(data.amount)) if data.currency else None entry = format_payment_entry( user_id=data.user_id, payment_account=payment_account.name, payable_or_receivable_account=user_receivable.name, amount_sats=amount_in_sats, description=data.description, entry_date=datetime.now().date(), is_payable=False, # User paying castle (receivable settlement) fiat_currency=fiat_currency, fiat_amount=fiat_amount, payment_hash=data.payment_hash, reference=data.reference or f"MANUAL-{data.user_id[:8]}" ) # Add additional metadata to entry if "meta" not in entry: entry["meta"] = {} entry["meta"]["payment-method"] = data.payment_method entry["meta"]["settled-by"] = wallet.wallet.user if data.txid: entry["meta"]["txid"] = data.txid # Submit to Fava result = await fava.add_entry(entry) logger.info(f"Receivable settlement submitted to Fava: {result.get('data', 'Unknown')}") # Get updated balance from Fava balance_data = await fava.get_user_balance(data.user_id) return { "journal_entry_id": f"fava-{datetime.now().timestamp()}", "user_id": data.user_id, "amount_settled": float(data.amount), "currency": data.currency, "payment_method": data.payment_method, "new_balance": balance_data["balance"], "message": f"Receivable settled successfully via {data.payment_method}", } @castle_api_router.post("/api/v1/payables/pay") async def api_pay_user( data: PayUser, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """ Pay a user (castle pays user for expense/liability). This endpoint is for both lightning and manual payments: - Lightning payments: already executed, just record the payment - Cash/Bank/Check: record manual payment that was made Admin only. """ from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can pay users", ) # Validate payment method valid_methods = ["cash", "bank_transfer", "check", "lightning", "btc_onchain", "other"] if data.payment_method.lower() not in valid_methods: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Invalid payment method. Must be one of: {', '.join(valid_methods)}", ) # Get user's payable account (what castle owes) user_payable = await get_or_create_user_account( data.user_id, AccountType.LIABILITY, "Accounts Payable" ) # Get the appropriate asset account based on payment method payment_account_map = { "cash": "Assets:Cash", "bank_transfer": "Assets:Bank", "check": "Assets:Bank", "lightning": "Assets:Bitcoin:Lightning", "btc_onchain": "Assets:Bitcoin:OnChain", "other": "Assets:Cash" } account_name = payment_account_map.get(data.payment_method.lower(), "Assets:Cash") payment_account = await get_account_by_name(account_name) if not payment_account: # Try to find any asset account that's not receivable all_accounts = await get_all_accounts() for acc in all_accounts: if acc.account_type == AccountType.ASSET and "receivable" not in acc.name.lower(): payment_account = acc break if not payment_account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Payment account '{account_name}' not found. Please create it first.", ) # Format payment entry and submit to Fava # DR Accounts Payable (liability decreased), CR Cash/Lightning/Bank (asset decreased) # This records that castle paid its debt from .fava_client import get_fava_client from .beancount_format import format_payment_entry from decimal import Decimal fava = get_fava_client() # Determine amount and currency if data.currency: # Fiat currency payment (e.g., EUR, USD) # Use the sats equivalent for the journal entry to match the payable if not data.amount_sats: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="amount_sats is required when paying with fiat currency" ) amount_in_sats = data.amount_sats fiat_currency = data.currency.upper() fiat_amount = data.amount else: # Satoshi payment amount_in_sats = int(data.amount) fiat_currency = None fiat_amount = None # Format payment entry entry = format_payment_entry( user_id=data.user_id, payment_account=payment_account.name, payable_or_receivable_account=user_payable.name, amount_sats=amount_in_sats, description=data.description or f"Payment to user via {data.payment_method}", entry_date=datetime.now().date(), is_payable=True, # Castle paying user (payable settlement) fiat_currency=fiat_currency, fiat_amount=fiat_amount, payment_hash=data.payment_hash, reference=data.reference or f"PAY-{data.user_id[:8]}" ) # Add additional metadata to entry if "meta" not in entry: entry["meta"] = {} entry["meta"]["payment-method"] = data.payment_method entry["meta"]["paid-by"] = wallet.wallet.user if data.txid: entry["meta"]["txid"] = data.txid # Submit to Fava result = await fava.add_entry(entry) logger.info(f"Payable payment submitted to Fava: {result.get('data', 'Unknown')}") # Get updated balance from Fava balance_data = await fava.get_user_balance(data.user_id) return { "journal_entry_id": f"fava-{datetime.now().timestamp()}", "user_id": data.user_id, "amount_paid": float(data.amount), "currency": data.currency, "payment_method": data.payment_method, "new_balance": balance_data["balance"], "message": f"User paid successfully via {data.payment_method}", } # ===== SETTINGS ENDPOINTS ===== @castle_api_router.get("/api/v1/settings") async def api_get_settings( user: User = Depends(check_user_exists), ) -> CastleSettings: """Get Castle settings""" user_id = "admin" settings = await get_settings(user_id) # Return empty settings if not configured (so UI can show setup screen) if not settings: return CastleSettings() return settings @castle_api_router.put("/api/v1/settings") async def api_update_settings( data: CastleSettings, user: User = Depends(check_super_user), ) -> CastleSettings: """Update Castle settings (super user only)""" if not data.castle_wallet_id: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="Castle wallet ID is required", ) user_id = "admin" return await update_settings(user_id, data) # ===== USER WALLET ENDPOINTS ===== @castle_api_router.get("/api/v1/user-wallet/{user_id}") async def api_get_user_wallet( user_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """Get user's wallet settings (admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can access user wallet info", ) user_wallet = await get_user_wallet(user_id) if not user_wallet: return {"user_id": user_id, "user_wallet_id": None} # Get invoice key for the user's wallet (needed to generate invoices) from lnbits.core.crud import get_wallet wallet_obj = await get_wallet(user_wallet.user_wallet_id) if not wallet_obj: return {"user_id": user_id, "user_wallet_id": user_wallet.user_wallet_id} return { "user_id": user_id, "user_wallet_id": user_wallet.user_wallet_id, "user_wallet_id_invoice_key": wallet_obj.inkey, } @castle_api_router.get("/api/v1/users") async def api_get_all_users( wallet: WalletTypeInfo = Depends(require_admin_key), ) -> list[dict]: """Get all users who have configured their wallet (admin only)""" from lnbits.core.crud.users import get_user user_settings = await get_all_user_wallet_settings() users = [] for setting in user_settings: # Get user details from core user = await get_user(setting.id) # Use username if available, otherwise truncate user_id username = user.username if user and user.username else setting.id[:16] + "..." users.append({ "user_id": setting.id, "user_wallet_id": setting.user_wallet_id, "username": username, }) return users @castle_api_router.get("/api/v1/admin/castle-users") async def api_get_castle_users( wallet: WalletTypeInfo = Depends(require_admin_key), ) -> list[dict]: """ Get all users who have configured their wallet in Castle. These are users who can interact with Castle (submit expenses, receive permissions, etc.). Admin only. """ from lnbits.core.crud.users import get_user # Get all users who have configured their wallet user_settings = await get_all_user_wallet_settings() users = [] for setting in user_settings: # Get user details from core user = await get_user(setting.id) # Use username if available, otherwise use user_id username = user.username if user and user.username else None users.append({ "id": setting.id, "user_id": setting.id, # Compatibility with existing code "username": username, "user_wallet_id": setting.user_wallet_id, }) # Sort by username (None values last) users.sort(key=lambda x: (x["username"] is None, x["username"] or "", x["user_id"])) return users @castle_api_router.get("/api/v1/user/wallet") async def api_get_user_wallet( user: User = Depends(check_user_exists), ) -> UserWalletSettings: """Get current user's wallet settings""" from lnbits.settings import settings as lnbits_settings # If user is super user, return the castle wallet if user.id == lnbits_settings.super_user: castle_settings = await get_settings("admin") if castle_settings and castle_settings.castle_wallet_id: return UserWalletSettings(user_wallet_id=castle_settings.castle_wallet_id) return UserWalletSettings() # For regular users, get their personal wallet settings = await get_user_wallet(user.id) # Return empty settings if not configured (so UI can show setup screen) if not settings: return UserWalletSettings() return settings @castle_api_router.put("/api/v1/user/wallet") async def api_update_user_wallet( data: UserWalletSettings, user: User = Depends(check_user_exists), ) -> UserWalletSettings: """Update current user's wallet settings""" from lnbits.settings import settings as lnbits_settings # Super user cannot set their wallet separately - it's always the castle wallet if user.id == lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Super user wallet is automatically set to the Castle wallet. Update Castle settings instead.", ) if not data.user_wallet_id: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="User wallet ID is required", ) return await update_user_wallet(user.id, data) # ===== MANUAL PAYMENT REQUESTS ===== @castle_api_router.post("/api/v1/manual-payment-request") async def api_create_manual_payment_request( data: CreateManualPaymentRequest, wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> ManualPaymentRequest: """Create a manual payment request for the Castle to review""" return await create_manual_payment_request( wallet.wallet.user, data.amount, data.description ) @castle_api_router.get("/api/v1/manual-payment-requests") async def api_get_manual_payment_requests( wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> list[ManualPaymentRequest]: """Get manual payment requests for the current user""" return await get_user_manual_payment_requests(wallet.wallet.user) @castle_api_router.get("/api/v1/manual-payment-requests/all") async def api_get_all_manual_payment_requests( status: str = None, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> list[ManualPaymentRequest]: """Get all manual payment requests (Castle admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can access this endpoint", ) return await get_all_manual_payment_requests(status) @castle_api_router.post("/api/v1/manual-payment-requests/{request_id}/approve") async def api_approve_manual_payment_request( request_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> ManualPaymentRequest: """Approve a manual payment request and create accounting entry (Castle admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can access this endpoint", ) # Get the request request = await get_manual_payment_request(request_id) if not request: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Manual payment request not found", ) if request.status != "pending": raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Request already {request.status}", ) # Get castle wallet from settings castle_wallet_id = await check_castle_wallet_configured() # Get or create liability account for user (castle owes the user) liability_account = await get_or_create_user_account( request.user_id, AccountType.LIABILITY, "Accounts Payable" ) # Get the Lightning asset account lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning") if not lightning_account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Lightning account not found", ) # Format payment entry and submit to Fava from .fava_client import get_fava_client from .beancount_format import format_payment_entry fava = get_fava_client() entry = format_payment_entry( user_id=request.user_id, payment_account=lightning_account.name, payable_or_receivable_account=liability_account.name, amount_sats=request.amount, description=f"Manual payment to user: {request.description}", entry_date=datetime.now().date(), is_payable=True, # Castle paying user reference=f"MPR-{request.id}" ) # Submit to Fava result = await fava.add_entry(entry) logger.info(f"Manual payment entry submitted to Fava: {result.get('data', 'Unknown')}") # Approve the request with Fava entry reference entry_id = f"fava-{datetime.now().timestamp()}" return await approve_manual_payment_request( request_id, wallet.wallet.user, entry_id ) @castle_api_router.post("/api/v1/manual-payment-requests/{request_id}/reject") async def api_reject_manual_payment_request( request_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> ManualPaymentRequest: """Reject a manual payment request (Castle admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can access this endpoint", ) # Get the request request = await get_manual_payment_request(request_id) if not request: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Manual payment request not found", ) if request.status != "pending": raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Request already {request.status}", ) return await reject_manual_payment_request(request_id, wallet.wallet.user) # ===== EXPENSE APPROVAL ENDPOINTS ===== @castle_api_router.post("/api/v1/entries/{entry_id}/approve") async def api_approve_expense_entry( entry_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """ Approve a pending expense entry by changing flag from '!' to '*' (admin only). This updates the transaction in the Beancount file via Fava API. """ from lnbits.settings import settings as lnbits_settings from .fava_client import get_fava_client if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can approve expenses", ) fava = get_fava_client() # 1. Get all journal entries from Fava all_entries = await fava.get_journal_entries() # 2. Find the entry with matching castle ID in links target_entry_hash = None target_entry = None for entry in all_entries: # Only look at transactions with pending flag if entry.get("t") == "Transaction" and entry.get("flag") == "!": links = entry.get("links", []) for link in links: # Strip ^ prefix if present (Beancount link syntax) link_clean = link.lstrip('^') # Check if this entry has our castle ID if link_clean == f"castle-{entry_id}" or link_clean.endswith(f"-{entry_id}"): target_entry_hash = entry.get("entry_hash") target_entry = entry break if target_entry_hash: break if not target_entry_hash: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Pending entry {entry_id} not found in Beancount ledger" ) # 3. Get the entry context (source text + sha256sum) context = await fava.get_entry_context(target_entry_hash) source = context.get("slice", "") sha256sum = context.get("sha256sum", "") if not source: raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="Could not retrieve entry source from Fava" ) # 4. Change flag from ! to * # Replace the first occurrence of the date + ! pattern import re date_str = target_entry.get("date", "") old_pattern = f"{date_str} !" new_pattern = f"{date_str} *" if old_pattern not in source: raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Could not find pending flag pattern '{old_pattern}' in entry source" ) new_source = source.replace(old_pattern, new_pattern, 1) # 5. Update the entry via Fava API await fava.update_entry_source(target_entry_hash, new_source, sha256sum) return { "message": f"Entry {entry_id} approved successfully", "entry_id": entry_id, "entry_hash": target_entry_hash, "date": date_str, "description": target_entry.get("narration", "") } @castle_api_router.post("/api/v1/entries/{entry_id}/reject") async def api_reject_expense_entry( entry_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """ Reject a pending expense entry by marking it as voided (admin only). Adds #voided tag for audit trail while keeping the '!' flag. Voided transactions are excluded from balances but preserved in the ledger. """ from lnbits.settings import settings as lnbits_settings from .fava_client import get_fava_client if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can reject expenses", ) fava = get_fava_client() # 1. Get all journal entries from Fava all_entries = await fava.get_journal_entries() # 2. Find the entry with matching castle ID in links target_entry_hash = None target_entry = None for entry in all_entries: # Only look at transactions with pending flag if entry.get("t") == "Transaction" and entry.get("flag") == "!": links = entry.get("links", []) for link in links: # Strip ^ prefix if present (Beancount link syntax) link_clean = link.lstrip('^') # Check if this entry has our castle ID if link_clean == f"castle-{entry_id}" or link_clean.endswith(f"-{entry_id}"): target_entry_hash = entry.get("entry_hash") target_entry = entry break if target_entry_hash: break if not target_entry_hash: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Pending entry {entry_id} not found in Beancount ledger" ) # 3. Get the entry context (source text + sha256sum) context = await fava.get_entry_context(target_entry_hash) source = context.get("slice", "") sha256sum = context.get("sha256sum", "") if not source: raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="Could not retrieve entry source from Fava" ) # 4. Add #voided tag (keep ! flag as per convention) date_str = target_entry.get("date", "") # Add #voided tag if not already present if "#voided" not in source: # Find the transaction line and add #voided to the tags # Pattern: date ! "narration" #existing-tags lines = source.split('\n') for i, line in enumerate(lines): if date_str in line and '"' in line and '!' in line: # Add #voided tag to the transaction line if '#' in line: # Already has tags, append voided lines[i] = line.rstrip() + ' #voided' else: # No tags yet, add after narration lines[i] = line.rstrip() + ' #voided' break new_source = '\n'.join(lines) else: new_source = source # 5. Update the entry via Fava API await fava.update_entry_source(target_entry_hash, new_source, sha256sum) return { "message": f"Entry {entry_id} rejected (marked as voided)", "entry_id": entry_id, "entry_hash": target_entry_hash, "date": date_str, "description": target_entry.get("narration", "") } # ===== BALANCE ASSERTION ENDPOINTS ===== @castle_api_router.post("/api/v1/assertions") async def api_create_balance_assertion( data: CreateBalanceAssertion, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> BalanceAssertion: """ Create a balance assertion for reconciliation (admin only). Uses hybrid approach: 1. Writes balance assertion to Beancount (via Fava) - source of truth 2. Stores metadata in Castle DB for UI convenience (created_by, notes, tolerance) 3. Lets Beancount validate the assertion automatically The assertion will be checked immediately upon creation. """ from lnbits.settings import settings as lnbits_settings from .fava_client import get_fava_client from .beancount_format import format_balance if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can create balance assertions", ) # Verify account exists account = await get_account(data.account_id) if not account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Account {data.account_id} not found", ) assertion_date = data.date or datetime.now() # HYBRID APPROACH: Write to Beancount first (source of truth) balance_directive = format_balance( date_val=assertion_date.date() if isinstance(assertion_date, datetime) else assertion_date, account=account.name, amount=data.expected_balance_sats, currency="SATS" ) # Submit to Fava/Beancount try: fava = get_fava_client() result = await fava.add_entry(balance_directive) logger.info(f"Balance assertion submitted to Fava: {result}") except Exception as e: logger.error(f"Failed to write balance assertion to Fava: {e}") raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Failed to write balance assertion to Beancount: {str(e)}" ) # Store metadata in Castle DB for UI convenience assertion = await create_balance_assertion(data, wallet.wallet.user) # Check it immediately (queries Fava for actual balance) try: assertion = await check_balance_assertion(assertion.id) except ValueError as e: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=str(e), ) # If assertion failed, return 409 Conflict with details if assertion.status == AssertionStatus.FAILED: raise HTTPException( status_code=HTTPStatus.CONFLICT, detail={ "message": "Balance assertion failed (validated by Beancount)", "expected_sats": assertion.expected_balance_sats, "actual_sats": assertion.checked_balance_sats, "difference_sats": assertion.difference_sats, "expected_fiat": float(assertion.expected_balance_fiat) if assertion.expected_balance_fiat else None, "actual_fiat": float(assertion.checked_balance_fiat) if assertion.checked_balance_fiat else None, "difference_fiat": float(assertion.difference_fiat) if assertion.difference_fiat else None, "fiat_currency": assertion.fiat_currency, }, ) return assertion @castle_api_router.get("/api/v1/assertions") async def api_get_balance_assertions( account_id: str = None, status: str = None, limit: int = 100, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> list[BalanceAssertion]: """Get balance assertions with optional filters (admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can view balance assertions", ) # Parse status enum if provided status_enum = None if status: try: status_enum = AssertionStatus(status) except ValueError: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Invalid status: {status}. Must be one of: pending, passed, failed", ) return await get_balance_assertions( account_id=account_id, status=status_enum, limit=limit, ) @castle_api_router.get("/api/v1/assertions/{assertion_id}") async def api_get_balance_assertion( assertion_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> BalanceAssertion: """Get a specific balance assertion (admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can view balance assertions", ) assertion = await get_balance_assertion(assertion_id) if not assertion: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Balance assertion not found", ) return assertion @castle_api_router.post("/api/v1/assertions/{assertion_id}/check") async def api_check_balance_assertion( assertion_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> BalanceAssertion: """Re-check a balance assertion (admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can check balance assertions", ) try: assertion = await check_balance_assertion(assertion_id) except ValueError as e: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=str(e), ) return assertion @castle_api_router.delete("/api/v1/assertions/{assertion_id}") async def api_delete_balance_assertion( assertion_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """Delete a balance assertion (admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can delete balance assertions", ) # Verify it exists assertion = await get_balance_assertion(assertion_id) if not assertion: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail="Balance assertion not found", ) await delete_balance_assertion(assertion_id) return {"success": True, "message": "Balance assertion deleted"} # ===== RECONCILIATION ENDPOINTS ===== @castle_api_router.get("/api/v1/reconciliation/summary") async def api_get_reconciliation_summary( wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """Get reconciliation summary (admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can access reconciliation", ) # Get all assertions all_assertions = await get_balance_assertions(limit=1000) # Count by status passed = len([a for a in all_assertions if a.status == AssertionStatus.PASSED]) failed = len([a for a in all_assertions if a.status == AssertionStatus.FAILED]) pending = len([a for a in all_assertions if a.status == AssertionStatus.PENDING]) # Get all journal entries from Fava from .fava_client import get_fava_client fava = get_fava_client() all_entries = await fava.query_transactions(limit=1000, include_pending=True) # Count entries by flag (Beancount only supports * and !) cleared = len([e for e in all_entries if e.get("flag") == "*"]) pending_entries = len([e for e in all_entries if e.get("flag") == "!"]) # Count entries with special tags voided = len([e for e in all_entries if "voided" in e.get("tags", [])]) flagged = len([e for e in all_entries if "review" in e.get("tags", []) or "flagged" in e.get("tags", [])]) # Get all accounts accounts = await get_all_accounts() return { "assertions": { "total": len(all_assertions), "passed": passed, "failed": failed, "pending": pending, }, "entries": { "total": len(all_entries), "cleared": cleared, "pending": pending_entries, "flagged": flagged, "voided": voided, }, "accounts": { "total": len(accounts), }, "last_checked": datetime.now().isoformat(), } @castle_api_router.post("/api/v1/reconciliation/check-all") async def api_check_all_assertions( wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """Re-check all balance assertions (admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can run reconciliation checks", ) # Get all assertions all_assertions = await get_balance_assertions(limit=1000) results = { "total": len(all_assertions), "checked": 0, "passed": 0, "failed": 0, "errors": 0, } for assertion in all_assertions: try: checked = await check_balance_assertion(assertion.id) results["checked"] += 1 if checked.status == AssertionStatus.PASSED: results["passed"] += 1 elif checked.status == AssertionStatus.FAILED: results["failed"] += 1 except Exception as e: results["errors"] += 1 return results @castle_api_router.get("/api/v1/reconciliation/discrepancies") async def api_get_discrepancies( wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """Get all discrepancies (failed assertions, flagged entries) (admin only)""" from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can view discrepancies", ) # Get failed assertions failed_assertions = await get_balance_assertions( status=AssertionStatus.FAILED, limit=1000, ) # Get flagged entries from Fava from .fava_client import get_fava_client fava = get_fava_client() all_entries = await fava.query_transactions(limit=1000, include_pending=True) flagged_entries = [e for e in all_entries if e.get("flag") == "#"] pending_entries = [e for e in all_entries if e.get("flag") == "!"] return { "failed_assertions": failed_assertions, "flagged_entries": flagged_entries, "pending_entries": pending_entries, "total_discrepancies": len(failed_assertions) + len(flagged_entries), } # ===== AUTOMATED TASKS ENDPOINTS ===== @castle_api_router.post("/api/v1/tasks/daily-reconciliation") async def api_run_daily_reconciliation( wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """ Manually trigger the daily reconciliation check (admin only). This endpoint can also be called via cron job. Returns a summary of the reconciliation check results. """ from lnbits.settings import settings as lnbits_settings if wallet.wallet.user != lnbits_settings.super_user: raise HTTPException( status_code=HTTPStatus.FORBIDDEN, detail="Only super user can run daily reconciliation", ) from .tasks import check_all_balance_assertions try: results = await check_all_balance_assertions() return results except Exception as e: raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Error running daily reconciliation: {str(e)}", ) # ===== USER EQUITY ELIGIBILITY ENDPOINTS ===== @castle_api_router.get("/api/v1/user/info") async def api_get_user_info( wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> UserInfo: """Get current user's information including equity eligibility""" from .crud import get_user_equity_status from .models import UserInfo equity_status = await get_user_equity_status(wallet.wallet.user) return UserInfo( user_id=wallet.wallet.user, is_equity_eligible=equity_status.is_equity_eligible if equity_status else False, equity_account_name=equity_status.equity_account_name if equity_status else None, ) @castle_api_router.post("/api/v1/admin/equity-eligibility", status_code=HTTPStatus.CREATED) async def api_grant_equity_eligibility( data: CreateUserEquityStatus, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> UserEquityStatus: """Grant equity contribution eligibility to a user (admin only)""" from .crud import create_or_update_user_equity_status return await create_or_update_user_equity_status(data, wallet.wallet.user) @castle_api_router.delete("/api/v1/admin/equity-eligibility/{user_id}") async def api_revoke_equity_eligibility( user_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> UserEquityStatus: """Revoke equity contribution eligibility from a user (admin only)""" from .crud import revoke_user_equity_eligibility result = await revoke_user_equity_eligibility(user_id) if not result: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"User {user_id} not found in equity status records", ) return result @castle_api_router.get("/api/v1/admin/equity-eligibility") async def api_list_equity_eligible_users( wallet: WalletTypeInfo = Depends(require_admin_key), ) -> list[UserEquityStatus]: """List all equity-eligible users (admin only)""" from .crud import get_all_equity_eligible_users return await get_all_equity_eligible_users() # ===== ACCOUNT PERMISSION ADMIN ENDPOINTS ===== @castle_api_router.post("/api/v1/admin/permissions", status_code=HTTPStatus.CREATED) async def api_grant_permission( data: CreateAccountPermission, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> AccountPermission: """Grant account permission to a user (admin only)""" # Validate that account exists account = await get_account(data.account_id) if not account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Account with ID '{data.account_id}' not found", ) return await create_account_permission(data, wallet.wallet.user) @castle_api_router.get("/api/v1/admin/permissions") async def api_list_permissions( user_id: str | None = None, account_id: str | None = None, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> list[AccountPermission]: """ List account permissions (admin only). Can filter by user_id or account_id. """ if user_id: return await get_user_permissions(user_id) elif account_id: return await get_account_permissions(account_id) else: # Get all permissions (get all users' permissions) # This is a bit inefficient but works for admin overview all_accounts = await get_all_accounts() all_permissions = [] for account in all_accounts: account_perms = await get_account_permissions(account.id) all_permissions.extend(account_perms) # Deduplicate by permission ID seen_ids = set() unique_permissions = [] for perm in all_permissions: if perm.id not in seen_ids: seen_ids.add(perm.id) unique_permissions.append(perm) return unique_permissions @castle_api_router.delete("/api/v1/admin/permissions/{permission_id}") async def api_revoke_permission( permission_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """Revoke (delete) an account permission (admin only)""" # Verify permission exists permission = await get_account_permission(permission_id) if not permission: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Permission with ID '{permission_id}' not found", ) await delete_account_permission(permission_id) return { "success": True, "message": f"Permission {permission_id} revoked successfully", } @castle_api_router.post("/api/v1/admin/permissions/bulk", status_code=HTTPStatus.CREATED) async def api_bulk_grant_permissions( permissions: list[CreateAccountPermission], wallet: WalletTypeInfo = Depends(require_admin_key), ) -> list[AccountPermission]: """Grant multiple account permissions at once (admin only)""" created_permissions = [] for perm_data in permissions: # Validate that account exists account = await get_account(perm_data.account_id) if not account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Account with ID '{perm_data.account_id}' not found", ) perm = await create_account_permission(perm_data, wallet.wallet.user) created_permissions.append(perm) return created_permissions @castle_api_router.post("/api/v1/admin/permissions/bulk-grant", status_code=HTTPStatus.CREATED) async def api_bulk_grant_permission_to_users( data: "BulkGrantPermission", wallet: WalletTypeInfo = Depends(require_admin_key), ) -> "BulkGrantResult": """ Grant the same permission to multiple users at once (admin only). This is a convenience endpoint that grants the same account permission to multiple users in one operation. Useful for onboarding teams or granting access to a shared expense account. Returns detailed results including successes and failures. """ from .models import BulkGrantResult granted = [] failed = [] # Validate account exists and is active account = await get_account(data.account_id) if not account: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Account with ID '{data.account_id}' not found", ) # Grant permission to each user for user_id in data.user_ids: try: perm_data = CreateAccountPermission( user_id=user_id, account_id=data.account_id, permission_type=data.permission_type, expires_at=data.expires_at, notes=data.notes, ) perm = await create_account_permission(perm_data, wallet.wallet.user) granted.append(perm) except Exception as e: failed.append({ "user_id": user_id, "error": str(e), }) return BulkGrantResult( granted=granted, failed=failed, total=len(data.user_ids), success_count=len(granted), failure_count=len(failed), ) # ===== USER PERMISSION ENDPOINTS ===== @castle_api_router.get("/api/v1/users/me/permissions") async def api_get_user_permissions( wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> list[AccountPermission]: """Get current user's account permissions""" return await get_user_permissions(wallet.wallet.user) # ===== ACCOUNT HIERARCHY ENDPOINT ===== @castle_api_router.get("/api/v1/accounts/hierarchy") async def api_get_account_hierarchy( root_account: str | None = None, wallet: WalletTypeInfo = Depends(require_invoice_key), ) -> list[AccountWithPermissions]: """ Get hierarchical account structure with user permissions. Optionally filter by root account (e.g., "Expenses" to get all expense sub-accounts). """ all_accounts = await get_all_accounts() user_id = wallet.wallet.user user_permissions = await get_user_permissions(user_id) # Filter by root account if specified if root_account: all_accounts = [ acc for acc in all_accounts if acc.name == root_account or acc.name.startswith(root_account + ":") ] # Build hierarchy with permission metadata accounts_with_hierarchy = [] for account in all_accounts: # Check if user has direct permission on this account account_perms = [ perm for perm in user_permissions if perm.account_id == account.id ] # Check if user has inherited permission from parent account inherited_perms = await get_user_permissions_with_inheritance( user_id, account.name, PermissionType.READ ) # Parse hierarchical account name to get parent and level parts = account.name.split(":") level = len(parts) - 1 parent_account = ":".join(parts[:-1]) if level > 0 else None # Determine inherited_from (which parent account gave access) inherited_from = None if inherited_perms and not account_perms: # Permission is inherited, use the parent account name _, parent_name = inherited_perms[0] inherited_from = parent_name # Collect permission types for this account permission_types = [perm.permission_type for perm in account_perms] # Check if account has children has_children = any( a.name.startswith(account.name + ":") for a in all_accounts ) accounts_with_hierarchy.append( AccountWithPermissions( id=account.id, name=account.name, account_type=account.account_type, description=account.description, user_id=account.user_id, created_at=account.created_at, user_permissions=permission_types if permission_types else None, inherited_from=inherited_from, parent_account=parent_account, level=level, has_children=has_children, ) ) # Sort by hierarchical name for natural ordering accounts_with_hierarchy.sort(key=lambda a: a.name) return accounts_with_hierarchy # ===== ACCOUNT SYNC ENDPOINTS ===== @castle_api_router.post("/api/v1/admin/accounts/sync") async def api_sync_all_accounts( force_full_sync: bool = False, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """ Sync all accounts from Beancount to Castle DB (admin only). This ensures Castle DB has metadata entries for all accounts that exist in Beancount, enabling permissions and user associations to work properly. Args: force_full_sync: If True, re-check all accounts. If False, only add new ones. Returns: Sync statistics: {total_beancount_accounts, accounts_added, accounts_skipped, errors} """ from .account_sync import sync_accounts_from_beancount logger.info(f"Admin {wallet.wallet.user[:8]} triggered account sync (force={force_full_sync})") try: stats = await sync_accounts_from_beancount(force_full_sync=force_full_sync) logger.info(f"Account sync complete: {stats['accounts_added']} added, {stats['accounts_skipped']} skipped") return stats except Exception as e: logger.error(f"Account sync failed: {e}") raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Account sync failed: {str(e)}" ) @castle_api_router.post("/api/v1/admin/accounts/sync/{account_name:path}") async def api_sync_single_account( account_name: str, wallet: WalletTypeInfo = Depends(require_admin_key), ) -> dict: """ Sync a single account from Beancount to Castle DB (admin only). Useful for ensuring a specific account exists in Castle DB before granting permissions on it. Args: account_name: Hierarchical account name (e.g., "Expenses:Food:Groceries") Returns: {success: bool, account_name: str, message: str} """ from .account_sync import sync_single_account_from_beancount logger.info(f"Admin {wallet.wallet.user[:8]} triggered sync for account: {account_name}") try: created = await sync_single_account_from_beancount(account_name) if created: return { "success": True, "account_name": account_name, "message": f"Account '{account_name}' synced successfully" } else: return { "success": False, "account_name": account_name, "message": f"Account '{account_name}' already exists or not found in Beancount" } except Exception as e: logger.error(f"Single account sync failed for {account_name}: {e}") raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Account sync failed: {str(e)}" ) # ===== RBAC (ROLE-BASED ACCESS CONTROL) ENDPOINTS ===== @castle_api_router.get("/api/v1/admin/roles") async def api_get_all_roles( wallet: WalletTypeInfo = Depends(require_admin_key), ) -> list: """Get all roles (admin only)""" from . import crud roles = await crud.get_all_roles() # Enrich each role with user count and permission count enriched_roles = [] for role in roles: user_count = await crud.get_user_count_for_role(role.id) permissions = await crud.get_role_permissions(role.id) enriched_roles.append({ "id": role.id, "name": role.name, "description": role.description, "is_default": role.is_default, "created_by": role.created_by, "created_at": role.created_at.isoformat(), "user_count": user_count, "permission_count": len(permissions), }) return enriched_roles @castle_api_router.post("/api/v1/admin/roles", status_code=HTTPStatus.CREATED) async def api_create_role( data: CreateRole, wallet: WalletTypeInfo = Depends(require_admin_key), ): """Create a new role (admin only)""" from . import crud try: role = await crud.create_role(data, created_by=wallet.wallet.user) return { "id": role.id, "name": role.name, "description": role.description, "is_default": role.is_default, "created_by": role.created_by, "created_at": role.created_at.isoformat(), } except Exception as e: logger.error(f"Failed to create role: {e}") raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Failed to create role: {str(e)}" ) @castle_api_router.get("/api/v1/admin/roles/{role_id}") async def api_get_role( role_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ): """Get a specific role with its permissions and users (admin only)""" from . import crud role = await crud.get_role(role_id) if not role: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Role {role_id} not found" ) permissions = await crud.get_role_permissions(role.id) user_roles = await crud.get_role_users(role.id) return { "id": role.id, "name": role.name, "description": role.description, "is_default": role.is_default, "created_by": role.created_by, "created_at": role.created_at.isoformat(), "permissions": [ { "id": p.id, "account_id": p.account_id, "permission_type": p.permission_type.value, "notes": p.notes, "created_at": p.created_at.isoformat(), } for p in permissions ], "users": [ { "id": ur.id, "user_id": ur.user_id, "granted_by": ur.granted_by, "granted_at": ur.granted_at.isoformat(), "expires_at": ur.expires_at.isoformat() if ur.expires_at else None, "notes": ur.notes, } for ur in user_roles ], } @castle_api_router.put("/api/v1/admin/roles/{role_id}") async def api_update_role( role_id: str, data: UpdateRole, wallet: WalletTypeInfo = Depends(require_admin_key), ): """Update a role (admin only)""" from . import crud role = await crud.update_role(role_id, data) if not role: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Role {role_id} not found" ) return { "id": role.id, "name": role.name, "description": role.description, "is_default": role.is_default, "created_by": role.created_by, "created_at": role.created_at.isoformat(), } @castle_api_router.delete("/api/v1/admin/roles/{role_id}") async def api_delete_role( role_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ): """Delete a role (admin only) - cascades to role_permissions and user_roles""" from . import crud role = await crud.get_role(role_id) if not role: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Role {role_id} not found" ) await crud.delete_role(role_id) return {"success": True, "message": f"Role '{role.name}' deleted successfully"} # ===== ROLE PERMISSION ENDPOINTS ===== @castle_api_router.post("/api/v1/admin/roles/{role_id}/permissions", status_code=HTTPStatus.CREATED) async def api_add_role_permission( role_id: str, data: CreateRolePermission, wallet: WalletTypeInfo = Depends(require_admin_key), ): """Add a permission to a role (admin only)""" from . import crud # Verify role exists role = await crud.get_role(role_id) if not role: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Role {role_id} not found" ) # Ensure data has correct role_id data.role_id = role_id try: permission = await crud.create_role_permission(data) return { "id": permission.id, "role_id": permission.role_id, "account_id": permission.account_id, "permission_type": permission.permission_type.value, "notes": permission.notes, "created_at": permission.created_at.isoformat(), } except Exception as e: logger.error(f"Failed to add role permission: {e}") raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Failed to add permission: {str(e)}" ) @castle_api_router.delete("/api/v1/admin/roles/{role_id}/permissions/{permission_id}") async def api_delete_role_permission( role_id: str, permission_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ): """Remove a permission from a role (admin only)""" from . import crud await crud.delete_role_permission(permission_id) return {"success": True, "message": "Permission removed from role"} # ===== USER ROLE ASSIGNMENT ENDPOINTS ===== @castle_api_router.post("/api/v1/admin/user-roles", status_code=HTTPStatus.CREATED) async def api_assign_user_role( data: AssignUserRole, wallet: WalletTypeInfo = Depends(require_admin_key), ): """Assign a user to a role (admin only)""" from . import crud # Verify role exists role = await crud.get_role(data.role_id) if not role: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Role {data.role_id} not found" ) try: user_role = await crud.assign_user_role(data, granted_by=wallet.wallet.user) return { "id": user_role.id, "user_id": user_role.user_id, "role_id": user_role.role_id, "granted_by": user_role.granted_by, "granted_at": user_role.granted_at.isoformat(), "expires_at": user_role.expires_at.isoformat() if user_role.expires_at else None, "notes": user_role.notes, } except Exception as e: logger.error(f"Failed to assign user role: {e}") raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Failed to assign role: {str(e)}" ) @castle_api_router.get("/api/v1/admin/user-roles/{user_id}") async def api_get_user_roles( user_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ): """Get all roles assigned to a user (admin only)""" from . import crud user_roles = await crud.get_user_roles(user_id) # Enrich with role details enriched = [] for ur in user_roles: role = await crud.get_role(ur.role_id) if role: enriched.append({ "user_role_id": ur.id, "user_id": ur.user_id, "role": { "id": role.id, "name": role.name, "description": role.description, "is_default": role.is_default, }, "granted_by": ur.granted_by, "granted_at": ur.granted_at.isoformat(), "expires_at": ur.expires_at.isoformat() if ur.expires_at else None, "notes": ur.notes, }) return enriched @castle_api_router.delete("/api/v1/admin/user-roles/{user_role_id}") async def api_revoke_user_role( user_role_id: str, wallet: WalletTypeInfo = Depends(require_admin_key), ): """Revoke a user's role assignment (admin only)""" from . import crud await crud.revoke_user_role(user_role_id) return {"success": True, "message": "Role assignment revoked"} @castle_api_router.get("/api/v1/admin/users/roles") async def api_get_all_user_roles( wallet: WalletTypeInfo = Depends(require_admin_key), ): """Get all user role assignments (admin only)""" from . import crud user_roles = await crud.get_all_user_roles() return [ { "id": ur.id, "user_id": ur.user_id, "role_id": ur.role_id, "granted_by": ur.granted_by, "granted_at": ur.granted_at.isoformat(), "expires_at": ur.expires_at.isoformat() if ur.expires_at else None, "notes": ur.notes, } for ur in user_roles ] @castle_api_router.get("/api/v1/users/me/roles") async def api_get_my_roles( wallet: WalletTypeInfo = Depends(require_invoice_key), ): """Get current user's roles and effective permissions""" from . import crud user_id = wallet.wallet.user # Get user's roles user_roles = await crud.get_user_roles(user_id) # Get permissions from roles role_permissions_list = await crud.get_user_permissions_from_roles(user_id) # Get direct permissions direct_permissions = await crud.get_user_permissions(user_id) # Build response roles_data = [] for ur in user_roles: role = await crud.get_role(ur.role_id) if role: permissions = await crud.get_role_permissions(role.id) roles_data.append({ "role": { "id": role.id, "name": role.name, "description": role.description, }, "permissions": [ { "account_id": p.account_id, "permission_type": p.permission_type.value, } for p in permissions ], "granted_at": ur.granted_at.isoformat(), "expires_at": ur.expires_at.isoformat() if ur.expires_at else None, }) return { "roles": roles_data, "direct_permissions": [ { "id": p.id, "account_id": p.account_id, "permission_type": p.permission_type.value, "granted_at": p.granted_at.isoformat(), "expires_at": p.expires_at.isoformat() if p.expires_at else None, "notes": p.notes, } for p in direct_permissions ], }