Improvements to equity account handling across the Castle extension: Transaction Categorization (views_api.py): - Prioritize equity accounts when enriching transaction entries - Use two-pass lookup: first search for equity accounts, then fall back to liability/asset accounts - Ensures transactions with Equity:User-<id> accounts are correctly categorized as equity UI Enhancements (index.html, index.js): - Add 'Equity' filter option to Recent Transactions table - Display blue "Equity" badge for equity entries (before receivable/payable badges) - Add isEquity() helper function to identify equity account entries Beancount Import (import_beancount.py): - Support importing Beancount Equity:<name> accounts - Map Beancount "Equity:Pat" to Castle "Equity:User-<id>" accounts - Update extract_user_from_user_account() to handle Equity: prefix - Improve error messages to include equity account examples - Add equity account lookup in get_account_id() with helpful error if equity not enabled These changes ensure equity accounts (representing user capital contributions) are properly distinguished from payables and receivables throughout the system.
617 lines
23 KiB
Python
Executable file
617 lines
23 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Beancount to Castle Import Script
|
|
|
|
Imports Beancount ledger transactions into Castle accounting extension.
|
|
Reads daily BTC/EUR rates from btc_eur_rates.csv in the same directory.
|
|
|
|
Usage:
|
|
python import_beancount.py <ledger.beancount> [--dry-run]
|
|
|
|
Example:
|
|
python import_beancount.py my_ledger.beancount --dry-run
|
|
python import_beancount.py my_ledger.beancount
|
|
"""
|
|
import requests
|
|
import csv
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from typing import Dict, Optional
|
|
|
|
# ===== CONFIGURATION =====
|
|
|
|
# LNbits URL and API Key
|
|
LNBITS_URL = os.environ.get("LNBITS_URL", "http://localhost:5000")
|
|
ADMIN_API_KEY = os.environ.get("CASTLE_ADMIN_KEY", "48d787d862484a6c89d6a557b4d5be9d")
|
|
|
|
# Rates CSV file (looks in same directory as this script)
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
RATES_CSV_FILE = os.path.join(SCRIPT_DIR, "btc_eur_rates.csv")
|
|
|
|
# User ID mappings: Equity account name -> Castle user ID (wallet ID)
|
|
# TODO: Update these with your actual Castle user/wallet IDs
|
|
USER_MAPPINGS = {
|
|
"Pat": "75be145a42884b22b60bf97510ed46e3",
|
|
"Coco": "375ec158ceca46de86cf6561ca20f881",
|
|
"Charlie": "921340b802104c25901eae6c420b1ba1",
|
|
}
|
|
|
|
# ===== RATE LOOKUP =====
|
|
|
|
class RateLookup:
|
|
"""Load and lookup BTC/EUR rates from CSV file"""
|
|
|
|
def __init__(self, csv_file: str):
|
|
self.rates = {}
|
|
self._load_csv(csv_file)
|
|
|
|
def _load_csv(self, csv_file: str):
|
|
"""Load rates from CSV file"""
|
|
if not os.path.exists(csv_file):
|
|
raise FileNotFoundError(
|
|
f"Rates CSV file not found: {csv_file}\n"
|
|
f"Please create btc_eur_rates.csv in the same directory as this script."
|
|
)
|
|
|
|
with open(csv_file, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
date = datetime.strptime(row['date'], '%Y-%m-%d').date()
|
|
# Handle comma as thousands separator
|
|
rate_str = row['btc_eur_rate'].replace(',', '').replace(' ', '')
|
|
rate = float(rate_str)
|
|
self.rates[date] = rate
|
|
|
|
if not self.rates:
|
|
raise ValueError(f"No rates loaded from {csv_file}")
|
|
|
|
print(f"📊 Loaded {len(self.rates)} daily rates from {os.path.basename(csv_file)}")
|
|
print(f" Date range: {min(self.rates.keys())} to {max(self.rates.keys())}")
|
|
|
|
def get_rate(self, date: datetime.date, fallback_days: int = 7) -> Optional[float]:
|
|
"""
|
|
Get BTC/EUR rate for a specific date.
|
|
If exact date not found, tries nearby dates within fallback_days.
|
|
|
|
Args:
|
|
date: Date to lookup
|
|
fallback_days: How many days to look back/forward if exact date missing
|
|
|
|
Returns:
|
|
BTC/EUR rate or None if not found
|
|
"""
|
|
# Try exact date first
|
|
if date in self.rates:
|
|
return self.rates[date]
|
|
|
|
# Try nearby dates (prefer earlier dates)
|
|
for days_offset in range(1, fallback_days + 1):
|
|
# Try earlier date first
|
|
earlier = date - timedelta(days=days_offset)
|
|
if earlier in self.rates:
|
|
print(f" ⚠️ Using rate from {earlier} for {date} (exact date not found)")
|
|
return self.rates[earlier]
|
|
|
|
# Try later date
|
|
later = date + timedelta(days=days_offset)
|
|
if later in self.rates:
|
|
print(f" ⚠️ Using rate from {later} for {date} (exact date not found)")
|
|
return self.rates[later]
|
|
|
|
return None
|
|
|
|
# ===== ACCOUNT LOOKUP =====
|
|
|
|
class AccountLookup:
|
|
"""Fetch and lookup Castle accounts from API"""
|
|
|
|
def __init__(self, lnbits_url: str, api_key: str):
|
|
self.accounts = {} # name -> account_id
|
|
self.accounts_by_user = {} # user_id -> {account_type -> account_id}
|
|
self.account_details = [] # Full account objects
|
|
self._fetch_accounts(lnbits_url, api_key)
|
|
|
|
def _fetch_accounts(self, lnbits_url: str, api_key: str):
|
|
"""Fetch all accounts from Castle API"""
|
|
url = f"{lnbits_url}/castle/api/v1/accounts"
|
|
headers = {"X-Api-Key": api_key}
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
accounts_list = response.json()
|
|
|
|
# Build mappings
|
|
for account in accounts_list:
|
|
name = account.get('name')
|
|
account_id = account.get('id')
|
|
user_id = account.get('user_id')
|
|
account_type = account.get('account_type')
|
|
|
|
self.account_details.append(account)
|
|
|
|
# Name -> ID mapping
|
|
if name and account_id:
|
|
self.accounts[name] = account_id
|
|
|
|
# User -> Account Type -> ID mapping (for equity accounts)
|
|
if user_id and account_type:
|
|
if user_id not in self.accounts_by_user:
|
|
self.accounts_by_user[user_id] = {}
|
|
self.accounts_by_user[user_id][account_type] = account_id
|
|
|
|
print(f"🏦 Loaded {len(self.accounts)} accounts from Castle")
|
|
|
|
except requests.RequestException as e:
|
|
raise ConnectionError(f"Failed to fetch accounts from Castle API: {e}")
|
|
|
|
def get_account_id(self, account_name: str) -> Optional[str]:
|
|
"""
|
|
Get Castle account ID for a Beancount account name.
|
|
|
|
Special handling for user-specific accounts:
|
|
- "Liabilities:Payable:Pat" -> looks up Pat's user_id and finds their Castle payable account
|
|
- "Assets:Receivable:Pat" -> looks up Pat's user_id and finds their Castle receivable account
|
|
- "Equity:Pat" -> looks up Pat's user_id and finds their Castle equity account
|
|
|
|
Args:
|
|
account_name: Beancount account name (e.g., "Expenses:Food:Supplies", "Liabilities:Payable:Pat", "Assets:Receivable:Pat", "Equity:Pat")
|
|
|
|
Returns:
|
|
Castle account UUID or None if not found
|
|
"""
|
|
# Check if this is a Liabilities:Payable:<name> account
|
|
# Map Beancount Liabilities:Payable:Pat to Castle Liabilities:Payable:User-<id>
|
|
if account_name.startswith("Liabilities:Payable:"):
|
|
user_name = extract_user_from_user_account(account_name)
|
|
if user_name:
|
|
# Look up user's actual user_id
|
|
user_id = USER_MAPPINGS.get(user_name)
|
|
if user_id:
|
|
# Find this user's liability (payable) account
|
|
# This is the Liabilities:Payable:User-<id> account in Castle
|
|
if user_id in self.accounts_by_user:
|
|
liability_account_id = self.accounts_by_user[user_id].get('liability')
|
|
if liability_account_id:
|
|
return liability_account_id
|
|
|
|
# If not found, provide helpful error
|
|
raise ValueError(
|
|
f"User '{user_name}' (ID: {user_id}) does not have a payable account.\n"
|
|
f"This should have been created when they configured their wallet.\n"
|
|
f"Please configure the wallet for user ID: {user_id}"
|
|
)
|
|
|
|
# Check if this is an Assets:Receivable:<name> account
|
|
# Map Beancount Assets:Receivable:Pat to Castle Assets:Receivable:User-<id>
|
|
elif account_name.startswith("Assets:Receivable:"):
|
|
user_name = extract_user_from_user_account(account_name)
|
|
if user_name:
|
|
# Look up user's actual user_id
|
|
user_id = USER_MAPPINGS.get(user_name)
|
|
if user_id:
|
|
# Find this user's asset (receivable) account
|
|
# This is the Assets:Receivable:User-<id> account in Castle
|
|
if user_id in self.accounts_by_user:
|
|
asset_account_id = self.accounts_by_user[user_id].get('asset')
|
|
if asset_account_id:
|
|
return asset_account_id
|
|
|
|
# If not found, provide helpful error
|
|
raise ValueError(
|
|
f"User '{user_name}' (ID: {user_id}) does not have a receivable account.\n"
|
|
f"This should have been created when they configured their wallet.\n"
|
|
f"Please configure the wallet for user ID: {user_id}"
|
|
)
|
|
|
|
# Check if this is an Equity:<name> account
|
|
# Map Beancount Equity:Pat to Castle Equity:User-<id>
|
|
elif account_name.startswith("Equity:"):
|
|
user_name = extract_user_from_user_account(account_name)
|
|
if user_name:
|
|
# Look up user's actual user_id
|
|
user_id = USER_MAPPINGS.get(user_name)
|
|
if user_id:
|
|
# Find this user's equity account
|
|
# This is the Equity:User-<id> account in Castle
|
|
if user_id in self.accounts_by_user:
|
|
equity_account_id = self.accounts_by_user[user_id].get('equity')
|
|
if equity_account_id:
|
|
return equity_account_id
|
|
|
|
# If not found, provide helpful error
|
|
raise ValueError(
|
|
f"User '{user_name}' (ID: {user_id}) does not have an equity account.\n"
|
|
f"Equity eligibility must be enabled for this user in Castle.\n"
|
|
f"Please enable equity for user ID: {user_id}"
|
|
)
|
|
|
|
# Normal account lookup by name
|
|
return self.accounts.get(account_name)
|
|
|
|
def list_accounts(self):
|
|
"""Print all available accounts"""
|
|
print("\n📋 Available accounts:")
|
|
for name in sorted(self.accounts.keys()):
|
|
print(f" - {name}")
|
|
|
|
# ===== CONVERSION FUNCTIONS =====
|
|
|
|
def eur_to_sats(eur_amount: Decimal, btc_eur_rate: float) -> int:
|
|
"""Convert EUR to satoshis using BTC/EUR rate"""
|
|
btc_amount = eur_amount / Decimal(str(btc_eur_rate))
|
|
sats = btc_amount * Decimal(100_000_000)
|
|
return int(sats.quantize(Decimal('1')))
|
|
|
|
def build_metadata(eur_amount: Decimal, btc_eur_rate: float) -> dict:
|
|
"""Build metadata dict for Castle entry line"""
|
|
abs_eur = abs(eur_amount)
|
|
abs_sats = abs(eur_to_sats(abs_eur, btc_eur_rate))
|
|
|
|
# fiat_rate = sats per EUR
|
|
fiat_rate = float(abs_sats) / float(abs_eur) if abs_eur > 0 else 0
|
|
|
|
return {
|
|
"fiat_currency": "EUR",
|
|
"fiat_amount": str(abs_eur.quantize(Decimal("0.001"))),
|
|
"fiat_rate": fiat_rate,
|
|
"btc_rate": btc_eur_rate
|
|
}
|
|
|
|
# ===== BEANCOUNT PARSER =====
|
|
|
|
def parse_beancount_transaction(txn_text: str) -> Optional[Dict]:
|
|
"""
|
|
Parse a Beancount transaction.
|
|
|
|
Expected format:
|
|
2025-07-06 * "Foix market"
|
|
Expenses:Groceries 69.40 EUR
|
|
Equity:Pat
|
|
"""
|
|
lines = txn_text.strip().split('\n')
|
|
if not lines:
|
|
return None
|
|
|
|
# Skip leading comments to find the transaction header
|
|
header_line_idx = 0
|
|
for i, line in enumerate(lines):
|
|
stripped = line.strip()
|
|
# Skip comments and empty lines
|
|
if not stripped or stripped.startswith(';'):
|
|
continue
|
|
# Found the first non-comment line
|
|
header_line_idx = i
|
|
break
|
|
|
|
# Parse header line
|
|
header = lines[header_line_idx].strip()
|
|
|
|
# Handle both * and ! flags
|
|
if '*' in header:
|
|
parts = header.split('*')
|
|
flag = '*'
|
|
elif '!' in header:
|
|
parts = header.split('!')
|
|
flag = '!'
|
|
else:
|
|
return None
|
|
|
|
date_str = parts[0].strip()
|
|
description = parts[1].strip().strip('"')
|
|
|
|
try:
|
|
date = datetime.strptime(date_str, '%Y-%m-%d')
|
|
except ValueError:
|
|
return None
|
|
|
|
# Parse postings (start after the header line)
|
|
postings = []
|
|
for line in lines[header_line_idx + 1:]:
|
|
line = line.strip()
|
|
|
|
# Skip comments and empty lines
|
|
if not line or line.startswith(';'):
|
|
continue
|
|
|
|
# Parse posting line
|
|
parts = line.split()
|
|
if not parts:
|
|
continue
|
|
|
|
account = parts[0]
|
|
|
|
# Check if amount is specified
|
|
if len(parts) >= 3 and parts[-1] == 'EUR':
|
|
# Strip commas from amount (e.g., "1,500.00" -> "1500.00")
|
|
amount_str = parts[-2].replace(',', '')
|
|
eur_amount = Decimal(amount_str)
|
|
else:
|
|
# No amount specified - will be calculated to balance
|
|
eur_amount = None
|
|
|
|
postings.append({
|
|
'account': account,
|
|
'eur_amount': eur_amount
|
|
})
|
|
|
|
# Calculate missing amounts (Beancount auto-balance)
|
|
# TODO: Support auto-balancing for transactions with >2 postings
|
|
# For now, only handles simple 2-posting transactions
|
|
if len(postings) == 2:
|
|
if postings[0]['eur_amount'] and not postings[1]['eur_amount']:
|
|
postings[1]['eur_amount'] = -postings[0]['eur_amount']
|
|
elif postings[1]['eur_amount'] and not postings[0]['eur_amount']:
|
|
postings[0]['eur_amount'] = -postings[1]['eur_amount']
|
|
|
|
return {
|
|
'date': date,
|
|
'description': description,
|
|
'postings': postings
|
|
}
|
|
|
|
# ===== HELPER FUNCTIONS =====
|
|
|
|
def extract_user_from_user_account(account_name: str) -> Optional[str]:
|
|
"""
|
|
Extract user name from user-specific accounts (Payable, Receivable, or Equity).
|
|
|
|
Examples:
|
|
"Liabilities:Payable:Pat" -> "Pat"
|
|
"Assets:Receivable:Alice" -> "Alice"
|
|
"Equity:Pat" -> "Pat"
|
|
"Expenses:Food" -> None
|
|
|
|
Returns:
|
|
User name or None if not a user-specific account
|
|
"""
|
|
if account_name.startswith("Liabilities:Payable:"):
|
|
parts = account_name.split(":")
|
|
if len(parts) >= 3:
|
|
return parts[2]
|
|
elif account_name.startswith("Assets:Receivable:"):
|
|
parts = account_name.split(":")
|
|
if len(parts) >= 3:
|
|
return parts[2]
|
|
elif account_name.startswith("Equity:"):
|
|
parts = account_name.split(":")
|
|
if len(parts) >= 2:
|
|
return parts[1]
|
|
return None
|
|
|
|
def determine_user_id(postings: list) -> Optional[str]:
|
|
"""
|
|
Determine which user ID to use for this transaction based on user-specific accounts.
|
|
|
|
Args:
|
|
postings: List of posting dicts with 'account' key
|
|
|
|
Returns:
|
|
User ID (wallet ID) from USER_MAPPINGS, or None if no user account found
|
|
"""
|
|
for posting in postings:
|
|
user_name = extract_user_from_user_account(posting['account'])
|
|
if user_name:
|
|
user_id = USER_MAPPINGS.get(user_name)
|
|
if not user_id:
|
|
raise ValueError(
|
|
f"No user ID mapping found for '{user_name}'.\n"
|
|
f"Please add '{user_name}' to USER_MAPPINGS in the script."
|
|
)
|
|
return user_id
|
|
|
|
# No user-specific account found - this shouldn't happen for typical transactions
|
|
return None
|
|
|
|
# ===== CASTLE CONVERTER =====
|
|
|
|
def convert_to_castle_entry(parsed: dict, btc_eur_rate: float, account_lookup: AccountLookup) -> dict:
|
|
"""Convert parsed Beancount transaction to Castle format"""
|
|
|
|
# Determine which user this transaction is for (based on user-specific accounts)
|
|
user_id = determine_user_id(parsed['postings'])
|
|
if not user_id:
|
|
raise ValueError(
|
|
f"Could not determine user ID for transaction.\n"
|
|
f"Transactions must have a user-specific account:\n"
|
|
f" - Liabilities:Payable:<name> (for payables)\n"
|
|
f" - Assets:Receivable:<name> (for receivables)\n"
|
|
f" - Equity:<name> (for equity)\n"
|
|
f"Examples: Liabilities:Payable:Pat, Assets:Receivable:Pat, Equity:Pat"
|
|
)
|
|
|
|
# Build entry lines
|
|
lines = []
|
|
for posting in parsed['postings']:
|
|
account_id = account_lookup.get_account_id(posting['account'])
|
|
if not account_id:
|
|
raise ValueError(
|
|
f"No account found in Castle with name '{posting['account']}'.\n"
|
|
f"Please create this account in Castle first."
|
|
)
|
|
|
|
eur_amount = posting['eur_amount']
|
|
if eur_amount is None:
|
|
raise ValueError(f"Could not determine amount for {posting['account']}")
|
|
|
|
# Convert EUR to sats
|
|
sats = eur_to_sats(eur_amount, btc_eur_rate)
|
|
|
|
# Build metadata
|
|
metadata = build_metadata(eur_amount, btc_eur_rate)
|
|
|
|
lines.append({
|
|
"account_id": account_id,
|
|
"amount": sats, # Positive = debit, negative = credit
|
|
"description": posting['account'],
|
|
"metadata": metadata
|
|
})
|
|
|
|
return {
|
|
"description": parsed['description'],
|
|
"entry_date": parsed['date'].isoformat(),
|
|
"reference": f"import-{parsed['date'].strftime('%Y%m%d')}-{parsed['description'][:20].replace(' ', '-')}",
|
|
"flag": "*",
|
|
"meta": {
|
|
"source": "beancount_import",
|
|
"imported_at": datetime.now().isoformat(),
|
|
"btc_eur_rate": btc_eur_rate,
|
|
"user_id": user_id # Track which user this transaction is for
|
|
},
|
|
"lines": lines
|
|
}
|
|
|
|
# ===== API UPLOAD =====
|
|
|
|
def upload_entry(entry: dict, api_key: str, dry_run: bool = False) -> dict:
|
|
"""Upload journal entry to Castle API"""
|
|
if dry_run:
|
|
print(f"\n[DRY RUN] Entry preview:")
|
|
print(f" Description: {entry['description']}")
|
|
print(f" Date: {entry['entry_date']}")
|
|
print(f" BTC/EUR Rate: {entry['meta']['btc_eur_rate']:,.2f}")
|
|
total_sats = 0
|
|
for line in entry['lines']:
|
|
sign = '+' if line['amount'] > 0 else ''
|
|
print(f" {line['description']}: {sign}{line['amount']:,} sats "
|
|
f"({line['metadata']['fiat_amount']} EUR)")
|
|
total_sats += line['amount']
|
|
print(f" Balance check: {total_sats} (should be 0)")
|
|
return {"id": "dry-run"}
|
|
|
|
url = f"{LNBITS_URL}/castle/api/v1/entries"
|
|
headers = {
|
|
"X-Api-Key": api_key,
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
response = requests.post(url, json=entry, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
# ===== MAIN IMPORT FUNCTION =====
|
|
|
|
def import_beancount_file(beancount_file: str, dry_run: bool = False):
|
|
"""Import transactions from Beancount file using rates from CSV"""
|
|
|
|
# Validate configuration
|
|
if not ADMIN_API_KEY:
|
|
print("❌ Error: CASTLE_ADMIN_KEY not set!")
|
|
print(" Set it as environment variable or update ADMIN_API_KEY in the script.")
|
|
return
|
|
|
|
# Load rates
|
|
try:
|
|
rate_lookup = RateLookup(RATES_CSV_FILE)
|
|
except (FileNotFoundError, ValueError) as e:
|
|
print(f"❌ Error loading rates: {e}")
|
|
return
|
|
|
|
# Load accounts from Castle
|
|
try:
|
|
account_lookup = AccountLookup(LNBITS_URL, ADMIN_API_KEY)
|
|
except (ConnectionError, ValueError) as e:
|
|
print(f"❌ Error loading accounts: {e}")
|
|
return
|
|
|
|
# Show user mappings and verify equity accounts exist
|
|
print(f"\n👥 User ID mappings and equity accounts:")
|
|
for name, user_id in USER_MAPPINGS.items():
|
|
has_equity = user_id in account_lookup.accounts_by_user and 'equity' in account_lookup.accounts_by_user[user_id]
|
|
status = "✅" if has_equity else "❌"
|
|
print(f" {status} {name} → {user_id} {'(has equity account)' if has_equity else '(NO EQUITY ACCOUNT - create in Castle!)'}")
|
|
|
|
# Read beancount file
|
|
if not os.path.exists(beancount_file):
|
|
print(f"❌ Error: Beancount file not found: {beancount_file}")
|
|
return
|
|
|
|
with open(beancount_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Split by blank lines to get individual transactions
|
|
transactions = [t.strip() for t in content.split('\n\n') if t.strip()]
|
|
|
|
print(f"\n📄 Found {len(transactions)} potential transactions in {os.path.basename(beancount_file)}")
|
|
if dry_run:
|
|
print("🔍 [DRY RUN MODE] No changes will be made\n")
|
|
|
|
success_count = 0
|
|
error_count = 0
|
|
skip_count = 0
|
|
skipped_items = [] # Track what was skipped
|
|
|
|
for i, txn_text in enumerate(transactions, 1):
|
|
try:
|
|
# Try to parse the transaction
|
|
parsed = parse_beancount_transaction(txn_text)
|
|
if not parsed:
|
|
# Not a valid transaction (likely a directive, option, or comment block)
|
|
skip_count += 1
|
|
first_line = txn_text.split('\n')[0][:60]
|
|
skipped_items.append(f"Entry {i}: {first_line}... (not a transaction)")
|
|
continue
|
|
|
|
# Look up rate for this transaction's date
|
|
btc_eur_rate = rate_lookup.get_rate(parsed['date'].date())
|
|
if not btc_eur_rate:
|
|
raise ValueError(f"No BTC/EUR rate found for {parsed['date'].date()}")
|
|
|
|
castle_entry = convert_to_castle_entry(parsed, btc_eur_rate, account_lookup)
|
|
result = upload_entry(castle_entry, ADMIN_API_KEY, dry_run)
|
|
|
|
# Get user name for display
|
|
user_name = None
|
|
for posting in parsed['postings']:
|
|
user_name = extract_user_from_user_account(posting['account'])
|
|
if user_name:
|
|
break
|
|
|
|
user_info = f" (User: {user_name})" if user_name else ""
|
|
print(f"✅ Transaction {i}: {parsed['date'].date()} - {parsed['description'][:35]}{user_info} "
|
|
f"(Rate: {btc_eur_rate:,.0f} EUR/BTC)")
|
|
success_count += 1
|
|
|
|
except Exception as e:
|
|
print(f"❌ Transaction {i} failed: {e}")
|
|
print(f" Content: {txn_text[:100]}...")
|
|
error_count += 1
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f"📊 Summary: {success_count} succeeded, {error_count} failed, {skip_count} skipped")
|
|
print(f"{'='*70}")
|
|
|
|
# Show details of skipped entries
|
|
if skipped_items:
|
|
print(f"\n⏭️ Skipped entries:")
|
|
for item in skipped_items:
|
|
print(f" {item}")
|
|
|
|
if success_count > 0 and not dry_run:
|
|
print(f"\n✅ Successfully imported {success_count} transactions to Castle!")
|
|
|
|
# ===== MAIN =====
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
print("=" * 70)
|
|
print("🏰 Beancount to Castle Import Script")
|
|
print("=" * 70)
|
|
|
|
if len(sys.argv) < 2:
|
|
print("\nUsage: python import_beancount.py <ledger.beancount> [--dry-run]")
|
|
print("\nExample:")
|
|
print(" python import_beancount.py my_ledger.beancount --dry-run")
|
|
print(" python import_beancount.py my_ledger.beancount")
|
|
print("\nConfiguration:")
|
|
print(f" LNBITS_URL: {LNBITS_URL}")
|
|
print(f" RATES_CSV: {RATES_CSV_FILE}")
|
|
print(f" API Key set: {'Yes' if ADMIN_API_KEY else 'No (set CASTLE_ADMIN_KEY env var)'}")
|
|
sys.exit(1)
|
|
|
|
beancount_file = sys.argv[1]
|
|
dry_run = "--dry-run" in sys.argv
|
|
|
|
import_beancount_file(beancount_file, dry_run)
|