Implements a script to import Beancount ledger transactions into the Castle accounting extension. The script fetches BTC/EUR rates, retrieves accounts from the Castle API, maps users, parses Beancount transactions, converts EUR to sats, and uploads the data to Castle. Adds error handling, dry-run mode, and detailed logging for improved usability. Displays equity account status and validates the existence of user equity accounts.
555 lines
19 KiB
Python
Executable file
555 lines
19 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Beancount to Castle Import Script
|
|
|
|
Imports Beancount ledger transactions into Castle accounting extension.
|
|
Reads daily BTC/EUR rates from btc_eur_rates.csv in the same directory.
|
|
|
|
Usage:
|
|
python import_beancount.py <ledger.beancount> [--dry-run]
|
|
|
|
Example:
|
|
python import_beancount.py my_ledger.beancount --dry-run
|
|
python import_beancount.py my_ledger.beancount
|
|
"""
|
|
import requests
|
|
import csv
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from typing import Dict, Optional
|
|
|
|
# ===== CONFIGURATION =====
|
|
|
|
# LNbits URL and API Key
|
|
LNBITS_URL = os.environ.get("LNBITS_URL", "http://localhost:5000")
|
|
ADMIN_API_KEY = os.environ.get("CASTLE_ADMIN_KEY", "48d787d862484a6c89d6a557b4d5be9d")
|
|
|
|
# Rates CSV file (looks in same directory as this script)
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
RATES_CSV_FILE = os.path.join(SCRIPT_DIR, "btc_eur_rates.csv")
|
|
|
|
# User ID mappings: Equity account name -> Castle user ID (wallet ID)
|
|
# TODO: Update these with your actual Castle user/wallet IDs
|
|
USER_MAPPINGS = {
|
|
"Pat": "75be145a42884b22b60bf97510ed46e3",
|
|
"Coco": "375ec158ceca46de86cf6561ca20f881",
|
|
"Charlie": "921340b802104c25901eae6c420b1ba1",
|
|
}
|
|
|
|
# ===== RATE LOOKUP =====
|
|
|
|
class RateLookup:
|
|
"""Load and lookup BTC/EUR rates from CSV file"""
|
|
|
|
def __init__(self, csv_file: str):
|
|
self.rates = {}
|
|
self._load_csv(csv_file)
|
|
|
|
def _load_csv(self, csv_file: str):
|
|
"""Load rates from CSV file"""
|
|
if not os.path.exists(csv_file):
|
|
raise FileNotFoundError(
|
|
f"Rates CSV file not found: {csv_file}\n"
|
|
f"Please create btc_eur_rates.csv in the same directory as this script."
|
|
)
|
|
|
|
with open(csv_file, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
date = datetime.strptime(row['date'], '%Y-%m-%d').date()
|
|
# Handle comma as thousands separator
|
|
rate_str = row['btc_eur_rate'].replace(',', '').replace(' ', '')
|
|
rate = float(rate_str)
|
|
self.rates[date] = rate
|
|
|
|
if not self.rates:
|
|
raise ValueError(f"No rates loaded from {csv_file}")
|
|
|
|
print(f"📊 Loaded {len(self.rates)} daily rates from {os.path.basename(csv_file)}")
|
|
print(f" Date range: {min(self.rates.keys())} to {max(self.rates.keys())}")
|
|
|
|
def get_rate(self, date: datetime.date, fallback_days: int = 7) -> Optional[float]:
|
|
"""
|
|
Get BTC/EUR rate for a specific date.
|
|
If exact date not found, tries nearby dates within fallback_days.
|
|
|
|
Args:
|
|
date: Date to lookup
|
|
fallback_days: How many days to look back/forward if exact date missing
|
|
|
|
Returns:
|
|
BTC/EUR rate or None if not found
|
|
"""
|
|
# Try exact date first
|
|
if date in self.rates:
|
|
return self.rates[date]
|
|
|
|
# Try nearby dates (prefer earlier dates)
|
|
for days_offset in range(1, fallback_days + 1):
|
|
# Try earlier date first
|
|
earlier = date - timedelta(days=days_offset)
|
|
if earlier in self.rates:
|
|
print(f" ⚠️ Using rate from {earlier} for {date} (exact date not found)")
|
|
return self.rates[earlier]
|
|
|
|
# Try later date
|
|
later = date + timedelta(days=days_offset)
|
|
if later in self.rates:
|
|
print(f" ⚠️ Using rate from {later} for {date} (exact date not found)")
|
|
return self.rates[later]
|
|
|
|
return None
|
|
|
|
# ===== ACCOUNT LOOKUP =====
|
|
|
|
class AccountLookup:
|
|
"""Fetch and lookup Castle accounts from API"""
|
|
|
|
def __init__(self, lnbits_url: str, api_key: str):
|
|
self.accounts = {} # name -> account_id
|
|
self.accounts_by_user = {} # user_id -> {account_type -> account_id}
|
|
self.account_details = [] # Full account objects
|
|
self._fetch_accounts(lnbits_url, api_key)
|
|
|
|
def _fetch_accounts(self, lnbits_url: str, api_key: str):
|
|
"""Fetch all accounts from Castle API"""
|
|
url = f"{lnbits_url}/castle/api/v1/accounts"
|
|
headers = {"X-Api-Key": api_key}
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
accounts_list = response.json()
|
|
|
|
# Build mappings
|
|
for account in accounts_list:
|
|
name = account.get('name')
|
|
account_id = account.get('id')
|
|
user_id = account.get('user_id')
|
|
account_type = account.get('account_type')
|
|
|
|
self.account_details.append(account)
|
|
|
|
# Name -> ID mapping
|
|
if name and account_id:
|
|
self.accounts[name] = account_id
|
|
|
|
# User -> Account Type -> ID mapping (for equity accounts)
|
|
if user_id and account_type:
|
|
if user_id not in self.accounts_by_user:
|
|
self.accounts_by_user[user_id] = {}
|
|
self.accounts_by_user[user_id][account_type] = account_id
|
|
|
|
print(f"🏦 Loaded {len(self.accounts)} accounts from Castle")
|
|
|
|
except requests.RequestException as e:
|
|
raise ConnectionError(f"Failed to fetch accounts from Castle API: {e}")
|
|
|
|
def get_account_id(self, account_name: str) -> Optional[str]:
|
|
"""
|
|
Get Castle account ID for a Beancount account name.
|
|
|
|
Special handling for Equity accounts:
|
|
- "Equity:Pat" -> looks up Pat's user_id and finds their equity account
|
|
|
|
Args:
|
|
account_name: Beancount account name (e.g., "Expenses:Food:Supplies" or "Equity:Pat")
|
|
|
|
Returns:
|
|
Castle account UUID or None if not found
|
|
"""
|
|
# Check if this is an Equity:<name> account
|
|
if account_name.startswith("Equity:"):
|
|
user_name = extract_user_from_equity_account(account_name)
|
|
if user_name:
|
|
# Look up user's actual user_id
|
|
user_id = USER_MAPPINGS.get(user_name)
|
|
if user_id:
|
|
# Find this user's equity account
|
|
if user_id in self.accounts_by_user:
|
|
equity_account_id = self.accounts_by_user[user_id].get('equity')
|
|
if equity_account_id:
|
|
return equity_account_id
|
|
|
|
# If not found, provide helpful error
|
|
raise ValueError(
|
|
f"User '{user_name}' (ID: {user_id}) does not have an equity account.\n"
|
|
f"Please enable equity eligibility for this user in Castle first."
|
|
)
|
|
|
|
# Normal account lookup by name
|
|
return self.accounts.get(account_name)
|
|
|
|
def list_accounts(self):
|
|
"""Print all available accounts"""
|
|
print("\n📋 Available accounts:")
|
|
for name in sorted(self.accounts.keys()):
|
|
print(f" - {name}")
|
|
|
|
# ===== CONVERSION FUNCTIONS =====
|
|
|
|
def eur_to_sats(eur_amount: Decimal, btc_eur_rate: float) -> int:
|
|
"""Convert EUR to satoshis using BTC/EUR rate"""
|
|
btc_amount = eur_amount / Decimal(str(btc_eur_rate))
|
|
sats = btc_amount * Decimal(100_000_000)
|
|
return int(sats.quantize(Decimal('1')))
|
|
|
|
def build_metadata(eur_amount: Decimal, btc_eur_rate: float) -> dict:
|
|
"""Build metadata dict for Castle entry line"""
|
|
abs_eur = abs(eur_amount)
|
|
abs_sats = abs(eur_to_sats(abs_eur, btc_eur_rate))
|
|
|
|
# fiat_rate = sats per EUR
|
|
fiat_rate = float(abs_sats) / float(abs_eur) if abs_eur > 0 else 0
|
|
|
|
return {
|
|
"fiat_currency": "EUR",
|
|
"fiat_amount": str(abs_eur.quantize(Decimal("0.001"))),
|
|
"fiat_rate": fiat_rate,
|
|
"btc_rate": btc_eur_rate
|
|
}
|
|
|
|
# ===== BEANCOUNT PARSER =====
|
|
|
|
def parse_beancount_transaction(txn_text: str) -> Optional[Dict]:
|
|
"""
|
|
Parse a Beancount transaction.
|
|
|
|
Expected format:
|
|
2025-07-06 * "Foix market"
|
|
Expenses:Groceries 69.40 EUR
|
|
Equity:Pat
|
|
"""
|
|
lines = txn_text.strip().split('\n')
|
|
if not lines:
|
|
return None
|
|
|
|
# Skip leading comments to find the transaction header
|
|
header_line_idx = 0
|
|
for i, line in enumerate(lines):
|
|
stripped = line.strip()
|
|
# Skip comments and empty lines
|
|
if not stripped or stripped.startswith(';'):
|
|
continue
|
|
# Found the first non-comment line
|
|
header_line_idx = i
|
|
break
|
|
|
|
# Parse header line
|
|
header = lines[header_line_idx].strip()
|
|
|
|
# Handle both * and ! flags
|
|
if '*' in header:
|
|
parts = header.split('*')
|
|
flag = '*'
|
|
elif '!' in header:
|
|
parts = header.split('!')
|
|
flag = '!'
|
|
else:
|
|
return None
|
|
|
|
date_str = parts[0].strip()
|
|
description = parts[1].strip().strip('"')
|
|
|
|
try:
|
|
date = datetime.strptime(date_str, '%Y-%m-%d')
|
|
except ValueError:
|
|
return None
|
|
|
|
# Parse postings (start after the header line)
|
|
postings = []
|
|
for line in lines[header_line_idx + 1:]:
|
|
line = line.strip()
|
|
|
|
# Skip comments and empty lines
|
|
if not line or line.startswith(';'):
|
|
continue
|
|
|
|
# Parse posting line
|
|
parts = line.split()
|
|
if not parts:
|
|
continue
|
|
|
|
account = parts[0]
|
|
|
|
# Check if amount is specified
|
|
if len(parts) >= 3 and parts[-1] == 'EUR':
|
|
# Strip commas from amount (e.g., "1,500.00" -> "1500.00")
|
|
amount_str = parts[-2].replace(',', '')
|
|
eur_amount = Decimal(amount_str)
|
|
else:
|
|
# No amount specified - will be calculated to balance
|
|
eur_amount = None
|
|
|
|
postings.append({
|
|
'account': account,
|
|
'eur_amount': eur_amount
|
|
})
|
|
|
|
# Calculate missing amounts (Beancount auto-balance)
|
|
# TODO: Support auto-balancing for transactions with >2 postings
|
|
# For now, only handles simple 2-posting transactions
|
|
if len(postings) == 2:
|
|
if postings[0]['eur_amount'] and not postings[1]['eur_amount']:
|
|
postings[1]['eur_amount'] = -postings[0]['eur_amount']
|
|
elif postings[1]['eur_amount'] and not postings[0]['eur_amount']:
|
|
postings[0]['eur_amount'] = -postings[1]['eur_amount']
|
|
|
|
return {
|
|
'date': date,
|
|
'description': description,
|
|
'postings': postings
|
|
}
|
|
|
|
# ===== HELPER FUNCTIONS =====
|
|
|
|
def extract_user_from_equity_account(account_name: str) -> Optional[str]:
|
|
"""
|
|
Extract user name from Equity account.
|
|
|
|
Examples:
|
|
"Equity:Pat" -> "Pat"
|
|
"Equity:Alice" -> "Alice"
|
|
"Expenses:Food" -> None
|
|
|
|
Returns:
|
|
User name or None if not an Equity account
|
|
"""
|
|
if account_name.startswith("Equity:"):
|
|
parts = account_name.split(":")
|
|
if len(parts) >= 2:
|
|
return parts[1]
|
|
return None
|
|
|
|
def determine_user_id(postings: list) -> Optional[str]:
|
|
"""
|
|
Determine which user ID to use for this transaction based on Equity accounts.
|
|
|
|
Args:
|
|
postings: List of posting dicts with 'account' key
|
|
|
|
Returns:
|
|
User ID (wallet ID) from USER_MAPPINGS, or None if no Equity account found
|
|
"""
|
|
for posting in postings:
|
|
user_name = extract_user_from_equity_account(posting['account'])
|
|
if user_name:
|
|
user_id = USER_MAPPINGS.get(user_name)
|
|
if not user_id:
|
|
raise ValueError(
|
|
f"No user ID mapping found for '{user_name}'.\n"
|
|
f"Please add '{user_name}' to USER_MAPPINGS in the script."
|
|
)
|
|
return user_id
|
|
|
|
# No Equity account found - this shouldn't happen for typical transactions
|
|
return None
|
|
|
|
# ===== CASTLE CONVERTER =====
|
|
|
|
def convert_to_castle_entry(parsed: dict, btc_eur_rate: float, account_lookup: AccountLookup) -> dict:
|
|
"""Convert parsed Beancount transaction to Castle format"""
|
|
|
|
# Determine which user this transaction is for (based on Equity accounts)
|
|
user_id = determine_user_id(parsed['postings'])
|
|
if not user_id:
|
|
raise ValueError(
|
|
f"Could not determine user ID for transaction.\n"
|
|
f"Transactions must have an Equity:<name> account (e.g., Equity:Pat)."
|
|
)
|
|
|
|
# Build entry lines
|
|
lines = []
|
|
for posting in parsed['postings']:
|
|
account_id = account_lookup.get_account_id(posting['account'])
|
|
if not account_id:
|
|
raise ValueError(
|
|
f"No account found in Castle with name '{posting['account']}'.\n"
|
|
f"Please create this account in Castle first."
|
|
)
|
|
|
|
eur_amount = posting['eur_amount']
|
|
if eur_amount is None:
|
|
raise ValueError(f"Could not determine amount for {posting['account']}")
|
|
|
|
# Convert EUR to sats
|
|
sats = eur_to_sats(eur_amount, btc_eur_rate)
|
|
|
|
# Build metadata
|
|
metadata = build_metadata(eur_amount, btc_eur_rate)
|
|
|
|
lines.append({
|
|
"account_id": account_id,
|
|
"amount": sats, # Positive = debit, negative = credit
|
|
"description": posting['account'],
|
|
"metadata": metadata
|
|
})
|
|
|
|
return {
|
|
"description": parsed['description'],
|
|
"entry_date": parsed['date'].isoformat(),
|
|
"reference": f"import-{parsed['date'].strftime('%Y%m%d')}-{parsed['description'][:20].replace(' ', '-')}",
|
|
"flag": "*",
|
|
"meta": {
|
|
"source": "beancount_import",
|
|
"imported_at": datetime.now().isoformat(),
|
|
"btc_eur_rate": btc_eur_rate,
|
|
"user_id": user_id # Track which user this transaction is for
|
|
},
|
|
"lines": lines
|
|
}
|
|
|
|
# ===== API UPLOAD =====
|
|
|
|
def upload_entry(entry: dict, api_key: str, dry_run: bool = False) -> dict:
|
|
"""Upload journal entry to Castle API"""
|
|
if dry_run:
|
|
print(f"\n[DRY RUN] Entry preview:")
|
|
print(f" Description: {entry['description']}")
|
|
print(f" Date: {entry['entry_date']}")
|
|
print(f" BTC/EUR Rate: {entry['meta']['btc_eur_rate']:,.2f}")
|
|
total_sats = 0
|
|
for line in entry['lines']:
|
|
sign = '+' if line['amount'] > 0 else ''
|
|
print(f" {line['description']}: {sign}{line['amount']:,} sats "
|
|
f"({line['metadata']['fiat_amount']} EUR)")
|
|
total_sats += line['amount']
|
|
print(f" Balance check: {total_sats} (should be 0)")
|
|
return {"id": "dry-run"}
|
|
|
|
url = f"{LNBITS_URL}/castle/api/v1/entries"
|
|
headers = {
|
|
"X-Api-Key": api_key,
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
response = requests.post(url, json=entry, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
# ===== MAIN IMPORT FUNCTION =====
|
|
|
|
def import_beancount_file(beancount_file: str, dry_run: bool = False):
|
|
"""Import transactions from Beancount file using rates from CSV"""
|
|
|
|
# Validate configuration
|
|
if not ADMIN_API_KEY:
|
|
print("❌ Error: CASTLE_ADMIN_KEY not set!")
|
|
print(" Set it as environment variable or update ADMIN_API_KEY in the script.")
|
|
return
|
|
|
|
# Load rates
|
|
try:
|
|
rate_lookup = RateLookup(RATES_CSV_FILE)
|
|
except (FileNotFoundError, ValueError) as e:
|
|
print(f"❌ Error loading rates: {e}")
|
|
return
|
|
|
|
# Load accounts from Castle
|
|
try:
|
|
account_lookup = AccountLookup(LNBITS_URL, ADMIN_API_KEY)
|
|
except (ConnectionError, ValueError) as e:
|
|
print(f"❌ Error loading accounts: {e}")
|
|
return
|
|
|
|
# Show user mappings and verify equity accounts exist
|
|
print(f"\n👥 User ID mappings and equity accounts:")
|
|
for name, user_id in USER_MAPPINGS.items():
|
|
has_equity = user_id in account_lookup.accounts_by_user and 'equity' in account_lookup.accounts_by_user[user_id]
|
|
status = "✅" if has_equity else "❌"
|
|
print(f" {status} {name} → {user_id} {'(has equity account)' if has_equity else '(NO EQUITY ACCOUNT - create in Castle!)'}")
|
|
|
|
# Read beancount file
|
|
if not os.path.exists(beancount_file):
|
|
print(f"❌ Error: Beancount file not found: {beancount_file}")
|
|
return
|
|
|
|
with open(beancount_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Split by blank lines to get individual transactions
|
|
transactions = [t.strip() for t in content.split('\n\n') if t.strip()]
|
|
|
|
print(f"\n📄 Found {len(transactions)} potential transactions in {os.path.basename(beancount_file)}")
|
|
if dry_run:
|
|
print("🔍 [DRY RUN MODE] No changes will be made\n")
|
|
|
|
success_count = 0
|
|
error_count = 0
|
|
skip_count = 0
|
|
skipped_items = [] # Track what was skipped
|
|
|
|
for i, txn_text in enumerate(transactions, 1):
|
|
try:
|
|
# Try to parse the transaction
|
|
parsed = parse_beancount_transaction(txn_text)
|
|
if not parsed:
|
|
# Not a valid transaction (likely a directive, option, or comment block)
|
|
skip_count += 1
|
|
first_line = txn_text.split('\n')[0][:60]
|
|
skipped_items.append(f"Entry {i}: {first_line}... (not a transaction)")
|
|
continue
|
|
|
|
# Look up rate for this transaction's date
|
|
btc_eur_rate = rate_lookup.get_rate(parsed['date'].date())
|
|
if not btc_eur_rate:
|
|
raise ValueError(f"No BTC/EUR rate found for {parsed['date'].date()}")
|
|
|
|
castle_entry = convert_to_castle_entry(parsed, btc_eur_rate, account_lookup)
|
|
result = upload_entry(castle_entry, ADMIN_API_KEY, dry_run)
|
|
|
|
# Get user name for display
|
|
user_name = None
|
|
for posting in parsed['postings']:
|
|
user_name = extract_user_from_equity_account(posting['account'])
|
|
if user_name:
|
|
break
|
|
|
|
user_info = f" (User: {user_name})" if user_name else ""
|
|
print(f"✅ Transaction {i}: {parsed['date'].date()} - {parsed['description'][:35]}{user_info} "
|
|
f"(Rate: {btc_eur_rate:,.0f} EUR/BTC)")
|
|
success_count += 1
|
|
|
|
except Exception as e:
|
|
print(f"❌ Transaction {i} failed: {e}")
|
|
print(f" Content: {txn_text[:100]}...")
|
|
error_count += 1
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f"📊 Summary: {success_count} succeeded, {error_count} failed, {skip_count} skipped")
|
|
print(f"{'='*70}")
|
|
|
|
# Show details of skipped entries
|
|
if skipped_items:
|
|
print(f"\n⏭️ Skipped entries:")
|
|
for item in skipped_items:
|
|
print(f" {item}")
|
|
|
|
if success_count > 0 and not dry_run:
|
|
print(f"\n✅ Successfully imported {success_count} transactions to Castle!")
|
|
|
|
# ===== MAIN =====
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
print("=" * 70)
|
|
print("🏰 Beancount to Castle Import Script")
|
|
print("=" * 70)
|
|
|
|
if len(sys.argv) < 2:
|
|
print("\nUsage: python import_beancount.py <ledger.beancount> [--dry-run]")
|
|
print("\nExample:")
|
|
print(" python import_beancount.py my_ledger.beancount --dry-run")
|
|
print(" python import_beancount.py my_ledger.beancount")
|
|
print("\nConfiguration:")
|
|
print(f" LNBITS_URL: {LNBITS_URL}")
|
|
print(f" RATES_CSV: {RATES_CSV_FILE}")
|
|
print(f" API Key set: {'Yes' if ADMIN_API_KEY else 'No (set CASTLE_ADMIN_KEY env var)'}")
|
|
sys.exit(1)
|
|
|
|
beancount_file = sys.argv[1]
|
|
dry_run = "--dry-run" in sys.argv
|
|
|
|
import_beancount_file(beancount_file, dry_run)
|