""" Fava API client for Castle. This module provides an async HTTP client for interacting with Fava's JSON API. All accounting logic is delegated to Fava/Beancount. Fava provides a REST API for: - Adding transactions (PUT /api/add_entries) - Adding accounts via Open directives (PUT /api/add_entries) - Querying balances (GET /api/query) - Balance sheets (GET /api/balance_sheet) - Account reports (GET /api/account_report) - Updating/deleting entries (PUT/DELETE /api/source_slice) See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py """ import httpx from typing import Any, Dict, List, Optional from decimal import Decimal from datetime import date, datetime from loguru import logger class FavaClient: """ Async client for Fava REST API. Fava runs as a separate web service and provides a JSON API for adding entries and querying ledger data. All accounting calculations are performed by Beancount via Fava. """ def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0): """ Initialize Fava client. Args: fava_url: Base URL of Fava server (e.g., http://localhost:3333) ledger_slug: URL-safe ledger identifier (e.g., castle-accounting) timeout: Request timeout in seconds """ self.fava_url = fava_url.rstrip('/') self.ledger_slug = ledger_slug self.base_url = f"{self.fava_url}/{self.ledger_slug}/api" self.timeout = timeout async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]: """ Submit a new journal entry to Fava. Args: entry: Beancount entry dict (format per Fava API spec) Must include: - t: "Transaction" (required by Fava) - date: "YYYY-MM-DD" - flag: "*" (cleared) or "!" (pending) - narration: str - postings: list of posting dicts - payee: str (empty string, not None) - tags: list of str - links: list of str - meta: dict Returns: Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."}) Raises: httpx.HTTPStatusError: If Fava returns an error httpx.RequestError: If connection fails Example: entry = { "t": "Transaction", "date": "2025-01-15", "flag": "*", "payee": "Store", "narration": "Purchase", "postings": [ {"account": "Expenses:Food", "amount": "50.00 EUR"}, {"account": "Assets:Cash", "amount": "-50.00 EUR"} ], "tags": [], "links": [], "meta": {"user_id": "abc123"} } result = await fava_client.add_entry(entry) """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.put( f"{self.base_url}/add_entries", json={"entries": [entry]}, headers={"Content-Type": "application/json"} ) response.raise_for_status() result = response.json() logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}") return result except httpx.HTTPStatusError as e: logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def get_account_balance(self, account_name: str) -> Dict[str, Any]: """ Get balance for a specific account (excluding pending transactions). Args: account_name: Full account name (e.g., "Assets:Receivable:User-abc123") Returns: Dict with: - sats: int (balance in satoshis) - positions: dict (currency → amount with cost basis) Note: Excludes pending transactions (flag='!') from balance calculation. Only cleared/completed transactions (flag='*') are included. Example: balance = await fava_client.get_account_balance("Assets:Receivable:User-abc") # Returns: { # "sats": 200000, # "positions": {"SATS": {"{100.00 EUR}": 200000}} # } """ query = f"SELECT sum(position) WHERE account = '{account_name}' AND flag != '!'" try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/query", params={"query_string": query} ) response.raise_for_status() data = response.json() if not data['data']['rows']: return {"sats": 0, "positions": {}} # Fava returns: [[account, {"SATS": {cost: amount}}]] positions = data['data']['rows'][0][1] if data['data']['rows'] else {} # Sum up all SATS positions total_sats = 0 if isinstance(positions, dict) and "SATS" in positions: sats_positions = positions["SATS"] if isinstance(sats_positions, dict): # Sum all amounts (with different cost bases) total_sats = sum(int(amount) for amount in sats_positions.values()) elif isinstance(sats_positions, (int, float)): # Simple number (no cost basis) total_sats = int(sats_positions) return { "sats": total_sats, "positions": positions } except httpx.HTTPStatusError as e: logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def get_user_balance(self, user_id: str) -> Dict[str, Any]: """ Get user's balance from castle's perspective. Aggregates: - Liabilities:Payable:User-{user_id} (negative = castle owes user) - Assets:Receivable:User-{user_id} (positive = user owes castle) Args: user_id: User ID Returns: { "balance": int (sats, positive = user owes castle, negative = castle owes user), "fiat_balances": {"EUR": Decimal("100.50")}, "accounts": [list of account dicts with balances] } Note: Excludes pending transactions (flag='!') from balance calculation. Only cleared/completed transactions (flag='*') are included. """ # Get all journal entries for this user all_entries = await self.get_journal_entries() total_sats = 0 fiat_balances = {} accounts_dict = {} # Track balances per account for entry in all_entries: # Skip non-transactions, pending (!), and voided if entry.get("t") != "Transaction": continue if entry.get("flag") == "!": continue if "voided" in entry.get("tags", []): continue # Process postings for this user for posting in entry.get("postings", []): account_name = posting.get("account", "") # Only process this user's accounts (account names use first 8 chars of user_id) if f":User-{user_id[:8]}" not in account_name: continue if "Payable" not in account_name and "Receivable" not in account_name: continue # Parse amount string: can be EUR, USD, or SATS amount_str = posting.get("amount", "") if not isinstance(amount_str, str) or not amount_str: continue import re # Try to extract EUR/USD amount first (new format) fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str) if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'): # Direct EUR/USD amount (new approach) fiat_amount = Decimal(fiat_match.group(1)) fiat_currency = fiat_match.group(2) if fiat_currency not in fiat_balances: fiat_balances[fiat_currency] = Decimal(0) fiat_balances[fiat_currency] += fiat_amount # Also track SATS equivalent from metadata if available posting_meta = posting.get("meta", {}) sats_equiv = posting_meta.get("sats-equivalent") if sats_equiv: sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv) total_sats += sats_amount if account_name not in accounts_dict: accounts_dict[account_name] = {"account": account_name, "sats": 0} accounts_dict[account_name]["sats"] += sats_amount else: # Old format: SATS with cost/price notation - extract SATS amount sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str) if sats_match: sats_amount = int(sats_match.group(1)) total_sats += sats_amount # Track per account if account_name not in accounts_dict: accounts_dict[account_name] = {"account": account_name, "sats": 0} accounts_dict[account_name]["sats"] += sats_amount # Try to extract fiat from metadata or cost syntax (backward compatibility) posting_meta = posting.get("meta", {}) fiat_amount_total_str = posting_meta.get("fiat-amount-total") fiat_currency_meta = posting_meta.get("fiat-currency") if fiat_amount_total_str and fiat_currency_meta: # Use exact total from metadata fiat_total = Decimal(fiat_amount_total_str) fiat_currency = fiat_currency_meta if fiat_currency not in fiat_balances: fiat_balances[fiat_currency] = Decimal(0) # Apply the same sign as the SATS amount if sats_match: sats_amount_for_sign = int(sats_match.group(1)) if sats_amount_for_sign < 0: fiat_total = -fiat_total fiat_balances[fiat_currency] += fiat_total logger.info(f"User {user_id[:8]} balance: {total_sats} sats, fiat: {dict(fiat_balances)}") return { "balance": total_sats, "fiat_balances": fiat_balances, "accounts": list(accounts_dict.values()) } async def get_all_user_balances(self) -> List[Dict[str, Any]]: """ Get balances for all users (admin view). Returns: [ { "user_id": "abc123", "balance": 100000, "fiat_balances": {"EUR": Decimal("100.50")}, "accounts": [...] }, ... ] Note: Excludes pending transactions (flag='!') and voided (tag #voided) from balance calculation. Only cleared/completed transactions (flag='*') are included. """ # Get all journal entries and calculate balances from postings all_entries = await self.get_journal_entries() # Group by user_id user_data = {} for entry in all_entries: # Skip non-transactions, pending (!), and voided if entry.get("t") != "Transaction": continue if entry.get("flag") == "!": continue if "voided" in entry.get("tags", []): continue # Process postings for posting in entry.get("postings", []): account_name = posting.get("account", "") # Only process user accounts (Payable or Receivable) if ":User-" not in account_name: continue if "Payable" not in account_name and "Receivable" not in account_name: continue # Extract user_id from account name user_id = account_name.split(":User-")[1] if user_id not in user_data: user_data[user_id] = { "user_id": user_id, "balance": 0, "fiat_balances": {}, "accounts": [] } # Parse amount string: can be EUR/USD directly (new format) or "SATS {EUR}" (old format) amount_str = posting.get("amount", "") if not isinstance(amount_str, str) or not amount_str: continue import re # Try to extract EUR/USD amount first (new format) fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str) if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'): # Direct EUR/USD amount (new approach) fiat_amount = Decimal(fiat_match.group(1)) fiat_currency = fiat_match.group(2) if fiat_currency not in user_data[user_id]["fiat_balances"]: user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0) user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount # Also track SATS equivalent from metadata if available posting_meta = posting.get("meta", {}) sats_equiv = posting_meta.get("sats-equivalent") if sats_equiv: sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv) user_data[user_id]["balance"] += sats_amount else: # Old format: SATS with cost/price notation sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str) if sats_match: sats_amount = int(sats_match.group(1)) user_data[user_id]["balance"] += sats_amount # Extract fiat from cost syntax or metadata (backward compatibility) posting_meta = posting.get("meta", {}) fiat_amount_total_str = posting_meta.get("fiat-amount-total") fiat_currency_meta = posting_meta.get("fiat-currency") if fiat_amount_total_str and fiat_currency_meta: fiat_total = Decimal(fiat_amount_total_str) fiat_currency = fiat_currency_meta if fiat_currency not in user_data[user_id]["fiat_balances"]: user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0) # Apply the same sign as the SATS amount if sats_match: sats_amount_for_sign = int(sats_match.group(1)) if sats_amount_for_sign < 0: fiat_total = -fiat_total user_data[user_id]["fiat_balances"][fiat_currency] += fiat_total return list(user_data.values()) async def check_fava_health(self) -> bool: """ Check if Fava is running and accessible. Returns: True if Fava responds, False otherwise """ try: async with httpx.AsyncClient(timeout=2.0) as client: response = await client.get( f"{self.base_url}/changed" ) return response.status_code == 200 except Exception as e: logger.warning(f"Fava health check failed: {e}") return False async def query_transactions( self, account_pattern: Optional[str] = None, limit: int = 100, include_pending: bool = True ) -> List[Dict[str, Any]]: """ Query transactions from Fava/Beancount. Args: account_pattern: Optional regex pattern to filter accounts (e.g., "User-abc123") limit: Maximum number of transactions to return include_pending: Include pending transactions (flag='!') Returns: List of transaction dictionaries with date, description, postings, etc. Example: # All transactions txns = await fava.query_transactions() # User's transactions txns = await fava.query_transactions(account_pattern="User-abc123") # Account transactions txns = await fava.query_transactions(account_pattern="Assets:Receivable:User-abc") """ # Build Beancount query if account_pattern: query = f"SELECT * WHERE account ~ '{account_pattern}' ORDER BY date DESC LIMIT {limit}" else: query = f"SELECT * ORDER BY date DESC LIMIT {limit}" try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/query", params={"query_string": query} ) response.raise_for_status() result = response.json() # Fava query API returns: {"data": {"rows": [...], "types": [...]}} data = result.get("data", {}) rows = data.get("rows", []) types = data.get("types", []) # Build column name mapping column_names = [t.get("name") for t in types] # Transform Fava's query result to transaction list transactions = [] for row in rows: # Rows are arrays, convert to dict using column names if isinstance(row, list) and len(row) == len(column_names): txn = dict(zip(column_names, row)) # Filter by flag if needed flag = txn.get("flag", "*") if not include_pending and flag == "!": continue transactions.append(txn) elif isinstance(row, dict): # Already a dict (shouldn't happen with BQL, but handle it) flag = row.get("flag", "*") if not include_pending and flag == "!": continue transactions.append(row) return transactions[:limit] except httpx.HTTPStatusError as e: logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def query_bql(self, query_string: str) -> Dict[str, Any]: """ Execute arbitrary Beancount Query Language (BQL) query. This is a general-purpose method for executing BQL queries against Fava/Beancount. Use this for efficient aggregations, filtering, and data retrieval. ⚠️ LIMITATION: BQL can only query position amounts and transaction-level data. It CANNOT access posting metadata (like 'sats-equivalent'). For Castle's current ledger format where SATS are stored in metadata, manual aggregation is required. See: misc-docs/BQL-BALANCE-QUERIES.md for detailed analysis and test results. FUTURE CONSIDERATION: If Castle's ledger format changes to use SATS as position amounts (instead of metadata), BQL could provide significant performance benefits. Args: query_string: BQL query (e.g., "SELECT account, sum(position) WHERE account ~ 'User-abc'") Returns: { "rows": [[col1, col2, ...], ...], "types": [{"name": "col1", "type": "str"}, ...], "column_names": ["col1", "col2", ...] } Example: result = await fava.query_bql("SELECT account, sum(position) WHERE account ~ 'User-abc'") for row in result["rows"]: account, balance = row print(f"{account}: {balance}") See: https://beancount.github.io/docs/beancount_query_language.html """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/query", params={"query_string": query_string} ) response.raise_for_status() result = response.json() # Fava returns: {"data": {"rows": [...], "types": [...]}} data = result.get("data", {}) rows = data.get("rows", []) types = data.get("types", []) column_names = [t.get("name") for t in types] return { "rows": rows, "types": types, "column_names": column_names } except httpx.HTTPStatusError as e: logger.error(f"BQL query error: {e.response.status_code} - {e.response.text}") logger.error(f"Query was: {query_string}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def get_user_balance_bql(self, user_id: str) -> Dict[str, Any]: """ Get user balance using BQL (efficient, replaces 115-line manual aggregation). ⚠️ NOT CURRENTLY USED: This method cannot access SATS balances stored in posting metadata. It only queries position amounts (EUR/USD). For Castle's current ledger format, use get_user_balance() instead (manual aggregation with caching). This method uses Beancount Query Language for server-side filtering and aggregation, which would provide 5-10x performance improvement IF SATS were stored as position amounts instead of metadata. FUTURE CONSIDERATION: If Castle's ledger format changes to store SATS as position amounts (e.g., "100000 SATS {100.00 EUR}"), this method would become feasible and provide significant performance benefits. See: misc-docs/BQL-BALANCE-QUERIES.md for detailed test results and analysis. Args: user_id: User ID Returns: { "balance": int (sats), # Currently returns 0 (cannot access metadata) "fiat_balances": {"EUR": Decimal("100.50"), ...}, # Works correctly "accounts": [{"account": "...", "sats": 150000}, ...] } Example: balance = await fava.get_user_balance_bql("af983632") print(f"Balance: {balance['balance']} sats") # Will be 0 with current ledger format """ from decimal import Decimal import re # Build BQL query for this user's Payable/Receivable accounts user_id_prefix = user_id[:8] query = f""" SELECT account, sum(position) as balance WHERE account ~ ':User-{user_id_prefix}' AND (account ~ 'Payable' OR account ~ 'Receivable') AND flag != '!' GROUP BY account """ result = await self.query_bql(query) # Process results total_sats = 0 fiat_balances = {} accounts = [] for row in result["rows"]: account_name, position = row # Position can be: # - Dict: {"SATS": "150000", "EUR": "145.50"} # - String: "150000 SATS" or "145.50 EUR" if isinstance(position, dict): # Extract SATS sats_str = position.get("SATS", "0") sats_amount = int(sats_str) if sats_str else 0 total_sats += sats_amount accounts.append({ "account": account_name, "sats": sats_amount }) # Extract fiat currencies for currency in ["EUR", "USD", "GBP"]: if currency in position: fiat_str = position[currency] fiat_amount = Decimal(fiat_str) if fiat_str else Decimal(0) if currency not in fiat_balances: fiat_balances[currency] = Decimal(0) fiat_balances[currency] += fiat_amount elif isinstance(position, str): # Single currency (parse "150000 SATS" or "145.50 EUR") sats_match = re.match(r'^(-?\d+)\s+SATS$', position) if sats_match: sats_amount = int(sats_match.group(1)) total_sats += sats_amount accounts.append({ "account": account_name, "sats": sats_amount }) else: fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', position) if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'): fiat_amount = Decimal(fiat_match.group(1)) currency = fiat_match.group(2) if currency not in fiat_balances: fiat_balances[currency] = Decimal(0) fiat_balances[currency] += fiat_amount logger.info(f"User {user_id[:8]} balance (BQL): {total_sats} sats, fiat: {dict(fiat_balances)}") return { "balance": total_sats, "fiat_balances": fiat_balances, "accounts": accounts } async def get_all_user_balances_bql(self) -> List[Dict[str, Any]]: """ Get balances for all users using BQL (efficient admin view). ⚠️ NOT CURRENTLY USED: This method cannot access SATS balances stored in posting metadata. It only queries position amounts (EUR/USD). For Castle's current ledger format, use get_all_user_balances() instead (manual aggregation with caching). This method uses Beancount Query Language to query all user balances in a single efficient query, which would be faster than fetching all entries IF SATS were stored as position amounts instead of metadata. FUTURE CONSIDERATION: If Castle's ledger format changes to store SATS as position amounts, this method would provide significant performance benefits for admin views. See: misc-docs/BQL-BALANCE-QUERIES.md for detailed test results and analysis. Returns: [ { "user_id": "abc123", "balance": 100000, # Currently 0 (cannot access metadata) "fiat_balances": {"EUR": Decimal("100.50")}, # Works correctly "accounts": [{"account": "...", "sats": 150000}, ...] }, ... ] Example: all_balances = await fava.get_all_user_balances_bql() for user in all_balances: print(f"{user['user_id']}: {user['balance']} sats") # Will be 0 with current format """ from decimal import Decimal import re # BQL query for ALL user accounts query = """ SELECT account, sum(position) as balance WHERE (account ~ 'Payable:User-' OR account ~ 'Receivable:User-') AND flag != '!' GROUP BY account """ result = await self.query_bql(query) # Group by user_id user_data = {} for row in result["rows"]: account_name, position = row # Extract user_id from account name # Format: "Liabilities:Payable:User-abc12345" or "Assets:Receivable:User-abc12345" if ":User-" not in account_name: continue user_id_with_prefix = account_name.split(":User-")[1] # User ID is the first 8 chars (our standard) user_id = user_id_with_prefix[:8] if user_id not in user_data: user_data[user_id] = { "user_id": user_id, "balance": 0, "fiat_balances": {}, "accounts": [] } # Process position (same logic as single-user query) if isinstance(position, dict): sats_str = position.get("SATS", "0") sats_amount = int(sats_str) if sats_str else 0 user_data[user_id]["balance"] += sats_amount user_data[user_id]["accounts"].append({ "account": account_name, "sats": sats_amount }) for currency in ["EUR", "USD", "GBP"]: if currency in position: fiat_str = position[currency] fiat_amount = Decimal(fiat_str) if fiat_str else Decimal(0) if currency not in user_data[user_id]["fiat_balances"]: user_data[user_id]["fiat_balances"][currency] = Decimal(0) user_data[user_id]["fiat_balances"][currency] += fiat_amount elif isinstance(position, str): # Single currency (parse "150000 SATS" or "145.50 EUR") sats_match = re.match(r'^(-?\d+)\s+SATS$', position) if sats_match: sats_amount = int(sats_match.group(1)) user_data[user_id]["balance"] += sats_amount user_data[user_id]["accounts"].append({ "account": account_name, "sats": sats_amount }) else: fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', position) if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'): fiat_amount = Decimal(fiat_match.group(1)) currency = fiat_match.group(2) if currency not in user_data[user_id]["fiat_balances"]: user_data[user_id]["fiat_balances"][currency] = Decimal(0) user_data[user_id]["fiat_balances"][currency] += fiat_amount logger.info(f"Fetched balances for {len(user_data)} users (BQL)") return list(user_data.values()) async def get_account_transactions( self, account_name: str, limit: int = 100 ) -> List[Dict[str, Any]]: """ Get all transactions affecting a specific account. Args: account_name: Full account name (e.g., "Assets:Receivable:User-abc123") limit: Maximum number of transactions Returns: List of transactions affecting this account """ return await self.query_transactions( account_pattern=account_name.replace(":", "\\:"), # Escape colons for regex limit=limit ) async def get_user_transactions( self, user_id: str, limit: int = 100 ) -> List[Dict[str, Any]]: """ Get all transactions affecting a user's accounts. Args: user_id: User ID limit: Maximum number of transactions Returns: List of transactions affecting user's accounts """ return await self.query_transactions( account_pattern=f"User-{user_id[:8]}", limit=limit ) async def get_journal_entries(self) -> List[Dict[str, Any]]: """ Get all journal entries from Fava (with entry hashes). Returns: List of all entries (transactions, opens, closes, etc.) with entry_hash field. Example: entries = await fava.get_journal_entries() # Each entry has: entry_hash, date, flag, narration, tags, links, etc. """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get(f"{self.base_url}/journal") response.raise_for_status() result = response.json() entries = result.get("data", []) logger.info(f"Fava /journal returned {len(entries)} entries") # Log transactions with "Lightning payment" in narration lightning_entries = [e for e in entries if "Lightning payment" in e.get("narration", "")] logger.info(f"Found {len(lightning_entries)} Lightning payment entries in journal") return entries except httpx.HTTPStatusError as e: logger.error(f"Fava journal error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def get_entry_context(self, entry_hash: str) -> Dict[str, Any]: """ Get entry context including source text and sha256sum. Args: entry_hash: Entry hash from get_journal_entries() Returns: { "entry": {...}, # Serialized entry "slice": "2025-01-15 ! \"Description\"...", # Beancount source text "sha256sum": "abc123...", # For concurrency control "balances_before": {...}, "balances_after": {...} } Example: context = await fava.get_entry_context("abc123") source = context["slice"] sha256sum = context["sha256sum"] """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/context", params={"entry_hash": entry_hash} ) response.raise_for_status() result = response.json() return result.get("data", {}) except httpx.HTTPStatusError as e: logger.error(f"Fava context error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def update_entry_source(self, entry_hash: str, new_source: str, sha256sum: str) -> str: """ Update an entry's source text (e.g., change flag from ! to *). Args: entry_hash: Entry hash new_source: Modified Beancount source text sha256sum: Current sha256sum from get_entry_context() for concurrency control Returns: New sha256sum after update Example: # Get context context = await fava.get_entry_context("abc123") source = context["slice"] sha256 = context["sha256sum"] # Change flag new_source = source.replace("2025-01-15 !", "2025-01-15 *") # Update new_sha256 = await fava.update_entry_source("abc123", new_source, sha256) """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.put( f"{self.base_url}/source_slice", json={ "entry_hash": entry_hash, "source": new_source, "sha256sum": sha256sum } ) response.raise_for_status() result = response.json() return result.get("data", "") except httpx.HTTPStatusError as e: logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def delete_entry(self, entry_hash: str, sha256sum: str) -> str: """ Delete an entry from the Beancount file. Args: entry_hash: Entry hash sha256sum: Current sha256sum for concurrency control Returns: Success message Example: context = await fava.get_entry_context("abc123") await fava.delete_entry("abc123", context["sha256sum"]) """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.delete( f"{self.base_url}/source_slice", params={ "entry_hash": entry_hash, "sha256sum": sha256sum } ) response.raise_for_status() result = response.json() return result.get("data", "") except httpx.HTTPStatusError as e: logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise async def add_account( self, account_name: str, currencies: list[str], opening_date: Optional[date] = None, metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Add an account to the Beancount ledger via an Open directive. NOTE: Fava's /api/add_entries endpoint does NOT support Open directives. This method uses /api/source to directly edit the Beancount file. Args: account_name: Full account name (e.g., "Assets:Receivable:User-abc123") currencies: List of currencies for this account (e.g., ["EUR", "SATS"]) opening_date: Date to open the account (defaults to today) metadata: Optional metadata for the account Returns: Response from Fava ({"data": "new_sha256sum", "mtime": "..."}) Example: # Add a user's receivable account result = await fava.add_account( account_name="Assets:Receivable:User-abc123", currencies=["EUR", "SATS", "USD"], metadata={"user_id": "abc123", "description": "User receivables"} ) # Add a user's payable account result = await fava.add_account( account_name="Liabilities:Payable:User-abc123", currencies=["EUR", "SATS"] ) """ from datetime import date as date_type if opening_date is None: opening_date = date_type.today() try: async with httpx.AsyncClient(timeout=self.timeout) as client: # Step 1: Get the main Beancount file path from Fava options_response = await client.get(f"{self.base_url}/options") options_response.raise_for_status() options_data = options_response.json()["data"] file_path = options_data["beancount_options"]["filename"] logger.debug(f"Fava main file: {file_path}") # Step 2: Get current source file response = await client.get( f"{self.base_url}/source", params={"filename": file_path} ) response.raise_for_status() source_data = response.json()["data"] sha256sum = source_data["sha256sum"] source = source_data["source"] # Step 2: Check if account already exists if f"open {account_name}" in source: logger.info(f"Account {account_name} already exists in Beancount file") return {"data": sha256sum, "mtime": source_data.get("mtime", "")} # Step 3: Find insertion point (after last Open directive AND its metadata) lines = source.split('\n') insert_index = 0 for i, line in enumerate(lines): if line.strip().startswith(('open ', f'{opening_date.year}-')) and 'open' in line: # Found an Open directive, now skip over any metadata lines insert_index = i + 1 # Skip metadata lines (lines starting with whitespace) while insert_index < len(lines) and lines[insert_index].startswith((' ', '\t')) and lines[insert_index].strip(): insert_index += 1 # Step 4: Format Open directive as Beancount text currencies_str = ", ".join(currencies) open_lines = [ "", f"{opening_date.isoformat()} open {account_name} {currencies_str}" ] # Add metadata if provided if metadata: for key, value in metadata.items(): # Format metadata with proper indentation if isinstance(value, str): open_lines.append(f' {key}: "{value}"') else: open_lines.append(f' {key}: {value}') # Step 5: Insert into source for i, line in enumerate(open_lines): lines.insert(insert_index + i, line) new_source = '\n'.join(lines) # Step 6: Update source file via PUT /api/source update_payload = { "file_path": file_path, "source": new_source, "sha256sum": sha256sum } response = await client.put( f"{self.base_url}/source", json=update_payload, headers={"Content-Type": "application/json"} ) response.raise_for_status() result = response.json() logger.info(f"Added account {account_name} to Beancount file with currencies {currencies}") return result except httpx.HTTPStatusError as e: logger.error(f"Fava HTTP error adding account: {e.response.status_code} - {e.response.text}") raise except httpx.RequestError as e: logger.error(f"Fava connection error: {e}") raise # Singleton instance (configured from settings) _fava_client: Optional[FavaClient] = None def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0): """ Initialize the global Fava client. Args: fava_url: Base URL of Fava server ledger_slug: Ledger identifier timeout: Request timeout in seconds """ global _fava_client _fava_client = FavaClient(fava_url, ledger_slug, timeout) logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}") def get_fava_client() -> FavaClient: """ Get the configured Fava client. Returns: FavaClient instance Raises: RuntimeError: If client not initialized """ if _fava_client is None: raise RuntimeError( "Fava client not initialized. Call init_fava_client() first. " "Castle requires Fava for all accounting operations." ) return _fava_client