""" Account Synchronization Module Syncs accounts from Beancount (source of truth) to Castle DB (metadata store). This implements the hybrid approach: - Beancount owns account existence (Open directives) - Castle DB stores permissions and user associations - Background sync keeps them in sync Related: ACCOUNTS-TABLE-REMOVAL-FEASIBILITY.md - Phase 2 implementation """ from datetime import datetime from typing import Optional from loguru import logger from .crud import create_account, get_account_by_name, get_all_accounts from .fava_client import get_fava_client from .models import AccountType, CreateAccount def infer_account_type_from_name(account_name: str) -> AccountType: """ Infer Beancount account type from hierarchical name. Args: account_name: Hierarchical account name (e.g., "Expenses:Food:Groceries") Returns: AccountType enum value Examples: "Assets:Cash" → AccountType.ASSET "Liabilities:PayPal" → AccountType.LIABILITY "Expenses:Food" → AccountType.EXPENSE "Income:Services" → AccountType.REVENUE "Equity:Opening-Balances" → AccountType.EQUITY """ root = account_name.split(":")[0] type_map = { "Assets": AccountType.ASSET, "Liabilities": AccountType.LIABILITY, "Expenses": AccountType.EXPENSE, "Income": AccountType.REVENUE, "Equity": AccountType.EQUITY, } # Default to ASSET if unknown (shouldn't happen with valid Beancount) return type_map.get(root, AccountType.ASSET) def extract_user_id_from_account_name(account_name: str) -> Optional[str]: """ Extract user ID from account name if it's a user-specific account. Args: account_name: Hierarchical account name Returns: User ID if found, None otherwise Examples: "Assets:Receivable:User-abc123def" → "abc123def456ghi789" "Liabilities:Payable:User-abc123" → "abc123def456ghi789" "Expenses:Food" → None """ if ":User-" not in account_name: return None # Extract the part after "User-" parts = account_name.split(":User-") if len(parts) < 2: return None # First 8 characters are the user ID prefix user_id_prefix = parts[1] # For now, return the prefix (could look up full user ID from DB if needed) # Note: get_or_create_user_account() uses 8-char prefix in account names return user_id_prefix async def sync_accounts_from_beancount(force_full_sync: bool = False) -> dict: """ Sync accounts from Beancount to Castle DB. This ensures Castle DB has metadata entries for all accounts that exist in Beancount, enabling permissions and user associations to work properly. Args: force_full_sync: If True, re-check all accounts. If False, only add new ones. Returns: dict with sync statistics: { "total_beancount_accounts": 150, "total_castle_accounts": 148, "accounts_added": 2, "accounts_updated": 0, "accounts_skipped": 148, "errors": [] } """ logger.info("Starting account sync from Beancount to Castle DB") fava = get_fava_client() # Get all accounts from Beancount try: beancount_accounts = await fava.get_all_accounts() except Exception as e: logger.error(f"Failed to fetch accounts from Beancount: {e}") return { "total_beancount_accounts": 0, "total_castle_accounts": 0, "accounts_added": 0, "accounts_updated": 0, "accounts_skipped": 0, "errors": [str(e)], } # Get all accounts from Castle DB castle_accounts = await get_all_accounts() castle_account_names = {acc.name for acc in castle_accounts} stats = { "total_beancount_accounts": len(beancount_accounts), "total_castle_accounts": len(castle_accounts), "accounts_added": 0, "accounts_updated": 0, "accounts_skipped": 0, "errors": [], } # Sync each Beancount account to Castle DB for bc_account in beancount_accounts: account_name = bc_account["account"] # Skip if already in Castle DB (unless force_full_sync) if account_name in castle_account_names and not force_full_sync: stats["accounts_skipped"] += 1 continue try: # Check if account exists (for force_full_sync) existing = await get_account_by_name(account_name) if existing: # Account exists - could update metadata here if needed stats["accounts_skipped"] += 1 logger.debug(f"Account already exists: {account_name}") continue # Create new account in Castle DB account_type = infer_account_type_from_name(account_name) user_id = extract_user_id_from_account_name(account_name) # Get description from Beancount metadata if available description = None if "meta" in bc_account and isinstance(bc_account["meta"], dict): description = bc_account["meta"].get("description") await create_account( CreateAccount( name=account_name, account_type=account_type, description=description, user_id=user_id, ) ) stats["accounts_added"] += 1 logger.info(f"Added account from Beancount: {account_name}") except Exception as e: error_msg = f"Failed to sync account {account_name}: {e}" logger.error(error_msg) stats["errors"].append(error_msg) logger.info( f"Account sync complete: " f"{stats['accounts_added']} added, " f"{stats['accounts_skipped']} skipped, " f"{len(stats['errors'])} errors" ) return stats async def sync_single_account_from_beancount(account_name: str) -> bool: """ Sync a single account from Beancount to Castle DB. Useful for ensuring a specific account exists in Castle DB before granting permissions on it. Args: account_name: Hierarchical account name (e.g., "Expenses:Food") Returns: True if account was created/updated, False if it already existed or failed """ logger.debug(f"Syncing single account: {account_name}") # Check if already exists existing = await get_account_by_name(account_name) if existing: logger.debug(f"Account already exists: {account_name}") return False # Get from Beancount fava = get_fava_client() try: all_accounts = await fava.get_all_accounts() bc_account = next( (acc for acc in all_accounts if acc["account"] == account_name), None ) if not bc_account: logger.error(f"Account not found in Beancount: {account_name}") return False # Create in Castle DB account_type = infer_account_type_from_name(account_name) user_id = extract_user_id_from_account_name(account_name) description = None if "meta" in bc_account and isinstance(bc_account["meta"], dict): description = bc_account["meta"].get("description") await create_account( CreateAccount( name=account_name, account_type=account_type, description=description, user_id=user_id, ) ) logger.info(f"Created account from Beancount: {account_name}") return True except Exception as e: logger.error(f"Failed to sync account {account_name}: {e}") return False async def ensure_account_exists_in_castle(account_name: str) -> bool: """ Ensure account exists in Castle DB, creating from Beancount if needed. This is the recommended function to call before granting permissions. Args: account_name: Hierarchical account name Returns: True if account exists (or was created), False if failed """ # Check Castle DB first existing = await get_account_by_name(account_name) if existing: return True # Try to sync from Beancount return await sync_single_account_from_beancount(account_name) # Background sync task (can be scheduled with cron or async scheduler) async def scheduled_account_sync(): """ Scheduled task to sync accounts from Beancount to Castle DB. Run this periodically (e.g., every hour) to keep Castle DB in sync with Beancount. Example with APScheduler: from apscheduler.schedulers.asyncio import AsyncIOScheduler scheduler = AsyncIOScheduler() scheduler.add_job( scheduled_account_sync, 'interval', hours=1, # Run every hour id='account_sync' ) scheduler.start() """ logger.info("Running scheduled account sync") try: stats = await sync_accounts_from_beancount(force_full_sync=False) if stats["accounts_added"] > 0: logger.info( f"Scheduled sync: Added {stats['accounts_added']} new accounts" ) if stats["errors"]: logger.warning( f"Scheduled sync: {len(stats['errors'])} errors encountered" ) return stats except Exception as e: logger.error(f"Scheduled account sync failed: {e}") raise