castle/fava_client.py
padreug ca2ce1dfcc Refactors expense tracking to use fiat amounts
Updates the expense tracking system to store payables and receivables in fiat currency within Beancount.
This ensures accurate debt representation and simplifies balance calculations.
Changes include:
- Converting `format_expense_entry` and `format_receivable_entry` to use fiat amounts.
- Introducing `format_net_settlement_entry` for net settlement payments.
- Modifying `format_payment_entry` to use cost syntax for fiat tracking.
- Adjusting Fava client to correctly process new amount formats and metadata.
- Adding average cost basis posting format

The use of fiat amounts and cost basis aims to provide better accuracy and compatibility with existing Beancount workflows.
2025-11-10 03:33:04 +01:00

713 lines
27 KiB
Python

"""
Fava API client for Castle.
This module provides an async HTTP client for interacting with Fava's JSON API.
All accounting logic is delegated to Fava/Beancount.
Fava provides a REST API for:
- Adding transactions (PUT /api/add_entries)
- Querying balances (GET /api/query)
- Balance sheets (GET /api/balance_sheet)
- Account reports (GET /api/account_report)
See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py
"""
import httpx
from typing import Any, Dict, List, Optional
from decimal import Decimal
from datetime import date, datetime
from loguru import logger
class FavaClient:
"""
Async client for Fava REST API.
Fava runs as a separate web service and provides a JSON API
for adding entries and querying ledger data.
All accounting calculations are performed by Beancount via Fava.
"""
def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0):
"""
Initialize Fava client.
Args:
fava_url: Base URL of Fava server (e.g., http://localhost:3333)
ledger_slug: URL-safe ledger identifier (e.g., castle-accounting)
timeout: Request timeout in seconds
"""
self.fava_url = fava_url.rstrip('/')
self.ledger_slug = ledger_slug
self.base_url = f"{self.fava_url}/{self.ledger_slug}/api"
self.timeout = timeout
async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
"""
Submit a new journal entry to Fava.
Args:
entry: Beancount entry dict (format per Fava API spec)
Must include:
- t: "Transaction" (required by Fava)
- date: "YYYY-MM-DD"
- flag: "*" (cleared) or "!" (pending)
- narration: str
- postings: list of posting dicts
- payee: str (empty string, not None)
- tags: list of str
- links: list of str
- meta: dict
Returns:
Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."})
Raises:
httpx.HTTPStatusError: If Fava returns an error
httpx.RequestError: If connection fails
Example:
entry = {
"t": "Transaction",
"date": "2025-01-15",
"flag": "*",
"payee": "Store",
"narration": "Purchase",
"postings": [
{"account": "Expenses:Food", "amount": "50.00 EUR"},
{"account": "Assets:Cash", "amount": "-50.00 EUR"}
],
"tags": [],
"links": [],
"meta": {"user_id": "abc123"}
}
result = await fava_client.add_entry(entry)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/add_entries",
json={"entries": [entry]},
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}")
return result
except httpx.HTTPStatusError as e:
logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_account_balance(self, account_name: str) -> Dict[str, Any]:
"""
Get balance for a specific account (excluding pending transactions).
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
Returns:
Dict with:
- sats: int (balance in satoshis)
- positions: dict (currency → amount with cost basis)
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
Example:
balance = await fava_client.get_account_balance("Assets:Receivable:User-abc")
# Returns: {
# "sats": 200000,
# "positions": {"SATS": {"{100.00 EUR}": 200000}}
# }
"""
query = f"SELECT sum(position) WHERE account = '{account_name}' AND flag != '!'"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
data = response.json()
if not data['data']['rows']:
return {"sats": 0, "positions": {}}
# Fava returns: [[account, {"SATS": {cost: amount}}]]
positions = data['data']['rows'][0][1] if data['data']['rows'] else {}
# Sum up all SATS positions
total_sats = 0
if isinstance(positions, dict) and "SATS" in positions:
sats_positions = positions["SATS"]
if isinstance(sats_positions, dict):
# Sum all amounts (with different cost bases)
total_sats = sum(int(amount) for amount in sats_positions.values())
elif isinstance(sats_positions, (int, float)):
# Simple number (no cost basis)
total_sats = int(sats_positions)
return {
"sats": total_sats,
"positions": positions
}
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_user_balance(self, user_id: str) -> Dict[str, Any]:
"""
Get user's balance from castle's perspective.
Aggregates:
- Liabilities:Payable:User-{user_id} (negative = castle owes user)
- Assets:Receivable:User-{user_id} (positive = user owes castle)
Args:
user_id: User ID
Returns:
{
"balance": int (sats, positive = user owes castle, negative = castle owes user),
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [list of account dicts with balances]
}
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
"""
# Get all journal entries for this user
all_entries = await self.get_journal_entries()
logger.info(f"Processing {len(all_entries)} journal entries for user {user_id[:8]}")
total_sats = 0
fiat_balances = {}
accounts_dict = {} # Track balances per account
processed_count = 0
for entry in all_entries:
# Skip non-transactions, pending (!), and voided
if entry.get("t") != "Transaction":
continue
if entry.get("flag") == "!":
continue
if "voided" in entry.get("tags", []):
continue
# Check if this entry has any User-375ec158 postings
has_user_posting = any(f":User-{user_id[:8]}" in p.get("account", "") for p in entry.get("postings", []))
if has_user_posting:
logger.info(f"Entry '{entry.get('narration', 'N/A')}' has {len(entry.get('postings', []))} postings")
# Process postings for this user
for posting in entry.get("postings", []):
account_name = posting.get("account", "")
# Only process this user's accounts (account names use first 8 chars of user_id)
if f":User-{user_id[:8]}" not in account_name:
continue
if "Payable" not in account_name and "Receivable" not in account_name:
continue
# Parse amount string: can be EUR, USD, or SATS
amount_str = posting.get("amount", "")
if not isinstance(amount_str, str) or not amount_str:
continue
logger.info(f"Processing posting in {account_name}: amount_str='{amount_str}'")
processed_count += 1
import re
# Try to extract EUR/USD amount first (new format)
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
# Direct EUR/USD amount (new approach)
fiat_amount = Decimal(fiat_match.group(1))
fiat_currency = fiat_match.group(2)
if fiat_currency not in fiat_balances:
fiat_balances[fiat_currency] = Decimal(0)
fiat_balances[fiat_currency] += fiat_amount
logger.info(f"Found fiat in {account_name}: {fiat_amount} {fiat_currency} (direct), running total: {fiat_balances[fiat_currency]}")
# Also track SATS equivalent from metadata if available
posting_meta = posting.get("meta", {})
sats_equiv = posting_meta.get("sats-equivalent")
if sats_equiv:
sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv)
total_sats += sats_amount
if account_name not in accounts_dict:
accounts_dict[account_name] = {"account": account_name, "sats": 0}
accounts_dict[account_name]["sats"] += sats_amount
else:
# Old format: SATS with cost/price notation - extract SATS amount
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
if sats_match:
sats_amount = int(sats_match.group(1))
total_sats += sats_amount
# Track per account
if account_name not in accounts_dict:
accounts_dict[account_name] = {"account": account_name, "sats": 0}
accounts_dict[account_name]["sats"] += sats_amount
# Try to extract fiat from metadata or cost syntax (backward compatibility)
posting_meta = posting.get("meta", {})
fiat_amount_total_str = posting_meta.get("fiat-amount-total")
fiat_currency_meta = posting_meta.get("fiat-currency")
if fiat_amount_total_str and fiat_currency_meta:
# Use exact total from metadata
fiat_total = Decimal(fiat_amount_total_str)
fiat_currency = fiat_currency_meta
if fiat_currency not in fiat_balances:
fiat_balances[fiat_currency] = Decimal(0)
# Apply the same sign as the SATS amount
if sats_match:
sats_amount_for_sign = int(sats_match.group(1))
if sats_amount_for_sign < 0:
fiat_total = -fiat_total
fiat_balances[fiat_currency] += fiat_total
logger.info(f"Found fiat in {account_name}: {fiat_total} {fiat_currency} (from SATS metadata), running total: {fiat_balances[fiat_currency]}")
result = {
"balance": total_sats,
"fiat_balances": fiat_balances,
"accounts": list(accounts_dict.values())
}
logger.info(f"Processed {processed_count} postings for user {user_id[:8]}")
logger.info(f"Returning balance for user {user_id[:8]}: sats={total_sats}, fiat_balances={fiat_balances}")
return result
async def get_all_user_balances(self) -> List[Dict[str, Any]]:
"""
Get balances for all users (admin view).
Returns:
[
{
"user_id": "abc123",
"balance": 100000,
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [...]
},
...
]
Note:
Excludes pending transactions (flag='!') and voided (tag #voided) from balance calculation.
Only cleared/completed transactions (flag='*') are included.
"""
# Get all journal entries and calculate balances from postings
all_entries = await self.get_journal_entries()
# Group by user_id
user_data = {}
for entry in all_entries:
# Skip non-transactions, pending (!), and voided
if entry.get("t") != "Transaction":
continue
if entry.get("flag") == "!":
continue
if "voided" in entry.get("tags", []):
continue
# Process postings
for posting in entry.get("postings", []):
account_name = posting.get("account", "")
# Only process user accounts (Payable or Receivable)
if ":User-" not in account_name:
continue
if "Payable" not in account_name and "Receivable" not in account_name:
continue
# Extract user_id from account name
user_id = account_name.split(":User-")[1]
if user_id not in user_data:
user_data[user_id] = {
"user_id": user_id,
"balance": 0,
"fiat_balances": {},
"accounts": []
}
# Parse amount string: "36791 SATS {33.33 EUR, 2025-11-09}"
amount_str = posting.get("amount", "")
if not isinstance(amount_str, str) or not amount_str:
continue
import re
# Extract SATS amount (with sign)
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
if sats_match:
sats_amount = int(sats_match.group(1))
# For admin/castle view, use Beancount amounts as-is:
# Receivable (asset): positive in Beancount = user owes castle (positive)
# Payable (liability): negative in Beancount = castle owes user (negative)
user_data[user_id]["balance"] += sats_amount
# Extract fiat from cost syntax: {33.33 EUR, ...}
cost_match = re.search(r'\{([\d.]+)\s+([A-Z]+)', amount_str)
if cost_match:
fiat_amount_unsigned = Decimal(cost_match.group(1))
fiat_currency = cost_match.group(2)
if fiat_currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
# Apply the same sign as the SATS amount
# If SATS is negative, fiat should be negative too
if sats_match:
sats_amount_for_sign = int(sats_match.group(1))
if sats_amount_for_sign < 0:
fiat_amount_unsigned = -fiat_amount_unsigned
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount_unsigned
return list(user_data.values())
async def check_fava_health(self) -> bool:
"""
Check if Fava is running and accessible.
Returns:
True if Fava responds, False otherwise
"""
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(
f"{self.base_url}/changed"
)
return response.status_code == 200
except Exception as e:
logger.warning(f"Fava health check failed: {e}")
return False
async def query_transactions(
self,
account_pattern: Optional[str] = None,
limit: int = 100,
include_pending: bool = True
) -> List[Dict[str, Any]]:
"""
Query transactions from Fava/Beancount.
Args:
account_pattern: Optional regex pattern to filter accounts (e.g., "User-abc123")
limit: Maximum number of transactions to return
include_pending: Include pending transactions (flag='!')
Returns:
List of transaction dictionaries with date, description, postings, etc.
Example:
# All transactions
txns = await fava.query_transactions()
# User's transactions
txns = await fava.query_transactions(account_pattern="User-abc123")
# Account transactions
txns = await fava.query_transactions(account_pattern="Assets:Receivable:User-abc")
"""
# Build Beancount query
if account_pattern:
query = f"SELECT * WHERE account ~ '{account_pattern}' ORDER BY date DESC LIMIT {limit}"
else:
query = f"SELECT * ORDER BY date DESC LIMIT {limit}"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
result = response.json()
# Fava query API returns: {"data": {"rows": [...], "types": [...]}}
data = result.get("data", {})
rows = data.get("rows", [])
types = data.get("types", [])
# Build column name mapping
column_names = [t.get("name") for t in types]
# Transform Fava's query result to transaction list
transactions = []
for row in rows:
# Rows are arrays, convert to dict using column names
if isinstance(row, list) and len(row) == len(column_names):
txn = dict(zip(column_names, row))
# Filter by flag if needed
flag = txn.get("flag", "*")
if not include_pending and flag == "!":
continue
transactions.append(txn)
elif isinstance(row, dict):
# Already a dict (shouldn't happen with BQL, but handle it)
flag = row.get("flag", "*")
if not include_pending and flag == "!":
continue
transactions.append(row)
return transactions[:limit]
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_account_transactions(
self,
account_name: str,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get all transactions affecting a specific account.
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
limit: Maximum number of transactions
Returns:
List of transactions affecting this account
"""
return await self.query_transactions(
account_pattern=account_name.replace(":", "\\:"), # Escape colons for regex
limit=limit
)
async def get_user_transactions(
self,
user_id: str,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get all transactions affecting a user's accounts.
Args:
user_id: User ID
limit: Maximum number of transactions
Returns:
List of transactions affecting user's accounts
"""
return await self.query_transactions(
account_pattern=f"User-{user_id[:8]}",
limit=limit
)
async def get_journal_entries(self) -> List[Dict[str, Any]]:
"""
Get all journal entries from Fava (with entry hashes).
Returns:
List of all entries (transactions, opens, closes, etc.) with entry_hash field.
Example:
entries = await fava.get_journal_entries()
# Each entry has: entry_hash, date, flag, narration, tags, links, etc.
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(f"{self.base_url}/journal")
response.raise_for_status()
result = response.json()
entries = result.get("data", [])
logger.info(f"Fava /journal returned {len(entries)} entries")
# Log transactions with "Lightning payment" in narration
lightning_entries = [e for e in entries if "Lightning payment" in e.get("narration", "")]
logger.info(f"Found {len(lightning_entries)} Lightning payment entries in journal")
return entries
except httpx.HTTPStatusError as e:
logger.error(f"Fava journal error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_entry_context(self, entry_hash: str) -> Dict[str, Any]:
"""
Get entry context including source text and sha256sum.
Args:
entry_hash: Entry hash from get_journal_entries()
Returns:
{
"entry": {...}, # Serialized entry
"slice": "2025-01-15 ! \"Description\"...", # Beancount source text
"sha256sum": "abc123...", # For concurrency control
"balances_before": {...},
"balances_after": {...}
}
Example:
context = await fava.get_entry_context("abc123")
source = context["slice"]
sha256sum = context["sha256sum"]
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/context",
params={"entry_hash": entry_hash}
)
response.raise_for_status()
result = response.json()
return result.get("data", {})
except httpx.HTTPStatusError as e:
logger.error(f"Fava context error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def update_entry_source(self, entry_hash: str, new_source: str, sha256sum: str) -> str:
"""
Update an entry's source text (e.g., change flag from ! to *).
Args:
entry_hash: Entry hash
new_source: Modified Beancount source text
sha256sum: Current sha256sum from get_entry_context() for concurrency control
Returns:
New sha256sum after update
Example:
# Get context
context = await fava.get_entry_context("abc123")
source = context["slice"]
sha256 = context["sha256sum"]
# Change flag
new_source = source.replace("2025-01-15 !", "2025-01-15 *")
# Update
new_sha256 = await fava.update_entry_source("abc123", new_source, sha256)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/source_slice",
json={
"entry_hash": entry_hash,
"source": new_source,
"sha256sum": sha256sum
}
)
response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e:
logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def delete_entry(self, entry_hash: str, sha256sum: str) -> str:
"""
Delete an entry from the Beancount file.
Args:
entry_hash: Entry hash
sha256sum: Current sha256sum for concurrency control
Returns:
Success message
Example:
context = await fava.get_entry_context("abc123")
await fava.delete_entry("abc123", context["sha256sum"])
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.delete(
f"{self.base_url}/source_slice",
params={
"entry_hash": entry_hash,
"sha256sum": sha256sum
}
)
response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e:
logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
# Singleton instance (configured from settings)
_fava_client: Optional[FavaClient] = None
def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0):
"""
Initialize the global Fava client.
Args:
fava_url: Base URL of Fava server
ledger_slug: Ledger identifier
timeout: Request timeout in seconds
"""
global _fava_client
_fava_client = FavaClient(fava_url, ledger_slug, timeout)
logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}")
def get_fava_client() -> FavaClient:
"""
Get the configured Fava client.
Returns:
FavaClient instance
Raises:
RuntimeError: If client not initialized
"""
if _fava_client is None:
raise RuntimeError(
"Fava client not initialized. Call init_fava_client() first. "
"Castle requires Fava for all accounting operations."
)
return _fava_client