Integrate account sync with API, background tasks, and user creation

Integration Components:
1. Manual API Endpoints (admin-only):
   - POST /api/v1/admin/accounts/sync (full sync)
   - POST /api/v1/admin/accounts/sync/{account_name} (single account)

2. Scheduled Background Sync:
   - Hourly background task (wait_for_account_sync)
   - Registered in castle_start() lifecycle
   - Automatically syncs new accounts from Beancount to Castle DB

3. Auto-sync on User Account Creation:
   - Updated get_or_create_user_account() in crud.py
   - Uses sync_single_account_from_beancount() for consistency
   - Ensures receivable/payable accounts are synced when users register

Flow:
- User associates wallet → creates receivable/payable in Beancount
  → syncs to Castle DB → permissions can be granted
- Admin manually syncs → all Beancount accounts added to Castle DB
- Hourly task → catches any accounts created directly in Beancount

This ensures Beancount remains the source of truth while Castle DB
maintains metadata for permissions and user associations.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
padreug 2025-11-11 01:28:59 +01:00
parent cbdd5f3779
commit 4a3922895e
4 changed files with 173 additions and 10 deletions

View file

@ -36,6 +36,7 @@ def castle_start():
from lnbits.tasks import create_permanent_unique_task from lnbits.tasks import create_permanent_unique_task
from .fava_client import init_fava_client from .fava_client import init_fava_client
from .models import CastleSettings from .models import CastleSettings
from .tasks import wait_for_account_sync
# Initialize Fava client with default settings # Initialize Fava client with default settings
# (Will be re-initialized if admin updates settings) # (Will be re-initialized if admin updates settings)
@ -55,5 +56,10 @@ def castle_start():
task = create_permanent_unique_task("ext_castle", wait_for_paid_invoices) task = create_permanent_unique_task("ext_castle", wait_for_paid_invoices)
scheduled_tasks.append(task) scheduled_tasks.append(task)
# Start account sync task (runs hourly)
sync_task = create_permanent_unique_task("ext_castle_account_sync", wait_for_account_sync)
scheduled_tasks.append(sync_task)
logger.info("Castle account sync task started (runs hourly)")
__all__ = ["castle_ext", "castle_static_files", "db", "castle_start", "castle_stop"] __all__ = ["castle_ext", "castle_static_files", "db", "castle_start", "castle_stop"]

44
crud.py
View file

@ -209,18 +209,42 @@ async def get_or_create_user_account(
logger.error(f"[FAVA ERROR] Could not check/create account in Fava: {e}", exc_info=True) logger.error(f"[FAVA ERROR] Could not check/create account in Fava: {e}", exc_info=True)
# Continue anyway - account creation in Castle DB is still useful for metadata # Continue anyway - account creation in Castle DB is still useful for metadata
# Create account in Castle DB for metadata tracking (only if it doesn't exist) # Ensure account exists in Castle DB (sync from Beancount if needed)
# This uses the account sync module for consistency
if not account: if not account:
logger.info(f"[CASTLE DB] Creating account in Castle DB: {account_name}") logger.info(f"[CASTLE DB] Syncing account from Beancount to Castle DB: {account_name}")
account = await create_account( from .account_sync import sync_single_account_from_beancount
CreateAccount(
name=account_name, # Sync from Beancount to Castle DB
account_type=account_type, created = await sync_single_account_from_beancount(account_name)
description=f"User-specific {account_type.value} account",
user_id=user_id, if created:
) logger.info(f"[CASTLE DB] Account synced from Beancount: {account_name}")
else:
logger.warning(f"[CASTLE DB] Failed to sync account from Beancount: {account_name}")
# Fetch the account from Castle DB
account = await db.fetchone(
"""
SELECT * FROM accounts
WHERE user_id = :user_id AND account_type = :type AND name = :name
""",
{"user_id": user_id, "type": account_type.value, "name": account_name},
Account,
) )
logger.info(f"[CASTLE DB] Created account in Castle DB: {account_name}")
if not account:
logger.error(f"[CASTLE DB] Account still not found after sync: {account_name}")
# Fallback: create directly in Castle DB if sync failed
logger.info(f"[CASTLE DB] Creating account directly in Castle DB: {account_name}")
account = await create_account(
CreateAccount(
name=account_name,
account_type=account_type,
description=f"User-specific {account_type.value} account",
user_id=user_id,
)
)
else: else:
logger.info(f"[CASTLE DB] Account already exists in Castle DB: {account_name}") logger.info(f"[CASTLE DB] Account already exists in Castle DB: {account_name}")

View file

@ -95,6 +95,59 @@ async def scheduled_daily_reconciliation():
raise raise
async def scheduled_account_sync():
"""
Scheduled task that runs hourly to sync accounts from Beancount to Castle DB.
This ensures Castle DB stays in sync with Beancount (source of truth) by
automatically adding any new accounts created in Beancount to Castle's
metadata database for permission tracking.
"""
from .account_sync import sync_accounts_from_beancount
logger.info(f"[CASTLE] Running scheduled account sync at {datetime.now()}")
try:
stats = await sync_accounts_from_beancount(force_full_sync=False)
if stats["accounts_added"] > 0:
logger.info(
f"[CASTLE] Account sync: Added {stats['accounts_added']} new accounts"
)
if stats["errors"]:
logger.warning(
f"[CASTLE] Account sync: {len(stats['errors'])} errors encountered"
)
for error in stats["errors"][:5]: # Log first 5 errors
logger.error(f" - {error}")
return stats
except Exception as e:
logger.error(f"[CASTLE] Error in scheduled account sync: {e}")
raise
async def wait_for_account_sync():
"""
Background task that periodically syncs accounts from Beancount to Castle DB.
Runs hourly to ensure Castle DB stays in sync with Beancount.
"""
logger.info("[CASTLE] Account sync background task started")
while True:
try:
# Run sync
await scheduled_account_sync()
except Exception as e:
logger.error(f"[CASTLE] Account sync error: {e}")
# Wait 1 hour before next sync
await asyncio.sleep(3600) # 3600 seconds = 1 hour
def start_daily_reconciliation_task(): def start_daily_reconciliation_task():
""" """
Initialize the daily reconciliation task. Initialize the daily reconciliation task.

View file

@ -3055,3 +3055,83 @@ async def api_get_account_hierarchy(
accounts_with_hierarchy.sort(key=lambda a: a.name) accounts_with_hierarchy.sort(key=lambda a: a.name)
return accounts_with_hierarchy return accounts_with_hierarchy
# ===== ACCOUNT SYNC ENDPOINTS =====
@castle_api_router.post("/api/v1/admin/accounts/sync")
async def api_sync_all_accounts(
force_full_sync: bool = False,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""
Sync all accounts from Beancount to Castle DB (admin only).
This ensures Castle DB has metadata entries for all accounts that exist
in Beancount, enabling permissions and user associations to work properly.
Args:
force_full_sync: If True, re-check all accounts. If False, only add new ones.
Returns:
Sync statistics: {total_beancount_accounts, accounts_added, accounts_skipped, errors}
"""
from .account_sync import sync_accounts_from_beancount
logger.info(f"Admin {wallet.wallet.user[:8]} triggered account sync (force={force_full_sync})")
try:
stats = await sync_accounts_from_beancount(force_full_sync=force_full_sync)
logger.info(f"Account sync complete: {stats['accounts_added']} added, {stats['accounts_skipped']} skipped")
return stats
except Exception as e:
logger.error(f"Account sync failed: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Account sync failed: {str(e)}"
)
@castle_api_router.post("/api/v1/admin/accounts/sync/{account_name:path}")
async def api_sync_single_account(
account_name: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> dict:
"""
Sync a single account from Beancount to Castle DB (admin only).
Useful for ensuring a specific account exists in Castle DB before
granting permissions on it.
Args:
account_name: Hierarchical account name (e.g., "Expenses:Food:Groceries")
Returns:
{success: bool, account_name: str, message: str}
"""
from .account_sync import sync_single_account_from_beancount
logger.info(f"Admin {wallet.wallet.user[:8]} triggered sync for account: {account_name}")
try:
created = await sync_single_account_from_beancount(account_name)
if created:
return {
"success": True,
"account_name": account_name,
"message": f"Account '{account_name}' synced successfully"
}
else:
return {
"success": False,
"account_name": account_name,
"message": f"Account '{account_name}' already exists or not found in Beancount"
}
except Exception as e:
logger.error(f"Single account sync failed for {account_name}: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Account sync failed: {str(e)}"
)