castle/fava_client.py
padreug bf79495ceb Optimize recent transactions with 30-day date filter
Performance improvement for large ledgers:
- Added optional 'days' parameter to get_journal_entries()
- User dashboard now fetches only last 30 days of entries
- Dramatically reduces data transfer for ledgers with 100+ entries
- Filters in Python after fetching from Fava API

Example impact: 229 entries → ~20-50 entries (typical 30-day activity)

This is a "quick win" optimization as recommended for accounting systems
with growing transaction history. Admin endpoints still fetch all entries.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-11 22:39:22 +01:00

1192 lines
46 KiB
Python

"""
Fava API client for Castle.
This module provides an async HTTP client for interacting with Fava's JSON API.
All accounting logic is delegated to Fava/Beancount.
Fava provides a REST API for:
- Adding transactions (PUT /api/add_entries)
- Adding accounts via Open directives (PUT /api/add_entries)
- Querying balances (GET /api/query)
- Balance sheets (GET /api/balance_sheet)
- Account reports (GET /api/account_report)
- Updating/deleting entries (PUT/DELETE /api/source_slice)
See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py
"""
import httpx
from typing import Any, Dict, List, Optional
from decimal import Decimal
from datetime import date, datetime
from loguru import logger
class FavaClient:
"""
Async client for Fava REST API.
Fava runs as a separate web service and provides a JSON API
for adding entries and querying ledger data.
All accounting calculations are performed by Beancount via Fava.
"""
def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0):
"""
Initialize Fava client.
Args:
fava_url: Base URL of Fava server (e.g., http://localhost:3333)
ledger_slug: URL-safe ledger identifier (e.g., castle-accounting)
timeout: Request timeout in seconds
"""
self.fava_url = fava_url.rstrip('/')
self.ledger_slug = ledger_slug
self.base_url = f"{self.fava_url}/{self.ledger_slug}/api"
self.timeout = timeout
async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
"""
Submit a new journal entry to Fava.
Args:
entry: Beancount entry dict (format per Fava API spec)
Must include:
- t: "Transaction" (required by Fava)
- date: "YYYY-MM-DD"
- flag: "*" (cleared) or "!" (pending)
- narration: str
- postings: list of posting dicts
- payee: str (empty string, not None)
- tags: list of str
- links: list of str
- meta: dict
Returns:
Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."})
Raises:
httpx.HTTPStatusError: If Fava returns an error
httpx.RequestError: If connection fails
Example:
entry = {
"t": "Transaction",
"date": "2025-01-15",
"flag": "*",
"payee": "Store",
"narration": "Purchase",
"postings": [
{"account": "Expenses:Food", "amount": "50.00 EUR"},
{"account": "Assets:Cash", "amount": "-50.00 EUR"}
],
"tags": [],
"links": [],
"meta": {"user_id": "abc123"}
}
result = await fava_client.add_entry(entry)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/add_entries",
json={"entries": [entry]},
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}")
return result
except httpx.HTTPStatusError as e:
logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_account_balance(self, account_name: str) -> Dict[str, Any]:
"""
Get balance for a specific account (excluding pending transactions).
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
Returns:
Dict with:
- sats: int (balance in satoshis)
- positions: dict (currency → amount with cost basis)
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
Example:
balance = await fava_client.get_account_balance("Assets:Receivable:User-abc")
# Returns: {
# "sats": 200000,
# "positions": {"SATS": {"{100.00 EUR}": 200000}}
# }
"""
query = f"SELECT sum(position) WHERE account = '{account_name}' AND flag != '!'"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
data = response.json()
if not data['data']['rows']:
return {"sats": 0, "positions": {}}
# Fava returns: [[account, {"SATS": {cost: amount}}]]
positions = data['data']['rows'][0][1] if data['data']['rows'] else {}
# Sum up all SATS positions
total_sats = 0
if isinstance(positions, dict) and "SATS" in positions:
sats_positions = positions["SATS"]
if isinstance(sats_positions, dict):
# Sum all amounts (with different cost bases)
total_sats = sum(int(amount) for amount in sats_positions.values())
elif isinstance(sats_positions, (int, float)):
# Simple number (no cost basis)
total_sats = int(sats_positions)
return {
"sats": total_sats,
"positions": positions
}
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_user_balance(self, user_id: str) -> Dict[str, Any]:
"""
Get user's balance from castle's perspective.
Aggregates:
- Liabilities:Payable:User-{user_id} (negative = castle owes user)
- Assets:Receivable:User-{user_id} (positive = user owes castle)
Args:
user_id: User ID
Returns:
{
"balance": int (sats, positive = user owes castle, negative = castle owes user),
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [list of account dicts with balances]
}
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
"""
# Get all journal entries for this user
all_entries = await self.get_journal_entries()
total_sats = 0
fiat_balances = {}
accounts_dict = {} # Track balances per account
for entry in all_entries:
# Skip non-transactions, pending (!), and voided
if entry.get("t") != "Transaction":
continue
if entry.get("flag") == "!":
continue
if "voided" in entry.get("tags", []):
continue
# Process postings for this user
for posting in entry.get("postings", []):
account_name = posting.get("account", "")
# Only process this user's accounts (account names use first 8 chars of user_id)
if f":User-{user_id[:8]}" not in account_name:
continue
if "Payable" not in account_name and "Receivable" not in account_name:
continue
# Parse amount string: can be EUR, USD, or SATS
amount_str = posting.get("amount", "")
if not isinstance(amount_str, str) or not amount_str:
continue
import re
# Try to extract EUR/USD amount first (new format)
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
# Direct EUR/USD amount (new approach)
fiat_amount = Decimal(fiat_match.group(1))
fiat_currency = fiat_match.group(2)
if fiat_currency not in fiat_balances:
fiat_balances[fiat_currency] = Decimal(0)
fiat_balances[fiat_currency] += fiat_amount
# Also track SATS equivalent from metadata if available
posting_meta = posting.get("meta", {})
sats_equiv = posting_meta.get("sats-equivalent")
if sats_equiv:
sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv)
total_sats += sats_amount
if account_name not in accounts_dict:
accounts_dict[account_name] = {"account": account_name, "sats": 0}
accounts_dict[account_name]["sats"] += sats_amount
else:
# Old format: SATS with cost/price notation - extract SATS amount
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
if sats_match:
sats_amount = int(sats_match.group(1))
total_sats += sats_amount
# Track per account
if account_name not in accounts_dict:
accounts_dict[account_name] = {"account": account_name, "sats": 0}
accounts_dict[account_name]["sats"] += sats_amount
# Try to extract fiat from metadata or cost syntax (backward compatibility)
posting_meta = posting.get("meta", {})
fiat_amount_total_str = posting_meta.get("fiat-amount-total")
fiat_currency_meta = posting_meta.get("fiat-currency")
if fiat_amount_total_str and fiat_currency_meta:
# Use exact total from metadata
fiat_total = Decimal(fiat_amount_total_str)
fiat_currency = fiat_currency_meta
if fiat_currency not in fiat_balances:
fiat_balances[fiat_currency] = Decimal(0)
# Apply the same sign as the SATS amount
if sats_match:
sats_amount_for_sign = int(sats_match.group(1))
if sats_amount_for_sign < 0:
fiat_total = -fiat_total
fiat_balances[fiat_currency] += fiat_total
logger.info(f"User {user_id[:8]} balance: {total_sats} sats, fiat: {dict(fiat_balances)}")
return {
"balance": total_sats,
"fiat_balances": fiat_balances,
"accounts": list(accounts_dict.values())
}
async def get_all_user_balances(self) -> List[Dict[str, Any]]:
"""
Get balances for all users (admin view).
Returns:
[
{
"user_id": "abc123",
"balance": 100000,
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [...]
},
...
]
Note:
Excludes pending transactions (flag='!') and voided (tag #voided) from balance calculation.
Only cleared/completed transactions (flag='*') are included.
"""
# Get all journal entries and calculate balances from postings
all_entries = await self.get_journal_entries()
# Group by user_id
user_data = {}
for entry in all_entries:
# Skip non-transactions, pending (!), and voided
if entry.get("t") != "Transaction":
continue
if entry.get("flag") == "!":
continue
if "voided" in entry.get("tags", []):
continue
# Process postings
for posting in entry.get("postings", []):
account_name = posting.get("account", "")
# Only process user accounts (Payable or Receivable)
if ":User-" not in account_name:
continue
if "Payable" not in account_name and "Receivable" not in account_name:
continue
# Extract user_id from account name
user_id = account_name.split(":User-")[1]
if user_id not in user_data:
user_data[user_id] = {
"user_id": user_id,
"balance": 0,
"fiat_balances": {},
"accounts": []
}
# Parse amount string: can be EUR/USD directly (new format) or "SATS {EUR}" (old format)
amount_str = posting.get("amount", "")
if not isinstance(amount_str, str) or not amount_str:
continue
import re
# Try to extract EUR/USD amount first (new format)
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
# Direct EUR/USD amount (new approach)
fiat_amount = Decimal(fiat_match.group(1))
fiat_currency = fiat_match.group(2)
if fiat_currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount
# Also track SATS equivalent from metadata if available
posting_meta = posting.get("meta", {})
sats_equiv = posting_meta.get("sats-equivalent")
if sats_equiv:
sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv)
user_data[user_id]["balance"] += sats_amount
else:
# Old format: SATS with cost/price notation
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
if sats_match:
sats_amount = int(sats_match.group(1))
user_data[user_id]["balance"] += sats_amount
# Extract fiat from cost syntax or metadata (backward compatibility)
posting_meta = posting.get("meta", {})
fiat_amount_total_str = posting_meta.get("fiat-amount-total")
fiat_currency_meta = posting_meta.get("fiat-currency")
if fiat_amount_total_str and fiat_currency_meta:
fiat_total = Decimal(fiat_amount_total_str)
fiat_currency = fiat_currency_meta
if fiat_currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
# Apply the same sign as the SATS amount
if sats_match:
sats_amount_for_sign = int(sats_match.group(1))
if sats_amount_for_sign < 0:
fiat_total = -fiat_total
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_total
return list(user_data.values())
async def check_fava_health(self) -> bool:
"""
Check if Fava is running and accessible.
Returns:
True if Fava responds, False otherwise
"""
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(
f"{self.base_url}/changed"
)
return response.status_code == 200
except Exception as e:
logger.warning(f"Fava health check failed: {e}")
return False
async def query_transactions(
self,
account_pattern: Optional[str] = None,
limit: int = 100,
include_pending: bool = True
) -> List[Dict[str, Any]]:
"""
Query transactions from Fava/Beancount.
Args:
account_pattern: Optional regex pattern to filter accounts (e.g., "User-abc123")
limit: Maximum number of transactions to return
include_pending: Include pending transactions (flag='!')
Returns:
List of transaction dictionaries with date, description, postings, etc.
Example:
# All transactions
txns = await fava.query_transactions()
# User's transactions
txns = await fava.query_transactions(account_pattern="User-abc123")
# Account transactions
txns = await fava.query_transactions(account_pattern="Assets:Receivable:User-abc")
"""
# Build Beancount query
if account_pattern:
query = f"SELECT * WHERE account ~ '{account_pattern}' ORDER BY date DESC LIMIT {limit}"
else:
query = f"SELECT * ORDER BY date DESC LIMIT {limit}"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
result = response.json()
# Fava query API returns: {"data": {"rows": [...], "types": [...]}}
data = result.get("data", {})
rows = data.get("rows", [])
types = data.get("types", [])
# Build column name mapping
column_names = [t.get("name") for t in types]
# Transform Fava's query result to transaction list
transactions = []
for row in rows:
# Rows are arrays, convert to dict using column names
if isinstance(row, list) and len(row) == len(column_names):
txn = dict(zip(column_names, row))
# Filter by flag if needed
flag = txn.get("flag", "*")
if not include_pending and flag == "!":
continue
transactions.append(txn)
elif isinstance(row, dict):
# Already a dict (shouldn't happen with BQL, but handle it)
flag = row.get("flag", "*")
if not include_pending and flag == "!":
continue
transactions.append(row)
return transactions[:limit]
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def query_bql(self, query_string: str) -> Dict[str, Any]:
"""
Execute arbitrary Beancount Query Language (BQL) query.
This is a general-purpose method for executing BQL queries against Fava/Beancount.
Use this for efficient aggregations, filtering, and data retrieval.
⚠️ LIMITATION: BQL can only query position amounts and transaction-level data.
It CANNOT access posting metadata (like 'sats-equivalent'). For Castle's current
ledger format where SATS are stored in metadata, manual aggregation is required.
See: docs/BQL-BALANCE-QUERIES.md for detailed analysis and test results.
FUTURE CONSIDERATION: If Castle's ledger format changes to use SATS as position
amounts (instead of metadata), BQL could provide significant performance benefits.
Args:
query_string: BQL query (e.g., "SELECT account, sum(position) WHERE account ~ 'User-abc'")
Returns:
{
"rows": [[col1, col2, ...], ...],
"types": [{"name": "col1", "type": "str"}, ...],
"column_names": ["col1", "col2", ...]
}
Example:
result = await fava.query_bql("SELECT account, sum(position) WHERE account ~ 'User-abc'")
for row in result["rows"]:
account, balance = row
print(f"{account}: {balance}")
See:
https://beancount.github.io/docs/beancount_query_language.html
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query_string}
)
response.raise_for_status()
result = response.json()
# Fava returns: {"data": {"rows": [...], "types": [...]}}
data = result.get("data", {})
rows = data.get("rows", [])
types = data.get("types", [])
column_names = [t.get("name") for t in types]
return {
"rows": rows,
"types": types,
"column_names": column_names
}
except httpx.HTTPStatusError as e:
logger.error(f"BQL query error: {e.response.status_code} - {e.response.text}")
logger.error(f"Query was: {query_string}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_user_balance_bql(self, user_id: str) -> Dict[str, Any]:
"""
Get user balance using BQL (efficient, replaces 115-line manual aggregation).
⚠️ NOT CURRENTLY USED: This method cannot access SATS balances stored in posting
metadata. It only queries position amounts (EUR/USD). For Castle's current ledger
format, use get_user_balance() instead (manual aggregation with caching).
This method uses Beancount Query Language for server-side filtering and aggregation,
which would provide 5-10x performance improvement IF SATS were stored as position
amounts instead of metadata.
FUTURE CONSIDERATION: If Castle's ledger format changes to store SATS as position
amounts (e.g., "100000 SATS {100.00 EUR}"), this method would become feasible and
provide significant performance benefits.
See: docs/BQL-BALANCE-QUERIES.md for detailed test results and analysis.
Args:
user_id: User ID
Returns:
{
"balance": int (sats), # Currently returns 0 (cannot access metadata)
"fiat_balances": {"EUR": Decimal("100.50"), ...}, # Works correctly
"accounts": [{"account": "...", "sats": 150000}, ...]
}
Example:
balance = await fava.get_user_balance_bql("af983632")
print(f"Balance: {balance['balance']} sats") # Will be 0 with current ledger format
"""
from decimal import Decimal
import re
# Build BQL query for this user's Payable/Receivable accounts
user_id_prefix = user_id[:8]
query = f"""
SELECT account, sum(position) as balance
WHERE account ~ ':User-{user_id_prefix}'
AND (account ~ 'Payable' OR account ~ 'Receivable')
AND flag != '!'
GROUP BY account
"""
result = await self.query_bql(query)
# Process results
total_sats = 0
fiat_balances = {}
accounts = []
for row in result["rows"]:
account_name, position = row
# Position can be:
# - Dict: {"SATS": "150000", "EUR": "145.50"}
# - String: "150000 SATS" or "145.50 EUR"
if isinstance(position, dict):
# Extract SATS
sats_str = position.get("SATS", "0")
sats_amount = int(sats_str) if sats_str else 0
total_sats += sats_amount
accounts.append({
"account": account_name,
"sats": sats_amount
})
# Extract fiat currencies
for currency in ["EUR", "USD", "GBP"]:
if currency in position:
fiat_str = position[currency]
fiat_amount = Decimal(fiat_str) if fiat_str else Decimal(0)
if currency not in fiat_balances:
fiat_balances[currency] = Decimal(0)
fiat_balances[currency] += fiat_amount
elif isinstance(position, str):
# Single currency (parse "150000 SATS" or "145.50 EUR")
sats_match = re.match(r'^(-?\d+)\s+SATS$', position)
if sats_match:
sats_amount = int(sats_match.group(1))
total_sats += sats_amount
accounts.append({
"account": account_name,
"sats": sats_amount
})
else:
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', position)
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
fiat_amount = Decimal(fiat_match.group(1))
currency = fiat_match.group(2)
if currency not in fiat_balances:
fiat_balances[currency] = Decimal(0)
fiat_balances[currency] += fiat_amount
logger.info(f"User {user_id[:8]} balance (BQL): {total_sats} sats, fiat: {dict(fiat_balances)}")
return {
"balance": total_sats,
"fiat_balances": fiat_balances,
"accounts": accounts
}
async def get_all_user_balances_bql(self) -> List[Dict[str, Any]]:
"""
Get balances for all users using BQL (efficient admin view).
⚠️ NOT CURRENTLY USED: This method cannot access SATS balances stored in posting
metadata. It only queries position amounts (EUR/USD). For Castle's current ledger
format, use get_all_user_balances() instead (manual aggregation with caching).
This method uses Beancount Query Language to query all user balances
in a single efficient query, which would be faster than fetching all entries IF
SATS were stored as position amounts instead of metadata.
FUTURE CONSIDERATION: If Castle's ledger format changes to store SATS as position
amounts, this method would provide significant performance benefits for admin views.
See: docs/BQL-BALANCE-QUERIES.md for detailed test results and analysis.
Returns:
[
{
"user_id": "abc123",
"balance": 100000, # Currently 0 (cannot access metadata)
"fiat_balances": {"EUR": Decimal("100.50")}, # Works correctly
"accounts": [{"account": "...", "sats": 150000}, ...]
},
...
]
Example:
all_balances = await fava.get_all_user_balances_bql()
for user in all_balances:
print(f"{user['user_id']}: {user['balance']} sats") # Will be 0 with current format
"""
from decimal import Decimal
import re
# BQL query for ALL user accounts
query = """
SELECT account, sum(position) as balance
WHERE (account ~ 'Payable:User-' OR account ~ 'Receivable:User-')
AND flag != '!'
GROUP BY account
"""
result = await self.query_bql(query)
# Group by user_id
user_data = {}
for row in result["rows"]:
account_name, position = row
# Extract user_id from account name
# Format: "Liabilities:Payable:User-abc12345" or "Assets:Receivable:User-abc12345"
if ":User-" not in account_name:
continue
user_id_with_prefix = account_name.split(":User-")[1]
# User ID is the first 8 chars (our standard)
user_id = user_id_with_prefix[:8]
if user_id not in user_data:
user_data[user_id] = {
"user_id": user_id,
"balance": 0,
"fiat_balances": {},
"accounts": []
}
# Process position (same logic as single-user query)
if isinstance(position, dict):
sats_str = position.get("SATS", "0")
sats_amount = int(sats_str) if sats_str else 0
user_data[user_id]["balance"] += sats_amount
user_data[user_id]["accounts"].append({
"account": account_name,
"sats": sats_amount
})
for currency in ["EUR", "USD", "GBP"]:
if currency in position:
fiat_str = position[currency]
fiat_amount = Decimal(fiat_str) if fiat_str else Decimal(0)
if currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][currency] = Decimal(0)
user_data[user_id]["fiat_balances"][currency] += fiat_amount
elif isinstance(position, str):
# Single currency (parse "150000 SATS" or "145.50 EUR")
sats_match = re.match(r'^(-?\d+)\s+SATS$', position)
if sats_match:
sats_amount = int(sats_match.group(1))
user_data[user_id]["balance"] += sats_amount
user_data[user_id]["accounts"].append({
"account": account_name,
"sats": sats_amount
})
else:
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', position)
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
fiat_amount = Decimal(fiat_match.group(1))
currency = fiat_match.group(2)
if currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][currency] = Decimal(0)
user_data[user_id]["fiat_balances"][currency] += fiat_amount
logger.info(f"Fetched balances for {len(user_data)} users (BQL)")
return list(user_data.values())
async def get_account_transactions(
self,
account_name: str,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get all transactions affecting a specific account.
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
limit: Maximum number of transactions
Returns:
List of transactions affecting this account
"""
return await self.query_transactions(
account_pattern=account_name.replace(":", "\\:"), # Escape colons for regex
limit=limit
)
async def get_user_transactions(
self,
user_id: str,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get all transactions affecting a user's accounts.
Args:
user_id: User ID
limit: Maximum number of transactions
Returns:
List of transactions affecting user's accounts
"""
return await self.query_transactions(
account_pattern=f"User-{user_id[:8]}",
limit=limit
)
async def get_all_accounts(self) -> List[Dict[str, Any]]:
"""
Get all accounts from Beancount/Fava using BQL query.
Returns:
List of account dictionaries:
[
{"account": "Assets:Cash", "meta": {...}},
{"account": "Expenses:Food", "meta": {...}},
...
]
Example:
accounts = await fava.get_all_accounts()
for acc in accounts:
print(acc["account"]) # "Assets:Cash"
"""
try:
# Use BQL to get all unique accounts
query = "SELECT DISTINCT account"
result = await self.query_bql(query)
# Convert BQL result to expected format
accounts = []
for row in result["rows"]:
account_name = row[0] if isinstance(row, list) else row.get("account")
if account_name:
accounts.append({
"account": account_name,
"meta": {} # BQL doesn't return metadata easily
})
logger.debug(f"Fava returned {len(accounts)} accounts via BQL")
return accounts
except Exception as e:
logger.error(f"Failed to fetch accounts via BQL: {e}")
raise
async def get_journal_entries(self, days: int = None) -> List[Dict[str, Any]]:
"""
Get journal entries from Fava (with entry hashes), optionally filtered by date.
Args:
days: If provided, only return entries from the last N days.
If None, returns all entries (default behavior).
Returns:
List of entries (transactions, opens, closes, etc.) with entry_hash field.
Example:
# Get all entries
entries = await fava.get_journal_entries()
# Get only last 30 days
recent = await fava.get_journal_entries(days=30)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(f"{self.base_url}/journal")
response.raise_for_status()
result = response.json()
entries = result.get("data", [])
logger.info(f"Fava /journal returned {len(entries)} entries")
# Filter by date if requested
if days is not None:
from datetime import datetime, timedelta
cutoff_date = (datetime.now() - timedelta(days=days)).date()
filtered_entries = []
for e in entries:
entry_date_str = e.get("date")
if entry_date_str:
try:
entry_date = datetime.strptime(entry_date_str, "%Y-%m-%d").date()
if entry_date >= cutoff_date:
filtered_entries.append(e)
except (ValueError, TypeError):
# Include entries with invalid dates (shouldn't happen)
filtered_entries.append(e)
logger.info(f"Filtered to {len(filtered_entries)} entries from last {days} days (cutoff: {cutoff_date})")
entries = filtered_entries
# Log transactions with "Lightning payment" in narration
lightning_entries = [e for e in entries if "Lightning payment" in e.get("narration", "")]
logger.info(f"Found {len(lightning_entries)} Lightning payment entries in journal")
return entries
except httpx.HTTPStatusError as e:
logger.error(f"Fava journal error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_entry_context(self, entry_hash: str) -> Dict[str, Any]:
"""
Get entry context including source text and sha256sum.
Args:
entry_hash: Entry hash from get_journal_entries()
Returns:
{
"entry": {...}, # Serialized entry
"slice": "2025-01-15 ! \"Description\"...", # Beancount source text
"sha256sum": "abc123...", # For concurrency control
"balances_before": {...},
"balances_after": {...}
}
Example:
context = await fava.get_entry_context("abc123")
source = context["slice"]
sha256sum = context["sha256sum"]
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/context",
params={"entry_hash": entry_hash}
)
response.raise_for_status()
result = response.json()
return result.get("data", {})
except httpx.HTTPStatusError as e:
logger.error(f"Fava context error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def update_entry_source(self, entry_hash: str, new_source: str, sha256sum: str) -> str:
"""
Update an entry's source text (e.g., change flag from ! to *).
Args:
entry_hash: Entry hash
new_source: Modified Beancount source text
sha256sum: Current sha256sum from get_entry_context() for concurrency control
Returns:
New sha256sum after update
Example:
# Get context
context = await fava.get_entry_context("abc123")
source = context["slice"]
sha256 = context["sha256sum"]
# Change flag
new_source = source.replace("2025-01-15 !", "2025-01-15 *")
# Update
new_sha256 = await fava.update_entry_source("abc123", new_source, sha256)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/source_slice",
json={
"entry_hash": entry_hash,
"source": new_source,
"sha256sum": sha256sum
}
)
response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e:
logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def delete_entry(self, entry_hash: str, sha256sum: str) -> str:
"""
Delete an entry from the Beancount file.
Args:
entry_hash: Entry hash
sha256sum: Current sha256sum for concurrency control
Returns:
Success message
Example:
context = await fava.get_entry_context("abc123")
await fava.delete_entry("abc123", context["sha256sum"])
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.delete(
f"{self.base_url}/source_slice",
params={
"entry_hash": entry_hash,
"sha256sum": sha256sum
}
)
response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e:
logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def add_account(
self,
account_name: str,
currencies: list[str],
opening_date: Optional[date] = None,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Add an account to the Beancount ledger via an Open directive.
NOTE: Fava's /api/add_entries endpoint does NOT support Open directives.
This method uses /api/source to directly edit the Beancount file.
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
currencies: List of currencies for this account (e.g., ["EUR", "SATS"])
opening_date: Date to open the account (defaults to today)
metadata: Optional metadata for the account
Returns:
Response from Fava ({"data": "new_sha256sum", "mtime": "..."})
Example:
# Add a user's receivable account
result = await fava.add_account(
account_name="Assets:Receivable:User-abc123",
currencies=["EUR", "SATS", "USD"],
metadata={"user_id": "abc123", "description": "User receivables"}
)
# Add a user's payable account
result = await fava.add_account(
account_name="Liabilities:Payable:User-abc123",
currencies=["EUR", "SATS"]
)
"""
from datetime import date as date_type
if opening_date is None:
opening_date = date_type.today()
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Step 1: Get the main Beancount file path from Fava
options_response = await client.get(f"{self.base_url}/options")
options_response.raise_for_status()
options_data = options_response.json()["data"]
file_path = options_data["beancount_options"]["filename"]
logger.debug(f"Fava main file: {file_path}")
# Step 2: Get current source file
response = await client.get(
f"{self.base_url}/source",
params={"filename": file_path}
)
response.raise_for_status()
source_data = response.json()["data"]
sha256sum = source_data["sha256sum"]
source = source_data["source"]
# Step 2: Check if account already exists
if f"open {account_name}" in source:
logger.info(f"Account {account_name} already exists in Beancount file")
return {"data": sha256sum, "mtime": source_data.get("mtime", "")}
# Step 3: Find insertion point (after last Open directive AND its metadata)
lines = source.split('\n')
insert_index = 0
for i, line in enumerate(lines):
if line.strip().startswith(('open ', f'{opening_date.year}-')) and 'open' in line:
# Found an Open directive, now skip over any metadata lines
insert_index = i + 1
# Skip metadata lines (lines starting with whitespace)
while insert_index < len(lines) and lines[insert_index].startswith((' ', '\t')) and lines[insert_index].strip():
insert_index += 1
# Step 4: Format Open directive as Beancount text
currencies_str = ", ".join(currencies)
open_lines = [
"",
f"{opening_date.isoformat()} open {account_name} {currencies_str}"
]
# Add metadata if provided
if metadata:
for key, value in metadata.items():
# Format metadata with proper indentation
if isinstance(value, str):
open_lines.append(f' {key}: "{value}"')
else:
open_lines.append(f' {key}: {value}')
# Step 5: Insert into source
for i, line in enumerate(open_lines):
lines.insert(insert_index + i, line)
new_source = '\n'.join(lines)
# Step 6: Update source file via PUT /api/source
update_payload = {
"file_path": file_path,
"source": new_source,
"sha256sum": sha256sum
}
response = await client.put(
f"{self.base_url}/source",
json=update_payload,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
logger.info(f"Added account {account_name} to Beancount file with currencies {currencies}")
return result
except httpx.HTTPStatusError as e:
logger.error(f"Fava HTTP error adding account: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
# Singleton instance (configured from settings)
_fava_client: Optional[FavaClient] = None
def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0):
"""
Initialize the global Fava client.
Args:
fava_url: Base URL of Fava server
ledger_slug: Ledger identifier
timeout: Request timeout in seconds
"""
global _fava_client
_fava_client = FavaClient(fava_url, ledger_slug, timeout)
logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}")
def get_fava_client() -> FavaClient:
"""
Get the configured Fava client.
Returns:
FavaClient instance
Raises:
RuntimeError: If client not initialized
"""
if _fava_client is None:
raise RuntimeError(
"Fava client not initialized. Call init_fava_client() first. "
"Castle requires Fava for all accounting operations."
)
return _fava_client