""" Account Synchronization Module Syncs accounts from Beancount (source of truth) to Castle DB (metadata store). This implements the hybrid approach: - Beancount owns account existence (Open directives) - Castle DB stores permissions and user associations - Background sync keeps them in sync Related: ACCOUNTS-TABLE-REMOVAL-FEASIBILITY.md - Phase 2 implementation """ from datetime import datetime from typing import Optional from loguru import logger from .crud import ( create_account, get_account_by_name, get_all_accounts, update_account_is_active, ) from .fava_client import get_fava_client from .models import AccountType, CreateAccount def infer_account_type_from_name(account_name: str) -> AccountType: """ Infer Beancount account type from hierarchical name. Args: account_name: Hierarchical account name (e.g., "Expenses:Food:Groceries") Returns: AccountType enum value Examples: "Assets:Cash" → AccountType.ASSET "Liabilities:PayPal" → AccountType.LIABILITY "Expenses:Food" → AccountType.EXPENSE "Income:Services" → AccountType.REVENUE "Equity:Opening-Balances" → AccountType.EQUITY """ root = account_name.split(":")[0] type_map = { "Assets": AccountType.ASSET, "Liabilities": AccountType.LIABILITY, "Expenses": AccountType.EXPENSE, "Income": AccountType.REVENUE, "Equity": AccountType.EQUITY, } # Default to ASSET if unknown (shouldn't happen with valid Beancount) return type_map.get(root, AccountType.ASSET) def extract_user_id_from_account_name(account_name: str) -> Optional[str]: """ Extract user ID from account name if it's a user-specific account. Args: account_name: Hierarchical account name Returns: User ID if found, None otherwise Examples: "Assets:Receivable:User-abc123def" → "abc123def456ghi789" "Liabilities:Payable:User-abc123" → "abc123def456ghi789" "Expenses:Food" → None """ if ":User-" not in account_name: return None # Extract the part after "User-" parts = account_name.split(":User-") if len(parts) < 2: return None # First 8 characters are the user ID prefix user_id_prefix = parts[1] # For now, return the prefix (could look up full user ID from DB if needed) # Note: get_or_create_user_account() uses 8-char prefix in account names return user_id_prefix async def sync_accounts_from_beancount(force_full_sync: bool = False) -> dict: """ Sync accounts from Beancount to Castle DB. This ensures Castle DB has metadata entries for all accounts that exist in Beancount, enabling permissions and user associations to work properly. New behavior (soft delete): - Accounts in Beancount but not in Castle DB: Added as active - Accounts in Castle DB but not in Beancount: Marked as inactive (soft delete) - Inactive accounts that return to Beancount: Reactivated Args: force_full_sync: If True, re-check all accounts. If False, only add new ones. Returns: dict with sync statistics: { "total_beancount_accounts": 150, "total_castle_accounts": 148, "accounts_added": 2, "accounts_updated": 0, "accounts_skipped": 148, "accounts_deactivated": 5, "accounts_reactivated": 1, "errors": [] } """ logger.info("Starting account sync from Beancount to Castle DB") fava = get_fava_client() # Get all accounts from Beancount try: beancount_accounts = await fava.get_all_accounts() except Exception as e: logger.error(f"Failed to fetch accounts from Beancount: {e}") return { "total_beancount_accounts": 0, "total_castle_accounts": 0, "accounts_added": 0, "accounts_updated": 0, "accounts_skipped": 0, "accounts_deactivated": 0, "accounts_reactivated": 0, "errors": [str(e)], } # Get all accounts from Castle DB (including inactive ones for sync) castle_accounts = await get_all_accounts(include_inactive=True) # Build lookup maps beancount_account_names = {acc["account"] for acc in beancount_accounts} castle_accounts_by_name = {acc.name: acc for acc in castle_accounts} stats = { "total_beancount_accounts": len(beancount_accounts), "total_castle_accounts": len(castle_accounts), "accounts_added": 0, "accounts_updated": 0, "accounts_skipped": 0, "accounts_deactivated": 0, "accounts_reactivated": 0, "errors": [], } # Step 1: Sync accounts from Beancount to Castle DB for bc_account in beancount_accounts: account_name = bc_account["account"] try: existing = castle_accounts_by_name.get(account_name) if existing: # Account exists in Castle DB # Check if it needs to be reactivated if not existing.is_active: await update_account_is_active(existing.id, True) stats["accounts_reactivated"] += 1 logger.info(f"Reactivated account: {account_name}") else: stats["accounts_skipped"] += 1 logger.debug(f"Account already active: {account_name}") continue # Create new account in Castle DB account_type = infer_account_type_from_name(account_name) user_id = extract_user_id_from_account_name(account_name) # Get description from Beancount metadata if available description = None if "meta" in bc_account and isinstance(bc_account["meta"], dict): description = bc_account["meta"].get("description") await create_account( CreateAccount( name=account_name, account_type=account_type, description=description, user_id=user_id, ) ) stats["accounts_added"] += 1 logger.info(f"Added account from Beancount: {account_name}") except Exception as e: error_msg = f"Failed to sync account {account_name}: {e}" logger.error(error_msg) stats["errors"].append(error_msg) # Step 2: Mark orphaned accounts (in Castle DB but not in Beancount) as inactive for castle_account in castle_accounts: if castle_account.name not in beancount_account_names: # Account no longer exists in Beancount if castle_account.is_active: try: await update_account_is_active(castle_account.id, False) stats["accounts_deactivated"] += 1 logger.info( f"Deactivated orphaned account: {castle_account.name}" ) except Exception as e: error_msg = ( f"Failed to deactivate account {castle_account.name}: {e}" ) logger.error(error_msg) stats["errors"].append(error_msg) logger.info( f"Account sync complete: " f"{stats['accounts_added']} added, " f"{stats['accounts_reactivated']} reactivated, " f"{stats['accounts_deactivated']} deactivated, " f"{stats['accounts_skipped']} skipped, " f"{len(stats['errors'])} errors" ) return stats async def sync_single_account_from_beancount(account_name: str) -> bool: """ Sync a single account from Beancount to Castle DB. Useful for ensuring a specific account exists in Castle DB before granting permissions on it. Args: account_name: Hierarchical account name (e.g., "Expenses:Food") Returns: True if account was created/updated, False if it already existed or failed """ logger.debug(f"Syncing single account: {account_name}") # Check if already exists existing = await get_account_by_name(account_name) if existing: logger.debug(f"Account already exists: {account_name}") return False # Get from Beancount fava = get_fava_client() try: all_accounts = await fava.get_all_accounts() bc_account = next( (acc for acc in all_accounts if acc["account"] == account_name), None ) if not bc_account: logger.error(f"Account not found in Beancount: {account_name}") return False # Create in Castle DB account_type = infer_account_type_from_name(account_name) user_id = extract_user_id_from_account_name(account_name) description = None if "meta" in bc_account and isinstance(bc_account["meta"], dict): description = bc_account["meta"].get("description") await create_account( CreateAccount( name=account_name, account_type=account_type, description=description, user_id=user_id, ) ) logger.info(f"Created account from Beancount: {account_name}") return True except Exception as e: logger.error(f"Failed to sync account {account_name}: {e}") return False async def ensure_account_exists_in_castle(account_name: str) -> bool: """ Ensure account exists in Castle DB, creating from Beancount if needed. This is the recommended function to call before granting permissions. Args: account_name: Hierarchical account name Returns: True if account exists (or was created), False if failed """ # Check Castle DB first existing = await get_account_by_name(account_name) if existing: return True # Try to sync from Beancount return await sync_single_account_from_beancount(account_name) # Background sync task (can be scheduled with cron or async scheduler) async def scheduled_account_sync(): """ Scheduled task to sync accounts from Beancount to Castle DB. Run this periodically (e.g., every hour) to keep Castle DB in sync with Beancount. Example with APScheduler: from apscheduler.schedulers.asyncio import AsyncIOScheduler scheduler = AsyncIOScheduler() scheduler.add_job( scheduled_account_sync, 'interval', hours=1, # Run every hour id='account_sync' ) scheduler.start() """ logger.info("Running scheduled account sync") try: stats = await sync_accounts_from_beancount(force_full_sync=False) if stats["accounts_added"] > 0: logger.info( f"Scheduled sync: Added {stats['accounts_added']} new accounts" ) if stats["errors"]: logger.warning( f"Scheduled sync: {len(stats['errors'])} errors encountered" ) return stats except Exception as e: logger.error(f"Scheduled account sync failed: {e}") raise