diff --git a/fava_client.py b/fava_client.py new file mode 100644 index 0000000..ed4cf4d --- /dev/null +++ b/fava_client.py @@ -0,0 +1,438 @@ +""" +Fava API client for Castle. + +This module provides an async HTTP client for interacting with Fava's JSON API. +All accounting logic is delegated to Fava/Beancount. + +Fava provides a REST API for: +- Adding transactions (PUT /api/add_entries) +- Querying balances (GET /api/query) +- Balance sheets (GET /api/balance_sheet) +- Account reports (GET /api/account_report) + +See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py +""" + +import httpx +from typing import Any, Dict, List, Optional +from decimal import Decimal +from datetime import date, datetime +import logging + +logger = logging.getLogger(__name__) + + +class FavaClient: + """ + Async client for Fava REST API. + + Fava runs as a separate web service and provides a JSON API + for adding entries and querying ledger data. + + All accounting calculations are performed by Beancount via Fava. + """ + + def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0): + """ + Initialize Fava client. + + Args: + fava_url: Base URL of Fava server (e.g., http://localhost:3333) + ledger_slug: URL-safe ledger identifier (e.g., castle-accounting) + timeout: Request timeout in seconds + """ + self.fava_url = fava_url.rstrip('/') + self.ledger_slug = ledger_slug + self.base_url = f"{self.fava_url}/{self.ledger_slug}/api" + self.timeout = timeout + + async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]: + """ + Submit a new journal entry to Fava. + + Args: + entry: Beancount entry dict (format per Fava API spec) + Must include: + - t: "Transaction" (required by Fava) + - date: "YYYY-MM-DD" + - flag: "*" (cleared) or "!" (pending) + - narration: str + - postings: list of posting dicts + - payee: str (empty string, not None) + - tags: list of str + - links: list of str + - meta: dict + + Returns: + Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."}) + + Raises: + httpx.HTTPStatusError: If Fava returns an error + httpx.RequestError: If connection fails + + Example: + entry = { + "t": "Transaction", + "date": "2025-01-15", + "flag": "*", + "payee": "Store", + "narration": "Purchase", + "postings": [ + {"account": "Expenses:Food", "amount": "50.00 EUR"}, + {"account": "Assets:Cash", "amount": "-50.00 EUR"} + ], + "tags": [], + "links": [], + "meta": {"user_id": "abc123"} + } + result = await fava_client.add_entry(entry) + """ + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.put( + f"{self.base_url}/add_entries", + json={"entries": [entry]}, + headers={"Content-Type": "application/json"} + ) + response.raise_for_status() + result = response.json() + + logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}") + return result + + except httpx.HTTPStatusError as e: + logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}") + raise + except httpx.RequestError as e: + logger.error(f"Fava connection error: {e}") + raise + + async def get_account_balance(self, account_name: str) -> Dict[str, Any]: + """ + Get balance for a specific account. + + Args: + account_name: Full account name (e.g., "Assets:Receivable:User-abc123") + + Returns: + Dict with: + - sats: int (balance in satoshis) + - positions: dict (currency → amount with cost basis) + + Example: + balance = await fava_client.get_account_balance("Assets:Receivable:User-abc") + # Returns: { + # "sats": 200000, + # "positions": {"SATS": {"{100.00 EUR}": 200000}} + # } + """ + query = f"SELECT sum(position) WHERE account = '{account_name}'" + + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + f"{self.base_url}/query", + params={"query_string": query} + ) + response.raise_for_status() + data = response.json() + + if not data['data']['rows']: + return {"sats": 0, "positions": {}} + + # Fava returns: [[account, {"SATS": {cost: amount}}]] + positions = data['data']['rows'][0][1] if data['data']['rows'] else {} + + # Sum up all SATS positions + total_sats = 0 + if isinstance(positions, dict) and "SATS" in positions: + sats_positions = positions["SATS"] + if isinstance(sats_positions, dict): + # Sum all amounts (with different cost bases) + total_sats = sum(int(amount) for amount in sats_positions.values()) + elif isinstance(sats_positions, (int, float)): + # Simple number (no cost basis) + total_sats = int(sats_positions) + + return { + "sats": total_sats, + "positions": positions + } + + except httpx.HTTPStatusError as e: + logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}") + raise + except httpx.RequestError as e: + logger.error(f"Fava connection error: {e}") + raise + + async def get_user_balance(self, user_id: str) -> Dict[str, Any]: + """ + Get user's total balance (what castle owes user). + + Aggregates: + - Liabilities:Payable:User-{user_id} (positive = castle owes) + - Assets:Receivable:User-{user_id} (positive = user owes, so negate) + + Args: + user_id: User ID + + Returns: + { + "balance": int (sats, positive = castle owes user), + "fiat_balances": {"EUR": Decimal("100.50")}, + "accounts": [list of account dicts with balances] + } + """ + # Query for all accounts matching user + query = f""" + SELECT account, sum(position) + WHERE account ~ 'User-{user_id[:8]}' + GROUP BY account + """ + + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + f"{self.base_url}/query", + params={"query_string": query} + ) + response.raise_for_status() + data = response.json() + + # Calculate user balance + total_sats = 0 + fiat_balances = {} + accounts = [] + + for row in data['data']['rows']: + account_name = row[0] + positions = row[1] # {"SATS": {cost: amount, ...}} + + account_balance = {"account": account_name, "sats": 0, "positions": positions} + + # Process SATS positions + if isinstance(positions, dict) and "SATS" in positions: + sats_positions = positions["SATS"] + + if isinstance(sats_positions, dict): + # Positions with cost basis: {"100.00 EUR": 200000, ...} + for cost_str, amount in sats_positions.items(): + amount_int = int(amount) + + # Apply sign based on account type + if "Payable" in account_name: + # Liability: positive = castle owes user + total_sats += amount_int + account_balance["sats"] += amount_int + elif "Receivable" in account_name: + # Asset: positive = user owes castle (subtract from user balance) + total_sats -= amount_int + account_balance["sats"] -= amount_int + + # Extract fiat amount from cost basis + # Format: "100.00 EUR" or "{100.00 EUR}" + if cost_str and cost_str != "SATS": + cost_clean = cost_str.strip('{}') + parts = cost_clean.split() + if len(parts) == 2: + try: + fiat_amount = Decimal(parts[0]) + fiat_currency = parts[1] + + if fiat_currency not in fiat_balances: + fiat_balances[fiat_currency] = Decimal(0) + + # Apply same sign logic + if "Payable" in account_name: + fiat_balances[fiat_currency] += fiat_amount + elif "Receivable" in account_name: + fiat_balances[fiat_currency] -= fiat_amount + except (ValueError, IndexError): + logger.warning(f"Could not parse cost basis: {cost_str}") + + elif isinstance(sats_positions, (int, float)): + # Simple number (no cost basis) + amount_int = int(sats_positions) + if "Payable" in account_name: + total_sats += amount_int + account_balance["sats"] += amount_int + elif "Receivable" in account_name: + total_sats -= amount_int + account_balance["sats"] -= amount_int + + accounts.append(account_balance) + + return { + "balance": total_sats, + "fiat_balances": fiat_balances, + "accounts": accounts + } + + except httpx.HTTPStatusError as e: + logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}") + raise + except httpx.RequestError as e: + logger.error(f"Fava connection error: {e}") + raise + + async def get_all_user_balances(self) -> List[Dict[str, Any]]: + """ + Get balances for all users (admin view). + + Returns: + [ + { + "user_id": "abc123", + "balance": 100000, + "fiat_balances": {"EUR": Decimal("100.50")}, + "accounts": [...] + }, + ... + ] + """ + query = """ + SELECT account, sum(position) + WHERE account ~ 'Payable:User-|Receivable:User-' + GROUP BY account + """ + + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + f"{self.base_url}/query", + params={"query_string": query} + ) + response.raise_for_status() + data = response.json() + + # Group by user_id + user_data = {} + + for row in data['data']['rows']: + account_name = row[0] + positions = row[1] + + # Extract user_id from account name + # e.g., "Liabilities:Payable:User-abc123" → "abc123..." + if ":User-" in account_name: + user_id = account_name.split(":User-")[1] + else: + continue + + if user_id not in user_data: + user_data[user_id] = { + "user_id": user_id, + "balance": 0, + "fiat_balances": {}, + "accounts": [] + } + + account_info = {"account": account_name, "sats": 0, "positions": positions} + + # Process positions + if isinstance(positions, dict) and "SATS" in positions: + sats_positions = positions["SATS"] + + if isinstance(sats_positions, dict): + for cost_str, amount in sats_positions.items(): + amount_int = int(amount) + + if "Payable" in account_name: + user_data[user_id]["balance"] += amount_int + account_info["sats"] += amount_int + elif "Receivable" in account_name: + user_data[user_id]["balance"] -= amount_int + account_info["sats"] -= amount_int + + # Extract fiat + if cost_str and cost_str != "SATS": + cost_clean = cost_str.strip('{}') + parts = cost_clean.split() + if len(parts) == 2: + try: + fiat_amount = Decimal(parts[0]) + fiat_currency = parts[1] + + if fiat_currency not in user_data[user_id]["fiat_balances"]: + user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0) + + if "Payable" in account_name: + user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount + elif "Receivable" in account_name: + user_data[user_id]["fiat_balances"][fiat_currency] -= fiat_amount + except (ValueError, IndexError): + pass + + elif isinstance(sats_positions, (int, float)): + amount_int = int(sats_positions) + if "Payable" in account_name: + user_data[user_id]["balance"] += amount_int + account_info["sats"] += amount_int + elif "Receivable" in account_name: + user_data[user_id]["balance"] -= amount_int + account_info["sats"] -= amount_int + + user_data[user_id]["accounts"].append(account_info) + + return list(user_data.values()) + + except httpx.HTTPStatusError as e: + logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}") + raise + except httpx.RequestError as e: + logger.error(f"Fava connection error: {e}") + raise + + async def check_fava_health(self) -> bool: + """ + Check if Fava is running and accessible. + + Returns: + True if Fava responds, False otherwise + """ + try: + async with httpx.AsyncClient(timeout=2.0) as client: + response = await client.get( + f"{self.base_url}/changed" + ) + return response.status_code == 200 + except Exception as e: + logger.warning(f"Fava health check failed: {e}") + return False + + +# Singleton instance (configured from settings) +_fava_client: Optional[FavaClient] = None + + +def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0): + """ + Initialize the global Fava client. + + Args: + fava_url: Base URL of Fava server + ledger_slug: Ledger identifier + timeout: Request timeout in seconds + """ + global _fava_client + _fava_client = FavaClient(fava_url, ledger_slug, timeout) + logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}") + + +def get_fava_client() -> FavaClient: + """ + Get the configured Fava client. + + Returns: + FavaClient instance + + Raises: + RuntimeError: If client not initialized + """ + if _fava_client is None: + raise RuntimeError( + "Fava client not initialized. Call init_fava_client() first. " + "Castle requires Fava for all accounting operations." + ) + return _fava_client