castle/fava_client.py
padreug 1362ada362 Rejects pending expense entries by voiding them
Instead of deleting pending expense entries, marks them as voided by adding a #voided tag.
This ensures an audit trail while excluding them from balances.

Updates the Fava client to use 'params' for the delete request.
2025-11-10 01:06:51 +01:00

705 lines
26 KiB
Python

"""
Fava API client for Castle.
This module provides an async HTTP client for interacting with Fava's JSON API.
All accounting logic is delegated to Fava/Beancount.
Fava provides a REST API for:
- Adding transactions (PUT /api/add_entries)
- Querying balances (GET /api/query)
- Balance sheets (GET /api/balance_sheet)
- Account reports (GET /api/account_report)
See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py
"""
import httpx
from typing import Any, Dict, List, Optional
from decimal import Decimal
from datetime import date, datetime
import logging
logger = logging.getLogger(__name__)
class FavaClient:
"""
Async client for Fava REST API.
Fava runs as a separate web service and provides a JSON API
for adding entries and querying ledger data.
All accounting calculations are performed by Beancount via Fava.
"""
def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0):
"""
Initialize Fava client.
Args:
fava_url: Base URL of Fava server (e.g., http://localhost:3333)
ledger_slug: URL-safe ledger identifier (e.g., castle-accounting)
timeout: Request timeout in seconds
"""
self.fava_url = fava_url.rstrip('/')
self.ledger_slug = ledger_slug
self.base_url = f"{self.fava_url}/{self.ledger_slug}/api"
self.timeout = timeout
async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
"""
Submit a new journal entry to Fava.
Args:
entry: Beancount entry dict (format per Fava API spec)
Must include:
- t: "Transaction" (required by Fava)
- date: "YYYY-MM-DD"
- flag: "*" (cleared) or "!" (pending)
- narration: str
- postings: list of posting dicts
- payee: str (empty string, not None)
- tags: list of str
- links: list of str
- meta: dict
Returns:
Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."})
Raises:
httpx.HTTPStatusError: If Fava returns an error
httpx.RequestError: If connection fails
Example:
entry = {
"t": "Transaction",
"date": "2025-01-15",
"flag": "*",
"payee": "Store",
"narration": "Purchase",
"postings": [
{"account": "Expenses:Food", "amount": "50.00 EUR"},
{"account": "Assets:Cash", "amount": "-50.00 EUR"}
],
"tags": [],
"links": [],
"meta": {"user_id": "abc123"}
}
result = await fava_client.add_entry(entry)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/add_entries",
json={"entries": [entry]},
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}")
return result
except httpx.HTTPStatusError as e:
logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_account_balance(self, account_name: str) -> Dict[str, Any]:
"""
Get balance for a specific account (excluding pending transactions).
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
Returns:
Dict with:
- sats: int (balance in satoshis)
- positions: dict (currency → amount with cost basis)
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
Example:
balance = await fava_client.get_account_balance("Assets:Receivable:User-abc")
# Returns: {
# "sats": 200000,
# "positions": {"SATS": {"{100.00 EUR}": 200000}}
# }
"""
query = f"SELECT sum(position) WHERE account = '{account_name}' AND flag != '!'"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
data = response.json()
if not data['data']['rows']:
return {"sats": 0, "positions": {}}
# Fava returns: [[account, {"SATS": {cost: amount}}]]
positions = data['data']['rows'][0][1] if data['data']['rows'] else {}
# Sum up all SATS positions
total_sats = 0
if isinstance(positions, dict) and "SATS" in positions:
sats_positions = positions["SATS"]
if isinstance(sats_positions, dict):
# Sum all amounts (with different cost bases)
total_sats = sum(int(amount) for amount in sats_positions.values())
elif isinstance(sats_positions, (int, float)):
# Simple number (no cost basis)
total_sats = int(sats_positions)
return {
"sats": total_sats,
"positions": positions
}
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_user_balance(self, user_id: str) -> Dict[str, Any]:
"""
Get user's total balance (what castle owes user).
Aggregates:
- Liabilities:Payable:User-{user_id} (negative balance = castle owes)
- Assets:Receivable:User-{user_id} (positive balance = user owes)
Args:
user_id: User ID
Returns:
{
"balance": int (sats, positive = castle owes user),
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [list of account dicts with balances]
}
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
"""
# Query for all accounts matching user (excluding pending)
query = f"""
SELECT account, sum(position)
WHERE account ~ 'User-{user_id[:8]}' AND flag != '!'
GROUP BY account
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
data = response.json()
# Calculate user balance
total_sats = 0
fiat_balances = {}
accounts = []
for row in data['data']['rows']:
account_name = row[0]
positions = row[1] # {"SATS": {cost: amount, ...}}
account_balance = {"account": account_name, "sats": 0, "positions": positions}
# Process SATS positions
if isinstance(positions, dict) and "SATS" in positions:
sats_positions = positions["SATS"]
if isinstance(sats_positions, dict):
# Positions with cost basis: {"100.00 EUR": 200000, ...}
for cost_str, amount in sats_positions.items():
amount_int = int(amount)
# For user balance perspective, negate Beancount balance
# - Payable (Liability): negative in Beancount → positive (castle owes user)
# - Receivable (Asset): positive in Beancount → negative (user owes castle)
adjusted_amount = -amount_int
total_sats += adjusted_amount
account_balance["sats"] += adjusted_amount
# Extract fiat amount from cost basis
# Format: "100.00 EUR" or "{100.00 EUR}"
if cost_str and cost_str != "SATS":
cost_clean = cost_str.strip('{}')
parts = cost_clean.split()
if len(parts) == 2:
try:
fiat_amount = Decimal(parts[0])
fiat_currency = parts[1]
if fiat_currency not in fiat_balances:
fiat_balances[fiat_currency] = Decimal(0)
# Apply same sign adjustment to fiat
# Cost basis is always positive, derive sign from amount
if amount_int < 0:
fiat_amount = -fiat_amount
adjusted_fiat = -fiat_amount
fiat_balances[fiat_currency] += adjusted_fiat
except (ValueError, IndexError):
logger.warning(f"Could not parse cost basis: {cost_str}")
elif isinstance(sats_positions, (int, float)):
# Simple number (no cost basis)
amount_int = int(sats_positions)
# Negate Beancount balance for user perspective
adjusted_amount = -amount_int
total_sats += adjusted_amount
account_balance["sats"] += adjusted_amount
accounts.append(account_balance)
return {
"balance": total_sats,
"fiat_balances": fiat_balances,
"accounts": accounts
}
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_all_user_balances(self) -> List[Dict[str, Any]]:
"""
Get balances for all users (admin view).
Returns:
[
{
"user_id": "abc123",
"balance": 100000,
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [...]
},
...
]
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
"""
query = """
SELECT account, sum(position)
WHERE account ~ 'Payable:User-|Receivable:User-' AND flag != '!'
GROUP BY account
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
data = response.json()
# Group by user_id
user_data = {}
for row in data['data']['rows']:
account_name = row[0]
positions = row[1]
# Extract user_id from account name
# e.g., "Liabilities:Payable:User-abc123" → "abc123..."
if ":User-" in account_name:
user_id = account_name.split(":User-")[1]
else:
continue
if user_id not in user_data:
user_data[user_id] = {
"user_id": user_id,
"balance": 0,
"fiat_balances": {},
"accounts": []
}
account_info = {"account": account_name, "sats": 0, "positions": positions}
# Process positions
if isinstance(positions, dict) and "SATS" in positions:
sats_positions = positions["SATS"]
if isinstance(sats_positions, dict):
for cost_str, amount in sats_positions.items():
amount_int = int(amount)
# Negate Beancount balance for user perspective
adjusted_amount = -amount_int
user_data[user_id]["balance"] += adjusted_amount
account_info["sats"] += adjusted_amount
# Extract fiat
if cost_str and cost_str != "SATS":
cost_clean = cost_str.strip('{}')
parts = cost_clean.split()
if len(parts) == 2:
try:
fiat_amount = Decimal(parts[0])
fiat_currency = parts[1]
if fiat_currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
# Apply sign from amount to fiat
if amount_int < 0:
fiat_amount = -fiat_amount
adjusted_fiat = -fiat_amount
user_data[user_id]["fiat_balances"][fiat_currency] += adjusted_fiat
except (ValueError, IndexError):
pass
elif isinstance(sats_positions, (int, float)):
amount_int = int(sats_positions)
# Negate Beancount balance for user perspective
adjusted_amount = -amount_int
user_data[user_id]["balance"] += adjusted_amount
account_info["sats"] += adjusted_amount
user_data[user_id]["accounts"].append(account_info)
return list(user_data.values())
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def check_fava_health(self) -> bool:
"""
Check if Fava is running and accessible.
Returns:
True if Fava responds, False otherwise
"""
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(
f"{self.base_url}/changed"
)
return response.status_code == 200
except Exception as e:
logger.warning(f"Fava health check failed: {e}")
return False
async def query_transactions(
self,
account_pattern: Optional[str] = None,
limit: int = 100,
include_pending: bool = True
) -> List[Dict[str, Any]]:
"""
Query transactions from Fava/Beancount.
Args:
account_pattern: Optional regex pattern to filter accounts (e.g., "User-abc123")
limit: Maximum number of transactions to return
include_pending: Include pending transactions (flag='!')
Returns:
List of transaction dictionaries with date, description, postings, etc.
Example:
# All transactions
txns = await fava.query_transactions()
# User's transactions
txns = await fava.query_transactions(account_pattern="User-abc123")
# Account transactions
txns = await fava.query_transactions(account_pattern="Assets:Receivable:User-abc")
"""
# Build Beancount query
if account_pattern:
query = f"SELECT * WHERE account ~ '{account_pattern}' ORDER BY date DESC LIMIT {limit}"
else:
query = f"SELECT * ORDER BY date DESC LIMIT {limit}"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
result = response.json()
# Fava query API returns: {"data": {"rows": [...], "types": [...]}}
data = result.get("data", {})
rows = data.get("rows", [])
types = data.get("types", [])
# Build column name mapping
column_names = [t.get("name") for t in types]
# Transform Fava's query result to transaction list
transactions = []
for row in rows:
# Rows are arrays, convert to dict using column names
if isinstance(row, list) and len(row) == len(column_names):
txn = dict(zip(column_names, row))
# Filter by flag if needed
flag = txn.get("flag", "*")
if not include_pending and flag == "!":
continue
transactions.append(txn)
elif isinstance(row, dict):
# Already a dict (shouldn't happen with BQL, but handle it)
flag = row.get("flag", "*")
if not include_pending and flag == "!":
continue
transactions.append(row)
return transactions[:limit]
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_account_transactions(
self,
account_name: str,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get all transactions affecting a specific account.
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
limit: Maximum number of transactions
Returns:
List of transactions affecting this account
"""
return await self.query_transactions(
account_pattern=account_name.replace(":", "\\:"), # Escape colons for regex
limit=limit
)
async def get_user_transactions(
self,
user_id: str,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get all transactions affecting a user's accounts.
Args:
user_id: User ID
limit: Maximum number of transactions
Returns:
List of transactions affecting user's accounts
"""
return await self.query_transactions(
account_pattern=f"User-{user_id[:8]}",
limit=limit
)
async def get_journal_entries(self) -> List[Dict[str, Any]]:
"""
Get all journal entries from Fava (with entry hashes).
Returns:
List of all entries (transactions, opens, closes, etc.) with entry_hash field.
Example:
entries = await fava.get_journal_entries()
# Each entry has: entry_hash, date, flag, narration, tags, links, etc.
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(f"{self.base_url}/journal")
response.raise_for_status()
result = response.json()
return result.get("data", [])
except httpx.HTTPStatusError as e:
logger.error(f"Fava journal error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_entry_context(self, entry_hash: str) -> Dict[str, Any]:
"""
Get entry context including source text and sha256sum.
Args:
entry_hash: Entry hash from get_journal_entries()
Returns:
{
"entry": {...}, # Serialized entry
"slice": "2025-01-15 ! \"Description\"...", # Beancount source text
"sha256sum": "abc123...", # For concurrency control
"balances_before": {...},
"balances_after": {...}
}
Example:
context = await fava.get_entry_context("abc123")
source = context["slice"]
sha256sum = context["sha256sum"]
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/context",
params={"entry_hash": entry_hash}
)
response.raise_for_status()
result = response.json()
return result.get("data", {})
except httpx.HTTPStatusError as e:
logger.error(f"Fava context error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def update_entry_source(self, entry_hash: str, new_source: str, sha256sum: str) -> str:
"""
Update an entry's source text (e.g., change flag from ! to *).
Args:
entry_hash: Entry hash
new_source: Modified Beancount source text
sha256sum: Current sha256sum from get_entry_context() for concurrency control
Returns:
New sha256sum after update
Example:
# Get context
context = await fava.get_entry_context("abc123")
source = context["slice"]
sha256 = context["sha256sum"]
# Change flag
new_source = source.replace("2025-01-15 !", "2025-01-15 *")
# Update
new_sha256 = await fava.update_entry_source("abc123", new_source, sha256)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/source_slice",
json={
"entry_hash": entry_hash,
"source": new_source,
"sha256sum": sha256sum
}
)
response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e:
logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def delete_entry(self, entry_hash: str, sha256sum: str) -> str:
"""
Delete an entry from the Beancount file.
Args:
entry_hash: Entry hash
sha256sum: Current sha256sum for concurrency control
Returns:
Success message
Example:
context = await fava.get_entry_context("abc123")
await fava.delete_entry("abc123", context["sha256sum"])
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.delete(
f"{self.base_url}/source_slice",
params={
"entry_hash": entry_hash,
"sha256sum": sha256sum
}
)
response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e:
logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
# Singleton instance (configured from settings)
_fava_client: Optional[FavaClient] = None
def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0):
"""
Initialize the global Fava client.
Args:
fava_url: Base URL of Fava server
ledger_slug: Ledger identifier
timeout: Request timeout in seconds
"""
global _fava_client
_fava_client = FavaClient(fava_url, ledger_slug, timeout)
logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}")
def get_fava_client() -> FavaClient:
"""
Get the configured Fava client.
Returns:
FavaClient instance
Raises:
RuntimeError: If client not initialized
"""
if _fava_client is None:
raise RuntimeError(
"Fava client not initialized. Call init_fava_client() first. "
"Castle requires Fava for all accounting operations."
)
return _fava_client