From 4a3922895eda7edda057cc4d5af858ab8c07a78c Mon Sep 17 00:00:00 2001 From: padreug Date: Tue, 11 Nov 2025 01:28:59 +0100 Subject: [PATCH] Integrate account sync with API, background tasks, and user creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Integration Components: 1. Manual API Endpoints (admin-only): - POST /api/v1/admin/accounts/sync (full sync) - POST /api/v1/admin/accounts/sync/{account_name} (single account) 2. Scheduled Background Sync: - Hourly background task (wait_for_account_sync) - Registered in castle_start() lifecycle - Automatically syncs new accounts from Beancount to Castle DB 3. Auto-sync on User Account Creation: - Updated get_or_create_user_account() in crud.py - Uses sync_single_account_from_beancount() for consistency - Ensures receivable/payable accounts are synced when users register Flow: - User associates wallet → creates receivable/payable in Beancount → syncs to Castle DB → permissions can be granted - Admin manually syncs → all Beancount accounts added to Castle DB - Hourly task → catches any accounts created directly in Beancount This ensures Beancount remains the source of truth while Castle DB maintains metadata for permissions and user associations. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- __init__.py | 6 ++++ crud.py | 44 ++++++++++++++++++++++------- tasks.py | 53 ++++++++++++++++++++++++++++++++++ views_api.py | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 173 insertions(+), 10 deletions(-) diff --git a/__init__.py b/__init__.py index 1a68806..6209e9d 100644 --- a/__init__.py +++ b/__init__.py @@ -36,6 +36,7 @@ def castle_start(): from lnbits.tasks import create_permanent_unique_task from .fava_client import init_fava_client from .models import CastleSettings + from .tasks import wait_for_account_sync # Initialize Fava client with default settings # (Will be re-initialized if admin updates settings) @@ -55,5 +56,10 @@ def castle_start(): task = create_permanent_unique_task("ext_castle", wait_for_paid_invoices) scheduled_tasks.append(task) + # Start account sync task (runs hourly) + sync_task = create_permanent_unique_task("ext_castle_account_sync", wait_for_account_sync) + scheduled_tasks.append(sync_task) + logger.info("Castle account sync task started (runs hourly)") + __all__ = ["castle_ext", "castle_static_files", "db", "castle_start", "castle_stop"] diff --git a/crud.py b/crud.py index 1db4814..c4b418f 100644 --- a/crud.py +++ b/crud.py @@ -209,18 +209,42 @@ async def get_or_create_user_account( logger.error(f"[FAVA ERROR] Could not check/create account in Fava: {e}", exc_info=True) # Continue anyway - account creation in Castle DB is still useful for metadata - # Create account in Castle DB for metadata tracking (only if it doesn't exist) + # Ensure account exists in Castle DB (sync from Beancount if needed) + # This uses the account sync module for consistency if not account: - logger.info(f"[CASTLE DB] Creating account in Castle DB: {account_name}") - account = await create_account( - CreateAccount( - name=account_name, - account_type=account_type, - description=f"User-specific {account_type.value} account", - user_id=user_id, - ) + logger.info(f"[CASTLE DB] Syncing account from Beancount to Castle DB: {account_name}") + from .account_sync import sync_single_account_from_beancount + + # Sync from Beancount to Castle DB + created = await sync_single_account_from_beancount(account_name) + + if created: + logger.info(f"[CASTLE DB] Account synced from Beancount: {account_name}") + else: + logger.warning(f"[CASTLE DB] Failed to sync account from Beancount: {account_name}") + + # Fetch the account from Castle DB + account = await db.fetchone( + """ + SELECT * FROM accounts + WHERE user_id = :user_id AND account_type = :type AND name = :name + """, + {"user_id": user_id, "type": account_type.value, "name": account_name}, + Account, ) - logger.info(f"[CASTLE DB] Created account in Castle DB: {account_name}") + + if not account: + logger.error(f"[CASTLE DB] Account still not found after sync: {account_name}") + # Fallback: create directly in Castle DB if sync failed + logger.info(f"[CASTLE DB] Creating account directly in Castle DB: {account_name}") + account = await create_account( + CreateAccount( + name=account_name, + account_type=account_type, + description=f"User-specific {account_type.value} account", + user_id=user_id, + ) + ) else: logger.info(f"[CASTLE DB] Account already exists in Castle DB: {account_name}") diff --git a/tasks.py b/tasks.py index 6331a16..1a8327d 100644 --- a/tasks.py +++ b/tasks.py @@ -95,6 +95,59 @@ async def scheduled_daily_reconciliation(): raise +async def scheduled_account_sync(): + """ + Scheduled task that runs hourly to sync accounts from Beancount to Castle DB. + + This ensures Castle DB stays in sync with Beancount (source of truth) by + automatically adding any new accounts created in Beancount to Castle's + metadata database for permission tracking. + """ + from .account_sync import sync_accounts_from_beancount + + logger.info(f"[CASTLE] Running scheduled account sync at {datetime.now()}") + + try: + stats = await sync_accounts_from_beancount(force_full_sync=False) + + if stats["accounts_added"] > 0: + logger.info( + f"[CASTLE] Account sync: Added {stats['accounts_added']} new accounts" + ) + + if stats["errors"]: + logger.warning( + f"[CASTLE] Account sync: {len(stats['errors'])} errors encountered" + ) + for error in stats["errors"][:5]: # Log first 5 errors + logger.error(f" - {error}") + + return stats + + except Exception as e: + logger.error(f"[CASTLE] Error in scheduled account sync: {e}") + raise + + +async def wait_for_account_sync(): + """ + Background task that periodically syncs accounts from Beancount to Castle DB. + + Runs hourly to ensure Castle DB stays in sync with Beancount. + """ + logger.info("[CASTLE] Account sync background task started") + + while True: + try: + # Run sync + await scheduled_account_sync() + except Exception as e: + logger.error(f"[CASTLE] Account sync error: {e}") + + # Wait 1 hour before next sync + await asyncio.sleep(3600) # 3600 seconds = 1 hour + + def start_daily_reconciliation_task(): """ Initialize the daily reconciliation task. diff --git a/views_api.py b/views_api.py index 00f15bb..98e8f77 100644 --- a/views_api.py +++ b/views_api.py @@ -3055,3 +3055,83 @@ async def api_get_account_hierarchy( accounts_with_hierarchy.sort(key=lambda a: a.name) return accounts_with_hierarchy + + +# ===== ACCOUNT SYNC ENDPOINTS ===== + + +@castle_api_router.post("/api/v1/admin/accounts/sync") +async def api_sync_all_accounts( + force_full_sync: bool = False, + wallet: WalletTypeInfo = Depends(require_admin_key), +) -> dict: + """ + Sync all accounts from Beancount to Castle DB (admin only). + + This ensures Castle DB has metadata entries for all accounts that exist + in Beancount, enabling permissions and user associations to work properly. + + Args: + force_full_sync: If True, re-check all accounts. If False, only add new ones. + + Returns: + Sync statistics: {total_beancount_accounts, accounts_added, accounts_skipped, errors} + """ + from .account_sync import sync_accounts_from_beancount + + logger.info(f"Admin {wallet.wallet.user[:8]} triggered account sync (force={force_full_sync})") + + try: + stats = await sync_accounts_from_beancount(force_full_sync=force_full_sync) + logger.info(f"Account sync complete: {stats['accounts_added']} added, {stats['accounts_skipped']} skipped") + return stats + except Exception as e: + logger.error(f"Account sync failed: {e}") + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=f"Account sync failed: {str(e)}" + ) + + +@castle_api_router.post("/api/v1/admin/accounts/sync/{account_name:path}") +async def api_sync_single_account( + account_name: str, + wallet: WalletTypeInfo = Depends(require_admin_key), +) -> dict: + """ + Sync a single account from Beancount to Castle DB (admin only). + + Useful for ensuring a specific account exists in Castle DB before + granting permissions on it. + + Args: + account_name: Hierarchical account name (e.g., "Expenses:Food:Groceries") + + Returns: + {success: bool, account_name: str, message: str} + """ + from .account_sync import sync_single_account_from_beancount + + logger.info(f"Admin {wallet.wallet.user[:8]} triggered sync for account: {account_name}") + + try: + created = await sync_single_account_from_beancount(account_name) + + if created: + return { + "success": True, + "account_name": account_name, + "message": f"Account '{account_name}' synced successfully" + } + else: + return { + "success": False, + "account_name": account_name, + "message": f"Account '{account_name}' already exists or not found in Beancount" + } + except Exception as e: + logger.error(f"Single account sync failed for {account_name}: {e}") + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=f"Account sync failed: {str(e)}" + )