""" Fava API client for Castle. This module provides an async HTTP client for interacting with Fava's JSON API. All accounting logic is delegated to Fava/Beancount. Fava provides a REST API for: - Adding transactions (PUT /api/add_entries) - Querying balances (GET /api/query) - Balance sheets (GET /api/balance_sheet) - Account reports (GET /api/account_report) See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py """ import httpx from typing import Any, Dict, List, Optional from decimal import Decimal from datetime import date, datetime import logging logger = logging.getLogger(__name__) class FavaClient: """ Async client for Fava REST API. Fava runs as a separate web service and provides a JSON API for adding entries and querying ledger data. All accounting calculations are performed by Beancount via Fava. """ def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0): """ Initialize Fava client. Args: fava_url: Base URL of Fava server (e.g., http://localhost:3333) ledger_slug: URL-safe ledger identifier (e.g., castle-accounting) timeout: Request timeout in seconds """ self.fava_url = fava_url.rstrip('/') self.ledger_slug = ledger_slug self.base_url = f"{self.fava_url}/{self.ledger_slug}/api" self.timeout = timeout async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]: """ Submit a new journal entry to Fava. Args: entry: Beancount entry dict (format per Fava API spec) Must include: - t: "Transaction" (required by Fava) - date: "YYYY-MM-DD" - flag: "*" (cleared) or "!" (pending) - narration: str - postings: list of posting dicts - payee: str (empty string, not None) - tags: list of str - links: list of str - meta: dict Returns: Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."}) Raises: httpx.HTTPStatusError: If Fava returns an error httpx.RequestError: If connection fails Example: entry = { "t": "Transaction", "date": "2025-01-15", "flag": "*", "payee": "Store", "narration": "Purchase", "postings": [ {"account": "Expenses:Food", "amount": "50.00 EUR"}, {"account": "Assets:Cash", "amount": "-50.00 EUR"} ], "tags": [], "links": [], "meta": {"user_id": "abc123"} } result = await fava_client.add_entry(entry) """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.put( f"{self.base_url}/add_entries", json={"entries": [entry]}, headers={"Content-Type": "application/json"} ) response.raise_for_status() result = response.json() logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}") return result except httpx.HTTPStatusError as e: logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def get_account_balance(self, account_name: str) -> Dict[str, Any]: """ Get balance for a specific account. Args: account_name: Full account name (e.g., "Assets:Receivable:User-abc123") Returns: Dict with: - sats: int (balance in satoshis) - positions: dict (currency → amount with cost basis) Example: balance = await fava_client.get_account_balance("Assets:Receivable:User-abc") # Returns: { # "sats": 200000, # "positions": {"SATS": {"{100.00 EUR}": 200000}} # } """ query = f"SELECT sum(position) WHERE account = '{account_name}'" try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/query", params={"query_string": query} ) response.raise_for_status() data = response.json() if not data['data']['rows']: return {"sats": 0, "positions": {}} # Fava returns: [[account, {"SATS": {cost: amount}}]] positions = data['data']['rows'][0][1] if data['data']['rows'] else {} # Sum up all SATS positions total_sats = 0 if isinstance(positions, dict) and "SATS" in positions: sats_positions = positions["SATS"] if isinstance(sats_positions, dict): # Sum all amounts (with different cost bases) total_sats = sum(int(amount) for amount in sats_positions.values()) elif isinstance(sats_positions, (int, float)): # Simple number (no cost basis) total_sats = int(sats_positions) return { "sats": total_sats, "positions": positions } except httpx.HTTPStatusError as e: logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def get_user_balance(self, user_id: str) -> Dict[str, Any]: """ Get user's total balance (what castle owes user). Aggregates: - Liabilities:Payable:User-{user_id} (positive = castle owes) - Assets:Receivable:User-{user_id} (positive = user owes, so negate) Args: user_id: User ID Returns: { "balance": int (sats, positive = castle owes user), "fiat_balances": {"EUR": Decimal("100.50")}, "accounts": [list of account dicts with balances] } """ # Query for all accounts matching user query = f""" SELECT account, sum(position) WHERE account ~ 'User-{user_id[:8]}' GROUP BY account """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/query", params={"query_string": query} ) response.raise_for_status() data = response.json() # Calculate user balance total_sats = 0 fiat_balances = {} accounts = [] for row in data['data']['rows']: account_name = row[0] positions = row[1] # {"SATS": {cost: amount, ...}} account_balance = {"account": account_name, "sats": 0, "positions": positions} # Process SATS positions if isinstance(positions, dict) and "SATS" in positions: sats_positions = positions["SATS"] if isinstance(sats_positions, dict): # Positions with cost basis: {"100.00 EUR": 200000, ...} for cost_str, amount in sats_positions.items(): amount_int = int(amount) # Apply sign based on account type if "Payable" in account_name: # Liability: positive = castle owes user total_sats += amount_int account_balance["sats"] += amount_int elif "Receivable" in account_name: # Asset: positive = user owes castle (subtract from user balance) total_sats -= amount_int account_balance["sats"] -= amount_int # Extract fiat amount from cost basis # Format: "100.00 EUR" or "{100.00 EUR}" if cost_str and cost_str != "SATS": cost_clean = cost_str.strip('{}') parts = cost_clean.split() if len(parts) == 2: try: fiat_amount = Decimal(parts[0]) fiat_currency = parts[1] if fiat_currency not in fiat_balances: fiat_balances[fiat_currency] = Decimal(0) # Apply same sign logic if "Payable" in account_name: fiat_balances[fiat_currency] += fiat_amount elif "Receivable" in account_name: fiat_balances[fiat_currency] -= fiat_amount except (ValueError, IndexError): logger.warning(f"Could not parse cost basis: {cost_str}") elif isinstance(sats_positions, (int, float)): # Simple number (no cost basis) amount_int = int(sats_positions) if "Payable" in account_name: total_sats += amount_int account_balance["sats"] += amount_int elif "Receivable" in account_name: total_sats -= amount_int account_balance["sats"] -= amount_int accounts.append(account_balance) return { "balance": total_sats, "fiat_balances": fiat_balances, "accounts": accounts } except httpx.HTTPStatusError as e: logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def get_all_user_balances(self) -> List[Dict[str, Any]]: """ Get balances for all users (admin view). Returns: [ { "user_id": "abc123", "balance": 100000, "fiat_balances": {"EUR": Decimal("100.50")}, "accounts": [...] }, ... ] """ query = """ SELECT account, sum(position) WHERE account ~ 'Payable:User-|Receivable:User-' GROUP BY account """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/query", params={"query_string": query} ) response.raise_for_status() data = response.json() # Group by user_id user_data = {} for row in data['data']['rows']: account_name = row[0] positions = row[1] # Extract user_id from account name # e.g., "Liabilities:Payable:User-abc123" → "abc123..." if ":User-" in account_name: user_id = account_name.split(":User-")[1] else: continue if user_id not in user_data: user_data[user_id] = { "user_id": user_id, "balance": 0, "fiat_balances": {}, "accounts": [] } account_info = {"account": account_name, "sats": 0, "positions": positions} # Process positions if isinstance(positions, dict) and "SATS" in positions: sats_positions = positions["SATS"] if isinstance(sats_positions, dict): for cost_str, amount in sats_positions.items(): amount_int = int(amount) if "Payable" in account_name: user_data[user_id]["balance"] += amount_int account_info["sats"] += amount_int elif "Receivable" in account_name: user_data[user_id]["balance"] -= amount_int account_info["sats"] -= amount_int # Extract fiat if cost_str and cost_str != "SATS": cost_clean = cost_str.strip('{}') parts = cost_clean.split() if len(parts) == 2: try: fiat_amount = Decimal(parts[0]) fiat_currency = parts[1] if fiat_currency not in user_data[user_id]["fiat_balances"]: user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0) if "Payable" in account_name: user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount elif "Receivable" in account_name: user_data[user_id]["fiat_balances"][fiat_currency] -= fiat_amount except (ValueError, IndexError): pass elif isinstance(sats_positions, (int, float)): amount_int = int(sats_positions) if "Payable" in account_name: user_data[user_id]["balance"] += amount_int account_info["sats"] += amount_int elif "Receivable" in account_name: user_data[user_id]["balance"] -= amount_int account_info["sats"] -= amount_int user_data[user_id]["accounts"].append(account_info) return list(user_data.values()) except httpx.HTTPStatusError as e: logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def check_fava_health(self) -> bool: """ Check if Fava is running and accessible. Returns: True if Fava responds, False otherwise """ try: async with httpx.AsyncClient(timeout=2.0) as client: response = await client.get( f"{self.base_url}/changed" ) return response.status_code == 200 except Exception as e: logger.warning(f"Fava health check failed: {e}") return False # Singleton instance (configured from settings) _fava_client: Optional[FavaClient] = None def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0): """ Initialize the global Fava client. Args: fava_url: Base URL of Fava server ledger_slug: Ledger identifier timeout: Request timeout in seconds """ global _fava_client _fava_client = FavaClient(fava_url, ledger_slug, timeout) logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}") def get_fava_client() -> FavaClient: """ Get the configured Fava client. Returns: FavaClient instance Raises: RuntimeError: If client not initialized """ if _fava_client is None: raise RuntimeError( "Fava client not initialized. Call init_fava_client() first. " "Castle requires Fava for all accounting operations." ) return _fava_client