castle/views_api.py
Padreug 1d2eb05c36 Adds custom date range filtering to transactions
Enables users to filter transactions by a custom date range, providing more flexibility in viewing transaction history.

Prioritizes custom date range over preset days for filtering.

Displays a warning if a user attempts to apply a custom date range without selecting both start and end dates.
2025-12-14 12:47:23 +01:00

3642 lines
129 KiB
Python

from datetime import datetime
from decimal import Decimal
from http import HTTPStatus
from fastapi import APIRouter, Depends, HTTPException
from loguru import logger
from lnbits.core.models import User, WalletTypeInfo
from lnbits.decorators import (
check_super_user,
check_user_exists,
require_admin_key,
require_invoice_key,
)
from lnbits.utils.exchange_rates import allowed_currencies, fiat_amount_as_satoshis
from .crud import (
approve_manual_payment_request,
check_balance_assertion,
create_account,
create_account_permission,
create_balance_assertion,
create_manual_payment_request,
db,
delete_account_permission,
delete_balance_assertion,
get_account,
get_account_by_name,
get_account_permission,
get_account_permissions,
get_all_accounts,
get_all_manual_payment_requests,
get_all_user_wallet_settings,
get_balance_assertion,
get_balance_assertions,
get_manual_payment_request,
get_or_create_user_account,
get_user_manual_payment_requests,
get_user_permissions,
get_user_permissions_with_inheritance,
reject_manual_payment_request,
)
from .models import (
Account,
AccountPermission,
AccountType,
AccountWithPermissions,
AssertionStatus,
AssignUserRole,
BalanceAssertion,
BulkGrantPermission,
BulkGrantResult,
CastleSettings,
CreateAccount,
CreateAccountPermission,
CreateBalanceAssertion,
CreateEntryLine,
CreateJournalEntry,
CreateManualPaymentRequest,
CreateRole,
CreateRolePermission,
CreateUserEquityStatus,
ExpenseEntry,
GeneratePaymentInvoice,
JournalEntry,
JournalEntryFlag,
ManualPaymentRequest,
PayUser,
PermissionType,
ReceivableEntry,
RecordPayment,
RevenueEntry,
Role,
RolePermission,
RoleWithPermissions,
SettleReceivable,
UpdateRole,
UserBalance,
UserEquityStatus,
UserInfo,
UserRole,
UserWalletSettings,
UserWithRoles,
)
from .services import get_settings, get_user_wallet, update_settings, update_user_wallet
castle_api_router = APIRouter()
# ===== HELPER FUNCTIONS =====
async def check_castle_wallet_configured() -> str:
"""Ensure castle wallet is configured, return wallet_id"""
settings = await get_settings("admin")
if not settings or not settings.castle_wallet_id:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Castle wallet not configured. Please contact the super user to configure the Castle wallet in settings.",
)
return settings.castle_wallet_id
async def check_user_wallet_configured(user_id: str) -> str:
"""Ensure user has configured their wallet, return wallet_id"""
from lnbits.settings import settings as lnbits_settings
# If user is super user, use the castle wallet
if user_id == lnbits_settings.super_user:
castle_settings = await get_settings("admin")
if castle_settings and castle_settings.castle_wallet_id:
return castle_settings.castle_wallet_id
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Castle wallet not configured. Please configure the Castle wallet in settings.",
)
# For regular users, check their personal wallet
user_wallet = await get_user_wallet(user_id)
if not user_wallet or not user_wallet.user_wallet_id:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="You must configure your wallet in settings before using this feature.",
)
return user_wallet.user_wallet_id
# ===== UTILITY ENDPOINTS =====
@castle_api_router.get("/api/v1/currencies")
async def api_get_currencies() -> list[str]:
"""Get list of allowed currencies for fiat conversion"""
return allowed_currencies()
# ===== ACCOUNT ENDPOINTS =====
@castle_api_router.get("/api/v1/accounts")
async def api_get_accounts(
filter_by_user: bool = False,
exclude_virtual: bool = True,
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> list[Account] | list[AccountWithPermissions]:
"""
Get all accounts in the chart of accounts.
- filter_by_user: If true, only return accounts the user has permissions for
- exclude_virtual: If true, exclude virtual parent accounts (default True)
- Returns AccountWithPermissions objects when filter_by_user=true, otherwise Account objects
"""
from lnbits.settings import settings as lnbits_settings
from . import crud
all_accounts = await get_all_accounts()
user_id = wallet.wallet.user
is_super_user = user_id == lnbits_settings.super_user
# Auto-assign default role if user has no roles (only for non-super users)
if not is_super_user:
assigned_role = await crud.auto_assign_default_role(user_id, "system")
if assigned_role:
logger.info(f"[ACCOUNTS] Auto-assigned role to user {user_id}")
# Super users bypass permission filtering - they see everything
if not filter_by_user or is_super_user:
# Filter out virtual accounts if requested (default behavior for user views)
if exclude_virtual:
all_accounts = [acc for acc in all_accounts if not acc.is_virtual]
# Return all accounts without filtering by permissions
return all_accounts
# Filter by user permissions
# NOTE: Do NOT filter out virtual accounts yet - they're needed for inheritance logic
# Get direct user permissions
user_permissions = await get_user_permissions(user_id)
# Get role-based permissions
role_permissions_list = await crud.get_user_permissions_from_roles(user_id)
# Flatten role permissions into a single list
role_perms = []
for role, perms in role_permissions_list:
role_perms.extend(perms)
# Combine direct and role-based permissions
all_permissions = list(user_permissions) + role_perms
logger.info(f"[ACCOUNTS] User {user_id} has {len(user_permissions)} direct permissions and {len(role_perms)} role permissions (total: {len(all_permissions)})")
if role_perms:
logger.info(f"[ACCOUNTS] Role permissions: {[(p.account_id, p.permission_type) for p in role_perms]}")
logger.info(f"[ACCOUNTS] Total accounts in system: {len(all_accounts)}")
if len(all_accounts) > 0:
logger.info(f"[ACCOUNTS] Sample account IDs: {[acc.id for acc in all_accounts[:5]]}")
# Get set of account IDs the user has any permission on
permitted_account_ids = {perm.account_id for perm in all_permissions}
# Build list of accounts with permission metadata
accounts_with_permissions = []
for account in all_accounts:
# Check if user has permission on this account (direct or from role)
account_perms = [
perm for perm in all_permissions if perm.account_id == account.id
]
# Check if user has inherited permission from parent account (using combined permissions)
# Check both direct and role-based permissions for parent accounts
inherited_perms = []
for perm in all_permissions:
# Get the account for this permission
perm_account = await get_account(perm.account_id)
if not perm_account:
continue
# Check if this permission's account is a parent of the current account
# e.g., "Expenses:Supplies" is parent of "Expenses:Supplies:Food"
if account.name.startswith(perm_account.name + ":"):
# Inherited permission from parent account
inherited_perms.append((perm, perm_account.name))
# Determine if account should be included
has_access = bool(account_perms) or bool(inherited_perms)
if has_access:
# Parse hierarchical account name to get parent and level
parts = account.name.split(":")
level = len(parts) - 1
parent_account = ":".join(parts[:-1]) if level > 0 else None
# Determine inherited_from (which parent account gave access)
inherited_from = None
if inherited_perms and not account_perms:
# Permission is inherited, use the parent account name
_, parent_name = inherited_perms[0]
inherited_from = parent_name
# Collect permission types for this account
permission_types = [perm.permission_type for perm in account_perms]
# Check if account has children
has_children = any(
a.name.startswith(account.name + ":") for a in all_accounts
)
accounts_with_permissions.append(
AccountWithPermissions(
id=account.id,
name=account.name,
account_type=account.account_type,
description=account.description,
user_id=account.user_id,
created_at=account.created_at,
is_active=account.is_active,
is_virtual=account.is_virtual,
user_permissions=permission_types if permission_types else None,
inherited_from=inherited_from,
parent_account=parent_account,
level=level,
has_children=has_children,
)
)
# Filter out virtual accounts if requested (after permission inheritance logic)
if exclude_virtual:
accounts_with_permissions = [
acc for acc in accounts_with_permissions if not acc.is_virtual
]
logger.info(f"[ACCOUNTS] Returning {len(accounts_with_permissions)} accounts for user {user_id}")
return accounts_with_permissions
@castle_api_router.post("/api/v1/accounts", status_code=HTTPStatus.CREATED)
async def api_create_account(
data: CreateAccount,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> Account:
"""Create a new account (admin only)"""
return await create_account(data)
@castle_api_router.get("/api/v1/accounts/{account_id}")
async def api_get_account(account_id: str) -> Account:
"""Get a specific account"""
account = await get_account(account_id)
if not account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Account not found"
)
return account
@castle_api_router.get("/api/v1/accounts/{account_id}/balance")
async def api_get_account_balance(account_id: str) -> dict:
"""Get account balance from Fava/Beancount"""
from .fava_client import get_fava_client
# Get account to retrieve its name
account = await get_account(account_id)
if not account:
raise HTTPException(status_code=404, detail="Account not found")
# Query Fava for balance
fava = get_fava_client()
balance_data = await fava.get_account_balance(account.name)
return {
"account_id": account_id,
"balance": balance_data["sats"], # Balance in satoshis
"positions": balance_data["positions"] # Full Beancount positions with cost basis
}
@castle_api_router.get("/api/v1/accounts/{account_id}/transactions")
async def api_get_account_transactions(account_id: str, limit: int = 100) -> list[dict]:
"""
Get all transactions for an account from Fava/Beancount.
Returns transactions affecting this account in reverse chronological order.
"""
from .fava_client import get_fava_client
# Get account details
account = await get_account(account_id)
if not account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Account {account_id} not found"
)
# Query Fava for transactions
fava = get_fava_client()
transactions = await fava.get_account_transactions(account.name, limit)
return transactions
# ===== JOURNAL ENTRY ENDPOINTS =====
@castle_api_router.get("/api/v1/entries")
async def api_get_journal_entries(limit: int = 100) -> list[dict]:
"""
Get all journal entries from Fava/Beancount.
Returns all transactions in reverse chronological order with username enrichment.
"""
from lnbits.core.crud.users import get_user
from .fava_client import get_fava_client
fava = get_fava_client()
all_entries = await fava.get_journal_entries()
# Filter to transactions only and enrich with username
enriched_entries = []
for e in all_entries:
if e.get("t") != "Transaction":
continue
# Extract user ID from metadata or account names
user_id = None
entry_meta = e.get("meta", {})
if "user-id" in entry_meta:
user_id = entry_meta["user-id"]
else:
# Try to extract from account names in postings
for posting in e.get("postings", []):
account = posting.get("account", "")
if "User-" in account:
parts = account.split("User-")
if len(parts) > 1:
user_id = parts[1]
break
# Look up username
username = None
if user_id:
user = await get_user(user_id)
username = user.username if user and user.username else f"User-{user_id[:8]}"
# Add username to entry
enriched_entry = dict(e)
enriched_entry["user_id"] = user_id
enriched_entry["username"] = username
enriched_entries.append(enriched_entry)
if len(enriched_entries) >= limit:
break
return enriched_entries
@castle_api_router.get("/api/v1/entries/user")
async def api_get_user_entries(
wallet: WalletTypeInfo = Depends(require_invoice_key),
limit: int = 20,
offset: int = 0,
filter_user_id: str = None,
filter_account_type: str = None, # 'asset' for receivable, 'liability' for payable
days: int = 15, # Default 15 days, options: 15, 30, 60
start_date: str = None, # ISO format: YYYY-MM-DD
end_date: str = None, # ISO format: YYYY-MM-DD
) -> dict:
"""
Get journal entries that affect the current user's accounts from Fava/Beancount.
Returns transactions in reverse chronological order with optional filtering.
Args:
days: Number of days to fetch (default: 15, options: 15, 30, 60)
start_date: Start date for custom range (YYYY-MM-DD). Requires end_date.
end_date: End date for custom range (YYYY-MM-DD). Requires start_date.
Note:
If both days and start_date/end_date are provided, start_date/end_date takes precedence.
"""
from lnbits.settings import settings as lnbits_settings
from .fava_client import get_fava_client
fava = get_fava_client()
# Determine which user's entries to fetch
if wallet.wallet.user == lnbits_settings.super_user:
# Super user can view all or filter by user_id
target_user_id = filter_user_id
else:
# Regular user can only see their own entries
target_user_id = wallet.wallet.user
# Get journal entries from Fava
# Priority: custom date range > days > default (5 days)
all_entries = await fava.get_journal_entries(
days=days,
start_date=start_date,
end_date=end_date
)
# Filter and transform entries
filtered_entries = []
for e in all_entries:
if e.get("t") != "Transaction":
continue
# Skip voided transactions
if "voided" in e.get("tags", []):
continue
# Extract user ID from metadata or account names
user_id_match = None
entry_meta = e.get("meta", {})
if "user-id" in entry_meta:
user_id_match = entry_meta["user-id"]
else:
# Try to extract from account names in postings
for posting in e.get("postings", []):
account = posting.get("account", "")
if "User-" in account:
# Extract user ID from account name (e.g., "Liabilities:Payable:User-abc123")
parts = account.split("User-")
if len(parts) > 1:
user_id_match = parts[1] # Just the short ID after User-
break
# Filter by target user if specified
if target_user_id and user_id_match:
if not user_id_match.startswith(target_user_id[:8]):
continue
# Filter by account type if specified
if filter_account_type and user_id_match:
postings = e.get("postings", [])
has_matching_account = False
for posting in postings:
account = posting.get("account", "")
if filter_account_type.lower() == "asset" and "Receivable" in account:
has_matching_account = True
break
elif filter_account_type.lower() == "liability" and "Payable" in account:
has_matching_account = True
break
if not has_matching_account:
continue
# Extract data for frontend
# Extract entry ID from links
entry_id = None
links = e.get("links", [])
if isinstance(links, (list, set)):
for link in links:
if isinstance(link, str):
link_clean = link.lstrip('^')
if "castle-" in link_clean:
parts = link_clean.split("castle-")
if len(parts) > 1:
entry_id = parts[-1]
break
# Extract amount from postings
amount_sats = 0
fiat_amount = None
fiat_currency = None
postings = e.get("postings", [])
if postings:
first_posting = postings[0]
if isinstance(first_posting, dict):
amount_str = first_posting.get("amount", "")
# Parse amount string: can be EUR/USD directly (new format) or "SATS {EUR}" (old format)
if isinstance(amount_str, str) and amount_str:
import re
# Try EUR/USD format first (new format: "37.22 EUR")
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
# Direct fiat amount (new approach)
fiat_amount = abs(float(fiat_match.group(1)))
fiat_currency = fiat_match.group(2)
# Get SATS from metadata
posting_meta = first_posting.get("meta", {})
sats_equiv = posting_meta.get("sats-equivalent")
if sats_equiv:
amount_sats = abs(int(sats_equiv))
else:
# Old format: "36791 SATS {33.33 EUR, 2025-11-09}" or "36791 SATS"
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
if sats_match:
amount_sats = abs(int(sats_match.group(1)))
# Extract fiat from cost syntax: {33.33 EUR, ...}
cost_match = re.search(r'\{([\d.]+)\s+([A-Z]+)', amount_str)
if cost_match:
fiat_amount = float(cost_match.group(1))
fiat_currency = cost_match.group(2)
# Extract reference from links (first non-castle link)
reference = None
if isinstance(links, (list, set)):
for link in links:
if isinstance(link, str):
link_clean = link.lstrip('^')
if not link_clean.startswith("castle-") and not link_clean.startswith("ln-"):
reference = link_clean
break
# Look up actual username using helper function
username = await _get_username_from_user_id(user_id_match) if user_id_match else None
entry_data = {
"id": entry_id or e.get("entry_hash", "unknown"),
"date": e.get("date", ""),
"entry_date": e.get("date", ""),
"flag": e.get("flag"),
"description": e.get("narration", ""),
"payee": e.get("payee"),
"tags": e.get("tags", []),
"links": links,
"amount": amount_sats,
"user_id": user_id_match,
"username": username,
"reference": reference,
"meta": entry_meta, # Include metadata for frontend
}
if fiat_amount and fiat_currency:
entry_data["fiat_amount"] = fiat_amount
entry_data["fiat_currency"] = fiat_currency
filtered_entries.append(entry_data)
# Sort by date descending
filtered_entries.sort(key=lambda x: x.get("date", ""), reverse=True)
# Apply pagination
total = len(filtered_entries)
paginated_entries = filtered_entries[offset:offset + limit]
return {
"entries": paginated_entries,
"total": total,
"limit": limit,
"offset": offset,
"has_next": (offset + limit) < total,
"has_prev": offset > 0,
}
async def _get_username_from_user_id(user_id: str) -> str:
"""
Helper function to get username from user_id, handling various formats.
Supports:
- Full UUID with dashes (36 chars): "375ec158-686c-4a21-b44d-a51cc90ef07d"
- Dashless UUID (32 chars): "375ec158686c4a21b44da51cc90ef07d"
- Partial ID (8 chars from account names): "375ec158"
Returns username or formatted fallback.
"""
from lnbits.core.crud.users import get_user
logger.debug(f"[USERNAME] Called with: '{user_id}' (len={len(user_id) if user_id else 0})")
if not user_id:
return None
# Case 1: Already in standard UUID format (36 chars with dashes)
if len(user_id) == 36 and user_id.count('-') == 4:
logger.debug(f"[USERNAME] Case 1: Full UUID format")
user = await get_user(user_id)
result = user.username if user and user.username else f"User-{user_id[:8]}"
logger.debug(f"[USERNAME] Case 1 result: '{result}'")
return result
# Case 2: Dashless 32-char UUID - lookup via Castle user settings, fallback to LNbits
elif len(user_id) == 32 and '-' not in user_id:
logger.debug(f"[USERNAME] Case 2: Dashless UUID format - looking up in Castle user settings")
try:
# Convert dashless to dashed format
user_id_with_dashes = f"{user_id[0:8]}-{user_id[8:12]}-{user_id[12:16]}-{user_id[16:20]}-{user_id[20:32]}"
logger.debug(f"[USERNAME] Converted to dashed format: {user_id_with_dashes}")
# Try Castle settings first
user_settings = await get_all_user_wallet_settings()
for setting in user_settings:
if setting.id == user_id_with_dashes:
logger.debug(f"[USERNAME] Found matching user in Castle settings")
user = await get_user(setting.id)
result = user.username if user and user.username else f"User-{user_id[:8]}"
logger.debug(f"[USERNAME] Case 2 result (from Castle): '{result}'")
return result
# Not in Castle settings - try LNbits database directly
logger.debug(f"[USERNAME] Not in Castle settings, querying LNbits database directly")
from lnbits.db import Database
db = Database("database")
async with db.connect() as conn:
row = await conn.fetchone(
"SELECT id, username FROM accounts WHERE id = :user_id LIMIT 1",
{"user_id": user_id_with_dashes}
)
logger.debug(f"[USERNAME] Database query result: {row}")
if row and row["username"]:
result = row["username"]
logger.debug(f"[USERNAME] Case 2 result (from LNbits DB): '{result}'")
return result
# User doesn't exist anywhere
logger.debug(f"[USERNAME] User not found in LNbits database either")
result = f"User-{user_id[:8]}"
logger.debug(f"[USERNAME] Case 2 result (not found): '{result}'")
return result
except Exception as e:
logger.error(f"Error looking up user by dashless UUID {user_id}: {e}")
result = f"User-{user_id[:8]}"
return result
# Case 3: Partial ID (8 chars from account name) - lookup via Castle user settings
elif len(user_id) == 8:
logger.debug(f"[USERNAME] Case 3: Partial ID format - looking up in Castle user settings")
try:
# Get all Castle users (which have full user_ids)
user_settings = await get_all_user_wallet_settings()
# Find matching user by first 8 chars
for setting in user_settings:
if setting.id.startswith(user_id):
logger.debug(f"[USERNAME] Found full user_id: {setting.id}")
# Now get username from LNbits with full ID
user = await get_user(setting.id)
result = user.username if user and user.username else f"User-{user_id}"
logger.debug(f"[USERNAME] Case 3 result (found): '{result}'")
return result
# No matching user found in Castle settings
logger.debug(f"[USERNAME] No matching user found in Castle settings")
result = f"User-{user_id}"
logger.debug(f"[USERNAME] Case 3 result (not found): '{result}'")
return result
except Exception as e:
logger.error(f"Error looking up user by partial ID {user_id}: {e}")
result = f"User-{user_id}"
return result
# Case 4: Unknown format - try as-is and fall back
else:
logger.debug(f"[USERNAME] Case 4: Unknown format - trying as-is")
try:
user = await get_user(user_id)
result = user.username if user and user.username else f"User-{user_id[:8]}"
logger.debug(f"[USERNAME] Case 4 result: '{result}'")
return result
except Exception as e:
logger.debug(f"[USERNAME] Case 4 exception: {e}")
result = f"User-{user_id[:8]}"
logger.debug(f"[USERNAME] Case 4 fallback result: '{result}'")
return result
@castle_api_router.get("/api/v1/entries/pending")
async def api_get_pending_entries(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list[dict]:
"""
Get all pending expense entries that need approval (admin only).
Returns transactions with flag='!' from Fava/Beancount.
"""
from lnbits.settings import settings as lnbits_settings
from .fava_client import get_fava_client
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can access this endpoint",
)
# Query Fava for all journal entries (includes links, tags, full metadata)
fava = get_fava_client()
all_entries = await fava.get_journal_entries()
# Filter for pending transactions and extract info
pending_entries = []
for e in all_entries:
# Only include pending transactions that are NOT voided
if e.get("t") == "Transaction" and e.get("flag") == "!" and "voided" not in e.get("tags", []):
# Extract entry ID from links field
entry_id = None
links = e.get("links", [])
if isinstance(links, (list, set)):
for link in links:
if isinstance(link, str):
# Strip ^ prefix if present (Beancount link syntax)
link_clean = link.lstrip('^')
if "castle-" in link_clean:
parts = link_clean.split("castle-")
if len(parts) > 1:
entry_id = parts[-1]
break
# Extract user ID from metadata or account names
user_id = None
entry_meta = e.get("meta", {})
logger.info(f"[EXTRACT] Entry metadata keys: {list(entry_meta.keys())}")
logger.info(f"[EXTRACT] Entry metadata: {entry_meta}")
if "user-id" in entry_meta:
user_id = entry_meta["user-id"]
logger.info(f"[EXTRACT] Found user-id in metadata: {user_id}")
else:
logger.info(f"[EXTRACT] No user-id in metadata, checking account names")
# Try to extract from account names in postings
for posting in e.get("postings", []):
account = posting.get("account", "")
if "User-" in account:
# Extract user ID from account name (e.g., "Liabilities:Payable:User-abc123")
parts = account.split("User-")
if len(parts) > 1:
user_id = parts[1] # Short ID after User-
logger.info(f"[EXTRACT] Extracted user_id from account name: {user_id}")
break
# Look up username using helper function
username = await _get_username_from_user_id(user_id) if user_id else None
# Extract amount from postings (sum of absolute values / 2)
amount_sats = 0
fiat_amount = None
fiat_currency = None
postings = e.get("postings", [])
if postings:
first_posting = postings[0]
if isinstance(first_posting, dict):
amount_str = first_posting.get("amount", "")
# Parse amount string format
if isinstance(amount_str, str) and amount_str:
import re
# Try EUR/USD format first (new architecture): "50.00 EUR"
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
fiat_amount = abs(float(fiat_match.group(1)))
fiat_currency = fiat_match.group(2)
# Extract sats equivalent from metadata
posting_meta = first_posting.get("meta", {})
sats_equiv = posting_meta.get("sats-equivalent")
if sats_equiv:
amount_sats = abs(int(sats_equiv))
else:
# Legacy SATS format: "36791 SATS {33.33 EUR, 2025-11-09}" or "36791 SATS"
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
if sats_match:
amount_sats = abs(int(sats_match.group(1)))
# Extract fiat from cost syntax: {33.33 EUR, ...}
cost_match = re.search(r'\{([\d.]+)\s+([A-Z]+)', amount_str)
if cost_match:
fiat_amount = float(cost_match.group(1))
fiat_currency = cost_match.group(2)
entry_data = {
"id": entry_id or "unknown",
"date": e.get("date", ""),
"entry_date": e.get("date", ""),
"flag": e.get("flag"),
"description": e.get("narration", ""),
"payee": e.get("payee"),
"tags": e.get("tags", []),
"links": links,
"amount": amount_sats,
"user_id": user_id,
"username": username,
}
# Add fiat info if available
if fiat_amount and fiat_currency:
entry_data["fiat_amount"] = fiat_amount
entry_data["fiat_currency"] = fiat_currency
pending_entries.append(entry_data)
return pending_entries
@castle_api_router.post("/api/v1/entries", status_code=HTTPStatus.CREATED)
async def api_create_journal_entry(
data: CreateJournalEntry,
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> JournalEntry:
"""
Create a new generic journal entry.
Submits entry to Fava/Beancount.
"""
from .fava_client import get_fava_client
from .beancount_format import format_transaction, format_posting_with_cost
# Validate that entry balances to zero
total = sum(line.amount for line in data.lines)
if total != 0:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Entry does not balance (total: {total}, expected: 0)"
)
# Get all accounts and validate they exist
account_map = {}
for line in data.lines:
account = await get_account(line.account_id)
if not account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Account '{line.account_id}' not found"
)
account_map[line.account_id] = account
# Format postings
postings = []
for line in data.lines:
account = account_map[line.account_id]
# Extract fiat info from metadata if present
fiat_currency = line.metadata.get("fiat_currency")
fiat_amount_str = line.metadata.get("fiat_amount")
fiat_amount = Decimal(fiat_amount_str) if fiat_amount_str else None
# Create posting metadata (excluding fiat fields that are used for primary amount)
posting_metadata = {k: v for k, v in line.metadata.items()
if k not in ["fiat_currency", "fiat_amount"]}
# If fiat currency is provided, use EUR-based format (primary amount in EUR, sats in metadata)
# Otherwise, use SATS-based format
if fiat_currency and fiat_amount:
# EUR-based posting (current architecture)
posting_metadata["sats-equivalent"] = str(abs(line.amount))
# Apply the sign from line.amount to fiat_amount
# line.amount is positive for debits, negative for credits
signed_fiat_amount = fiat_amount if line.amount >= 0 else -fiat_amount
posting = {
"account": account.name,
"amount": f"{signed_fiat_amount:.2f} {fiat_currency}",
"meta": posting_metadata if posting_metadata else None
}
else:
# SATS-based posting (legacy/fallback)
if line.description:
posting_metadata["description"] = line.description
posting = format_posting_with_cost(
account=account.name,
amount_sats=line.amount,
fiat_currency=None,
fiat_amount=None,
metadata=posting_metadata if posting_metadata else None
)
postings.append(posting)
# Extract tags and links from meta
tags = data.meta.get("tags", [])
links = data.meta.get("links", [])
if data.reference:
links.append(data.reference)
# Entry metadata (excluding tags and links which go at transaction level)
entry_meta = {k: v for k, v in data.meta.items() if k not in ["tags", "links"]}
entry_meta["source"] = "castle-api"
entry_meta["created-by"] = wallet.wallet.id
# Format as Beancount entry
fava = get_fava_client()
entry = format_transaction(
date_val=data.entry_date.date() if data.entry_date else datetime.now().date(),
flag=data.flag.value if data.flag else "*",
narration=data.description,
postings=postings,
tags=tags if tags else None,
links=links if links else None,
meta=entry_meta
)
# Submit to Fava
result = await fava.add_entry(entry)
logger.info(f"Journal entry submitted to Fava: {result.get('data', 'Unknown')}")
# Return simplified JournalEntry for API compatibility
# Note: Castle no longer stores entries in DB, Fava is the source of truth
timestamp = datetime.now().timestamp()
return JournalEntry(
id=f"fava-{timestamp}",
description=data.description,
entry_date=data.entry_date if data.entry_date else datetime.now(),
created_by=wallet.wallet.id,
created_at=datetime.now(),
reference=data.reference,
flag=data.flag if data.flag else JournalEntryFlag.CLEARED,
lines=[], # Empty - entry is stored in Fava, not Castle DB
meta={"source": "fava", "fava_response": result.get('data', 'Unknown')}
)
# ===== SIMPLIFIED ENTRY ENDPOINTS =====
@castle_api_router.post("/api/v1/entries/expense", status_code=HTTPStatus.CREATED)
async def api_create_expense_entry(
data: ExpenseEntry,
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> JournalEntry:
"""
Create an expense entry for a user.
If is_equity=True, records as equity contribution.
If is_equity=False, records as liability (castle owes user).
If currency is provided, amount is converted from fiat to satoshis.
"""
# Check that castle wallet is configured
await check_castle_wallet_configured()
# Check that user has configured their wallet
await check_user_wallet_configured(wallet.wallet.user)
# Handle currency conversion
amount_sats = int(data.amount)
metadata = {}
if data.currency:
# Validate currency
if data.currency.upper() not in allowed_currencies():
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Currency '{data.currency}' not allowed. Use one of: {', '.join(allowed_currencies())}",
)
# Convert fiat to satoshis
amount_sats = await fiat_amount_as_satoshis(float(data.amount), data.currency)
# Store currency metadata (store fiat_amount as string to preserve Decimal precision)
metadata = {
"fiat_currency": data.currency.upper(),
"fiat_amount": str(data.amount.quantize(Decimal("0.001"))), # Store as string with 3 decimal places
"fiat_rate": float(amount_sats) / float(data.amount) if data.amount > 0 else 0,
"btc_rate": float(data.amount) / float(amount_sats) * 100_000_000 if amount_sats > 0 else 0,
}
# Get or create expense account
expense_account = await get_account_by_name(data.expense_account)
if not expense_account:
# Try to get it by ID
expense_account = await get_account(data.expense_account)
if not expense_account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Expense account '{data.expense_account}' not found",
)
# Validate user has permission to submit expenses to this account
from .crud import get_user_permissions_with_inheritance
submit_perms = await get_user_permissions_with_inheritance(
wallet.wallet.user, expense_account.name, PermissionType.SUBMIT_EXPENSE
)
if not submit_perms:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail=f"You do not have permission to submit expenses to account '{expense_account.name}'. Please contact an administrator to request access.",
)
# Get or create user-specific account
if data.is_equity:
# Validate equity eligibility
from .crud import get_user_equity_status
equity_status = await get_user_equity_status(wallet.wallet.user)
if not equity_status or not equity_status.is_equity_eligible:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="User is not eligible to contribute expenses to equity. Please submit for cash reimbursement.",
)
if not equity_status.equity_account_name:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="User equity account not configured. Contact administrator.",
)
# Equity contribution - use user's specific equity account
user_account = await get_account_by_name(equity_status.equity_account_name)
if not user_account:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Equity account '{equity_status.equity_account_name}' not found. Contact administrator.",
)
else:
# Liability (castle owes user)
user_account = await get_or_create_user_account(
wallet.wallet.user, AccountType.LIABILITY, "Accounts Payable"
)
# Create journal entry
# DR Expense, CR User Account (Liability or Equity)
description_suffix = f" ({metadata['fiat_amount']} {metadata['fiat_currency']})" if metadata else ""
# Add meta information for audit trail
entry_meta = {
"source": "api",
"created_via": "expense_entry",
"user_id": wallet.wallet.user,
"is_equity": data.is_equity,
}
# Format as Beancount entry and submit to Fava
from .fava_client import get_fava_client
from .beancount_format import format_expense_entry, sanitize_link
fava = get_fava_client()
# Extract fiat info from metadata
fiat_currency = metadata.get("fiat_currency") if metadata else None
fiat_amount = Decimal(metadata.get("fiat_amount")) if metadata and metadata.get("fiat_amount") else None
# Generate unique entry ID for tracking
import uuid
entry_id = str(uuid.uuid4()).replace("-", "")[:16]
# Add castle ID as reference/link (sanitized for Beancount)
castle_reference = f"castle-{entry_id}"
if data.reference:
castle_reference = f"{sanitize_link(data.reference)}-{entry_id}"
# Format Beancount entry
entry = format_expense_entry(
user_id=wallet.wallet.user,
expense_account=expense_account.name,
user_account=user_account.name,
amount_sats=amount_sats,
description=data.description,
entry_date=data.entry_date.date() if data.entry_date else datetime.now().date(),
is_equity=data.is_equity,
fiat_currency=fiat_currency,
fiat_amount=fiat_amount,
reference=castle_reference # Add castle ID as link
)
# Submit to Fava
result = await fava.add_entry(entry)
# Return a JournalEntry-like response for compatibility
from .models import EntryLine
return JournalEntry(
id=entry_id, # Use the generated castle entry ID
description=data.description + description_suffix,
entry_date=data.entry_date if data.entry_date else datetime.now(),
created_by=wallet.wallet.id,
created_at=datetime.now(),
reference=castle_reference,
flag=JournalEntryFlag.PENDING,
meta=entry_meta,
lines=[
EntryLine(
id=f"line-1-{entry_id}",
journal_entry_id=entry_id,
account_id=expense_account.id,
amount=amount_sats,
description=f"Expense paid by user {wallet.wallet.user[:8]}",
metadata=metadata or {}
),
EntryLine(
id=f"line-2-{entry_id}",
journal_entry_id=entry_id,
account_id=user_account.id,
amount=-amount_sats,
description=f"{'Equity contribution' if data.is_equity else 'Amount owed to user'}",
metadata=metadata or {}
),
]
)
@castle_api_router.post("/api/v1/entries/receivable", status_code=HTTPStatus.CREATED)
async def api_create_receivable_entry(
data: ReceivableEntry,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> JournalEntry:
"""
Create an accounts receivable entry (user owes castle).
Admin only to prevent abuse.
If currency is provided, amount is converted from fiat to satoshis.
"""
# Handle currency conversion
amount_sats = int(data.amount)
metadata = {}
if data.currency:
# Validate currency
if data.currency.upper() not in allowed_currencies():
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Currency '{data.currency}' not allowed. Use one of: {', '.join(allowed_currencies())}",
)
# Convert fiat to satoshis
amount_sats = await fiat_amount_as_satoshis(float(data.amount), data.currency)
# Store currency metadata (store fiat_amount as string to preserve Decimal precision)
metadata = {
"fiat_currency": data.currency.upper(),
"fiat_amount": str(data.amount.quantize(Decimal("0.001"))), # Store as string with 3 decimal places
"fiat_rate": float(amount_sats) / float(data.amount) if data.amount > 0 else 0,
"btc_rate": float(data.amount) / float(amount_sats) * 100_000_000 if amount_sats > 0 else 0,
}
# Get or create revenue account
revenue_account = await get_account_by_name(data.revenue_account)
if not revenue_account:
revenue_account = await get_account(data.revenue_account)
if not revenue_account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Revenue account '{data.revenue_account}' not found",
)
# Get or create user-specific receivable account
user_receivable = await get_or_create_user_account(
data.user_id, AccountType.ASSET, "Accounts Receivable"
)
# Create journal entry
# DR Accounts Receivable (User), CR Revenue
description_suffix = f" ({metadata['fiat_amount']} {metadata['fiat_currency']})" if metadata else ""
# Add meta information for audit trail
entry_meta = {
"source": "api",
"created_via": "receivable_entry",
"debtor_user_id": data.user_id,
}
# Format as Beancount entry and submit to Fava
from .fava_client import get_fava_client
from .beancount_format import format_receivable_entry, sanitize_link
fava = get_fava_client()
# Extract fiat info from metadata
fiat_currency = metadata.get("fiat_currency") if metadata else None
fiat_amount = Decimal(metadata.get("fiat_amount")) if metadata and metadata.get("fiat_amount") else None
# Generate unique entry ID for tracking
import uuid
entry_id = str(uuid.uuid4()).replace("-", "")[:16]
# Add castle ID as reference/link (sanitized for Beancount)
castle_reference = f"castle-{entry_id}"
if data.reference:
castle_reference = f"{sanitize_link(data.reference)}-{entry_id}"
# Format Beancount entry
entry = format_receivable_entry(
user_id=data.user_id,
revenue_account=revenue_account.name,
receivable_account=user_receivable.name,
amount_sats=amount_sats,
description=data.description,
entry_date=datetime.now().date(),
fiat_currency=fiat_currency,
fiat_amount=fiat_amount,
reference=castle_reference # Use castle reference with unique ID
)
# Submit to Fava
result = await fava.add_entry(entry)
# Return a JournalEntry-like response for compatibility
from .models import EntryLine
return JournalEntry(
id=entry_id, # Use the generated castle entry ID
description=data.description + description_suffix,
entry_date=datetime.now(),
created_by=wallet.wallet.id,
created_at=datetime.now(),
reference=castle_reference, # Use castle reference with unique ID
flag=JournalEntryFlag.PENDING,
meta=entry_meta,
lines=[
EntryLine(
id=f"line-1-{entry_id}",
journal_entry_id=entry_id,
account_id=user_receivable.id,
amount=amount_sats,
description=f"Amount owed by user {data.user_id[:8]}",
metadata=metadata or {}
),
EntryLine(
id=f"line-2-{entry_id}",
journal_entry_id=entry_id,
account_id=revenue_account.id,
amount=-amount_sats,
description="Revenue earned",
metadata=metadata or {}
),
]
)
@castle_api_router.post("/api/v1/entries/revenue", status_code=HTTPStatus.CREATED)
async def api_create_revenue_entry(
data: RevenueEntry,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> JournalEntry:
"""
Create a revenue entry (castle receives payment).
Admin only.
Submits entry to Fava/Beancount.
"""
from .fava_client import get_fava_client
from .beancount_format import format_revenue_entry, sanitize_link
# Get revenue account
revenue_account = await get_account_by_name(data.revenue_account)
if not revenue_account:
revenue_account = await get_account(data.revenue_account)
if not revenue_account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Revenue account '{data.revenue_account}' not found",
)
# Get payment method account
payment_account = await get_account_by_name(data.payment_method_account)
if not payment_account:
payment_account = await get_account(data.payment_method_account)
if not payment_account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Payment account '{data.payment_method_account}' not found",
)
# Handle currency conversion if provided
amount_sats = int(data.amount)
fiat_currency = None
fiat_amount = None
if data.currency:
# Validate currency
if data.currency.upper() not in allowed_currencies():
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Currency '{data.currency}' not supported. Allowed: {', '.join(allowed_currencies())}",
)
# Store fiat info for cost basis
fiat_currency = data.currency.upper()
fiat_amount = data.amount # Original fiat amount
# In this case, data.amount should be the satoshi amount
# This is a bit confusing - the API accepts amount as Decimal which could be either sats or fiat
# For now, assume if currency is provided, amount is fiat and needs conversion
# TODO: Consider updating the API model to be clearer about this
# Format as Beancount entry and submit to Fava
fava = get_fava_client()
# Generate unique entry ID for tracking
import uuid
entry_id = str(uuid.uuid4()).replace("-", "")[:16]
# Add castle ID as reference/link (sanitized for Beancount)
castle_reference = f"castle-{entry_id}"
if data.reference:
castle_reference = f"{sanitize_link(data.reference)}-{entry_id}"
entry = format_revenue_entry(
payment_account=payment_account.name,
revenue_account=revenue_account.name,
amount_sats=amount_sats,
description=data.description,
entry_date=datetime.now().date(),
fiat_currency=fiat_currency,
fiat_amount=fiat_amount,
reference=castle_reference # Use castle reference with unique ID
)
# Submit to Fava
result = await fava.add_entry(entry)
logger.info(f"Revenue entry submitted to Fava: {result.get('data', 'Unknown')}")
# Return simplified JournalEntry for API compatibility
# Note: Castle no longer stores entries in DB, Fava is the source of truth
return JournalEntry(
id=entry_id,
description=data.description,
entry_date=datetime.now(),
created_by=wallet.wallet.id,
created_at=datetime.now(),
reference=castle_reference,
flag=JournalEntryFlag.CLEARED,
lines=[], # Empty - entry is stored in Fava, not Castle DB
meta={"source": "fava", "fava_response": result.get('data', 'Unknown')}
)
# ===== USER BALANCE ENDPOINTS =====
@castle_api_router.get("/api/v1/balance")
async def api_get_my_balance(
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> UserBalance:
"""Get current user's balance with the Castle (from Fava/Beancount)"""
from lnbits.settings import settings as lnbits_settings
from .fava_client import get_fava_client
fava = get_fava_client()
# If super user, show total castle position
if wallet.wallet.user == lnbits_settings.super_user:
all_balances = await fava.get_all_user_balances()
# Calculate total:
# From get_user_balance(): positive = user owes castle, negative = castle owes user
# Positive balances = Users owe Castle (receivables for Castle)
# Negative balances = Castle owes users (liabilities for Castle)
# Net: positive means castle is owed money, negative means castle owes money
total_receivables = sum(b["balance"] for b in all_balances if b["balance"] > 0)
total_liabilities = sum(abs(b["balance"]) for b in all_balances if b["balance"] < 0)
net_balance = total_receivables - total_liabilities
# Aggregate fiat balances from all users
total_fiat_balances = {}
for user_balance in all_balances:
for currency, amount in user_balance["fiat_balances"].items():
if currency not in total_fiat_balances:
total_fiat_balances[currency] = Decimal("0")
# Add all balances (positive and negative)
total_fiat_balances[currency] += amount
# Return net position
return UserBalance(
user_id=wallet.wallet.user,
balance=net_balance,
accounts=[],
fiat_balances=total_fiat_balances,
)
# For regular users, show their individual balance from Fava
balance_data = await fava.get_user_balance(wallet.wallet.user)
return UserBalance(
user_id=wallet.wallet.user,
balance=balance_data["balance"],
accounts=[], # Could populate from balance_data["accounts"] if needed
fiat_balances=balance_data["fiat_balances"],
)
@castle_api_router.get("/api/v1/balance/{user_id}")
async def api_get_user_balance(user_id: str) -> UserBalance:
"""Get a specific user's balance with the Castle (from Fava/Beancount)"""
from .fava_client import get_fava_client
fava = get_fava_client()
balance_data = await fava.get_user_balance(user_id)
return UserBalance(
user_id=user_id,
balance=balance_data["balance"],
accounts=[],
fiat_balances=balance_data["fiat_balances"],
)
@castle_api_router.get("/api/v1/balances/all")
async def api_get_all_balances(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list[dict]:
"""Get all user balances (admin/super user only) from Fava/Beancount"""
from .fava_client import get_fava_client
fava = get_fava_client()
balances = await fava.get_all_user_balances()
# Enrich with username information using helper function
result = []
for balance in balances:
username = await _get_username_from_user_id(balance["user_id"])
result.append({
"user_id": balance["user_id"],
"username": username,
"balance": balance["balance"],
"fiat_balances": balance["fiat_balances"],
"accounts": balance["accounts"],
})
return result
# ===== PAYMENT ENDPOINTS =====
@castle_api_router.post("/api/v1/generate-payment-invoice")
async def api_generate_payment_invoice(
data: GeneratePaymentInvoice,
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> dict:
"""
Generate an invoice on the Castle wallet for user to pay their balance.
User can then pay this invoice to settle their debt.
If user_id is provided (admin only), the invoice is generated for that specific user.
"""
from lnbits.core.crud.wallets import get_wallet
from lnbits.core.models import CreateInvoice
from lnbits.core.services import create_payment_request
from lnbits.settings import settings as lnbits_settings
# Determine which user this invoice is for
if data.user_id:
# Admin generating invoice for a specific user
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can generate invoices for other users",
)
target_user_id = data.user_id
else:
# User generating invoice for themselves
target_user_id = wallet.wallet.user
# Get castle wallet ID
castle_wallet_id = await check_castle_wallet_configured()
# Get user's balance from Fava to calculate fiat metadata
from .fava_client import get_fava_client
fava = get_fava_client()
balance_data = await fava.get_user_balance(target_user_id)
# Build UserBalance object for compatibility
user_balance = UserBalance(
user_id=target_user_id,
balance=balance_data["balance"],
accounts=[],
fiat_balances=balance_data["fiat_balances"]
)
# Calculate proportional fiat amount for this invoice
invoice_extra = {"tag": "castle", "user_id": target_user_id}
logger.info(f"User balance for invoice generation - sats: {user_balance.balance}, fiat_balances: {user_balance.fiat_balances}")
if user_balance.fiat_balances:
# Simple single-currency solution: use the first (and should be only) currency
currencies = list(user_balance.fiat_balances.keys())
if len(currencies) > 1:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"User has multiple currencies ({', '.join(currencies)}). Please settle to a single currency first.",
)
if len(currencies) == 1:
fiat_currency = currencies[0]
total_fiat_balance = user_balance.fiat_balances[fiat_currency]
total_sat_balance = abs(user_balance.balance) # Use absolute value
if total_sat_balance > 0:
# Calculate proportional fiat amount for this invoice
# fiat_amount = (invoice_amount / total_sats) * total_fiat
from decimal import Decimal
proportion = Decimal(data.amount) / Decimal(total_sat_balance)
invoice_fiat_amount = abs(total_fiat_balance) * proportion
# Calculate fiat rate (sats per fiat unit)
fiat_rate = float(data.amount) / float(invoice_fiat_amount) if invoice_fiat_amount > 0 else 0
btc_rate = float(invoice_fiat_amount) / float(data.amount) * 100_000_000 if data.amount > 0 else 0
invoice_extra.update({
"fiat_currency": fiat_currency,
"fiat_amount": str(invoice_fiat_amount.quantize(Decimal("0.001"))),
"fiat_rate": fiat_rate,
"btc_rate": btc_rate,
})
logger.info(f"Invoice extra metadata: {invoice_extra}")
# Create invoice on castle wallet
invoice_data = CreateInvoice(
out=False,
amount=data.amount,
memo=f"Payment from user {target_user_id[:8]} to Castle",
unit="sat",
extra=invoice_extra,
)
payment = await create_payment_request(castle_wallet_id, invoice_data)
# Get castle wallet to return its inkey for payment checking
castle_wallet = await get_wallet(castle_wallet_id)
if not castle_wallet:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Castle wallet not found"
)
return {
"payment_hash": payment.payment_hash,
"payment_request": payment.bolt11,
"amount": data.amount,
"memo": invoice_data.memo,
"check_wallet_key": castle_wallet.inkey, # Key to check payment status
}
@castle_api_router.post("/api/v1/record-payment")
async def api_record_payment(
data: RecordPayment,
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> dict:
"""
Record a lightning payment in accounting after invoice is paid.
This reduces what the user owes to the castle.
The user_id is extracted from the payment metadata (set during invoice generation).
"""
from lnbits.core.crud.payments import get_standalone_payment
# Get the payment details (incoming=True to get the invoice, not the payment)
payment = await get_standalone_payment(data.payment_hash, incoming=True)
if not payment:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Payment not found"
)
if payment.pending:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="Payment not yet settled"
)
# Get user_id from payment metadata (set during invoice generation)
target_user_id = None
if payment.extra and isinstance(payment.extra, dict):
target_user_id = payment.extra.get("user_id")
if not target_user_id:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Payment metadata missing user_id. Cannot determine which user to credit.",
)
# Check if payment already recorded in Fava (idempotency)
from .fava_client import get_fava_client
from .beancount_format import format_payment_entry
import httpx
fava = get_fava_client()
# Check if payment already recorded by fetching recent entries
# Note: We can't use BQL query with `links ~ 'pattern'` because links is a set type
# and BQL doesn't support regex matching on sets. Instead, fetch entries and filter in Python.
link_to_find = f"ln-{data.payment_hash[:16]}"
try:
async with httpx.AsyncClient(timeout=5.0) as client:
# Get recent entries from Fava's journal endpoint
response = await client.get(
f"{fava.base_url}/api/journal",
params={"time": ""} # Get all entries
)
if response.status_code == 200:
response_data = response.json()
entries = response_data.get('entries', [])
# Check if any entry has our payment link
for entry in entries:
entry_links = entry.get('links', [])
if link_to_find in entry_links:
# Payment already recorded, return existing entry
balance_data = await fava.get_user_balance(target_user_id)
return {
"journal_entry_id": f"fava-exists-{data.payment_hash[:16]}",
"new_balance": balance_data["balance"],
"message": "Payment already recorded",
}
except Exception as e:
logger.warning(f"Could not check Fava for duplicate payment: {e}")
# Continue anyway - Fava/Beancount will catch duplicate if it exists
# Convert amount from millisatoshis to satoshis
amount_sats = payment.amount // 1000
# Extract fiat metadata from invoice (if present)
fiat_currency = None
fiat_amount = None
if payment.extra and isinstance(payment.extra, dict):
logger.info(f"Payment.extra contents: {payment.extra}")
fiat_currency = payment.extra.get("fiat_currency")
fiat_amount_str = payment.extra.get("fiat_amount")
if fiat_amount_str:
from decimal import Decimal
fiat_amount = Decimal(str(fiat_amount_str))
logger.info(f"Extracted fiat metadata - currency: {fiat_currency}, amount: {fiat_amount}")
# Get user's receivable account (what user owes)
user_receivable = await get_or_create_user_account(
target_user_id, AccountType.ASSET, "Accounts Receivable"
)
# Get lightning account
lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning")
if not lightning_account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Lightning account not found"
)
# Format payment entry and submit to Fava
entry = format_payment_entry(
user_id=target_user_id,
payment_account=lightning_account.name,
payable_or_receivable_account=user_receivable.name,
amount_sats=amount_sats,
description=f"Lightning payment from user {target_user_id[:8]}",
entry_date=datetime.now().date(),
is_payable=False, # User paying castle (receivable settlement)
fiat_currency=fiat_currency,
fiat_amount=fiat_amount,
payment_hash=data.payment_hash,
reference=data.payment_hash
)
logger.info(f"Formatted payment entry: {entry}")
# Submit to Fava
result = await fava.add_entry(entry)
logger.info(f"Payment entry submitted to Fava: {result.get('data', 'Unknown')}")
# Get updated balance from Fava
balance_data = await fava.get_user_balance(target_user_id)
return {
"journal_entry_id": f"fava-{datetime.now().timestamp()}",
"new_balance": balance_data["balance"],
"message": "Payment recorded successfully",
}
@castle_api_router.post("/api/v1/pay-user")
async def api_pay_user(
user_id: str,
amount: int,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""
Record a payment from castle to user (reduces what castle owes user).
Admin only.
"""
# Get user's payable account (what castle owes)
user_payable = await get_or_create_user_account(
user_id, AccountType.LIABILITY, "Accounts Payable"
)
# Get lightning account
lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning")
if not lightning_account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Lightning account not found"
)
# Format payment entry and submit to Fava
# DR Liabilities:Payable (User), CR Assets:Bitcoin:Lightning
from .fava_client import get_fava_client
from .beancount_format import format_payment_entry
fava = get_fava_client()
entry = format_payment_entry(
user_id=user_id,
payment_account=lightning_account.name,
payable_or_receivable_account=user_payable.name,
amount_sats=amount,
description=f"Payment to user {user_id[:8]}",
entry_date=datetime.now().date(),
is_payable=True, # Castle paying user
reference=f"PAY-{user_id[:8]}"
)
# Submit to Fava
result = await fava.add_entry(entry)
logger.info(f"Payment submitted to Fava: {result.get('data', 'Unknown')}")
# Get updated balance from Fava
balance_data = await fava.get_user_balance(user_id)
return {
"journal_entry_id": f"fava-{datetime.now().timestamp()}",
"new_balance": balance_data["balance"],
"message": "Payment recorded successfully",
}
@castle_api_router.post("/api/v1/receivables/settle")
async def api_settle_receivable(
data: SettleReceivable,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""
Manually settle a receivable (record when user pays castle in person).
This endpoint is for non-lightning payments like:
- Cash payments
- Bank transfers
- Other manual settlements
Admin only.
"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can settle receivables",
)
# Validate payment method
valid_methods = ["cash", "bank_transfer", "check", "lightning", "btc_onchain", "other"]
if data.payment_method.lower() not in valid_methods:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Invalid payment method. Must be one of: {', '.join(valid_methods)}",
)
# Get user's receivable account (what user owes)
user_receivable = await get_or_create_user_account(
data.user_id, AccountType.ASSET, "Accounts Receivable"
)
# Get the appropriate asset account based on payment method
payment_account_map = {
"cash": "Assets:Cash",
"bank_transfer": "Assets:Bank",
"check": "Assets:Bank",
"lightning": "Assets:Bitcoin:Lightning",
"btc_onchain": "Assets:Bitcoin:OnChain",
"other": "Assets:Cash"
}
account_name = payment_account_map.get(data.payment_method.lower(), "Assets:Cash")
payment_account = await get_account_by_name(account_name)
# If account doesn't exist, try to find or create a generic one
if not payment_account:
# Try to find any asset account that's not receivable
all_accounts = await get_all_accounts()
for acc in all_accounts:
if acc.account_type == AccountType.ASSET and "receivable" not in acc.name.lower():
payment_account = acc
break
if not payment_account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Payment account '{account_name}' not found. Please create it first.",
)
# Format settlement entry and submit to Fava
# DR Cash/Bank (asset increased), CR Accounts Receivable (asset decreased)
# This records that user paid their debt
from .fava_client import get_fava_client
from .beancount_format import format_payment_entry, format_fiat_settlement_entry
from decimal import Decimal
fava = get_fava_client()
# Determine if this is a fiat or lightning payment
is_fiat_payment = data.currency and data.payment_method.lower() in [
"cash", "bank_transfer", "check", "other"
]
if is_fiat_payment:
# Fiat currency payment (cash, bank transfer, etc.)
# Record in fiat currency with sats as metadata
if not data.amount_sats:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="amount_sats is required when settling with fiat currency"
)
entry = format_fiat_settlement_entry(
user_id=data.user_id,
payment_account=payment_account.name,
payable_or_receivable_account=user_receivable.name,
fiat_amount=Decimal(str(data.amount)),
fiat_currency=data.currency.upper(),
amount_sats=data.amount_sats,
description=data.description,
entry_date=datetime.now().date(),
is_payable=False, # User paying castle (receivable settlement)
payment_method=data.payment_method,
reference=data.reference or f"MANUAL-{data.user_id[:8]}"
)
else:
# Lightning or BTC onchain payment
# Record in SATS with optional fiat metadata
amount_in_sats = data.amount_sats if data.amount_sats else int(data.amount)
fiat_currency = data.currency.upper() if data.currency else None
fiat_amount = Decimal(str(data.amount)) if data.currency else None
entry = format_payment_entry(
user_id=data.user_id,
payment_account=payment_account.name,
payable_or_receivable_account=user_receivable.name,
amount_sats=amount_in_sats,
description=data.description,
entry_date=datetime.now().date(),
is_payable=False, # User paying castle (receivable settlement)
fiat_currency=fiat_currency,
fiat_amount=fiat_amount,
payment_hash=data.payment_hash,
reference=data.reference or f"MANUAL-{data.user_id[:8]}"
)
# Add additional metadata to entry
if "meta" not in entry:
entry["meta"] = {}
entry["meta"]["payment-method"] = data.payment_method
entry["meta"]["settled-by"] = wallet.wallet.user
if data.txid:
entry["meta"]["txid"] = data.txid
# Submit to Fava
result = await fava.add_entry(entry)
logger.info(f"Receivable settlement submitted to Fava: {result.get('data', 'Unknown')}")
# Get updated balance from Fava
balance_data = await fava.get_user_balance(data.user_id)
return {
"journal_entry_id": f"fava-{datetime.now().timestamp()}",
"user_id": data.user_id,
"amount_settled": float(data.amount),
"currency": data.currency,
"payment_method": data.payment_method,
"new_balance": balance_data["balance"],
"message": f"Receivable settled successfully via {data.payment_method}",
}
@castle_api_router.post("/api/v1/payables/pay")
async def api_pay_user(
data: PayUser,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""
Pay a user (castle pays user for expense/liability).
This endpoint is for both lightning and manual payments:
- Lightning payments: already executed, just record the payment
- Cash/Bank/Check: record manual payment that was made
Admin only.
"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can pay users",
)
# Validate payment method
valid_methods = ["cash", "bank_transfer", "check", "lightning", "btc_onchain", "other"]
if data.payment_method.lower() not in valid_methods:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Invalid payment method. Must be one of: {', '.join(valid_methods)}",
)
# Get user's payable account (what castle owes)
user_payable = await get_or_create_user_account(
data.user_id, AccountType.LIABILITY, "Accounts Payable"
)
# Get the appropriate asset account based on payment method
payment_account_map = {
"cash": "Assets:Cash",
"bank_transfer": "Assets:Bank",
"check": "Assets:Bank",
"lightning": "Assets:Bitcoin:Lightning",
"btc_onchain": "Assets:Bitcoin:OnChain",
"other": "Assets:Cash"
}
account_name = payment_account_map.get(data.payment_method.lower(), "Assets:Cash")
payment_account = await get_account_by_name(account_name)
if not payment_account:
# Try to find any asset account that's not receivable
all_accounts = await get_all_accounts()
for acc in all_accounts:
if acc.account_type == AccountType.ASSET and "receivable" not in acc.name.lower():
payment_account = acc
break
if not payment_account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Payment account '{account_name}' not found. Please create it first.",
)
# Format payment entry and submit to Fava
# DR Accounts Payable (liability decreased), CR Cash/Lightning/Bank (asset decreased)
# This records that castle paid its debt
from .fava_client import get_fava_client
from .beancount_format import format_payment_entry
from decimal import Decimal
fava = get_fava_client()
# Determine amount and currency
if data.currency:
# Fiat currency payment (e.g., EUR, USD)
# Use the sats equivalent for the journal entry to match the payable
if not data.amount_sats:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="amount_sats is required when paying with fiat currency"
)
amount_in_sats = data.amount_sats
fiat_currency = data.currency.upper()
fiat_amount = data.amount
else:
# Satoshi payment
amount_in_sats = int(data.amount)
fiat_currency = None
fiat_amount = None
# Format payment entry
entry = format_payment_entry(
user_id=data.user_id,
payment_account=payment_account.name,
payable_or_receivable_account=user_payable.name,
amount_sats=amount_in_sats,
description=data.description or f"Payment to user via {data.payment_method}",
entry_date=datetime.now().date(),
is_payable=True, # Castle paying user (payable settlement)
fiat_currency=fiat_currency,
fiat_amount=fiat_amount,
payment_hash=data.payment_hash,
reference=data.reference or f"PAY-{data.user_id[:8]}"
)
# Add additional metadata to entry
if "meta" not in entry:
entry["meta"] = {}
entry["meta"]["payment-method"] = data.payment_method
entry["meta"]["paid-by"] = wallet.wallet.user
if data.txid:
entry["meta"]["txid"] = data.txid
# Submit to Fava
result = await fava.add_entry(entry)
logger.info(f"Payable payment submitted to Fava: {result.get('data', 'Unknown')}")
# Get updated balance from Fava
balance_data = await fava.get_user_balance(data.user_id)
return {
"journal_entry_id": f"fava-{datetime.now().timestamp()}",
"user_id": data.user_id,
"amount_paid": float(data.amount),
"currency": data.currency,
"payment_method": data.payment_method,
"new_balance": balance_data["balance"],
"message": f"User paid successfully via {data.payment_method}",
}
# ===== SETTINGS ENDPOINTS =====
@castle_api_router.get("/api/v1/settings")
async def api_get_settings(
user: User = Depends(check_user_exists),
) -> CastleSettings:
"""Get Castle settings"""
user_id = "admin"
settings = await get_settings(user_id)
# Return empty settings if not configured (so UI can show setup screen)
if not settings:
return CastleSettings()
return settings
@castle_api_router.put("/api/v1/settings")
async def api_update_settings(
data: CastleSettings,
user: User = Depends(check_super_user),
) -> CastleSettings:
"""Update Castle settings (super user only)"""
if not data.castle_wallet_id:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Castle wallet ID is required",
)
user_id = "admin"
return await update_settings(user_id, data)
# ===== USER WALLET ENDPOINTS =====
@castle_api_router.get("/api/v1/user-wallet/{user_id}")
async def api_get_user_wallet(
user_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""Get user's wallet settings (admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can access user wallet info",
)
user_wallet = await get_user_wallet(user_id)
if not user_wallet:
return {"user_id": user_id, "user_wallet_id": None}
# Get invoice key for the user's wallet (needed to generate invoices)
from lnbits.core.crud import get_wallet
wallet_obj = await get_wallet(user_wallet.user_wallet_id)
if not wallet_obj:
return {"user_id": user_id, "user_wallet_id": user_wallet.user_wallet_id}
return {
"user_id": user_id,
"user_wallet_id": user_wallet.user_wallet_id,
"user_wallet_id_invoice_key": wallet_obj.inkey,
}
@castle_api_router.get("/api/v1/users")
async def api_get_all_users(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list[dict]:
"""Get all users who have configured their wallet (admin only)"""
from lnbits.core.crud.users import get_user
user_settings = await get_all_user_wallet_settings()
users = []
for setting in user_settings:
# Get user details from core
user = await get_user(setting.id)
# Use username if available, otherwise truncate user_id
username = user.username if user and user.username else setting.id[:16] + "..."
users.append({
"user_id": setting.id,
"user_wallet_id": setting.user_wallet_id,
"username": username,
})
return users
@castle_api_router.get("/api/v1/admin/castle-users")
async def api_get_castle_users(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list[dict]:
"""
Get all users who have configured their wallet in Castle.
These are users who can interact with Castle (submit expenses, receive permissions, etc.).
Admin only.
"""
from lnbits.core.crud.users import get_user
# Get all users who have configured their wallet
user_settings = await get_all_user_wallet_settings()
users = []
for setting in user_settings:
# Get user details from core
user = await get_user(setting.id)
# Use username if available, otherwise use user_id
username = user.username if user and user.username else None
users.append({
"id": setting.id,
"user_id": setting.id, # Compatibility with existing code
"username": username,
"user_wallet_id": setting.user_wallet_id,
})
# Sort by username (None values last)
users.sort(key=lambda x: (x["username"] is None, x["username"] or "", x["user_id"]))
return users
@castle_api_router.get("/api/v1/user/wallet")
async def api_get_user_wallet(
user: User = Depends(check_user_exists),
) -> UserWalletSettings:
"""Get current user's wallet settings"""
from lnbits.settings import settings as lnbits_settings
# If user is super user, return the castle wallet
if user.id == lnbits_settings.super_user:
castle_settings = await get_settings("admin")
if castle_settings and castle_settings.castle_wallet_id:
return UserWalletSettings(user_wallet_id=castle_settings.castle_wallet_id)
return UserWalletSettings()
# For regular users, get their personal wallet
settings = await get_user_wallet(user.id)
# Return empty settings if not configured (so UI can show setup screen)
if not settings:
return UserWalletSettings()
return settings
@castle_api_router.put("/api/v1/user/wallet")
async def api_update_user_wallet(
data: UserWalletSettings,
user: User = Depends(check_user_exists),
) -> UserWalletSettings:
"""Update current user's wallet settings"""
from lnbits.settings import settings as lnbits_settings
# Super user cannot set their wallet separately - it's always the castle wallet
if user.id == lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Super user wallet is automatically set to the Castle wallet. Update Castle settings instead.",
)
if not data.user_wallet_id:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="User wallet ID is required",
)
return await update_user_wallet(user.id, data)
# ===== MANUAL PAYMENT REQUESTS =====
@castle_api_router.post("/api/v1/manual-payment-request")
async def api_create_manual_payment_request(
data: CreateManualPaymentRequest,
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> ManualPaymentRequest:
"""Create a manual payment request for the Castle to review"""
return await create_manual_payment_request(
wallet.wallet.user, data.amount, data.description
)
@castle_api_router.get("/api/v1/manual-payment-requests")
async def api_get_manual_payment_requests(
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> list[ManualPaymentRequest]:
"""Get manual payment requests for the current user"""
return await get_user_manual_payment_requests(wallet.wallet.user)
@castle_api_router.get("/api/v1/manual-payment-requests/all")
async def api_get_all_manual_payment_requests(
status: str = None,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list[ManualPaymentRequest]:
"""Get all manual payment requests (Castle admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can access this endpoint",
)
return await get_all_manual_payment_requests(status)
@castle_api_router.post("/api/v1/manual-payment-requests/{request_id}/approve")
async def api_approve_manual_payment_request(
request_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> ManualPaymentRequest:
"""Approve a manual payment request and create accounting entry (Castle admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can access this endpoint",
)
# Get the request
request = await get_manual_payment_request(request_id)
if not request:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail="Manual payment request not found",
)
if request.status != "pending":
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Request already {request.status}",
)
# Get castle wallet from settings
castle_wallet_id = await check_castle_wallet_configured()
# Get or create liability account for user (castle owes the user)
liability_account = await get_or_create_user_account(
request.user_id, AccountType.LIABILITY, "Accounts Payable"
)
# Get the Lightning asset account
lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning")
if not lightning_account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail="Lightning account not found",
)
# Format payment entry and submit to Fava
from .fava_client import get_fava_client
from .beancount_format import format_payment_entry
fava = get_fava_client()
entry = format_payment_entry(
user_id=request.user_id,
payment_account=lightning_account.name,
payable_or_receivable_account=liability_account.name,
amount_sats=request.amount,
description=f"Manual payment to user: {request.description}",
entry_date=datetime.now().date(),
is_payable=True, # Castle paying user
reference=f"MPR-{request.id}"
)
# Submit to Fava
result = await fava.add_entry(entry)
logger.info(f"Manual payment entry submitted to Fava: {result.get('data', 'Unknown')}")
# Approve the request with Fava entry reference
entry_id = f"fava-{datetime.now().timestamp()}"
return await approve_manual_payment_request(
request_id, wallet.wallet.user, entry_id
)
@castle_api_router.post("/api/v1/manual-payment-requests/{request_id}/reject")
async def api_reject_manual_payment_request(
request_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> ManualPaymentRequest:
"""Reject a manual payment request (Castle admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can access this endpoint",
)
# Get the request
request = await get_manual_payment_request(request_id)
if not request:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail="Manual payment request not found",
)
if request.status != "pending":
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Request already {request.status}",
)
return await reject_manual_payment_request(request_id, wallet.wallet.user)
# ===== EXPENSE APPROVAL ENDPOINTS =====
@castle_api_router.post("/api/v1/entries/{entry_id}/approve")
async def api_approve_expense_entry(
entry_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""
Approve a pending expense entry by changing flag from '!' to '*' (admin only).
This updates the transaction in the Beancount file via Fava API.
"""
from lnbits.settings import settings as lnbits_settings
from .fava_client import get_fava_client
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can approve expenses",
)
fava = get_fava_client()
# 1. Get all journal entries from Fava
all_entries = await fava.get_journal_entries()
# 2. Find the entry with matching castle ID in links
target_entry_hash = None
target_entry = None
for entry in all_entries:
# Only look at transactions with pending flag
if entry.get("t") == "Transaction" and entry.get("flag") == "!":
links = entry.get("links", [])
for link in links:
# Strip ^ prefix if present (Beancount link syntax)
link_clean = link.lstrip('^')
# Check if this entry has our castle ID
if link_clean == f"castle-{entry_id}" or link_clean.endswith(f"-{entry_id}"):
target_entry_hash = entry.get("entry_hash")
target_entry = entry
break
if target_entry_hash:
break
if not target_entry_hash:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Pending entry {entry_id} not found in Beancount ledger"
)
# 3. Get the entry context (source text + sha256sum)
context = await fava.get_entry_context(target_entry_hash)
source = context.get("slice", "")
sha256sum = context.get("sha256sum", "")
if not source:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="Could not retrieve entry source from Fava"
)
# 4. Change flag from ! to *
# Replace the first occurrence of the date + ! pattern
import re
date_str = target_entry.get("date", "")
old_pattern = f"{date_str} !"
new_pattern = f"{date_str} *"
if old_pattern not in source:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Could not find pending flag pattern '{old_pattern}' in entry source"
)
new_source = source.replace(old_pattern, new_pattern, 1)
# 5. Update the entry via Fava API
await fava.update_entry_source(target_entry_hash, new_source, sha256sum)
return {
"message": f"Entry {entry_id} approved successfully",
"entry_id": entry_id,
"entry_hash": target_entry_hash,
"date": date_str,
"description": target_entry.get("narration", "")
}
@castle_api_router.post("/api/v1/entries/{entry_id}/reject")
async def api_reject_expense_entry(
entry_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""
Reject a pending expense entry by marking it as voided (admin only).
Adds #voided tag for audit trail while keeping the '!' flag.
Voided transactions are excluded from balances but preserved in the ledger.
"""
from lnbits.settings import settings as lnbits_settings
from .fava_client import get_fava_client
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can reject expenses",
)
fava = get_fava_client()
# 1. Get all journal entries from Fava
all_entries = await fava.get_journal_entries()
# 2. Find the entry with matching castle ID in links
target_entry_hash = None
target_entry = None
for entry in all_entries:
# Only look at transactions with pending flag
if entry.get("t") == "Transaction" and entry.get("flag") == "!":
links = entry.get("links", [])
for link in links:
# Strip ^ prefix if present (Beancount link syntax)
link_clean = link.lstrip('^')
# Check if this entry has our castle ID
if link_clean == f"castle-{entry_id}" or link_clean.endswith(f"-{entry_id}"):
target_entry_hash = entry.get("entry_hash")
target_entry = entry
break
if target_entry_hash:
break
if not target_entry_hash:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Pending entry {entry_id} not found in Beancount ledger"
)
# 3. Get the entry context (source text + sha256sum)
context = await fava.get_entry_context(target_entry_hash)
source = context.get("slice", "")
sha256sum = context.get("sha256sum", "")
if not source:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="Could not retrieve entry source from Fava"
)
# 4. Add #voided tag (keep ! flag as per convention)
date_str = target_entry.get("date", "")
# Add #voided tag if not already present
if "#voided" not in source:
# Find the transaction line and add #voided to the tags
# Pattern: date ! "narration" #existing-tags
lines = source.split('\n')
for i, line in enumerate(lines):
if date_str in line and '"' in line and '!' in line:
# Add #voided tag to the transaction line
if '#' in line:
# Already has tags, append voided
lines[i] = line.rstrip() + ' #voided'
else:
# No tags yet, add after narration
lines[i] = line.rstrip() + ' #voided'
break
new_source = '\n'.join(lines)
else:
new_source = source
# 5. Update the entry via Fava API
await fava.update_entry_source(target_entry_hash, new_source, sha256sum)
return {
"message": f"Entry {entry_id} rejected (marked as voided)",
"entry_id": entry_id,
"entry_hash": target_entry_hash,
"date": date_str,
"description": target_entry.get("narration", "")
}
# ===== BALANCE ASSERTION ENDPOINTS =====
@castle_api_router.post("/api/v1/assertions")
async def api_create_balance_assertion(
data: CreateBalanceAssertion,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> BalanceAssertion:
"""
Create a balance assertion for reconciliation (admin only).
Uses hybrid approach:
1. Writes balance assertion to Beancount (via Fava) - source of truth
2. Stores metadata in Castle DB for UI convenience (created_by, notes, tolerance)
3. Lets Beancount validate the assertion automatically
The assertion will be checked immediately upon creation.
"""
from lnbits.settings import settings as lnbits_settings
from .fava_client import get_fava_client
from .beancount_format import format_balance
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can create balance assertions",
)
# Verify account exists
account = await get_account(data.account_id)
if not account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Account {data.account_id} not found",
)
assertion_date = data.date or datetime.now()
# HYBRID APPROACH: Write to Beancount first (source of truth)
balance_directive = format_balance(
date_val=assertion_date.date() if isinstance(assertion_date, datetime) else assertion_date,
account=account.name,
amount=data.expected_balance_sats,
currency="SATS"
)
# Submit to Fava/Beancount
try:
fava = get_fava_client()
result = await fava.add_entry(balance_directive)
logger.info(f"Balance assertion submitted to Fava: {result}")
except Exception as e:
logger.error(f"Failed to write balance assertion to Fava: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Failed to write balance assertion to Beancount: {str(e)}"
)
# Store metadata in Castle DB for UI convenience
assertion = await create_balance_assertion(data, wallet.wallet.user)
# Check it immediately (queries Fava for actual balance)
try:
assertion = await check_balance_assertion(assertion.id)
except ValueError as e:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=str(e),
)
# If assertion failed, return 409 Conflict with details
if assertion.status == AssertionStatus.FAILED:
raise HTTPException(
status_code=HTTPStatus.CONFLICT,
detail={
"message": "Balance assertion failed (validated by Beancount)",
"expected_sats": assertion.expected_balance_sats,
"actual_sats": assertion.checked_balance_sats,
"difference_sats": assertion.difference_sats,
"expected_fiat": float(assertion.expected_balance_fiat) if assertion.expected_balance_fiat else None,
"actual_fiat": float(assertion.checked_balance_fiat) if assertion.checked_balance_fiat else None,
"difference_fiat": float(assertion.difference_fiat) if assertion.difference_fiat else None,
"fiat_currency": assertion.fiat_currency,
},
)
return assertion
@castle_api_router.get("/api/v1/assertions")
async def api_get_balance_assertions(
account_id: str = None,
status: str = None,
limit: int = 100,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list[BalanceAssertion]:
"""Get balance assertions with optional filters (admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can view balance assertions",
)
# Parse status enum if provided
status_enum = None
if status:
try:
status_enum = AssertionStatus(status)
except ValueError:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Invalid status: {status}. Must be one of: pending, passed, failed",
)
return await get_balance_assertions(
account_id=account_id,
status=status_enum,
limit=limit,
)
@castle_api_router.get("/api/v1/assertions/{assertion_id}")
async def api_get_balance_assertion(
assertion_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> BalanceAssertion:
"""Get a specific balance assertion (admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can view balance assertions",
)
assertion = await get_balance_assertion(assertion_id)
if not assertion:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail="Balance assertion not found",
)
return assertion
@castle_api_router.post("/api/v1/assertions/{assertion_id}/check")
async def api_check_balance_assertion(
assertion_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> BalanceAssertion:
"""Re-check a balance assertion (admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can check balance assertions",
)
try:
assertion = await check_balance_assertion(assertion_id)
except ValueError as e:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=str(e),
)
return assertion
@castle_api_router.delete("/api/v1/assertions/{assertion_id}")
async def api_delete_balance_assertion(
assertion_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""Delete a balance assertion (admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can delete balance assertions",
)
# Verify it exists
assertion = await get_balance_assertion(assertion_id)
if not assertion:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail="Balance assertion not found",
)
await delete_balance_assertion(assertion_id)
return {"success": True, "message": "Balance assertion deleted"}
# ===== RECONCILIATION ENDPOINTS =====
@castle_api_router.get("/api/v1/reconciliation/summary")
async def api_get_reconciliation_summary(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""Get reconciliation summary (admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can access reconciliation",
)
# Get all assertions
all_assertions = await get_balance_assertions(limit=1000)
# Count by status
passed = len([a for a in all_assertions if a.status == AssertionStatus.PASSED])
failed = len([a for a in all_assertions if a.status == AssertionStatus.FAILED])
pending = len([a for a in all_assertions if a.status == AssertionStatus.PENDING])
# Get all journal entries from Fava
from .fava_client import get_fava_client
fava = get_fava_client()
all_entries = await fava.query_transactions(limit=1000, include_pending=True)
# Count entries by flag (Beancount only supports * and !)
cleared = len([e for e in all_entries if e.get("flag") == "*"])
pending_entries = len([e for e in all_entries if e.get("flag") == "!"])
# Count entries with special tags
voided = len([e for e in all_entries if "voided" in e.get("tags", [])])
flagged = len([e for e in all_entries if "review" in e.get("tags", []) or "flagged" in e.get("tags", [])])
# Get all accounts
accounts = await get_all_accounts()
return {
"assertions": {
"total": len(all_assertions),
"passed": passed,
"failed": failed,
"pending": pending,
},
"entries": {
"total": len(all_entries),
"cleared": cleared,
"pending": pending_entries,
"flagged": flagged,
"voided": voided,
},
"accounts": {
"total": len(accounts),
},
"last_checked": datetime.now().isoformat(),
}
@castle_api_router.post("/api/v1/reconciliation/check-all")
async def api_check_all_assertions(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""Re-check all balance assertions (admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can run reconciliation checks",
)
# Get all assertions
all_assertions = await get_balance_assertions(limit=1000)
results = {
"total": len(all_assertions),
"checked": 0,
"passed": 0,
"failed": 0,
"errors": 0,
}
for assertion in all_assertions:
try:
checked = await check_balance_assertion(assertion.id)
results["checked"] += 1
if checked.status == AssertionStatus.PASSED:
results["passed"] += 1
elif checked.status == AssertionStatus.FAILED:
results["failed"] += 1
except Exception as e:
results["errors"] += 1
return results
@castle_api_router.get("/api/v1/reconciliation/discrepancies")
async def api_get_discrepancies(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""Get all discrepancies (failed assertions, flagged entries) (admin only)"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can view discrepancies",
)
# Get failed assertions
failed_assertions = await get_balance_assertions(
status=AssertionStatus.FAILED,
limit=1000,
)
# Get flagged entries from Fava
from .fava_client import get_fava_client
fava = get_fava_client()
all_entries = await fava.query_transactions(limit=1000, include_pending=True)
flagged_entries = [e for e in all_entries if e.get("flag") == "#"]
pending_entries = [e for e in all_entries if e.get("flag") == "!"]
return {
"failed_assertions": failed_assertions,
"flagged_entries": flagged_entries,
"pending_entries": pending_entries,
"total_discrepancies": len(failed_assertions) + len(flagged_entries),
}
# ===== AUTOMATED TASKS ENDPOINTS =====
@castle_api_router.post("/api/v1/tasks/daily-reconciliation")
async def api_run_daily_reconciliation(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""
Manually trigger the daily reconciliation check (admin only).
This endpoint can also be called via cron job.
Returns a summary of the reconciliation check results.
"""
from lnbits.settings import settings as lnbits_settings
if wallet.wallet.user != lnbits_settings.super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Only super user can run daily reconciliation",
)
from .tasks import check_all_balance_assertions
try:
results = await check_all_balance_assertions()
return results
except Exception as e:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Error running daily reconciliation: {str(e)}",
)
# ===== USER EQUITY ELIGIBILITY ENDPOINTS =====
@castle_api_router.get("/api/v1/user/info")
async def api_get_user_info(
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> UserInfo:
"""Get current user's information including equity eligibility"""
from .crud import get_user_equity_status
from .models import UserInfo
equity_status = await get_user_equity_status(wallet.wallet.user)
return UserInfo(
user_id=wallet.wallet.user,
is_equity_eligible=equity_status.is_equity_eligible if equity_status else False,
equity_account_name=equity_status.equity_account_name if equity_status else None,
)
@castle_api_router.post("/api/v1/admin/equity-eligibility", status_code=HTTPStatus.CREATED)
async def api_grant_equity_eligibility(
data: CreateUserEquityStatus,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> UserEquityStatus:
"""Grant equity contribution eligibility to a user (admin only)"""
from .crud import create_or_update_user_equity_status
return await create_or_update_user_equity_status(data, wallet.wallet.user)
@castle_api_router.delete("/api/v1/admin/equity-eligibility/{user_id}")
async def api_revoke_equity_eligibility(
user_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> UserEquityStatus:
"""Revoke equity contribution eligibility from a user (admin only)"""
from .crud import revoke_user_equity_eligibility
result = await revoke_user_equity_eligibility(user_id)
if not result:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"User {user_id} not found in equity status records",
)
return result
@castle_api_router.get("/api/v1/admin/equity-eligibility")
async def api_list_equity_eligible_users(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list[UserEquityStatus]:
"""List all equity-eligible users (admin only)"""
from .crud import get_all_equity_eligible_users
return await get_all_equity_eligible_users()
# ===== ACCOUNT PERMISSION ADMIN ENDPOINTS =====
@castle_api_router.post("/api/v1/admin/permissions", status_code=HTTPStatus.CREATED)
async def api_grant_permission(
data: CreateAccountPermission,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> AccountPermission:
"""Grant account permission to a user (admin only)"""
# Validate that account exists
account = await get_account(data.account_id)
if not account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Account with ID '{data.account_id}' not found",
)
return await create_account_permission(data, wallet.wallet.user)
@castle_api_router.get("/api/v1/admin/permissions")
async def api_list_permissions(
user_id: str | None = None,
account_id: str | None = None,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list[AccountPermission]:
"""
List account permissions (admin only).
Can filter by user_id or account_id.
"""
if user_id:
return await get_user_permissions(user_id)
elif account_id:
return await get_account_permissions(account_id)
else:
# Get all permissions (get all users' permissions)
# This is a bit inefficient but works for admin overview
all_accounts = await get_all_accounts()
all_permissions = []
for account in all_accounts:
account_perms = await get_account_permissions(account.id)
all_permissions.extend(account_perms)
# Deduplicate by permission ID
seen_ids = set()
unique_permissions = []
for perm in all_permissions:
if perm.id not in seen_ids:
seen_ids.add(perm.id)
unique_permissions.append(perm)
return unique_permissions
@castle_api_router.delete("/api/v1/admin/permissions/{permission_id}")
async def api_revoke_permission(
permission_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""Revoke (delete) an account permission (admin only)"""
# Verify permission exists
permission = await get_account_permission(permission_id)
if not permission:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Permission with ID '{permission_id}' not found",
)
await delete_account_permission(permission_id)
return {
"success": True,
"message": f"Permission {permission_id} revoked successfully",
}
@castle_api_router.post("/api/v1/admin/permissions/bulk", status_code=HTTPStatus.CREATED)
async def api_bulk_grant_permissions(
permissions: list[CreateAccountPermission],
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list[AccountPermission]:
"""Grant multiple account permissions at once (admin only)"""
created_permissions = []
for perm_data in permissions:
# Validate that account exists
account = await get_account(perm_data.account_id)
if not account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Account with ID '{perm_data.account_id}' not found",
)
perm = await create_account_permission(perm_data, wallet.wallet.user)
created_permissions.append(perm)
return created_permissions
@castle_api_router.post("/api/v1/admin/permissions/bulk-grant", status_code=HTTPStatus.CREATED)
async def api_bulk_grant_permission_to_users(
data: "BulkGrantPermission",
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> "BulkGrantResult":
"""
Grant the same permission to multiple users at once (admin only).
This is a convenience endpoint that grants the same account permission
to multiple users in one operation. Useful for onboarding teams or
granting access to a shared expense account.
Returns detailed results including successes and failures.
"""
from .models import BulkGrantResult
granted = []
failed = []
# Validate account exists and is active
account = await get_account(data.account_id)
if not account:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Account with ID '{data.account_id}' not found",
)
# Grant permission to each user
for user_id in data.user_ids:
try:
perm_data = CreateAccountPermission(
user_id=user_id,
account_id=data.account_id,
permission_type=data.permission_type,
expires_at=data.expires_at,
notes=data.notes,
)
perm = await create_account_permission(perm_data, wallet.wallet.user)
granted.append(perm)
except Exception as e:
failed.append({
"user_id": user_id,
"error": str(e),
})
return BulkGrantResult(
granted=granted,
failed=failed,
total=len(data.user_ids),
success_count=len(granted),
failure_count=len(failed),
)
# ===== USER PERMISSION ENDPOINTS =====
@castle_api_router.get("/api/v1/users/me/permissions")
async def api_get_user_permissions(
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> list[AccountPermission]:
"""Get current user's account permissions"""
return await get_user_permissions(wallet.wallet.user)
# ===== ACCOUNT HIERARCHY ENDPOINT =====
@castle_api_router.get("/api/v1/accounts/hierarchy")
async def api_get_account_hierarchy(
root_account: str | None = None,
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> list[AccountWithPermissions]:
"""
Get hierarchical account structure with user permissions.
Optionally filter by root account (e.g., "Expenses" to get all expense sub-accounts).
"""
all_accounts = await get_all_accounts()
user_id = wallet.wallet.user
user_permissions = await get_user_permissions(user_id)
# Filter by root account if specified
if root_account:
all_accounts = [
acc for acc in all_accounts
if acc.name == root_account or acc.name.startswith(root_account + ":")
]
# Build hierarchy with permission metadata
accounts_with_hierarchy = []
for account in all_accounts:
# Check if user has direct permission on this account
account_perms = [
perm for perm in user_permissions if perm.account_id == account.id
]
# Check if user has inherited permission from parent account
inherited_perms = await get_user_permissions_with_inheritance(
user_id, account.name, PermissionType.READ
)
# Parse hierarchical account name to get parent and level
parts = account.name.split(":")
level = len(parts) - 1
parent_account = ":".join(parts[:-1]) if level > 0 else None
# Determine inherited_from (which parent account gave access)
inherited_from = None
if inherited_perms and not account_perms:
# Permission is inherited, use the parent account name
_, parent_name = inherited_perms[0]
inherited_from = parent_name
# Collect permission types for this account
permission_types = [perm.permission_type for perm in account_perms]
# Check if account has children
has_children = any(
a.name.startswith(account.name + ":") for a in all_accounts
)
accounts_with_hierarchy.append(
AccountWithPermissions(
id=account.id,
name=account.name,
account_type=account.account_type,
description=account.description,
user_id=account.user_id,
created_at=account.created_at,
user_permissions=permission_types if permission_types else None,
inherited_from=inherited_from,
parent_account=parent_account,
level=level,
has_children=has_children,
)
)
# Sort by hierarchical name for natural ordering
accounts_with_hierarchy.sort(key=lambda a: a.name)
return accounts_with_hierarchy
# ===== ACCOUNT SYNC ENDPOINTS =====
@castle_api_router.post("/api/v1/admin/accounts/sync")
async def api_sync_all_accounts(
force_full_sync: bool = False,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""
Sync all accounts from Beancount to Castle DB (admin only).
This ensures Castle DB has metadata entries for all accounts that exist
in Beancount, enabling permissions and user associations to work properly.
Args:
force_full_sync: If True, re-check all accounts. If False, only add new ones.
Returns:
Sync statistics: {total_beancount_accounts, accounts_added, accounts_skipped, errors}
"""
from .account_sync import sync_accounts_from_beancount
logger.info(f"Admin {wallet.wallet.user[:8]} triggered account sync (force={force_full_sync})")
try:
stats = await sync_accounts_from_beancount(force_full_sync=force_full_sync)
logger.info(f"Account sync complete: {stats['accounts_added']} added, {stats['accounts_skipped']} skipped")
return stats
except Exception as e:
logger.error(f"Account sync failed: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Account sync failed: {str(e)}"
)
@castle_api_router.post("/api/v1/admin/accounts/sync/{account_name:path}")
async def api_sync_single_account(
account_name: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""
Sync a single account from Beancount to Castle DB (admin only).
Useful for ensuring a specific account exists in Castle DB before
granting permissions on it.
Args:
account_name: Hierarchical account name (e.g., "Expenses:Food:Groceries")
Returns:
{success: bool, account_name: str, message: str}
"""
from .account_sync import sync_single_account_from_beancount
logger.info(f"Admin {wallet.wallet.user[:8]} triggered sync for account: {account_name}")
try:
created = await sync_single_account_from_beancount(account_name)
if created:
return {
"success": True,
"account_name": account_name,
"message": f"Account '{account_name}' synced successfully"
}
else:
return {
"success": False,
"account_name": account_name,
"message": f"Account '{account_name}' already exists or not found in Beancount"
}
except Exception as e:
logger.error(f"Single account sync failed for {account_name}: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Account sync failed: {str(e)}"
)
# ===== RBAC (ROLE-BASED ACCESS CONTROL) ENDPOINTS =====
@castle_api_router.get("/api/v1/admin/roles")
async def api_get_all_roles(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list:
"""Get all roles (admin only)"""
from . import crud
roles = await crud.get_all_roles()
# Enrich each role with user count and permission count
enriched_roles = []
for role in roles:
user_count = await crud.get_user_count_for_role(role.id)
permissions = await crud.get_role_permissions(role.id)
enriched_roles.append({
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
"created_by": role.created_by,
"created_at": role.created_at.isoformat(),
"user_count": user_count,
"permission_count": len(permissions),
})
return enriched_roles
@castle_api_router.post("/api/v1/admin/roles", status_code=HTTPStatus.CREATED)
async def api_create_role(
data: CreateRole,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Create a new role (admin only)"""
from . import crud
try:
role = await crud.create_role(data, created_by=wallet.wallet.user)
return {
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
"created_by": role.created_by,
"created_at": role.created_at.isoformat(),
}
except Exception as e:
logger.error(f"Failed to create role: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Failed to create role: {str(e)}"
)
@castle_api_router.get("/api/v1/admin/roles/{role_id}")
async def api_get_role(
role_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Get a specific role with its permissions and users (admin only)"""
from . import crud
role = await crud.get_role(role_id)
if not role:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Role {role_id} not found"
)
permissions = await crud.get_role_permissions(role.id)
user_roles = await crud.get_role_users(role.id)
return {
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
"created_by": role.created_by,
"created_at": role.created_at.isoformat(),
"permissions": [
{
"id": p.id,
"account_id": p.account_id,
"permission_type": p.permission_type.value,
"notes": p.notes,
"created_at": p.created_at.isoformat(),
}
for p in permissions
],
"users": [
{
"id": ur.id,
"user_id": ur.user_id,
"granted_by": ur.granted_by,
"granted_at": ur.granted_at.isoformat(),
"expires_at": ur.expires_at.isoformat() if ur.expires_at else None,
"notes": ur.notes,
}
for ur in user_roles
],
}
@castle_api_router.put("/api/v1/admin/roles/{role_id}")
async def api_update_role(
role_id: str,
data: UpdateRole,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Update a role (admin only)"""
from . import crud
role = await crud.update_role(role_id, data)
if not role:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Role {role_id} not found"
)
return {
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
"created_by": role.created_by,
"created_at": role.created_at.isoformat(),
}
@castle_api_router.delete("/api/v1/admin/roles/{role_id}")
async def api_delete_role(
role_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Delete a role (admin only) - cascades to role_permissions and user_roles"""
from . import crud
role = await crud.get_role(role_id)
if not role:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Role {role_id} not found"
)
await crud.delete_role(role_id)
return {"success": True, "message": f"Role '{role.name}' deleted successfully"}
# ===== ROLE PERMISSION ENDPOINTS =====
@castle_api_router.post("/api/v1/admin/roles/{role_id}/permissions", status_code=HTTPStatus.CREATED)
async def api_add_role_permission(
role_id: str,
data: CreateRolePermission,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Add a permission to a role (admin only)"""
from . import crud
# Verify role exists
role = await crud.get_role(role_id)
if not role:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Role {role_id} not found"
)
# Ensure data has correct role_id
data.role_id = role_id
try:
permission = await crud.create_role_permission(data)
return {
"id": permission.id,
"role_id": permission.role_id,
"account_id": permission.account_id,
"permission_type": permission.permission_type.value,
"notes": permission.notes,
"created_at": permission.created_at.isoformat(),
}
except Exception as e:
logger.error(f"Failed to add role permission: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Failed to add permission: {str(e)}"
)
@castle_api_router.delete("/api/v1/admin/roles/{role_id}/permissions/{permission_id}")
async def api_delete_role_permission(
role_id: str,
permission_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Remove a permission from a role (admin only)"""
from . import crud
await crud.delete_role_permission(permission_id)
return {"success": True, "message": "Permission removed from role"}
# ===== USER ROLE ASSIGNMENT ENDPOINTS =====
@castle_api_router.post("/api/v1/admin/user-roles", status_code=HTTPStatus.CREATED)
async def api_assign_user_role(
data: AssignUserRole,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Assign a user to a role (admin only)"""
from . import crud
# Verify role exists
role = await crud.get_role(data.role_id)
if not role:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Role {data.role_id} not found"
)
try:
user_role = await crud.assign_user_role(data, granted_by=wallet.wallet.user)
return {
"id": user_role.id,
"user_id": user_role.user_id,
"role_id": user_role.role_id,
"granted_by": user_role.granted_by,
"granted_at": user_role.granted_at.isoformat(),
"expires_at": user_role.expires_at.isoformat() if user_role.expires_at else None,
"notes": user_role.notes,
}
except Exception as e:
logger.error(f"Failed to assign user role: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Failed to assign role: {str(e)}"
)
@castle_api_router.get("/api/v1/admin/user-roles/{user_id}")
async def api_get_user_roles(
user_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Get all roles assigned to a user (admin only)"""
from . import crud
user_roles = await crud.get_user_roles(user_id)
# Enrich with role details
enriched = []
for ur in user_roles:
role = await crud.get_role(ur.role_id)
if role:
enriched.append({
"user_role_id": ur.id,
"user_id": ur.user_id,
"role": {
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
},
"granted_by": ur.granted_by,
"granted_at": ur.granted_at.isoformat(),
"expires_at": ur.expires_at.isoformat() if ur.expires_at else None,
"notes": ur.notes,
})
return enriched
@castle_api_router.delete("/api/v1/admin/user-roles/{user_role_id}")
async def api_revoke_user_role(
user_role_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Revoke a user's role assignment (admin only)"""
from . import crud
await crud.revoke_user_role(user_role_id)
return {"success": True, "message": "Role assignment revoked"}
@castle_api_router.get("/api/v1/admin/users/roles")
async def api_get_all_user_roles(
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Get all user role assignments (admin only)"""
from . import crud
user_roles = await crud.get_all_user_roles()
return [
{
"id": ur.id,
"user_id": ur.user_id,
"role_id": ur.role_id,
"granted_by": ur.granted_by,
"granted_at": ur.granted_at.isoformat(),
"expires_at": ur.expires_at.isoformat() if ur.expires_at else None,
"notes": ur.notes,
}
for ur in user_roles
]
@castle_api_router.get("/api/v1/users/me/roles")
async def api_get_my_roles(
wallet: WalletTypeInfo = Depends(require_invoice_key),
):
"""Get current user's roles and effective permissions"""
from . import crud
user_id = wallet.wallet.user
# Get user's roles
user_roles = await crud.get_user_roles(user_id)
# Get permissions from roles
role_permissions_list = await crud.get_user_permissions_from_roles(user_id)
# Get direct permissions
direct_permissions = await crud.get_user_permissions(user_id)
# Build response
roles_data = []
for ur in user_roles:
role = await crud.get_role(ur.role_id)
if role:
permissions = await crud.get_role_permissions(role.id)
roles_data.append({
"role": {
"id": role.id,
"name": role.name,
"description": role.description,
},
"permissions": [
{
"account_id": p.account_id,
"permission_type": p.permission_type.value,
}
for p in permissions
],
"granted_at": ur.granted_at.isoformat(),
"expires_at": ur.expires_at.isoformat() if ur.expires_at else None,
})
return {
"roles": roles_data,
"direct_permissions": [
{
"id": p.id,
"account_id": p.account_id,
"permission_type": p.permission_type.value,
"granted_at": p.granted_at.isoformat(),
"expires_at": p.expires_at.isoformat() if p.expires_at else None,
"notes": p.notes,
}
for p in direct_permissions
],
}