Integration Components:
1. Manual API Endpoints (admin-only):
- POST /api/v1/admin/accounts/sync (full sync)
- POST /api/v1/admin/accounts/sync/{account_name} (single account)
2. Scheduled Background Sync:
- Hourly background task (wait_for_account_sync)
- Registered in castle_start() lifecycle
- Automatically syncs new accounts from Beancount to Castle DB
3. Auto-sync on User Account Creation:
- Updated get_or_create_user_account() in crud.py
- Uses sync_single_account_from_beancount() for consistency
- Ensures receivable/payable accounts are synced when users register
Flow:
- User associates wallet → creates receivable/payable in Beancount
→ syncs to Castle DB → permissions can be granted
- Admin manually syncs → all Beancount accounts added to Castle DB
- Hourly task → catches any accounts created directly in Beancount
This ensures Beancount remains the source of truth while Castle DB
maintains metadata for permissions and user associations.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
317 lines
11 KiB
Python
317 lines
11 KiB
Python
"""
|
|
Background tasks for Castle accounting extension.
|
|
These tasks handle automated reconciliation checks and maintenance.
|
|
"""
|
|
|
|
import asyncio
|
|
from asyncio import Queue
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from lnbits.core.models import Payment
|
|
from lnbits.tasks import register_invoice_listener
|
|
from loguru import logger
|
|
|
|
from .crud import check_balance_assertion, get_balance_assertions
|
|
from .models import AssertionStatus
|
|
|
|
|
|
async def check_all_balance_assertions() -> dict:
|
|
"""
|
|
Check all balance assertions and return results.
|
|
This can be called manually or scheduled to run daily.
|
|
|
|
Returns:
|
|
dict: Summary of check results
|
|
"""
|
|
from lnbits.helpers import urlsafe_short_hash
|
|
|
|
# Get all assertions
|
|
all_assertions = await get_balance_assertions(limit=1000)
|
|
|
|
results = {
|
|
"task_id": urlsafe_short_hash(),
|
|
"timestamp": datetime.now().isoformat(),
|
|
"total": len(all_assertions),
|
|
"checked": 0,
|
|
"passed": 0,
|
|
"failed": 0,
|
|
"errors": 0,
|
|
"failed_assertions": [],
|
|
}
|
|
|
|
for assertion in all_assertions:
|
|
try:
|
|
checked = await check_balance_assertion(assertion.id)
|
|
results["checked"] += 1
|
|
|
|
if checked.status == AssertionStatus.PASSED:
|
|
results["passed"] += 1
|
|
elif checked.status == AssertionStatus.FAILED:
|
|
results["failed"] += 1
|
|
results["failed_assertions"].append({
|
|
"id": assertion.id,
|
|
"account_id": assertion.account_id,
|
|
"expected_sats": assertion.expected_balance_sats,
|
|
"actual_sats": checked.checked_balance_sats,
|
|
"difference_sats": checked.difference_sats,
|
|
})
|
|
except Exception as e:
|
|
results["errors"] += 1
|
|
print(f"Error checking assertion {assertion.id}: {e}")
|
|
|
|
# Log results
|
|
if results["failed"] > 0:
|
|
print(f"[CASTLE] Daily reconciliation check: {results['failed']} FAILED assertions!")
|
|
for failed in results["failed_assertions"]:
|
|
print(f" - Account {failed['account_id']}: expected {failed['expected_sats']}, got {failed['actual_sats']}")
|
|
else:
|
|
print(f"[CASTLE] Daily reconciliation check: All {results['passed']} assertions passed ✓")
|
|
|
|
return results
|
|
|
|
|
|
async def scheduled_daily_reconciliation():
|
|
"""
|
|
Scheduled task that runs daily to check all balance assertions.
|
|
|
|
This function is meant to be called by a scheduler (cron, systemd timer, etc.)
|
|
or by LNbits background task system.
|
|
"""
|
|
print(f"[CASTLE] Running scheduled daily reconciliation check at {datetime.now()}")
|
|
|
|
try:
|
|
results = await check_all_balance_assertions()
|
|
|
|
# TODO: Send notifications if there are failures
|
|
# This could send email, webhook, or in-app notification
|
|
if results["failed"] > 0:
|
|
print(f"[CASTLE] WARNING: {results['failed']} balance assertions failed!")
|
|
# Future: Send alert notification
|
|
|
|
return results
|
|
except Exception as e:
|
|
print(f"[CASTLE] Error in scheduled reconciliation: {e}")
|
|
raise
|
|
|
|
|
|
async def scheduled_account_sync():
|
|
"""
|
|
Scheduled task that runs hourly to sync accounts from Beancount to Castle DB.
|
|
|
|
This ensures Castle DB stays in sync with Beancount (source of truth) by
|
|
automatically adding any new accounts created in Beancount to Castle's
|
|
metadata database for permission tracking.
|
|
"""
|
|
from .account_sync import sync_accounts_from_beancount
|
|
|
|
logger.info(f"[CASTLE] Running scheduled account sync at {datetime.now()}")
|
|
|
|
try:
|
|
stats = await sync_accounts_from_beancount(force_full_sync=False)
|
|
|
|
if stats["accounts_added"] > 0:
|
|
logger.info(
|
|
f"[CASTLE] Account sync: Added {stats['accounts_added']} new accounts"
|
|
)
|
|
|
|
if stats["errors"]:
|
|
logger.warning(
|
|
f"[CASTLE] Account sync: {len(stats['errors'])} errors encountered"
|
|
)
|
|
for error in stats["errors"][:5]: # Log first 5 errors
|
|
logger.error(f" - {error}")
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
logger.error(f"[CASTLE] Error in scheduled account sync: {e}")
|
|
raise
|
|
|
|
|
|
async def wait_for_account_sync():
|
|
"""
|
|
Background task that periodically syncs accounts from Beancount to Castle DB.
|
|
|
|
Runs hourly to ensure Castle DB stays in sync with Beancount.
|
|
"""
|
|
logger.info("[CASTLE] Account sync background task started")
|
|
|
|
while True:
|
|
try:
|
|
# Run sync
|
|
await scheduled_account_sync()
|
|
except Exception as e:
|
|
logger.error(f"[CASTLE] Account sync error: {e}")
|
|
|
|
# Wait 1 hour before next sync
|
|
await asyncio.sleep(3600) # 3600 seconds = 1 hour
|
|
|
|
|
|
def start_daily_reconciliation_task():
|
|
"""
|
|
Initialize the daily reconciliation task.
|
|
|
|
This can be called from the extension's __init__.py or configured
|
|
to run via external cron job.
|
|
|
|
For cron setup:
|
|
# Run daily at 2 AM
|
|
0 2 * * * curl -X POST http://localhost:5000/castle/api/v1/tasks/daily-reconciliation -H "X-Api-Key: YOUR_ADMIN_KEY"
|
|
"""
|
|
print("[CASTLE] Daily reconciliation task registered")
|
|
# In a production system, you would register this with LNbits task scheduler
|
|
# For now, it can be triggered manually via API endpoint
|
|
|
|
|
|
async def wait_for_paid_invoices():
|
|
"""
|
|
Background task that listens for paid invoices and automatically
|
|
records them in the accounting system.
|
|
|
|
This ensures payments are recorded even if the user closes their browser
|
|
before the payment is detected by client-side polling.
|
|
"""
|
|
invoice_queue = Queue()
|
|
register_invoice_listener(invoice_queue, "ext_castle")
|
|
|
|
while True:
|
|
payment = await invoice_queue.get()
|
|
await on_invoice_paid(payment)
|
|
|
|
|
|
async def on_invoice_paid(payment: Payment) -> None:
|
|
"""
|
|
Handle a paid Castle invoice by automatically submitting to Fava.
|
|
|
|
This function is called automatically when any invoice on the Castle wallet
|
|
is paid. It checks if the invoice is a Castle payment and records it in
|
|
Beancount via Fava.
|
|
"""
|
|
# Only process Castle-specific payments
|
|
if not payment.extra or payment.extra.get("tag") != "castle":
|
|
return
|
|
|
|
user_id = payment.extra.get("user_id")
|
|
if not user_id:
|
|
logger.warning(f"Castle invoice {payment.payment_hash} missing user_id in metadata")
|
|
return
|
|
|
|
# Check if payment already recorded (idempotency)
|
|
# Query Fava for existing entry with this payment hash link
|
|
from .fava_client import get_fava_client
|
|
import httpx
|
|
|
|
fava = get_fava_client()
|
|
|
|
try:
|
|
# Check if payment already recorded by fetching recent entries
|
|
# Note: We can't use BQL query with `links ~ 'pattern'` because links is a set type
|
|
# and BQL doesn't support regex matching on sets. Instead, fetch entries and filter in Python.
|
|
link_to_find = f"ln-{payment.payment_hash[:16]}"
|
|
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
# Get recent entries from Fava's journal endpoint
|
|
response = await client.get(
|
|
f"{fava.base_url}/api/journal",
|
|
params={"time": ""} # Get all entries
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
entries = data.get('entries', [])
|
|
|
|
# Check if any entry has our payment link
|
|
for entry in entries:
|
|
entry_links = entry.get('links', [])
|
|
if link_to_find in entry_links:
|
|
logger.info(f"Payment {payment.payment_hash} already recorded in Fava, skipping")
|
|
return
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not check Fava for duplicate payment: {e}")
|
|
# Continue anyway - Fava/Beancount will catch duplicate if it exists
|
|
|
|
logger.info(f"Recording Castle payment {payment.payment_hash} for user {user_id[:8]} to Fava")
|
|
|
|
try:
|
|
from decimal import Decimal
|
|
from .crud import get_account_by_name, get_or_create_user_account
|
|
from .models import AccountType
|
|
from .beancount_format import format_net_settlement_entry
|
|
|
|
# Convert amount from millisatoshis to satoshis
|
|
amount_sats = payment.amount // 1000
|
|
|
|
# Extract fiat metadata from invoice (if present)
|
|
fiat_currency = None
|
|
fiat_amount = None
|
|
if payment.extra:
|
|
fiat_currency = payment.extra.get("fiat_currency")
|
|
fiat_amount_str = payment.extra.get("fiat_amount")
|
|
if fiat_amount_str:
|
|
fiat_amount = Decimal(str(fiat_amount_str))
|
|
|
|
if not fiat_currency or not fiat_amount:
|
|
logger.error(f"Payment {payment.payment_hash} missing fiat currency/amount metadata")
|
|
return
|
|
|
|
# Get user's current balance to determine receivables and payables
|
|
balance = await fava.get_user_balance(user_id)
|
|
fiat_balances = balance.get("fiat_balances", {})
|
|
total_fiat_balance = fiat_balances.get(fiat_currency, Decimal(0))
|
|
|
|
# Determine receivables and payables based on balance
|
|
# Positive balance = user owes castle (receivable)
|
|
# Negative balance = castle owes user (payable)
|
|
if total_fiat_balance > 0:
|
|
# User owes castle
|
|
total_receivable = total_fiat_balance
|
|
total_payable = Decimal(0)
|
|
else:
|
|
# Castle owes user
|
|
total_receivable = Decimal(0)
|
|
total_payable = abs(total_fiat_balance)
|
|
|
|
logger.info(f"Settlement: {fiat_amount} {fiat_currency} (Receivable: {total_receivable}, Payable: {total_payable})")
|
|
|
|
# Get account names
|
|
user_receivable = await get_or_create_user_account(
|
|
user_id, AccountType.ASSET, "Accounts Receivable"
|
|
)
|
|
user_payable = await get_or_create_user_account(
|
|
user_id, AccountType.LIABILITY, "Accounts Payable"
|
|
)
|
|
lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning")
|
|
if not lightning_account:
|
|
logger.error("Lightning account 'Assets:Bitcoin:Lightning' not found")
|
|
return
|
|
|
|
# Format as net settlement transaction
|
|
entry = format_net_settlement_entry(
|
|
user_id=user_id,
|
|
payment_account=lightning_account.name,
|
|
receivable_account=user_receivable.name,
|
|
payable_account=user_payable.name,
|
|
amount_sats=amount_sats,
|
|
net_fiat_amount=fiat_amount,
|
|
total_receivable_fiat=total_receivable,
|
|
total_payable_fiat=total_payable,
|
|
fiat_currency=fiat_currency,
|
|
description=f"Lightning payment settlement from user {user_id[:8]}",
|
|
entry_date=datetime.now().date(),
|
|
payment_hash=payment.payment_hash,
|
|
reference=payment.payment_hash
|
|
)
|
|
|
|
# Submit to Fava
|
|
result = await fava.add_entry(entry)
|
|
|
|
logger.info(
|
|
f"Successfully recorded payment {payment.payment_hash} to Fava: "
|
|
f"{result.get('data', 'Unknown')}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error recording Castle payment {payment.payment_hash}: {e}")
|
|
raise
|