castle/tasks.py
padreug 4a3922895e Integrate account sync with API, background tasks, and user creation
Integration Components:
1. Manual API Endpoints (admin-only):
   - POST /api/v1/admin/accounts/sync (full sync)
   - POST /api/v1/admin/accounts/sync/{account_name} (single account)

2. Scheduled Background Sync:
   - Hourly background task (wait_for_account_sync)
   - Registered in castle_start() lifecycle
   - Automatically syncs new accounts from Beancount to Castle DB

3. Auto-sync on User Account Creation:
   - Updated get_or_create_user_account() in crud.py
   - Uses sync_single_account_from_beancount() for consistency
   - Ensures receivable/payable accounts are synced when users register

Flow:
- User associates wallet → creates receivable/payable in Beancount
  → syncs to Castle DB → permissions can be granted
- Admin manually syncs → all Beancount accounts added to Castle DB
- Hourly task → catches any accounts created directly in Beancount

This ensures Beancount remains the source of truth while Castle DB
maintains metadata for permissions and user associations.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-11 01:28:59 +01:00

317 lines
11 KiB
Python

"""
Background tasks for Castle accounting extension.
These tasks handle automated reconciliation checks and maintenance.
"""
import asyncio
from asyncio import Queue
from datetime import datetime
from typing import Optional
from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener
from loguru import logger
from .crud import check_balance_assertion, get_balance_assertions
from .models import AssertionStatus
async def check_all_balance_assertions() -> dict:
"""
Check all balance assertions and return results.
This can be called manually or scheduled to run daily.
Returns:
dict: Summary of check results
"""
from lnbits.helpers import urlsafe_short_hash
# Get all assertions
all_assertions = await get_balance_assertions(limit=1000)
results = {
"task_id": urlsafe_short_hash(),
"timestamp": datetime.now().isoformat(),
"total": len(all_assertions),
"checked": 0,
"passed": 0,
"failed": 0,
"errors": 0,
"failed_assertions": [],
}
for assertion in all_assertions:
try:
checked = await check_balance_assertion(assertion.id)
results["checked"] += 1
if checked.status == AssertionStatus.PASSED:
results["passed"] += 1
elif checked.status == AssertionStatus.FAILED:
results["failed"] += 1
results["failed_assertions"].append({
"id": assertion.id,
"account_id": assertion.account_id,
"expected_sats": assertion.expected_balance_sats,
"actual_sats": checked.checked_balance_sats,
"difference_sats": checked.difference_sats,
})
except Exception as e:
results["errors"] += 1
print(f"Error checking assertion {assertion.id}: {e}")
# Log results
if results["failed"] > 0:
print(f"[CASTLE] Daily reconciliation check: {results['failed']} FAILED assertions!")
for failed in results["failed_assertions"]:
print(f" - Account {failed['account_id']}: expected {failed['expected_sats']}, got {failed['actual_sats']}")
else:
print(f"[CASTLE] Daily reconciliation check: All {results['passed']} assertions passed ✓")
return results
async def scheduled_daily_reconciliation():
"""
Scheduled task that runs daily to check all balance assertions.
This function is meant to be called by a scheduler (cron, systemd timer, etc.)
or by LNbits background task system.
"""
print(f"[CASTLE] Running scheduled daily reconciliation check at {datetime.now()}")
try:
results = await check_all_balance_assertions()
# TODO: Send notifications if there are failures
# This could send email, webhook, or in-app notification
if results["failed"] > 0:
print(f"[CASTLE] WARNING: {results['failed']} balance assertions failed!")
# Future: Send alert notification
return results
except Exception as e:
print(f"[CASTLE] Error in scheduled reconciliation: {e}")
raise
async def scheduled_account_sync():
"""
Scheduled task that runs hourly to sync accounts from Beancount to Castle DB.
This ensures Castle DB stays in sync with Beancount (source of truth) by
automatically adding any new accounts created in Beancount to Castle's
metadata database for permission tracking.
"""
from .account_sync import sync_accounts_from_beancount
logger.info(f"[CASTLE] Running scheduled account sync at {datetime.now()}")
try:
stats = await sync_accounts_from_beancount(force_full_sync=False)
if stats["accounts_added"] > 0:
logger.info(
f"[CASTLE] Account sync: Added {stats['accounts_added']} new accounts"
)
if stats["errors"]:
logger.warning(
f"[CASTLE] Account sync: {len(stats['errors'])} errors encountered"
)
for error in stats["errors"][:5]: # Log first 5 errors
logger.error(f" - {error}")
return stats
except Exception as e:
logger.error(f"[CASTLE] Error in scheduled account sync: {e}")
raise
async def wait_for_account_sync():
"""
Background task that periodically syncs accounts from Beancount to Castle DB.
Runs hourly to ensure Castle DB stays in sync with Beancount.
"""
logger.info("[CASTLE] Account sync background task started")
while True:
try:
# Run sync
await scheduled_account_sync()
except Exception as e:
logger.error(f"[CASTLE] Account sync error: {e}")
# Wait 1 hour before next sync
await asyncio.sleep(3600) # 3600 seconds = 1 hour
def start_daily_reconciliation_task():
"""
Initialize the daily reconciliation task.
This can be called from the extension's __init__.py or configured
to run via external cron job.
For cron setup:
# Run daily at 2 AM
0 2 * * * curl -X POST http://localhost:5000/castle/api/v1/tasks/daily-reconciliation -H "X-Api-Key: YOUR_ADMIN_KEY"
"""
print("[CASTLE] Daily reconciliation task registered")
# In a production system, you would register this with LNbits task scheduler
# For now, it can be triggered manually via API endpoint
async def wait_for_paid_invoices():
"""
Background task that listens for paid invoices and automatically
records them in the accounting system.
This ensures payments are recorded even if the user closes their browser
before the payment is detected by client-side polling.
"""
invoice_queue = Queue()
register_invoice_listener(invoice_queue, "ext_castle")
while True:
payment = await invoice_queue.get()
await on_invoice_paid(payment)
async def on_invoice_paid(payment: Payment) -> None:
"""
Handle a paid Castle invoice by automatically submitting to Fava.
This function is called automatically when any invoice on the Castle wallet
is paid. It checks if the invoice is a Castle payment and records it in
Beancount via Fava.
"""
# Only process Castle-specific payments
if not payment.extra or payment.extra.get("tag") != "castle":
return
user_id = payment.extra.get("user_id")
if not user_id:
logger.warning(f"Castle invoice {payment.payment_hash} missing user_id in metadata")
return
# Check if payment already recorded (idempotency)
# Query Fava for existing entry with this payment hash link
from .fava_client import get_fava_client
import httpx
fava = get_fava_client()
try:
# Check if payment already recorded by fetching recent entries
# Note: We can't use BQL query with `links ~ 'pattern'` because links is a set type
# and BQL doesn't support regex matching on sets. Instead, fetch entries and filter in Python.
link_to_find = f"ln-{payment.payment_hash[:16]}"
async with httpx.AsyncClient(timeout=5.0) as client:
# Get recent entries from Fava's journal endpoint
response = await client.get(
f"{fava.base_url}/api/journal",
params={"time": ""} # Get all entries
)
if response.status_code == 200:
data = response.json()
entries = data.get('entries', [])
# Check if any entry has our payment link
for entry in entries:
entry_links = entry.get('links', [])
if link_to_find in entry_links:
logger.info(f"Payment {payment.payment_hash} already recorded in Fava, skipping")
return
except Exception as e:
logger.warning(f"Could not check Fava for duplicate payment: {e}")
# Continue anyway - Fava/Beancount will catch duplicate if it exists
logger.info(f"Recording Castle payment {payment.payment_hash} for user {user_id[:8]} to Fava")
try:
from decimal import Decimal
from .crud import get_account_by_name, get_or_create_user_account
from .models import AccountType
from .beancount_format import format_net_settlement_entry
# Convert amount from millisatoshis to satoshis
amount_sats = payment.amount // 1000
# Extract fiat metadata from invoice (if present)
fiat_currency = None
fiat_amount = None
if payment.extra:
fiat_currency = payment.extra.get("fiat_currency")
fiat_amount_str = payment.extra.get("fiat_amount")
if fiat_amount_str:
fiat_amount = Decimal(str(fiat_amount_str))
if not fiat_currency or not fiat_amount:
logger.error(f"Payment {payment.payment_hash} missing fiat currency/amount metadata")
return
# Get user's current balance to determine receivables and payables
balance = await fava.get_user_balance(user_id)
fiat_balances = balance.get("fiat_balances", {})
total_fiat_balance = fiat_balances.get(fiat_currency, Decimal(0))
# Determine receivables and payables based on balance
# Positive balance = user owes castle (receivable)
# Negative balance = castle owes user (payable)
if total_fiat_balance > 0:
# User owes castle
total_receivable = total_fiat_balance
total_payable = Decimal(0)
else:
# Castle owes user
total_receivable = Decimal(0)
total_payable = abs(total_fiat_balance)
logger.info(f"Settlement: {fiat_amount} {fiat_currency} (Receivable: {total_receivable}, Payable: {total_payable})")
# Get account names
user_receivable = await get_or_create_user_account(
user_id, AccountType.ASSET, "Accounts Receivable"
)
user_payable = await get_or_create_user_account(
user_id, AccountType.LIABILITY, "Accounts Payable"
)
lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning")
if not lightning_account:
logger.error("Lightning account 'Assets:Bitcoin:Lightning' not found")
return
# Format as net settlement transaction
entry = format_net_settlement_entry(
user_id=user_id,
payment_account=lightning_account.name,
receivable_account=user_receivable.name,
payable_account=user_payable.name,
amount_sats=amount_sats,
net_fiat_amount=fiat_amount,
total_receivable_fiat=total_receivable,
total_payable_fiat=total_payable,
fiat_currency=fiat_currency,
description=f"Lightning payment settlement from user {user_id[:8]}",
entry_date=datetime.now().date(),
payment_hash=payment.payment_hash,
reference=payment.payment_hash
)
# Submit to Fava
result = await fava.add_entry(entry)
logger.info(
f"Successfully recorded payment {payment.payment_hash} to Fava: "
f"{result.get('data', 'Unknown')}"
)
except Exception as e:
logger.error(f"Error recording Castle payment {payment.payment_hash}: {e}")
raise