castle/helper/import_beancount.py
padreug a6b67b7416 Improves Beancount entry generation and sanitization
Adds a function to sanitize strings for use as Beancount links,
ensuring compatibility with Beancount's link restrictions.

Refactors the journal entry creation process to use EUR-based
postings when fiat currency is provided, improving accuracy
and consistency. The legacy SATS-based fallback is retained for
cases without fiat currency information.

Adjusts reference generation for Beancount entries using the
sanitized description.
2025-11-10 11:35:41 +01:00

661 lines
24 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Beancount to Castle Import Script
Imports Beancount ledger transactions into Castle accounting extension.
Reads daily BTC/EUR rates from btc_eur_rates.csv in the same directory.
Usage:
python import_beancount.py <ledger.beancount> [--dry-run]
Example:
python import_beancount.py my_ledger.beancount --dry-run
python import_beancount.py my_ledger.beancount
"""
import requests
import csv
import os
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Dict, Optional
# ===== CONFIGURATION =====
# LNbits URL and API Key
LNBITS_URL = os.environ.get("LNBITS_URL", "http://localhost:5000")
ADMIN_API_KEY = os.environ.get("CASTLE_ADMIN_KEY", "48d787d862484a6c89d6a557b4d5be9d")
# Rates CSV file (looks in same directory as this script)
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
RATES_CSV_FILE = os.path.join(SCRIPT_DIR, "btc_eur_rates.csv")
# User ID mappings: Equity account name -> Castle user ID (wallet ID)
# TODO: Update these with your actual Castle user/wallet IDs
USER_MAPPINGS = {
"Pat": "75be145a42884b22b60bf97510ed46e3",
"Coco": "375ec158ceca46de86cf6561ca20f881",
"Charlie": "921340b802104c25901eae6c420b1ba1",
}
# ===== RATE LOOKUP =====
class RateLookup:
"""Load and lookup BTC/EUR rates from CSV file"""
def __init__(self, csv_file: str):
self.rates = {}
self._load_csv(csv_file)
def _load_csv(self, csv_file: str):
"""Load rates from CSV file"""
if not os.path.exists(csv_file):
raise FileNotFoundError(
f"Rates CSV file not found: {csv_file}\n"
f"Please create btc_eur_rates.csv in the same directory as this script."
)
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
date = datetime.strptime(row['date'], '%Y-%m-%d').date()
# Handle comma as thousands separator
rate_str = row['btc_eur_rate'].replace(',', '').replace(' ', '')
rate = float(rate_str)
self.rates[date] = rate
if not self.rates:
raise ValueError(f"No rates loaded from {csv_file}")
print(f"📊 Loaded {len(self.rates)} daily rates from {os.path.basename(csv_file)}")
print(f" Date range: {min(self.rates.keys())} to {max(self.rates.keys())}")
def get_rate(self, date: datetime.date, fallback_days: int = 7) -> Optional[float]:
"""
Get BTC/EUR rate for a specific date.
If exact date not found, tries nearby dates within fallback_days.
Args:
date: Date to lookup
fallback_days: How many days to look back/forward if exact date missing
Returns:
BTC/EUR rate or None if not found
"""
# Try exact date first
if date in self.rates:
return self.rates[date]
# Try nearby dates (prefer earlier dates)
for days_offset in range(1, fallback_days + 1):
# Try earlier date first
earlier = date - timedelta(days=days_offset)
if earlier in self.rates:
print(f" ⚠️ Using rate from {earlier} for {date} (exact date not found)")
return self.rates[earlier]
# Try later date
later = date + timedelta(days=days_offset)
if later in self.rates:
print(f" ⚠️ Using rate from {later} for {date} (exact date not found)")
return self.rates[later]
return None
# ===== ACCOUNT LOOKUP =====
class AccountLookup:
"""Fetch and lookup Castle accounts from API"""
def __init__(self, lnbits_url: str, api_key: str):
self.accounts = {} # name -> account_id
self.accounts_by_user = {} # user_id -> {account_type -> account_id}
self.account_details = [] # Full account objects
self._fetch_accounts(lnbits_url, api_key)
def _fetch_accounts(self, lnbits_url: str, api_key: str):
"""Fetch all accounts from Castle API"""
url = f"{lnbits_url}/castle/api/v1/accounts"
headers = {"X-Api-Key": api_key}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
accounts_list = response.json()
# Build mappings
for account in accounts_list:
name = account.get('name')
account_id = account.get('id')
user_id = account.get('user_id')
account_type = account.get('account_type')
self.account_details.append(account)
# Name -> ID mapping
if name and account_id:
self.accounts[name] = account_id
# User -> Account Type -> ID mapping (for equity accounts)
if user_id and account_type:
if user_id not in self.accounts_by_user:
self.accounts_by_user[user_id] = {}
self.accounts_by_user[user_id][account_type] = account_id
print(f"🏦 Loaded {len(self.accounts)} accounts from Castle")
except requests.RequestException as e:
raise ConnectionError(f"Failed to fetch accounts from Castle API: {e}")
def get_account_id(self, account_name: str) -> Optional[str]:
"""
Get Castle account ID for a Beancount account name.
Special handling for user-specific accounts:
- "Liabilities:Payable:Pat" -> looks up Pat's user_id and finds their Castle payable account
- "Assets:Receivable:Pat" -> looks up Pat's user_id and finds their Castle receivable account
- "Equity:Pat" -> looks up Pat's user_id and finds their Castle equity account
Args:
account_name: Beancount account name (e.g., "Expenses:Food:Supplies", "Liabilities:Payable:Pat", "Assets:Receivable:Pat", "Equity:Pat")
Returns:
Castle account UUID or None if not found
"""
# Check if this is a Liabilities:Payable:<name> account
# Map Beancount Liabilities:Payable:Pat to Castle Liabilities:Payable:User-<id>
if account_name.startswith("Liabilities:Payable:"):
user_name = extract_user_from_user_account(account_name)
if user_name:
# Look up user's actual user_id
user_id = USER_MAPPINGS.get(user_name)
if user_id:
# Find this user's liability (payable) account
# This is the Liabilities:Payable:User-<id> account in Castle
if user_id in self.accounts_by_user:
liability_account_id = self.accounts_by_user[user_id].get('liability')
if liability_account_id:
return liability_account_id
# If not found, provide helpful error
raise ValueError(
f"User '{user_name}' (ID: {user_id}) does not have a payable account.\n"
f"This should have been created when they configured their wallet.\n"
f"Please configure the wallet for user ID: {user_id}"
)
# Check if this is an Assets:Receivable:<name> account
# Map Beancount Assets:Receivable:Pat to Castle Assets:Receivable:User-<id>
elif account_name.startswith("Assets:Receivable:"):
user_name = extract_user_from_user_account(account_name)
if user_name:
# Look up user's actual user_id
user_id = USER_MAPPINGS.get(user_name)
if user_id:
# Find this user's asset (receivable) account
# This is the Assets:Receivable:User-<id> account in Castle
if user_id in self.accounts_by_user:
asset_account_id = self.accounts_by_user[user_id].get('asset')
if asset_account_id:
return asset_account_id
# If not found, provide helpful error
raise ValueError(
f"User '{user_name}' (ID: {user_id}) does not have a receivable account.\n"
f"This should have been created when they configured their wallet.\n"
f"Please configure the wallet for user ID: {user_id}"
)
# Check if this is an Equity:<name> account
# Map Beancount Equity:Pat to Castle Equity:User-<id>
elif account_name.startswith("Equity:"):
user_name = extract_user_from_user_account(account_name)
if user_name:
# Look up user's actual user_id
user_id = USER_MAPPINGS.get(user_name)
if user_id:
# Find this user's equity account
# This is the Equity:User-<id> account in Castle
if user_id in self.accounts_by_user:
equity_account_id = self.accounts_by_user[user_id].get('equity')
if equity_account_id:
return equity_account_id
# If not found, provide helpful error
raise ValueError(
f"User '{user_name}' (ID: {user_id}) does not have an equity account.\n"
f"Equity eligibility must be enabled for this user in Castle.\n"
f"Please enable equity for user ID: {user_id}"
)
# Normal account lookup by name
return self.accounts.get(account_name)
def list_accounts(self):
"""Print all available accounts"""
print("\n📋 Available accounts:")
for name in sorted(self.accounts.keys()):
print(f" - {name}")
# ===== CONVERSION FUNCTIONS =====
def sanitize_link(text: str) -> str:
"""
Sanitize a string to make it valid for Beancount links.
Beancount links can only contain: A-Z, a-z, 0-9, -, _, /, .
All other characters are replaced with hyphens.
Examples:
>>> sanitize_link("Test (pending)")
'Test-pending'
>>> sanitize_link("Invoice #123")
'Invoice-123'
>>> sanitize_link("import-20250623-Action Ressourcerie")
'import-20250623-Action-Ressourcerie'
"""
import re
# Replace any character that's not alphanumeric, dash, underscore, slash, or period with a hyphen
sanitized = re.sub(r'[^A-Za-z0-9\-_/.]', '-', text)
# Remove consecutive hyphens
sanitized = re.sub(r'-+', '-', sanitized)
# Remove leading/trailing hyphens
sanitized = sanitized.strip('-')
return sanitized
def eur_to_sats(eur_amount: Decimal, btc_eur_rate: float) -> int:
"""Convert EUR to satoshis using BTC/EUR rate"""
btc_amount = eur_amount / Decimal(str(btc_eur_rate))
sats = btc_amount * Decimal(100_000_000)
return int(sats.quantize(Decimal('1')))
def build_metadata(eur_amount: Decimal, btc_eur_rate: float) -> dict:
"""
Build metadata dict for Castle entry line.
The API will extract fiat_currency and fiat_amount and use them
to create proper EUR-based postings with SATS in metadata.
"""
abs_eur = abs(eur_amount)
abs_sats = abs(eur_to_sats(abs_eur, btc_eur_rate))
return {
"fiat_currency": "EUR",
"fiat_amount": str(abs_eur.quantize(Decimal("0.01"))), # Store as string for JSON
"btc_rate": str(btc_eur_rate) # Store exchange rate for reference
}
# ===== BEANCOUNT PARSER =====
def parse_beancount_transaction(txn_text: str) -> Optional[Dict]:
"""
Parse a Beancount transaction.
Expected format:
2025-07-06 * "Foix market"
Expenses:Groceries 69.40 EUR
Equity:Pat
"""
lines = txn_text.strip().split('\n')
if not lines:
return None
# Skip leading comments to find the transaction header
header_line_idx = 0
for i, line in enumerate(lines):
stripped = line.strip()
# Skip comments and empty lines
if not stripped or stripped.startswith(';'):
continue
# Found the first non-comment line
header_line_idx = i
break
# Parse header line
header = lines[header_line_idx].strip()
# Handle both * and ! flags
if '*' in header:
parts = header.split('*')
flag = '*'
elif '!' in header:
parts = header.split('!')
flag = '!'
else:
return None
date_str = parts[0].strip()
description = parts[1].strip().strip('"')
try:
date = datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
return None
# Parse postings (start after the header line)
postings = []
for line in lines[header_line_idx + 1:]:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith(';'):
continue
# Parse posting line
parts = line.split()
if not parts:
continue
account = parts[0]
# Check if amount is specified
if len(parts) >= 3 and parts[-1] == 'EUR':
# Strip commas from amount (e.g., "1,500.00" -> "1500.00")
amount_str = parts[-2].replace(',', '')
eur_amount = Decimal(amount_str)
else:
# No amount specified - will be calculated to balance
eur_amount = None
postings.append({
'account': account,
'eur_amount': eur_amount
})
# Calculate missing amounts (Beancount auto-balance)
# TODO: Support auto-balancing for transactions with >2 postings
# For now, only handles simple 2-posting transactions
if len(postings) == 2:
if postings[0]['eur_amount'] and not postings[1]['eur_amount']:
postings[1]['eur_amount'] = -postings[0]['eur_amount']
elif postings[1]['eur_amount'] and not postings[0]['eur_amount']:
postings[0]['eur_amount'] = -postings[1]['eur_amount']
return {
'date': date,
'description': description,
'postings': postings
}
# ===== HELPER FUNCTIONS =====
def extract_user_from_user_account(account_name: str) -> Optional[str]:
"""
Extract user name from user-specific accounts (Payable, Receivable, or Equity).
Examples:
"Liabilities:Payable:Pat" -> "Pat"
"Assets:Receivable:Alice" -> "Alice"
"Equity:Pat" -> "Pat"
"Expenses:Food" -> None
Returns:
User name or None if not a user-specific account
"""
if account_name.startswith("Liabilities:Payable:"):
parts = account_name.split(":")
if len(parts) >= 3:
return parts[2]
elif account_name.startswith("Assets:Receivable:"):
parts = account_name.split(":")
if len(parts) >= 3:
return parts[2]
elif account_name.startswith("Equity:"):
parts = account_name.split(":")
if len(parts) >= 2:
return parts[1]
return None
def determine_user_id(postings: list) -> Optional[str]:
"""
Determine which user ID to use for this transaction based on user-specific accounts.
Args:
postings: List of posting dicts with 'account' key
Returns:
User ID (wallet ID) from USER_MAPPINGS, or None if no user account found
"""
for posting in postings:
user_name = extract_user_from_user_account(posting['account'])
if user_name:
user_id = USER_MAPPINGS.get(user_name)
if not user_id:
raise ValueError(
f"No user ID mapping found for '{user_name}'.\n"
f"Please add '{user_name}' to USER_MAPPINGS in the script."
)
return user_id
# No user-specific account found - this shouldn't happen for typical transactions
return None
# ===== CASTLE CONVERTER =====
def convert_to_castle_entry(parsed: dict, btc_eur_rate: float, account_lookup: AccountLookup) -> dict:
"""
Convert parsed Beancount transaction to Castle format.
Sends SATS amounts with fiat metadata. The Castle API will automatically
convert to EUR-based postings with SATS stored in metadata.
"""
# Determine which user this transaction is for (based on user-specific accounts)
user_id = determine_user_id(parsed['postings'])
if not user_id:
raise ValueError(
f"Could not determine user ID for transaction.\n"
f"Transactions must have a user-specific account:\n"
f" - Liabilities:Payable:<name> (for payables)\n"
f" - Assets:Receivable:<name> (for receivables)\n"
f" - Equity:<name> (for equity)\n"
f"Examples: Liabilities:Payable:Pat, Assets:Receivable:Pat, Equity:Pat"
)
# Build entry lines
lines = []
for posting in parsed['postings']:
account_id = account_lookup.get_account_id(posting['account'])
if not account_id:
raise ValueError(
f"No account found in Castle with name '{posting['account']}'.\n"
f"Please create this account in Castle first."
)
eur_amount = posting['eur_amount']
if eur_amount is None:
raise ValueError(f"Could not determine amount for {posting['account']}")
# Convert EUR to sats (amount sent to API)
sats = eur_to_sats(eur_amount, btc_eur_rate)
# Build metadata (API will extract fiat_currency and fiat_amount)
metadata = build_metadata(eur_amount, btc_eur_rate)
lines.append({
"account_id": account_id,
"amount": sats, # Positive = debit, negative = credit
"description": posting['account'],
"metadata": metadata
})
# Create sanitized reference link
desc_part = sanitize_link(parsed['description'][:30])
return {
"description": parsed['description'],
"entry_date": parsed['date'].isoformat(),
"reference": f"import-{parsed['date'].strftime('%Y%m%d')}-{desc_part}",
"flag": "*",
"meta": {
"source": "beancount_import",
"imported_at": datetime.now().isoformat(),
"btc_eur_rate": str(btc_eur_rate),
"user_id": user_id # Track which user this transaction is for
},
"lines": lines
}
# ===== API UPLOAD =====
def upload_entry(entry: dict, api_key: str, dry_run: bool = False) -> dict:
"""Upload journal entry to Castle API"""
if dry_run:
print(f"\n[DRY RUN] Entry preview:")
print(f" Description: {entry['description']}")
print(f" Date: {entry['entry_date']}")
print(f" BTC/EUR Rate: {entry['meta']['btc_eur_rate']:,.2f}")
total_sats = 0
for line in entry['lines']:
sign = '+' if line['amount'] > 0 else ''
print(f" {line['description']}: {sign}{line['amount']:,} sats "
f"({line['metadata']['fiat_amount']} EUR)")
total_sats += line['amount']
print(f" Balance check: {total_sats} (should be 0)")
return {"id": "dry-run"}
url = f"{LNBITS_URL}/castle/api/v1/entries"
headers = {
"X-Api-Key": api_key,
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=entry, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
print(f" ❌ HTTP Error: {e}")
if response.text:
print(f" Response: {response.text}")
raise
except Exception as e:
print(f" ❌ Error: {e}")
raise
# ===== MAIN IMPORT FUNCTION =====
def import_beancount_file(beancount_file: str, dry_run: bool = False):
"""Import transactions from Beancount file using rates from CSV"""
# Validate configuration
if not ADMIN_API_KEY:
print("❌ Error: CASTLE_ADMIN_KEY not set!")
print(" Set it as environment variable or update ADMIN_API_KEY in the script.")
return
# Load rates
try:
rate_lookup = RateLookup(RATES_CSV_FILE)
except (FileNotFoundError, ValueError) as e:
print(f"❌ Error loading rates: {e}")
return
# Load accounts from Castle
try:
account_lookup = AccountLookup(LNBITS_URL, ADMIN_API_KEY)
except (ConnectionError, ValueError) as e:
print(f"❌ Error loading accounts: {e}")
return
# Show user mappings and verify equity accounts exist
print(f"\n👥 User ID mappings and equity accounts:")
for name, user_id in USER_MAPPINGS.items():
has_equity = user_id in account_lookup.accounts_by_user and 'equity' in account_lookup.accounts_by_user[user_id]
status = "" if has_equity else ""
print(f" {status} {name}{user_id} {'(has equity account)' if has_equity else '(NO EQUITY ACCOUNT - create in Castle!)'}")
# Read beancount file
if not os.path.exists(beancount_file):
print(f"❌ Error: Beancount file not found: {beancount_file}")
return
with open(beancount_file, 'r', encoding='utf-8') as f:
content = f.read()
# Split by blank lines to get individual transactions
transactions = [t.strip() for t in content.split('\n\n') if t.strip()]
print(f"\n📄 Found {len(transactions)} potential transactions in {os.path.basename(beancount_file)}")
if dry_run:
print("🔍 [DRY RUN MODE] No changes will be made\n")
success_count = 0
error_count = 0
skip_count = 0
skipped_items = [] # Track what was skipped
for i, txn_text in enumerate(transactions, 1):
try:
# Try to parse the transaction
parsed = parse_beancount_transaction(txn_text)
if not parsed:
# Not a valid transaction (likely a directive, option, or comment block)
skip_count += 1
first_line = txn_text.split('\n')[0][:60]
skipped_items.append(f"Entry {i}: {first_line}... (not a transaction)")
continue
# Look up rate for this transaction's date
btc_eur_rate = rate_lookup.get_rate(parsed['date'].date())
if not btc_eur_rate:
raise ValueError(f"No BTC/EUR rate found for {parsed['date'].date()}")
castle_entry = convert_to_castle_entry(parsed, btc_eur_rate, account_lookup)
result = upload_entry(castle_entry, ADMIN_API_KEY, dry_run)
# Get user name for display
user_name = None
for posting in parsed['postings']:
user_name = extract_user_from_user_account(posting['account'])
if user_name:
break
user_info = f" (User: {user_name})" if user_name else ""
print(f"✅ Transaction {i}: {parsed['date'].date()} - {parsed['description'][:35]}{user_info} "
f"(Rate: {btc_eur_rate:,.0f} EUR/BTC)")
success_count += 1
except Exception as e:
print(f"❌ Transaction {i} failed: {e}")
print(f" Content: {txn_text[:100]}...")
error_count += 1
print(f"\n{'='*70}")
print(f"📊 Summary: {success_count} succeeded, {error_count} failed, {skip_count} skipped")
print(f"{'='*70}")
# Show details of skipped entries
if skipped_items:
print(f"\n⏭️ Skipped entries:")
for item in skipped_items:
print(f" {item}")
if success_count > 0 and not dry_run:
print(f"\n✅ Successfully imported {success_count} transactions to Castle!")
print(f"\n💡 Note: Transactions are stored in EUR with SATS in metadata.")
print(f" Check Fava to see the imported entries.")
# ===== MAIN =====
if __name__ == "__main__":
import sys
print("=" * 70)
print("🏰 Beancount to Castle Import Script")
print("=" * 70)
if len(sys.argv) < 2:
print("\nUsage: python import_beancount.py <ledger.beancount> [--dry-run]")
print("\nExample:")
print(" python import_beancount.py my_ledger.beancount --dry-run")
print(" python import_beancount.py my_ledger.beancount")
print("\nConfiguration:")
print(f" LNBITS_URL: {LNBITS_URL}")
print(f" RATES_CSV: {RATES_CSV_FILE}")
print(f" API Key set: {'Yes' if ADMIN_API_KEY else 'No (set CASTLE_ADMIN_KEY env var)'}")
sys.exit(1)
beancount_file = sys.argv[1]
dry_run = "--dry-run" in sys.argv
import_beancount_file(beancount_file, dry_run)