castle/tasks.py
padreug 3add13075c Adds fiat currency metadata to payments
Adds fiat currency information to payment invoices and ledger entries.

This allows for tracking the fiat value of transactions and provides a more complete financial picture. Calculates the fiat amount proportionally based on the user's balance and includes the fiat currency, amount, and exchange rates in the invoice's extra data. This data is then extracted and added to the ledger entry's metadata when recording the payment.
2025-11-02 02:52:41 +01:00

230 lines
8.1 KiB
Python

"""
Background tasks for Castle accounting extension.
These tasks handle automated reconciliation checks and maintenance.
"""
import asyncio
from asyncio import Queue
from datetime import datetime
from typing import Optional
from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener
from loguru import logger
from .crud import check_balance_assertion, get_balance_assertions
from .models import AssertionStatus
async def check_all_balance_assertions() -> dict:
"""
Check all balance assertions and return results.
This can be called manually or scheduled to run daily.
Returns:
dict: Summary of check results
"""
from lnbits.helpers import urlsafe_short_hash
# Get all assertions
all_assertions = await get_balance_assertions(limit=1000)
results = {
"task_id": urlsafe_short_hash(),
"timestamp": datetime.now().isoformat(),
"total": len(all_assertions),
"checked": 0,
"passed": 0,
"failed": 0,
"errors": 0,
"failed_assertions": [],
}
for assertion in all_assertions:
try:
checked = await check_balance_assertion(assertion.id)
results["checked"] += 1
if checked.status == AssertionStatus.PASSED:
results["passed"] += 1
elif checked.status == AssertionStatus.FAILED:
results["failed"] += 1
results["failed_assertions"].append({
"id": assertion.id,
"account_id": assertion.account_id,
"expected_sats": assertion.expected_balance_sats,
"actual_sats": checked.checked_balance_sats,
"difference_sats": checked.difference_sats,
})
except Exception as e:
results["errors"] += 1
print(f"Error checking assertion {assertion.id}: {e}")
# Log results
if results["failed"] > 0:
print(f"[CASTLE] Daily reconciliation check: {results['failed']} FAILED assertions!")
for failed in results["failed_assertions"]:
print(f" - Account {failed['account_id']}: expected {failed['expected_sats']}, got {failed['actual_sats']}")
else:
print(f"[CASTLE] Daily reconciliation check: All {results['passed']} assertions passed ✓")
return results
async def scheduled_daily_reconciliation():
"""
Scheduled task that runs daily to check all balance assertions.
This function is meant to be called by a scheduler (cron, systemd timer, etc.)
or by LNbits background task system.
"""
print(f"[CASTLE] Running scheduled daily reconciliation check at {datetime.now()}")
try:
results = await check_all_balance_assertions()
# TODO: Send notifications if there are failures
# This could send email, webhook, or in-app notification
if results["failed"] > 0:
print(f"[CASTLE] WARNING: {results['failed']} balance assertions failed!")
# Future: Send alert notification
return results
except Exception as e:
print(f"[CASTLE] Error in scheduled reconciliation: {e}")
raise
def start_daily_reconciliation_task():
"""
Initialize the daily reconciliation task.
This can be called from the extension's __init__.py or configured
to run via external cron job.
For cron setup:
# Run daily at 2 AM
0 2 * * * curl -X POST http://localhost:5000/castle/api/v1/tasks/daily-reconciliation -H "X-Api-Key: YOUR_ADMIN_KEY"
"""
print("[CASTLE] Daily reconciliation task registered")
# In a production system, you would register this with LNbits task scheduler
# For now, it can be triggered manually via API endpoint
async def wait_for_paid_invoices():
"""
Background task that listens for paid invoices and automatically
records them in the accounting system.
This ensures payments are recorded even if the user closes their browser
before the payment is detected by client-side polling.
"""
invoice_queue = Queue()
register_invoice_listener(invoice_queue, "ext_castle")
while True:
payment = await invoice_queue.get()
await on_invoice_paid(payment)
async def on_invoice_paid(payment: Payment) -> None:
"""
Handle a paid Castle invoice by automatically creating a journal entry.
This function is called automatically when any invoice on the Castle wallet
is paid. It checks if the invoice is a Castle payment and records it in
the accounting system.
"""
# Only process Castle-specific payments
if not payment.extra or payment.extra.get("tag") != "castle":
return
user_id = payment.extra.get("user_id")
if not user_id:
logger.warning(f"Castle invoice {payment.payment_hash} missing user_id in metadata")
return
# Check if payment already recorded (idempotency)
from .crud import get_journal_entry_by_reference
existing = await get_journal_entry_by_reference(payment.payment_hash)
if existing:
logger.info(f"Payment {payment.payment_hash} already recorded, skipping")
return
logger.info(f"Recording Castle payment {payment.payment_hash} for user {user_id[:8]}")
try:
# Import here to avoid circular dependencies
from .crud import create_journal_entry, get_account_by_name, get_or_create_user_account
from .models import AccountType, CreateEntryLine, CreateJournalEntry, JournalEntryFlag
# Convert amount from millisatoshis to satoshis
amount_sats = payment.amount // 1000
# Extract fiat metadata from invoice (if present)
from decimal import Decimal
line_metadata = {}
if payment.extra:
fiat_currency = payment.extra.get("fiat_currency")
fiat_amount = payment.extra.get("fiat_amount")
fiat_rate = payment.extra.get("fiat_rate")
btc_rate = payment.extra.get("btc_rate")
if fiat_currency and fiat_amount:
line_metadata = {
"fiat_currency": fiat_currency,
"fiat_amount": str(fiat_amount),
"fiat_rate": fiat_rate,
"btc_rate": btc_rate,
}
# Get user's receivable account (what user owes)
user_receivable = await get_or_create_user_account(
user_id, AccountType.ASSET, "Accounts Receivable"
)
# Get lightning account
lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning")
if not lightning_account:
logger.error("Lightning account 'Assets:Bitcoin:Lightning' not found")
return
# Create journal entry to record payment
# DR Assets:Bitcoin:Lightning, CR Assets:Receivable (User)
# This reduces what the user owes
entry_meta = {
"source": "lightning_payment",
"created_via": "auto_invoice_listener",
"payment_hash": payment.payment_hash,
"payer_user_id": user_id,
}
entry_data = CreateJournalEntry(
description=f"Lightning payment from user {user_id[:8]}",
reference=payment.payment_hash,
flag=JournalEntryFlag.CLEARED,
meta=entry_meta,
lines=[
CreateEntryLine(
account_id=lightning_account.id,
debit=amount_sats,
credit=0,
description="Lightning payment received",
metadata=line_metadata,
),
CreateEntryLine(
account_id=user_receivable.id,
debit=0,
credit=amount_sats,
description="Payment applied to balance",
metadata=line_metadata,
),
],
)
entry = await create_journal_entry(entry_data, user_id)
logger.info(f"Successfully recorded journal entry {entry.id} for payment {payment.payment_hash}")
except Exception as e:
logger.error(f"Error recording Castle payment {payment.payment_hash}: {e}")
raise