673 lines
25 KiB
Python
Executable file
673 lines
25 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Beancount to Castle Import Script
|
|
|
|
⚠️ NOTE: This script is for ONE-OFF MIGRATION purposes only.
|
|
|
|
Now that Castle uses Fava/Beancount as the single source of truth,
|
|
the data flow is: Castle → Fava/Beancount (not the reverse).
|
|
|
|
This script was used for initial data import from existing Beancount files.
|
|
|
|
Future disposition:
|
|
- DELETE if no longer needed for migrations
|
|
- REPURPOSE for bidirectional sync if that becomes a requirement
|
|
- ARCHIVE to misc-docs/old-helpers/ if keeping for reference
|
|
|
|
Imports Beancount ledger transactions into Castle accounting extension.
|
|
Reads daily BTC/EUR rates from btc_eur_rates.csv in the same directory.
|
|
|
|
Usage:
|
|
python import_beancount.py <ledger.beancount> [--dry-run]
|
|
|
|
Example:
|
|
python import_beancount.py my_ledger.beancount --dry-run
|
|
python import_beancount.py my_ledger.beancount
|
|
"""
|
|
import requests
|
|
import csv
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from typing import Dict, Optional
|
|
|
|
# ===== CONFIGURATION =====
|
|
|
|
# LNbits URL and API Key
|
|
LNBITS_URL = os.environ.get("LNBITS_URL", "http://localhost:5000")
|
|
ADMIN_API_KEY = os.environ.get("CASTLE_ADMIN_KEY", "48d787d862484a6c89d6a557b4d5be9d")
|
|
|
|
# Rates CSV file (looks in same directory as this script)
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
RATES_CSV_FILE = os.path.join(SCRIPT_DIR, "btc_eur_rates.csv")
|
|
|
|
# User ID mappings: Equity account name -> Castle user ID (wallet ID)
|
|
# TODO: Update these with your actual Castle user/wallet IDs
|
|
USER_MAPPINGS = {
|
|
"Pat": "75be145a42884b22b60bf97510ed46e3",
|
|
"Coco": "375ec158ceca46de86cf6561ca20f881",
|
|
"Charlie": "921340b802104c25901eae6c420b1ba1",
|
|
}
|
|
|
|
# ===== RATE LOOKUP =====
|
|
|
|
class RateLookup:
|
|
"""Load and lookup BTC/EUR rates from CSV file"""
|
|
|
|
def __init__(self, csv_file: str):
|
|
self.rates = {}
|
|
self._load_csv(csv_file)
|
|
|
|
def _load_csv(self, csv_file: str):
|
|
"""Load rates from CSV file"""
|
|
if not os.path.exists(csv_file):
|
|
raise FileNotFoundError(
|
|
f"Rates CSV file not found: {csv_file}\n"
|
|
f"Please create btc_eur_rates.csv in the same directory as this script."
|
|
)
|
|
|
|
with open(csv_file, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
date = datetime.strptime(row['date'], '%Y-%m-%d').date()
|
|
# Handle comma as thousands separator
|
|
rate_str = row['btc_eur_rate'].replace(',', '').replace(' ', '')
|
|
rate = float(rate_str)
|
|
self.rates[date] = rate
|
|
|
|
if not self.rates:
|
|
raise ValueError(f"No rates loaded from {csv_file}")
|
|
|
|
print(f"📊 Loaded {len(self.rates)} daily rates from {os.path.basename(csv_file)}")
|
|
print(f" Date range: {min(self.rates.keys())} to {max(self.rates.keys())}")
|
|
|
|
def get_rate(self, date: datetime.date, fallback_days: int = 7) -> Optional[float]:
|
|
"""
|
|
Get BTC/EUR rate for a specific date.
|
|
If exact date not found, tries nearby dates within fallback_days.
|
|
|
|
Args:
|
|
date: Date to lookup
|
|
fallback_days: How many days to look back/forward if exact date missing
|
|
|
|
Returns:
|
|
BTC/EUR rate or None if not found
|
|
"""
|
|
# Try exact date first
|
|
if date in self.rates:
|
|
return self.rates[date]
|
|
|
|
# Try nearby dates (prefer earlier dates)
|
|
for days_offset in range(1, fallback_days + 1):
|
|
# Try earlier date first
|
|
earlier = date - timedelta(days=days_offset)
|
|
if earlier in self.rates:
|
|
print(f" ⚠️ Using rate from {earlier} for {date} (exact date not found)")
|
|
return self.rates[earlier]
|
|
|
|
# Try later date
|
|
later = date + timedelta(days=days_offset)
|
|
if later in self.rates:
|
|
print(f" ⚠️ Using rate from {later} for {date} (exact date not found)")
|
|
return self.rates[later]
|
|
|
|
return None
|
|
|
|
# ===== ACCOUNT LOOKUP =====
|
|
|
|
class AccountLookup:
|
|
"""Fetch and lookup Castle accounts from API"""
|
|
|
|
def __init__(self, lnbits_url: str, api_key: str):
|
|
self.accounts = {} # name -> account_id
|
|
self.accounts_by_user = {} # user_id -> {account_type -> account_id}
|
|
self.account_details = [] # Full account objects
|
|
self._fetch_accounts(lnbits_url, api_key)
|
|
|
|
def _fetch_accounts(self, lnbits_url: str, api_key: str):
|
|
"""Fetch all accounts from Castle API"""
|
|
url = f"{lnbits_url}/castle/api/v1/accounts"
|
|
headers = {"X-Api-Key": api_key}
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
accounts_list = response.json()
|
|
|
|
# Build mappings
|
|
for account in accounts_list:
|
|
name = account.get('name')
|
|
account_id = account.get('id')
|
|
user_id = account.get('user_id')
|
|
account_type = account.get('account_type')
|
|
|
|
self.account_details.append(account)
|
|
|
|
# Name -> ID mapping
|
|
if name and account_id:
|
|
self.accounts[name] = account_id
|
|
|
|
# User -> Account Type -> ID mapping (for equity accounts)
|
|
if user_id and account_type:
|
|
if user_id not in self.accounts_by_user:
|
|
self.accounts_by_user[user_id] = {}
|
|
self.accounts_by_user[user_id][account_type] = account_id
|
|
|
|
print(f"🏦 Loaded {len(self.accounts)} accounts from Castle")
|
|
|
|
except requests.RequestException as e:
|
|
raise ConnectionError(f"Failed to fetch accounts from Castle API: {e}")
|
|
|
|
def get_account_id(self, account_name: str) -> Optional[str]:
|
|
"""
|
|
Get Castle account ID for a Beancount account name.
|
|
|
|
Special handling for user-specific accounts:
|
|
- "Liabilities:Payable:Pat" -> looks up Pat's user_id and finds their Castle payable account
|
|
- "Assets:Receivable:Pat" -> looks up Pat's user_id and finds their Castle receivable account
|
|
- "Equity:Pat" -> looks up Pat's user_id and finds their Castle equity account
|
|
|
|
Args:
|
|
account_name: Beancount account name (e.g., "Expenses:Food:Supplies", "Liabilities:Payable:Pat", "Assets:Receivable:Pat", "Equity:Pat")
|
|
|
|
Returns:
|
|
Castle account UUID or None if not found
|
|
"""
|
|
# Check if this is a Liabilities:Payable:<name> account
|
|
# Map Beancount Liabilities:Payable:Pat to Castle Liabilities:Payable:User-<id>
|
|
if account_name.startswith("Liabilities:Payable:"):
|
|
user_name = extract_user_from_user_account(account_name)
|
|
if user_name:
|
|
# Look up user's actual user_id
|
|
user_id = USER_MAPPINGS.get(user_name)
|
|
if user_id:
|
|
# Find this user's liability (payable) account
|
|
# This is the Liabilities:Payable:User-<id> account in Castle
|
|
if user_id in self.accounts_by_user:
|
|
liability_account_id = self.accounts_by_user[user_id].get('liability')
|
|
if liability_account_id:
|
|
return liability_account_id
|
|
|
|
# If not found, provide helpful error
|
|
raise ValueError(
|
|
f"User '{user_name}' (ID: {user_id}) does not have a payable account.\n"
|
|
f"This should have been created when they configured their wallet.\n"
|
|
f"Please configure the wallet for user ID: {user_id}"
|
|
)
|
|
|
|
# Check if this is an Assets:Receivable:<name> account
|
|
# Map Beancount Assets:Receivable:Pat to Castle Assets:Receivable:User-<id>
|
|
elif account_name.startswith("Assets:Receivable:"):
|
|
user_name = extract_user_from_user_account(account_name)
|
|
if user_name:
|
|
# Look up user's actual user_id
|
|
user_id = USER_MAPPINGS.get(user_name)
|
|
if user_id:
|
|
# Find this user's asset (receivable) account
|
|
# This is the Assets:Receivable:User-<id> account in Castle
|
|
if user_id in self.accounts_by_user:
|
|
asset_account_id = self.accounts_by_user[user_id].get('asset')
|
|
if asset_account_id:
|
|
return asset_account_id
|
|
|
|
# If not found, provide helpful error
|
|
raise ValueError(
|
|
f"User '{user_name}' (ID: {user_id}) does not have a receivable account.\n"
|
|
f"This should have been created when they configured their wallet.\n"
|
|
f"Please configure the wallet for user ID: {user_id}"
|
|
)
|
|
|
|
# Check if this is an Equity:<name> account
|
|
# Map Beancount Equity:Pat to Castle Equity:User-<id>
|
|
elif account_name.startswith("Equity:"):
|
|
user_name = extract_user_from_user_account(account_name)
|
|
if user_name:
|
|
# Look up user's actual user_id
|
|
user_id = USER_MAPPINGS.get(user_name)
|
|
if user_id:
|
|
# Find this user's equity account
|
|
# This is the Equity:User-<id> account in Castle
|
|
if user_id in self.accounts_by_user:
|
|
equity_account_id = self.accounts_by_user[user_id].get('equity')
|
|
if equity_account_id:
|
|
return equity_account_id
|
|
|
|
# If not found, provide helpful error
|
|
raise ValueError(
|
|
f"User '{user_name}' (ID: {user_id}) does not have an equity account.\n"
|
|
f"Equity eligibility must be enabled for this user in Castle.\n"
|
|
f"Please enable equity for user ID: {user_id}"
|
|
)
|
|
|
|
# Normal account lookup by name
|
|
return self.accounts.get(account_name)
|
|
|
|
def list_accounts(self):
|
|
"""Print all available accounts"""
|
|
print("\n📋 Available accounts:")
|
|
for name in sorted(self.accounts.keys()):
|
|
print(f" - {name}")
|
|
|
|
# ===== CONVERSION FUNCTIONS =====
|
|
|
|
def sanitize_link(text: str) -> str:
|
|
"""
|
|
Sanitize a string to make it valid for Beancount links.
|
|
|
|
Beancount links can only contain: A-Z, a-z, 0-9, -, _, /, .
|
|
All other characters are replaced with hyphens.
|
|
|
|
Examples:
|
|
>>> sanitize_link("Test (pending)")
|
|
'Test-pending'
|
|
>>> sanitize_link("Invoice #123")
|
|
'Invoice-123'
|
|
>>> sanitize_link("import-20250623-Action Ressourcerie")
|
|
'import-20250623-Action-Ressourcerie'
|
|
"""
|
|
import re
|
|
# Replace any character that's not alphanumeric, dash, underscore, slash, or period with a hyphen
|
|
sanitized = re.sub(r'[^A-Za-z0-9\-_/.]', '-', text)
|
|
# Remove consecutive hyphens
|
|
sanitized = re.sub(r'-+', '-', sanitized)
|
|
# Remove leading/trailing hyphens
|
|
sanitized = sanitized.strip('-')
|
|
return sanitized
|
|
|
|
def eur_to_sats(eur_amount: Decimal, btc_eur_rate: float) -> int:
|
|
"""Convert EUR to satoshis using BTC/EUR rate"""
|
|
btc_amount = eur_amount / Decimal(str(btc_eur_rate))
|
|
sats = btc_amount * Decimal(100_000_000)
|
|
return int(sats.quantize(Decimal('1')))
|
|
|
|
def build_metadata(eur_amount: Decimal, btc_eur_rate: float) -> dict:
|
|
"""
|
|
Build metadata dict for Castle entry line.
|
|
|
|
The API will extract fiat_currency and fiat_amount and use them
|
|
to create proper EUR-based postings with SATS in metadata.
|
|
"""
|
|
abs_eur = abs(eur_amount)
|
|
abs_sats = abs(eur_to_sats(abs_eur, btc_eur_rate))
|
|
|
|
return {
|
|
"fiat_currency": "EUR",
|
|
"fiat_amount": str(abs_eur.quantize(Decimal("0.01"))), # Store as string for JSON
|
|
"btc_rate": str(btc_eur_rate) # Store exchange rate for reference
|
|
}
|
|
|
|
# ===== BEANCOUNT PARSER =====
|
|
|
|
def parse_beancount_transaction(txn_text: str) -> Optional[Dict]:
|
|
"""
|
|
Parse a Beancount transaction.
|
|
|
|
Expected format:
|
|
2025-07-06 * "Foix market"
|
|
Expenses:Groceries 69.40 EUR
|
|
Equity:Pat
|
|
"""
|
|
lines = txn_text.strip().split('\n')
|
|
if not lines:
|
|
return None
|
|
|
|
# Skip leading comments to find the transaction header
|
|
header_line_idx = 0
|
|
for i, line in enumerate(lines):
|
|
stripped = line.strip()
|
|
# Skip comments and empty lines
|
|
if not stripped or stripped.startswith(';'):
|
|
continue
|
|
# Found the first non-comment line
|
|
header_line_idx = i
|
|
break
|
|
|
|
# Parse header line
|
|
header = lines[header_line_idx].strip()
|
|
|
|
# Handle both * and ! flags
|
|
if '*' in header:
|
|
parts = header.split('*')
|
|
flag = '*'
|
|
elif '!' in header:
|
|
parts = header.split('!')
|
|
flag = '!'
|
|
else:
|
|
return None
|
|
|
|
date_str = parts[0].strip()
|
|
description = parts[1].strip().strip('"')
|
|
|
|
try:
|
|
date = datetime.strptime(date_str, '%Y-%m-%d')
|
|
except ValueError:
|
|
return None
|
|
|
|
# Parse postings (start after the header line)
|
|
postings = []
|
|
for line in lines[header_line_idx + 1:]:
|
|
line = line.strip()
|
|
|
|
# Skip comments and empty lines
|
|
if not line or line.startswith(';'):
|
|
continue
|
|
|
|
# Parse posting line
|
|
parts = line.split()
|
|
if not parts:
|
|
continue
|
|
|
|
account = parts[0]
|
|
|
|
# Check if amount is specified
|
|
if len(parts) >= 3 and parts[-1] == 'EUR':
|
|
# Strip commas from amount (e.g., "1,500.00" -> "1500.00")
|
|
amount_str = parts[-2].replace(',', '')
|
|
eur_amount = Decimal(amount_str)
|
|
else:
|
|
# No amount specified - will be calculated to balance
|
|
eur_amount = None
|
|
|
|
postings.append({
|
|
'account': account,
|
|
'eur_amount': eur_amount
|
|
})
|
|
|
|
# Calculate missing amounts (Beancount auto-balance)
|
|
# TODO: Support auto-balancing for transactions with >2 postings
|
|
# For now, only handles simple 2-posting transactions
|
|
if len(postings) == 2:
|
|
if postings[0]['eur_amount'] and not postings[1]['eur_amount']:
|
|
postings[1]['eur_amount'] = -postings[0]['eur_amount']
|
|
elif postings[1]['eur_amount'] and not postings[0]['eur_amount']:
|
|
postings[0]['eur_amount'] = -postings[1]['eur_amount']
|
|
|
|
return {
|
|
'date': date,
|
|
'description': description,
|
|
'postings': postings
|
|
}
|
|
|
|
# ===== HELPER FUNCTIONS =====
|
|
|
|
def extract_user_from_user_account(account_name: str) -> Optional[str]:
|
|
"""
|
|
Extract user name from user-specific accounts (Payable, Receivable, or Equity).
|
|
|
|
Examples:
|
|
"Liabilities:Payable:Pat" -> "Pat"
|
|
"Assets:Receivable:Alice" -> "Alice"
|
|
"Equity:Pat" -> "Pat"
|
|
"Expenses:Food" -> None
|
|
|
|
Returns:
|
|
User name or None if not a user-specific account
|
|
"""
|
|
if account_name.startswith("Liabilities:Payable:"):
|
|
parts = account_name.split(":")
|
|
if len(parts) >= 3:
|
|
return parts[2]
|
|
elif account_name.startswith("Assets:Receivable:"):
|
|
parts = account_name.split(":")
|
|
if len(parts) >= 3:
|
|
return parts[2]
|
|
elif account_name.startswith("Equity:"):
|
|
parts = account_name.split(":")
|
|
if len(parts) >= 2:
|
|
return parts[1]
|
|
return None
|
|
|
|
def determine_user_id(postings: list) -> Optional[str]:
|
|
"""
|
|
Determine which user ID to use for this transaction based on user-specific accounts.
|
|
|
|
Args:
|
|
postings: List of posting dicts with 'account' key
|
|
|
|
Returns:
|
|
User ID (wallet ID) from USER_MAPPINGS, or None if no user account found
|
|
"""
|
|
for posting in postings:
|
|
user_name = extract_user_from_user_account(posting['account'])
|
|
if user_name:
|
|
user_id = USER_MAPPINGS.get(user_name)
|
|
if not user_id:
|
|
raise ValueError(
|
|
f"No user ID mapping found for '{user_name}'.\n"
|
|
f"Please add '{user_name}' to USER_MAPPINGS in the script."
|
|
)
|
|
return user_id
|
|
|
|
# No user-specific account found - this shouldn't happen for typical transactions
|
|
return None
|
|
|
|
# ===== CASTLE CONVERTER =====
|
|
|
|
def convert_to_castle_entry(parsed: dict, btc_eur_rate: float, account_lookup: AccountLookup) -> dict:
|
|
"""
|
|
Convert parsed Beancount transaction to Castle format.
|
|
|
|
Sends SATS amounts with fiat metadata. The Castle API will automatically
|
|
convert to EUR-based postings with SATS stored in metadata.
|
|
"""
|
|
|
|
# Determine which user this transaction is for (based on user-specific accounts)
|
|
user_id = determine_user_id(parsed['postings'])
|
|
if not user_id:
|
|
raise ValueError(
|
|
f"Could not determine user ID for transaction.\n"
|
|
f"Transactions must have a user-specific account:\n"
|
|
f" - Liabilities:Payable:<name> (for payables)\n"
|
|
f" - Assets:Receivable:<name> (for receivables)\n"
|
|
f" - Equity:<name> (for equity)\n"
|
|
f"Examples: Liabilities:Payable:Pat, Assets:Receivable:Pat, Equity:Pat"
|
|
)
|
|
|
|
# Build entry lines
|
|
lines = []
|
|
for posting in parsed['postings']:
|
|
account_id = account_lookup.get_account_id(posting['account'])
|
|
if not account_id:
|
|
raise ValueError(
|
|
f"No account found in Castle with name '{posting['account']}'.\n"
|
|
f"Please create this account in Castle first."
|
|
)
|
|
|
|
eur_amount = posting['eur_amount']
|
|
if eur_amount is None:
|
|
raise ValueError(f"Could not determine amount for {posting['account']}")
|
|
|
|
# Convert EUR to sats (amount sent to API)
|
|
sats = eur_to_sats(eur_amount, btc_eur_rate)
|
|
|
|
# Build metadata (API will extract fiat_currency and fiat_amount)
|
|
metadata = build_metadata(eur_amount, btc_eur_rate)
|
|
|
|
lines.append({
|
|
"account_id": account_id,
|
|
"amount": sats, # Positive = debit, negative = credit
|
|
"description": posting['account'],
|
|
"metadata": metadata
|
|
})
|
|
|
|
# Create sanitized reference link
|
|
desc_part = sanitize_link(parsed['description'][:30])
|
|
|
|
return {
|
|
"description": parsed['description'],
|
|
"entry_date": parsed['date'].isoformat(),
|
|
"reference": f"import-{parsed['date'].strftime('%Y%m%d')}-{desc_part}",
|
|
"flag": "*",
|
|
"meta": {
|
|
"source": "beancount_import",
|
|
"imported_at": datetime.now().isoformat(),
|
|
"btc_eur_rate": str(btc_eur_rate),
|
|
"user_id": user_id # Track which user this transaction is for
|
|
},
|
|
"lines": lines
|
|
}
|
|
|
|
# ===== API UPLOAD =====
|
|
|
|
def upload_entry(entry: dict, api_key: str, dry_run: bool = False) -> dict:
|
|
"""Upload journal entry to Castle API"""
|
|
if dry_run:
|
|
print(f"\n[DRY RUN] Entry preview:")
|
|
print(f" Description: {entry['description']}")
|
|
print(f" Date: {entry['entry_date']}")
|
|
print(f" BTC/EUR Rate: {entry['meta']['btc_eur_rate']:,.2f}")
|
|
total_sats = 0
|
|
for line in entry['lines']:
|
|
sign = '+' if line['amount'] > 0 else ''
|
|
print(f" {line['description']}: {sign}{line['amount']:,} sats "
|
|
f"({line['metadata']['fiat_amount']} EUR)")
|
|
total_sats += line['amount']
|
|
print(f" Balance check: {total_sats} (should be 0)")
|
|
return {"id": "dry-run"}
|
|
|
|
url = f"{LNBITS_URL}/castle/api/v1/entries"
|
|
headers = {
|
|
"X-Api-Key": api_key,
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url, json=entry, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
print(f" ❌ HTTP Error: {e}")
|
|
if response.text:
|
|
print(f" Response: {response.text}")
|
|
raise
|
|
except Exception as e:
|
|
print(f" ❌ Error: {e}")
|
|
raise
|
|
|
|
# ===== MAIN IMPORT FUNCTION =====
|
|
|
|
def import_beancount_file(beancount_file: str, dry_run: bool = False):
|
|
"""Import transactions from Beancount file using rates from CSV"""
|
|
|
|
# Validate configuration
|
|
if not ADMIN_API_KEY:
|
|
print("❌ Error: CASTLE_ADMIN_KEY not set!")
|
|
print(" Set it as environment variable or update ADMIN_API_KEY in the script.")
|
|
return
|
|
|
|
# Load rates
|
|
try:
|
|
rate_lookup = RateLookup(RATES_CSV_FILE)
|
|
except (FileNotFoundError, ValueError) as e:
|
|
print(f"❌ Error loading rates: {e}")
|
|
return
|
|
|
|
# Load accounts from Castle
|
|
try:
|
|
account_lookup = AccountLookup(LNBITS_URL, ADMIN_API_KEY)
|
|
except (ConnectionError, ValueError) as e:
|
|
print(f"❌ Error loading accounts: {e}")
|
|
return
|
|
|
|
# Show user mappings and verify equity accounts exist
|
|
print(f"\n👥 User ID mappings and equity accounts:")
|
|
for name, user_id in USER_MAPPINGS.items():
|
|
has_equity = user_id in account_lookup.accounts_by_user and 'equity' in account_lookup.accounts_by_user[user_id]
|
|
status = "✅" if has_equity else "❌"
|
|
print(f" {status} {name} → {user_id} {'(has equity account)' if has_equity else '(NO EQUITY ACCOUNT - create in Castle!)'}")
|
|
|
|
# Read beancount file
|
|
if not os.path.exists(beancount_file):
|
|
print(f"❌ Error: Beancount file not found: {beancount_file}")
|
|
return
|
|
|
|
with open(beancount_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Split by blank lines to get individual transactions
|
|
transactions = [t.strip() for t in content.split('\n\n') if t.strip()]
|
|
|
|
print(f"\n📄 Found {len(transactions)} potential transactions in {os.path.basename(beancount_file)}")
|
|
if dry_run:
|
|
print("🔍 [DRY RUN MODE] No changes will be made\n")
|
|
|
|
success_count = 0
|
|
error_count = 0
|
|
skip_count = 0
|
|
skipped_items = [] # Track what was skipped
|
|
|
|
for i, txn_text in enumerate(transactions, 1):
|
|
try:
|
|
# Try to parse the transaction
|
|
parsed = parse_beancount_transaction(txn_text)
|
|
if not parsed:
|
|
# Not a valid transaction (likely a directive, option, or comment block)
|
|
skip_count += 1
|
|
first_line = txn_text.split('\n')[0][:60]
|
|
skipped_items.append(f"Entry {i}: {first_line}... (not a transaction)")
|
|
continue
|
|
|
|
# Look up rate for this transaction's date
|
|
btc_eur_rate = rate_lookup.get_rate(parsed['date'].date())
|
|
if not btc_eur_rate:
|
|
raise ValueError(f"No BTC/EUR rate found for {parsed['date'].date()}")
|
|
|
|
castle_entry = convert_to_castle_entry(parsed, btc_eur_rate, account_lookup)
|
|
result = upload_entry(castle_entry, ADMIN_API_KEY, dry_run)
|
|
|
|
# Get user name for display
|
|
user_name = None
|
|
for posting in parsed['postings']:
|
|
user_name = extract_user_from_user_account(posting['account'])
|
|
if user_name:
|
|
break
|
|
|
|
user_info = f" (User: {user_name})" if user_name else ""
|
|
print(f"✅ Transaction {i}: {parsed['date'].date()} - {parsed['description'][:35]}{user_info} "
|
|
f"(Rate: {btc_eur_rate:,.0f} EUR/BTC)")
|
|
success_count += 1
|
|
|
|
except Exception as e:
|
|
print(f"❌ Transaction {i} failed: {e}")
|
|
print(f" Content: {txn_text[:100]}...")
|
|
error_count += 1
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f"📊 Summary: {success_count} succeeded, {error_count} failed, {skip_count} skipped")
|
|
print(f"{'='*70}")
|
|
|
|
# Show details of skipped entries
|
|
if skipped_items:
|
|
print(f"\n⏭️ Skipped entries:")
|
|
for item in skipped_items:
|
|
print(f" {item}")
|
|
|
|
if success_count > 0 and not dry_run:
|
|
print(f"\n✅ Successfully imported {success_count} transactions to Castle!")
|
|
print(f"\n💡 Note: Transactions are stored in EUR with SATS in metadata.")
|
|
print(f" Check Fava to see the imported entries.")
|
|
|
|
# ===== MAIN =====
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
print("=" * 70)
|
|
print("🏰 Beancount to Castle Import Script")
|
|
print("=" * 70)
|
|
|
|
if len(sys.argv) < 2:
|
|
print("\nUsage: python import_beancount.py <ledger.beancount> [--dry-run]")
|
|
print("\nExample:")
|
|
print(" python import_beancount.py my_ledger.beancount --dry-run")
|
|
print(" python import_beancount.py my_ledger.beancount")
|
|
print("\nConfiguration:")
|
|
print(f" LNBITS_URL: {LNBITS_URL}")
|
|
print(f" RATES_CSV: {RATES_CSV_FILE}")
|
|
print(f" API Key set: {'Yes' if ADMIN_API_KEY else 'No (set CASTLE_ADMIN_KEY env var)'}")
|
|
sys.exit(1)
|
|
|
|
beancount_file = sys.argv[1]
|
|
dry_run = "--dry-run" in sys.argv
|
|
|
|
import_beancount_file(beancount_file, dry_run)
|