#!/usr/bin/env python3 """ Beancount to Castle Import Script Imports Beancount ledger transactions into Castle accounting extension. Reads daily BTC/EUR rates from btc_eur_rates.csv in the same directory. Usage: python import_beancount.py [--dry-run] Example: python import_beancount.py my_ledger.beancount --dry-run python import_beancount.py my_ledger.beancount """ import requests import csv import os from datetime import datetime, timedelta from decimal import Decimal from typing import Dict, Optional # ===== CONFIGURATION ===== # LNbits URL and API Key LNBITS_URL = os.environ.get("LNBITS_URL", "http://localhost:5000") ADMIN_API_KEY = os.environ.get("CASTLE_ADMIN_KEY", "48d787d862484a6c89d6a557b4d5be9d") # Rates CSV file (looks in same directory as this script) SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) RATES_CSV_FILE = os.path.join(SCRIPT_DIR, "btc_eur_rates.csv") # User ID mappings: Equity account name -> Castle user ID (wallet ID) # TODO: Update these with your actual Castle user/wallet IDs USER_MAPPINGS = { "Pat": "75be145a42884b22b60bf97510ed46e3", "Coco": "375ec158ceca46de86cf6561ca20f881", "Charlie": "921340b802104c25901eae6c420b1ba1", } # ===== RATE LOOKUP ===== class RateLookup: """Load and lookup BTC/EUR rates from CSV file""" def __init__(self, csv_file: str): self.rates = {} self._load_csv(csv_file) def _load_csv(self, csv_file: str): """Load rates from CSV file""" if not os.path.exists(csv_file): raise FileNotFoundError( f"Rates CSV file not found: {csv_file}\n" f"Please create btc_eur_rates.csv in the same directory as this script." ) with open(csv_file, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: date = datetime.strptime(row['date'], '%Y-%m-%d').date() # Handle comma as thousands separator rate_str = row['btc_eur_rate'].replace(',', '').replace(' ', '') rate = float(rate_str) self.rates[date] = rate if not self.rates: raise ValueError(f"No rates loaded from {csv_file}") print(f"šŸ“Š Loaded {len(self.rates)} daily rates from {os.path.basename(csv_file)}") print(f" Date range: {min(self.rates.keys())} to {max(self.rates.keys())}") def get_rate(self, date: datetime.date, fallback_days: int = 7) -> Optional[float]: """ Get BTC/EUR rate for a specific date. If exact date not found, tries nearby dates within fallback_days. Args: date: Date to lookup fallback_days: How many days to look back/forward if exact date missing Returns: BTC/EUR rate or None if not found """ # Try exact date first if date in self.rates: return self.rates[date] # Try nearby dates (prefer earlier dates) for days_offset in range(1, fallback_days + 1): # Try earlier date first earlier = date - timedelta(days=days_offset) if earlier in self.rates: print(f" āš ļø Using rate from {earlier} for {date} (exact date not found)") return self.rates[earlier] # Try later date later = date + timedelta(days=days_offset) if later in self.rates: print(f" āš ļø Using rate from {later} for {date} (exact date not found)") return self.rates[later] return None # ===== ACCOUNT LOOKUP ===== class AccountLookup: """Fetch and lookup Castle accounts from API""" def __init__(self, lnbits_url: str, api_key: str): self.accounts = {} # name -> account_id self.accounts_by_user = {} # user_id -> {account_type -> account_id} self.account_details = [] # Full account objects self._fetch_accounts(lnbits_url, api_key) def _fetch_accounts(self, lnbits_url: str, api_key: str): """Fetch all accounts from Castle API""" url = f"{lnbits_url}/castle/api/v1/accounts" headers = {"X-Api-Key": api_key} try: response = requests.get(url, headers=headers) response.raise_for_status() accounts_list = response.json() # Build mappings for account in accounts_list: name = account.get('name') account_id = account.get('id') user_id = account.get('user_id') account_type = account.get('account_type') self.account_details.append(account) # Name -> ID mapping if name and account_id: self.accounts[name] = account_id # User -> Account Type -> ID mapping (for equity accounts) if user_id and account_type: if user_id not in self.accounts_by_user: self.accounts_by_user[user_id] = {} self.accounts_by_user[user_id][account_type] = account_id print(f"šŸ¦ Loaded {len(self.accounts)} accounts from Castle") except requests.RequestException as e: raise ConnectionError(f"Failed to fetch accounts from Castle API: {e}") def get_account_id(self, account_name: str) -> Optional[str]: """ Get Castle account ID for a Beancount account name. Special handling for user-specific accounts: - "Liabilities:Payable:Pat" -> looks up Pat's user_id and finds their Castle payable account - "Assets:Receivable:Pat" -> looks up Pat's user_id and finds their Castle receivable account - "Equity:Pat" -> looks up Pat's user_id and finds their Castle equity account Args: account_name: Beancount account name (e.g., "Expenses:Food:Supplies", "Liabilities:Payable:Pat", "Assets:Receivable:Pat", "Equity:Pat") Returns: Castle account UUID or None if not found """ # Check if this is a Liabilities:Payable: account # Map Beancount Liabilities:Payable:Pat to Castle Liabilities:Payable:User- if account_name.startswith("Liabilities:Payable:"): user_name = extract_user_from_user_account(account_name) if user_name: # Look up user's actual user_id user_id = USER_MAPPINGS.get(user_name) if user_id: # Find this user's liability (payable) account # This is the Liabilities:Payable:User- account in Castle if user_id in self.accounts_by_user: liability_account_id = self.accounts_by_user[user_id].get('liability') if liability_account_id: return liability_account_id # If not found, provide helpful error raise ValueError( f"User '{user_name}' (ID: {user_id}) does not have a payable account.\n" f"This should have been created when they configured their wallet.\n" f"Please configure the wallet for user ID: {user_id}" ) # Check if this is an Assets:Receivable: account # Map Beancount Assets:Receivable:Pat to Castle Assets:Receivable:User- elif account_name.startswith("Assets:Receivable:"): user_name = extract_user_from_user_account(account_name) if user_name: # Look up user's actual user_id user_id = USER_MAPPINGS.get(user_name) if user_id: # Find this user's asset (receivable) account # This is the Assets:Receivable:User- account in Castle if user_id in self.accounts_by_user: asset_account_id = self.accounts_by_user[user_id].get('asset') if asset_account_id: return asset_account_id # If not found, provide helpful error raise ValueError( f"User '{user_name}' (ID: {user_id}) does not have a receivable account.\n" f"This should have been created when they configured their wallet.\n" f"Please configure the wallet for user ID: {user_id}" ) # Check if this is an Equity: account # Map Beancount Equity:Pat to Castle Equity:User- elif account_name.startswith("Equity:"): user_name = extract_user_from_user_account(account_name) if user_name: # Look up user's actual user_id user_id = USER_MAPPINGS.get(user_name) if user_id: # Find this user's equity account # This is the Equity:User- account in Castle if user_id in self.accounts_by_user: equity_account_id = self.accounts_by_user[user_id].get('equity') if equity_account_id: return equity_account_id # If not found, provide helpful error raise ValueError( f"User '{user_name}' (ID: {user_id}) does not have an equity account.\n" f"Equity eligibility must be enabled for this user in Castle.\n" f"Please enable equity for user ID: {user_id}" ) # Normal account lookup by name return self.accounts.get(account_name) def list_accounts(self): """Print all available accounts""" print("\nšŸ“‹ Available accounts:") for name in sorted(self.accounts.keys()): print(f" - {name}") # ===== CONVERSION FUNCTIONS ===== def eur_to_sats(eur_amount: Decimal, btc_eur_rate: float) -> int: """Convert EUR to satoshis using BTC/EUR rate""" btc_amount = eur_amount / Decimal(str(btc_eur_rate)) sats = btc_amount * Decimal(100_000_000) return int(sats.quantize(Decimal('1'))) def build_metadata(eur_amount: Decimal, btc_eur_rate: float) -> dict: """Build metadata dict for Castle entry line""" abs_eur = abs(eur_amount) abs_sats = abs(eur_to_sats(abs_eur, btc_eur_rate)) # fiat_rate = sats per EUR fiat_rate = float(abs_sats) / float(abs_eur) if abs_eur > 0 else 0 return { "fiat_currency": "EUR", "fiat_amount": str(abs_eur.quantize(Decimal("0.001"))), "fiat_rate": fiat_rate, "btc_rate": btc_eur_rate } # ===== BEANCOUNT PARSER ===== def parse_beancount_transaction(txn_text: str) -> Optional[Dict]: """ Parse a Beancount transaction. Expected format: 2025-07-06 * "Foix market" Expenses:Groceries 69.40 EUR Equity:Pat """ lines = txn_text.strip().split('\n') if not lines: return None # Skip leading comments to find the transaction header header_line_idx = 0 for i, line in enumerate(lines): stripped = line.strip() # Skip comments and empty lines if not stripped or stripped.startswith(';'): continue # Found the first non-comment line header_line_idx = i break # Parse header line header = lines[header_line_idx].strip() # Handle both * and ! flags if '*' in header: parts = header.split('*') flag = '*' elif '!' in header: parts = header.split('!') flag = '!' else: return None date_str = parts[0].strip() description = parts[1].strip().strip('"') try: date = datetime.strptime(date_str, '%Y-%m-%d') except ValueError: return None # Parse postings (start after the header line) postings = [] for line in lines[header_line_idx + 1:]: line = line.strip() # Skip comments and empty lines if not line or line.startswith(';'): continue # Parse posting line parts = line.split() if not parts: continue account = parts[0] # Check if amount is specified if len(parts) >= 3 and parts[-1] == 'EUR': # Strip commas from amount (e.g., "1,500.00" -> "1500.00") amount_str = parts[-2].replace(',', '') eur_amount = Decimal(amount_str) else: # No amount specified - will be calculated to balance eur_amount = None postings.append({ 'account': account, 'eur_amount': eur_amount }) # Calculate missing amounts (Beancount auto-balance) # TODO: Support auto-balancing for transactions with >2 postings # For now, only handles simple 2-posting transactions if len(postings) == 2: if postings[0]['eur_amount'] and not postings[1]['eur_amount']: postings[1]['eur_amount'] = -postings[0]['eur_amount'] elif postings[1]['eur_amount'] and not postings[0]['eur_amount']: postings[0]['eur_amount'] = -postings[1]['eur_amount'] return { 'date': date, 'description': description, 'postings': postings } # ===== HELPER FUNCTIONS ===== def extract_user_from_user_account(account_name: str) -> Optional[str]: """ Extract user name from user-specific accounts (Payable, Receivable, or Equity). Examples: "Liabilities:Payable:Pat" -> "Pat" "Assets:Receivable:Alice" -> "Alice" "Equity:Pat" -> "Pat" "Expenses:Food" -> None Returns: User name or None if not a user-specific account """ if account_name.startswith("Liabilities:Payable:"): parts = account_name.split(":") if len(parts) >= 3: return parts[2] elif account_name.startswith("Assets:Receivable:"): parts = account_name.split(":") if len(parts) >= 3: return parts[2] elif account_name.startswith("Equity:"): parts = account_name.split(":") if len(parts) >= 2: return parts[1] return None def determine_user_id(postings: list) -> Optional[str]: """ Determine which user ID to use for this transaction based on user-specific accounts. Args: postings: List of posting dicts with 'account' key Returns: User ID (wallet ID) from USER_MAPPINGS, or None if no user account found """ for posting in postings: user_name = extract_user_from_user_account(posting['account']) if user_name: user_id = USER_MAPPINGS.get(user_name) if not user_id: raise ValueError( f"No user ID mapping found for '{user_name}'.\n" f"Please add '{user_name}' to USER_MAPPINGS in the script." ) return user_id # No user-specific account found - this shouldn't happen for typical transactions return None # ===== CASTLE CONVERTER ===== def convert_to_castle_entry(parsed: dict, btc_eur_rate: float, account_lookup: AccountLookup) -> dict: """Convert parsed Beancount transaction to Castle format""" # Determine which user this transaction is for (based on user-specific accounts) user_id = determine_user_id(parsed['postings']) if not user_id: raise ValueError( f"Could not determine user ID for transaction.\n" f"Transactions must have a user-specific account:\n" f" - Liabilities:Payable: (for payables)\n" f" - Assets:Receivable: (for receivables)\n" f" - Equity: (for equity)\n" f"Examples: Liabilities:Payable:Pat, Assets:Receivable:Pat, Equity:Pat" ) # Build entry lines lines = [] for posting in parsed['postings']: account_id = account_lookup.get_account_id(posting['account']) if not account_id: raise ValueError( f"No account found in Castle with name '{posting['account']}'.\n" f"Please create this account in Castle first." ) eur_amount = posting['eur_amount'] if eur_amount is None: raise ValueError(f"Could not determine amount for {posting['account']}") # Convert EUR to sats sats = eur_to_sats(eur_amount, btc_eur_rate) # Build metadata metadata = build_metadata(eur_amount, btc_eur_rate) lines.append({ "account_id": account_id, "amount": sats, # Positive = debit, negative = credit "description": posting['account'], "metadata": metadata }) return { "description": parsed['description'], "entry_date": parsed['date'].isoformat(), "reference": f"import-{parsed['date'].strftime('%Y%m%d')}-{parsed['description'][:20].replace(' ', '-')}", "flag": "*", "meta": { "source": "beancount_import", "imported_at": datetime.now().isoformat(), "btc_eur_rate": btc_eur_rate, "user_id": user_id # Track which user this transaction is for }, "lines": lines } # ===== API UPLOAD ===== def upload_entry(entry: dict, api_key: str, dry_run: bool = False) -> dict: """Upload journal entry to Castle API""" if dry_run: print(f"\n[DRY RUN] Entry preview:") print(f" Description: {entry['description']}") print(f" Date: {entry['entry_date']}") print(f" BTC/EUR Rate: {entry['meta']['btc_eur_rate']:,.2f}") total_sats = 0 for line in entry['lines']: sign = '+' if line['amount'] > 0 else '' print(f" {line['description']}: {sign}{line['amount']:,} sats " f"({line['metadata']['fiat_amount']} EUR)") total_sats += line['amount'] print(f" Balance check: {total_sats} (should be 0)") return {"id": "dry-run"} url = f"{LNBITS_URL}/castle/api/v1/entries" headers = { "X-Api-Key": api_key, "Content-Type": "application/json" } response = requests.post(url, json=entry, headers=headers) response.raise_for_status() return response.json() # ===== MAIN IMPORT FUNCTION ===== def import_beancount_file(beancount_file: str, dry_run: bool = False): """Import transactions from Beancount file using rates from CSV""" # Validate configuration if not ADMIN_API_KEY: print("āŒ Error: CASTLE_ADMIN_KEY not set!") print(" Set it as environment variable or update ADMIN_API_KEY in the script.") return # Load rates try: rate_lookup = RateLookup(RATES_CSV_FILE) except (FileNotFoundError, ValueError) as e: print(f"āŒ Error loading rates: {e}") return # Load accounts from Castle try: account_lookup = AccountLookup(LNBITS_URL, ADMIN_API_KEY) except (ConnectionError, ValueError) as e: print(f"āŒ Error loading accounts: {e}") return # Show user mappings and verify equity accounts exist print(f"\nšŸ‘„ User ID mappings and equity accounts:") for name, user_id in USER_MAPPINGS.items(): has_equity = user_id in account_lookup.accounts_by_user and 'equity' in account_lookup.accounts_by_user[user_id] status = "āœ…" if has_equity else "āŒ" print(f" {status} {name} → {user_id} {'(has equity account)' if has_equity else '(NO EQUITY ACCOUNT - create in Castle!)'}") # Read beancount file if not os.path.exists(beancount_file): print(f"āŒ Error: Beancount file not found: {beancount_file}") return with open(beancount_file, 'r', encoding='utf-8') as f: content = f.read() # Split by blank lines to get individual transactions transactions = [t.strip() for t in content.split('\n\n') if t.strip()] print(f"\nšŸ“„ Found {len(transactions)} potential transactions in {os.path.basename(beancount_file)}") if dry_run: print("šŸ” [DRY RUN MODE] No changes will be made\n") success_count = 0 error_count = 0 skip_count = 0 skipped_items = [] # Track what was skipped for i, txn_text in enumerate(transactions, 1): try: # Try to parse the transaction parsed = parse_beancount_transaction(txn_text) if not parsed: # Not a valid transaction (likely a directive, option, or comment block) skip_count += 1 first_line = txn_text.split('\n')[0][:60] skipped_items.append(f"Entry {i}: {first_line}... (not a transaction)") continue # Look up rate for this transaction's date btc_eur_rate = rate_lookup.get_rate(parsed['date'].date()) if not btc_eur_rate: raise ValueError(f"No BTC/EUR rate found for {parsed['date'].date()}") castle_entry = convert_to_castle_entry(parsed, btc_eur_rate, account_lookup) result = upload_entry(castle_entry, ADMIN_API_KEY, dry_run) # Get user name for display user_name = None for posting in parsed['postings']: user_name = extract_user_from_user_account(posting['account']) if user_name: break user_info = f" (User: {user_name})" if user_name else "" print(f"āœ… Transaction {i}: {parsed['date'].date()} - {parsed['description'][:35]}{user_info} " f"(Rate: {btc_eur_rate:,.0f} EUR/BTC)") success_count += 1 except Exception as e: print(f"āŒ Transaction {i} failed: {e}") print(f" Content: {txn_text[:100]}...") error_count += 1 print(f"\n{'='*70}") print(f"šŸ“Š Summary: {success_count} succeeded, {error_count} failed, {skip_count} skipped") print(f"{'='*70}") # Show details of skipped entries if skipped_items: print(f"\nā­ļø Skipped entries:") for item in skipped_items: print(f" {item}") if success_count > 0 and not dry_run: print(f"\nāœ… Successfully imported {success_count} transactions to Castle!") # ===== MAIN ===== if __name__ == "__main__": import sys print("=" * 70) print("šŸ° Beancount to Castle Import Script") print("=" * 70) if len(sys.argv) < 2: print("\nUsage: python import_beancount.py [--dry-run]") print("\nExample:") print(" python import_beancount.py my_ledger.beancount --dry-run") print(" python import_beancount.py my_ledger.beancount") print("\nConfiguration:") print(f" LNBITS_URL: {LNBITS_URL}") print(f" RATES_CSV: {RATES_CSV_FILE}") print(f" API Key set: {'Yes' if ADMIN_API_KEY else 'No (set CASTLE_ADMIN_KEY env var)'}") sys.exit(1) beancount_file = sys.argv[1] dry_run = "--dry-run" in sys.argv import_beancount_file(beancount_file, dry_run)