castle/helper/import_beancount.py
padreug 4b327a0aab Extends account lookup for user accounts
Implements account lookup logic for user-specific accounts,
specifically Liabilities:Payable and Assets:Receivable.

This allows the system to automatically map Beancount accounts
to corresponding accounts in the Castle system based on user ID.

Improves error messages when user accounts are not properly configured.
2025-11-08 23:51:07 +01:00

586 lines
21 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Beancount to Castle Import Script
Imports Beancount ledger transactions into Castle accounting extension.
Reads daily BTC/EUR rates from btc_eur_rates.csv in the same directory.
Usage:
python import_beancount.py <ledger.beancount> [--dry-run]
Example:
python import_beancount.py my_ledger.beancount --dry-run
python import_beancount.py my_ledger.beancount
"""
import requests
import csv
import os
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Dict, Optional
# ===== CONFIGURATION =====
# LNbits URL and API Key
LNBITS_URL = os.environ.get("LNBITS_URL", "http://localhost:5000")
ADMIN_API_KEY = os.environ.get("CASTLE_ADMIN_KEY", "48d787d862484a6c89d6a557b4d5be9d")
# Rates CSV file (looks in same directory as this script)
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
RATES_CSV_FILE = os.path.join(SCRIPT_DIR, "btc_eur_rates.csv")
# User ID mappings: Equity account name -> Castle user ID (wallet ID)
# TODO: Update these with your actual Castle user/wallet IDs
USER_MAPPINGS = {
"Pat": "75be145a42884b22b60bf97510ed46e3",
"Coco": "375ec158ceca46de86cf6561ca20f881",
"Charlie": "921340b802104c25901eae6c420b1ba1",
}
# ===== RATE LOOKUP =====
class RateLookup:
"""Load and lookup BTC/EUR rates from CSV file"""
def __init__(self, csv_file: str):
self.rates = {}
self._load_csv(csv_file)
def _load_csv(self, csv_file: str):
"""Load rates from CSV file"""
if not os.path.exists(csv_file):
raise FileNotFoundError(
f"Rates CSV file not found: {csv_file}\n"
f"Please create btc_eur_rates.csv in the same directory as this script."
)
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
date = datetime.strptime(row['date'], '%Y-%m-%d').date()
# Handle comma as thousands separator
rate_str = row['btc_eur_rate'].replace(',', '').replace(' ', '')
rate = float(rate_str)
self.rates[date] = rate
if not self.rates:
raise ValueError(f"No rates loaded from {csv_file}")
print(f"📊 Loaded {len(self.rates)} daily rates from {os.path.basename(csv_file)}")
print(f" Date range: {min(self.rates.keys())} to {max(self.rates.keys())}")
def get_rate(self, date: datetime.date, fallback_days: int = 7) -> Optional[float]:
"""
Get BTC/EUR rate for a specific date.
If exact date not found, tries nearby dates within fallback_days.
Args:
date: Date to lookup
fallback_days: How many days to look back/forward if exact date missing
Returns:
BTC/EUR rate or None if not found
"""
# Try exact date first
if date in self.rates:
return self.rates[date]
# Try nearby dates (prefer earlier dates)
for days_offset in range(1, fallback_days + 1):
# Try earlier date first
earlier = date - timedelta(days=days_offset)
if earlier in self.rates:
print(f" ⚠️ Using rate from {earlier} for {date} (exact date not found)")
return self.rates[earlier]
# Try later date
later = date + timedelta(days=days_offset)
if later in self.rates:
print(f" ⚠️ Using rate from {later} for {date} (exact date not found)")
return self.rates[later]
return None
# ===== ACCOUNT LOOKUP =====
class AccountLookup:
"""Fetch and lookup Castle accounts from API"""
def __init__(self, lnbits_url: str, api_key: str):
self.accounts = {} # name -> account_id
self.accounts_by_user = {} # user_id -> {account_type -> account_id}
self.account_details = [] # Full account objects
self._fetch_accounts(lnbits_url, api_key)
def _fetch_accounts(self, lnbits_url: str, api_key: str):
"""Fetch all accounts from Castle API"""
url = f"{lnbits_url}/castle/api/v1/accounts"
headers = {"X-Api-Key": api_key}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
accounts_list = response.json()
# Build mappings
for account in accounts_list:
name = account.get('name')
account_id = account.get('id')
user_id = account.get('user_id')
account_type = account.get('account_type')
self.account_details.append(account)
# Name -> ID mapping
if name and account_id:
self.accounts[name] = account_id
# User -> Account Type -> ID mapping (for equity accounts)
if user_id and account_type:
if user_id not in self.accounts_by_user:
self.accounts_by_user[user_id] = {}
self.accounts_by_user[user_id][account_type] = account_id
print(f"🏦 Loaded {len(self.accounts)} accounts from Castle")
except requests.RequestException as e:
raise ConnectionError(f"Failed to fetch accounts from Castle API: {e}")
def get_account_id(self, account_name: str) -> Optional[str]:
"""
Get Castle account ID for a Beancount account name.
Special handling for user-specific accounts:
- "Liabilities:Payable:Pat" -> looks up Pat's user_id and finds their Castle payable account
- "Assets:Receivable:Pat" -> looks up Pat's user_id and finds their Castle receivable account
Args:
account_name: Beancount account name (e.g., "Expenses:Food:Supplies", "Liabilities:Payable:Pat", "Assets:Receivable:Pat")
Returns:
Castle account UUID or None if not found
"""
# Check if this is a Liabilities:Payable:<name> account
# Map Beancount Liabilities:Payable:Pat to Castle Liabilities:Payable:User-<id>
if account_name.startswith("Liabilities:Payable:"):
user_name = extract_user_from_user_account(account_name)
if user_name:
# Look up user's actual user_id
user_id = USER_MAPPINGS.get(user_name)
if user_id:
# Find this user's liability (payable) account
# This is the Liabilities:Payable:User-<id> account in Castle
if user_id in self.accounts_by_user:
liability_account_id = self.accounts_by_user[user_id].get('liability')
if liability_account_id:
return liability_account_id
# If not found, provide helpful error
raise ValueError(
f"User '{user_name}' (ID: {user_id}) does not have a payable account.\n"
f"This should have been created when they configured their wallet.\n"
f"Please configure the wallet for user ID: {user_id}"
)
# Check if this is an Assets:Receivable:<name> account
# Map Beancount Assets:Receivable:Pat to Castle Assets:Receivable:User-<id>
elif account_name.startswith("Assets:Receivable:"):
user_name = extract_user_from_user_account(account_name)
if user_name:
# Look up user's actual user_id
user_id = USER_MAPPINGS.get(user_name)
if user_id:
# Find this user's asset (receivable) account
# This is the Assets:Receivable:User-<id> account in Castle
if user_id in self.accounts_by_user:
asset_account_id = self.accounts_by_user[user_id].get('asset')
if asset_account_id:
return asset_account_id
# If not found, provide helpful error
raise ValueError(
f"User '{user_name}' (ID: {user_id}) does not have a receivable account.\n"
f"This should have been created when they configured their wallet.\n"
f"Please configure the wallet for user ID: {user_id}"
)
# Normal account lookup by name
return self.accounts.get(account_name)
def list_accounts(self):
"""Print all available accounts"""
print("\n📋 Available accounts:")
for name in sorted(self.accounts.keys()):
print(f" - {name}")
# ===== CONVERSION FUNCTIONS =====
def eur_to_sats(eur_amount: Decimal, btc_eur_rate: float) -> int:
"""Convert EUR to satoshis using BTC/EUR rate"""
btc_amount = eur_amount / Decimal(str(btc_eur_rate))
sats = btc_amount * Decimal(100_000_000)
return int(sats.quantize(Decimal('1')))
def build_metadata(eur_amount: Decimal, btc_eur_rate: float) -> dict:
"""Build metadata dict for Castle entry line"""
abs_eur = abs(eur_amount)
abs_sats = abs(eur_to_sats(abs_eur, btc_eur_rate))
# fiat_rate = sats per EUR
fiat_rate = float(abs_sats) / float(abs_eur) if abs_eur > 0 else 0
return {
"fiat_currency": "EUR",
"fiat_amount": str(abs_eur.quantize(Decimal("0.001"))),
"fiat_rate": fiat_rate,
"btc_rate": btc_eur_rate
}
# ===== BEANCOUNT PARSER =====
def parse_beancount_transaction(txn_text: str) -> Optional[Dict]:
"""
Parse a Beancount transaction.
Expected format:
2025-07-06 * "Foix market"
Expenses:Groceries 69.40 EUR
Equity:Pat
"""
lines = txn_text.strip().split('\n')
if not lines:
return None
# Skip leading comments to find the transaction header
header_line_idx = 0
for i, line in enumerate(lines):
stripped = line.strip()
# Skip comments and empty lines
if not stripped or stripped.startswith(';'):
continue
# Found the first non-comment line
header_line_idx = i
break
# Parse header line
header = lines[header_line_idx].strip()
# Handle both * and ! flags
if '*' in header:
parts = header.split('*')
flag = '*'
elif '!' in header:
parts = header.split('!')
flag = '!'
else:
return None
date_str = parts[0].strip()
description = parts[1].strip().strip('"')
try:
date = datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
return None
# Parse postings (start after the header line)
postings = []
for line in lines[header_line_idx + 1:]:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith(';'):
continue
# Parse posting line
parts = line.split()
if not parts:
continue
account = parts[0]
# Check if amount is specified
if len(parts) >= 3 and parts[-1] == 'EUR':
# Strip commas from amount (e.g., "1,500.00" -> "1500.00")
amount_str = parts[-2].replace(',', '')
eur_amount = Decimal(amount_str)
else:
# No amount specified - will be calculated to balance
eur_amount = None
postings.append({
'account': account,
'eur_amount': eur_amount
})
# Calculate missing amounts (Beancount auto-balance)
# TODO: Support auto-balancing for transactions with >2 postings
# For now, only handles simple 2-posting transactions
if len(postings) == 2:
if postings[0]['eur_amount'] and not postings[1]['eur_amount']:
postings[1]['eur_amount'] = -postings[0]['eur_amount']
elif postings[1]['eur_amount'] and not postings[0]['eur_amount']:
postings[0]['eur_amount'] = -postings[1]['eur_amount']
return {
'date': date,
'description': description,
'postings': postings
}
# ===== HELPER FUNCTIONS =====
def extract_user_from_user_account(account_name: str) -> Optional[str]:
"""
Extract user name from user-specific accounts (Payable or Receivable).
Examples:
"Liabilities:Payable:Pat" -> "Pat"
"Assets:Receivable:Alice" -> "Alice"
"Expenses:Food" -> None
Returns:
User name or None if not a user-specific account
"""
if account_name.startswith("Liabilities:Payable:"):
parts = account_name.split(":")
if len(parts) >= 3:
return parts[2]
elif account_name.startswith("Assets:Receivable:"):
parts = account_name.split(":")
if len(parts) >= 3:
return parts[2]
return None
def determine_user_id(postings: list) -> Optional[str]:
"""
Determine which user ID to use for this transaction based on user-specific accounts.
Args:
postings: List of posting dicts with 'account' key
Returns:
User ID (wallet ID) from USER_MAPPINGS, or None if no user account found
"""
for posting in postings:
user_name = extract_user_from_user_account(posting['account'])
if user_name:
user_id = USER_MAPPINGS.get(user_name)
if not user_id:
raise ValueError(
f"No user ID mapping found for '{user_name}'.\n"
f"Please add '{user_name}' to USER_MAPPINGS in the script."
)
return user_id
# No user-specific account found - this shouldn't happen for typical transactions
return None
# ===== CASTLE CONVERTER =====
def convert_to_castle_entry(parsed: dict, btc_eur_rate: float, account_lookup: AccountLookup) -> dict:
"""Convert parsed Beancount transaction to Castle format"""
# Determine which user this transaction is for (based on user-specific accounts)
user_id = determine_user_id(parsed['postings'])
if not user_id:
raise ValueError(
f"Could not determine user ID for transaction.\n"
f"Transactions must have a Liabilities:Payable:<name> or Assets:Receivable:<name> account.\n"
f"Examples: Liabilities:Payable:Pat, Assets:Receivable:Pat"
)
# Build entry lines
lines = []
for posting in parsed['postings']:
account_id = account_lookup.get_account_id(posting['account'])
if not account_id:
raise ValueError(
f"No account found in Castle with name '{posting['account']}'.\n"
f"Please create this account in Castle first."
)
eur_amount = posting['eur_amount']
if eur_amount is None:
raise ValueError(f"Could not determine amount for {posting['account']}")
# Convert EUR to sats
sats = eur_to_sats(eur_amount, btc_eur_rate)
# Build metadata
metadata = build_metadata(eur_amount, btc_eur_rate)
lines.append({
"account_id": account_id,
"amount": sats, # Positive = debit, negative = credit
"description": posting['account'],
"metadata": metadata
})
return {
"description": parsed['description'],
"entry_date": parsed['date'].isoformat(),
"reference": f"import-{parsed['date'].strftime('%Y%m%d')}-{parsed['description'][:20].replace(' ', '-')}",
"flag": "*",
"meta": {
"source": "beancount_import",
"imported_at": datetime.now().isoformat(),
"btc_eur_rate": btc_eur_rate,
"user_id": user_id # Track which user this transaction is for
},
"lines": lines
}
# ===== API UPLOAD =====
def upload_entry(entry: dict, api_key: str, dry_run: bool = False) -> dict:
"""Upload journal entry to Castle API"""
if dry_run:
print(f"\n[DRY RUN] Entry preview:")
print(f" Description: {entry['description']}")
print(f" Date: {entry['entry_date']}")
print(f" BTC/EUR Rate: {entry['meta']['btc_eur_rate']:,.2f}")
total_sats = 0
for line in entry['lines']:
sign = '+' if line['amount'] > 0 else ''
print(f" {line['description']}: {sign}{line['amount']:,} sats "
f"({line['metadata']['fiat_amount']} EUR)")
total_sats += line['amount']
print(f" Balance check: {total_sats} (should be 0)")
return {"id": "dry-run"}
url = f"{LNBITS_URL}/castle/api/v1/entries"
headers = {
"X-Api-Key": api_key,
"Content-Type": "application/json"
}
response = requests.post(url, json=entry, headers=headers)
response.raise_for_status()
return response.json()
# ===== MAIN IMPORT FUNCTION =====
def import_beancount_file(beancount_file: str, dry_run: bool = False):
"""Import transactions from Beancount file using rates from CSV"""
# Validate configuration
if not ADMIN_API_KEY:
print("❌ Error: CASTLE_ADMIN_KEY not set!")
print(" Set it as environment variable or update ADMIN_API_KEY in the script.")
return
# Load rates
try:
rate_lookup = RateLookup(RATES_CSV_FILE)
except (FileNotFoundError, ValueError) as e:
print(f"❌ Error loading rates: {e}")
return
# Load accounts from Castle
try:
account_lookup = AccountLookup(LNBITS_URL, ADMIN_API_KEY)
except (ConnectionError, ValueError) as e:
print(f"❌ Error loading accounts: {e}")
return
# Show user mappings and verify equity accounts exist
print(f"\n👥 User ID mappings and equity accounts:")
for name, user_id in USER_MAPPINGS.items():
has_equity = user_id in account_lookup.accounts_by_user and 'equity' in account_lookup.accounts_by_user[user_id]
status = "" if has_equity else ""
print(f" {status} {name}{user_id} {'(has equity account)' if has_equity else '(NO EQUITY ACCOUNT - create in Castle!)'}")
# Read beancount file
if not os.path.exists(beancount_file):
print(f"❌ Error: Beancount file not found: {beancount_file}")
return
with open(beancount_file, 'r', encoding='utf-8') as f:
content = f.read()
# Split by blank lines to get individual transactions
transactions = [t.strip() for t in content.split('\n\n') if t.strip()]
print(f"\n📄 Found {len(transactions)} potential transactions in {os.path.basename(beancount_file)}")
if dry_run:
print("🔍 [DRY RUN MODE] No changes will be made\n")
success_count = 0
error_count = 0
skip_count = 0
skipped_items = [] # Track what was skipped
for i, txn_text in enumerate(transactions, 1):
try:
# Try to parse the transaction
parsed = parse_beancount_transaction(txn_text)
if not parsed:
# Not a valid transaction (likely a directive, option, or comment block)
skip_count += 1
first_line = txn_text.split('\n')[0][:60]
skipped_items.append(f"Entry {i}: {first_line}... (not a transaction)")
continue
# Look up rate for this transaction's date
btc_eur_rate = rate_lookup.get_rate(parsed['date'].date())
if not btc_eur_rate:
raise ValueError(f"No BTC/EUR rate found for {parsed['date'].date()}")
castle_entry = convert_to_castle_entry(parsed, btc_eur_rate, account_lookup)
result = upload_entry(castle_entry, ADMIN_API_KEY, dry_run)
# Get user name for display
user_name = None
for posting in parsed['postings']:
user_name = extract_user_from_user_account(posting['account'])
if user_name:
break
user_info = f" (User: {user_name})" if user_name else ""
print(f"✅ Transaction {i}: {parsed['date'].date()} - {parsed['description'][:35]}{user_info} "
f"(Rate: {btc_eur_rate:,.0f} EUR/BTC)")
success_count += 1
except Exception as e:
print(f"❌ Transaction {i} failed: {e}")
print(f" Content: {txn_text[:100]}...")
error_count += 1
print(f"\n{'='*70}")
print(f"📊 Summary: {success_count} succeeded, {error_count} failed, {skip_count} skipped")
print(f"{'='*70}")
# Show details of skipped entries
if skipped_items:
print(f"\n⏭️ Skipped entries:")
for item in skipped_items:
print(f" {item}")
if success_count > 0 and not dry_run:
print(f"\n✅ Successfully imported {success_count} transactions to Castle!")
# ===== MAIN =====
if __name__ == "__main__":
import sys
print("=" * 70)
print("🏰 Beancount to Castle Import Script")
print("=" * 70)
if len(sys.argv) < 2:
print("\nUsage: python import_beancount.py <ledger.beancount> [--dry-run]")
print("\nExample:")
print(" python import_beancount.py my_ledger.beancount --dry-run")
print(" python import_beancount.py my_ledger.beancount")
print("\nConfiguration:")
print(f" LNBITS_URL: {LNBITS_URL}")
print(f" RATES_CSV: {RATES_CSV_FILE}")
print(f" API Key set: {'Yes' if ADMIN_API_KEY else 'No (set CASTLE_ADMIN_KEY env var)'}")
sys.exit(1)
beancount_file = sys.argv[1]
dry_run = "--dry-run" in sys.argv
import_beancount_file(beancount_file, dry_run)