castle/fava_client.py
padreug 8396331d5a Calculates user balance from journal entries
Refactors user balance calculation to directly parse journal
entries, enhancing accuracy and efficiency. This change
eliminates reliance on direct database queries and provides a
more reliable mechanism for determining user balances.

Adds logging for debugging purposes.

Also extracts and uses fiat metadata from invoice/payment extras.
2025-11-10 02:18:49 +01:00

666 lines
24 KiB
Python

"""
Fava API client for Castle.
This module provides an async HTTP client for interacting with Fava's JSON API.
All accounting logic is delegated to Fava/Beancount.
Fava provides a REST API for:
- Adding transactions (PUT /api/add_entries)
- Querying balances (GET /api/query)
- Balance sheets (GET /api/balance_sheet)
- Account reports (GET /api/account_report)
See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py
"""
import httpx
from typing import Any, Dict, List, Optional
from decimal import Decimal
from datetime import date, datetime
from loguru import logger
class FavaClient:
"""
Async client for Fava REST API.
Fava runs as a separate web service and provides a JSON API
for adding entries and querying ledger data.
All accounting calculations are performed by Beancount via Fava.
"""
def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0):
"""
Initialize Fava client.
Args:
fava_url: Base URL of Fava server (e.g., http://localhost:3333)
ledger_slug: URL-safe ledger identifier (e.g., castle-accounting)
timeout: Request timeout in seconds
"""
self.fava_url = fava_url.rstrip('/')
self.ledger_slug = ledger_slug
self.base_url = f"{self.fava_url}/{self.ledger_slug}/api"
self.timeout = timeout
async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
"""
Submit a new journal entry to Fava.
Args:
entry: Beancount entry dict (format per Fava API spec)
Must include:
- t: "Transaction" (required by Fava)
- date: "YYYY-MM-DD"
- flag: "*" (cleared) or "!" (pending)
- narration: str
- postings: list of posting dicts
- payee: str (empty string, not None)
- tags: list of str
- links: list of str
- meta: dict
Returns:
Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."})
Raises:
httpx.HTTPStatusError: If Fava returns an error
httpx.RequestError: If connection fails
Example:
entry = {
"t": "Transaction",
"date": "2025-01-15",
"flag": "*",
"payee": "Store",
"narration": "Purchase",
"postings": [
{"account": "Expenses:Food", "amount": "50.00 EUR"},
{"account": "Assets:Cash", "amount": "-50.00 EUR"}
],
"tags": [],
"links": [],
"meta": {"user_id": "abc123"}
}
result = await fava_client.add_entry(entry)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/add_entries",
json={"entries": [entry]},
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}")
return result
except httpx.HTTPStatusError as e:
logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_account_balance(self, account_name: str) -> Dict[str, Any]:
"""
Get balance for a specific account (excluding pending transactions).
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
Returns:
Dict with:
- sats: int (balance in satoshis)
- positions: dict (currency → amount with cost basis)
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
Example:
balance = await fava_client.get_account_balance("Assets:Receivable:User-abc")
# Returns: {
# "sats": 200000,
# "positions": {"SATS": {"{100.00 EUR}": 200000}}
# }
"""
query = f"SELECT sum(position) WHERE account = '{account_name}' AND flag != '!'"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
data = response.json()
if not data['data']['rows']:
return {"sats": 0, "positions": {}}
# Fava returns: [[account, {"SATS": {cost: amount}}]]
positions = data['data']['rows'][0][1] if data['data']['rows'] else {}
# Sum up all SATS positions
total_sats = 0
if isinstance(positions, dict) and "SATS" in positions:
sats_positions = positions["SATS"]
if isinstance(sats_positions, dict):
# Sum all amounts (with different cost bases)
total_sats = sum(int(amount) for amount in sats_positions.values())
elif isinstance(sats_positions, (int, float)):
# Simple number (no cost basis)
total_sats = int(sats_positions)
return {
"sats": total_sats,
"positions": positions
}
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_user_balance(self, user_id: str) -> Dict[str, Any]:
"""
Get user's balance from castle's perspective.
Aggregates:
- Liabilities:Payable:User-{user_id} (negative = castle owes user)
- Assets:Receivable:User-{user_id} (positive = user owes castle)
Args:
user_id: User ID
Returns:
{
"balance": int (sats, positive = user owes castle, negative = castle owes user),
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [list of account dicts with balances]
}
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
"""
# Get all journal entries for this user
all_entries = await self.get_journal_entries()
total_sats = 0
fiat_balances = {}
accounts_dict = {} # Track balances per account
for entry in all_entries:
# Skip non-transactions, pending (!), and voided
if entry.get("t") != "Transaction":
continue
if entry.get("flag") == "!":
continue
if "voided" in entry.get("tags", []):
continue
# Process postings for this user
for posting in entry.get("postings", []):
account_name = posting.get("account", "")
# Only process this user's accounts (account names use first 8 chars of user_id)
if f":User-{user_id[:8]}" not in account_name:
continue
if "Payable" not in account_name and "Receivable" not in account_name:
continue
# Parse amount string: "36791 SATS {33.33 EUR}"
amount_str = posting.get("amount", "")
if not isinstance(amount_str, str) or not amount_str:
continue
import re
# Extract SATS amount (with sign)
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
if sats_match:
sats_amount = int(sats_match.group(1))
total_sats += sats_amount
# Track per account
if account_name not in accounts_dict:
accounts_dict[account_name] = {"account": account_name, "sats": 0}
accounts_dict[account_name]["sats"] += sats_amount
# Extract fiat from cost syntax: {33.33 EUR}
cost_match = re.search(r'\{([\d.]+)\s+([A-Z]+)', amount_str)
if cost_match:
fiat_amount_unsigned = Decimal(cost_match.group(1))
fiat_currency = cost_match.group(2)
if fiat_currency not in fiat_balances:
fiat_balances[fiat_currency] = Decimal(0)
# Apply the same sign as the SATS amount
if sats_match:
sats_amount_for_sign = int(sats_match.group(1))
if sats_amount_for_sign < 0:
fiat_amount_unsigned = -fiat_amount_unsigned
fiat_balances[fiat_currency] += fiat_amount_unsigned
logger.info(f"Found fiat in {account_name}: {fiat_amount_unsigned} {fiat_currency}, running total: {fiat_balances[fiat_currency]}")
result = {
"balance": total_sats,
"fiat_balances": fiat_balances,
"accounts": list(accounts_dict.values())
}
logger.info(f"Returning balance for user {user_id[:8]}: sats={total_sats}, fiat_balances={fiat_balances}")
return result
async def get_all_user_balances(self) -> List[Dict[str, Any]]:
"""
Get balances for all users (admin view).
Returns:
[
{
"user_id": "abc123",
"balance": 100000,
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [...]
},
...
]
Note:
Excludes pending transactions (flag='!') and voided (tag #voided) from balance calculation.
Only cleared/completed transactions (flag='*') are included.
"""
# Get all journal entries and calculate balances from postings
all_entries = await self.get_journal_entries()
# Group by user_id
user_data = {}
for entry in all_entries:
# Skip non-transactions, pending (!), and voided
if entry.get("t") != "Transaction":
continue
if entry.get("flag") == "!":
continue
if "voided" in entry.get("tags", []):
continue
# Process postings
for posting in entry.get("postings", []):
account_name = posting.get("account", "")
# Only process user accounts (Payable or Receivable)
if ":User-" not in account_name:
continue
if "Payable" not in account_name and "Receivable" not in account_name:
continue
# Extract user_id from account name
user_id = account_name.split(":User-")[1]
if user_id not in user_data:
user_data[user_id] = {
"user_id": user_id,
"balance": 0,
"fiat_balances": {},
"accounts": []
}
# Parse amount string: "36791 SATS {33.33 EUR, 2025-11-09}"
amount_str = posting.get("amount", "")
if not isinstance(amount_str, str) or not amount_str:
continue
import re
# Extract SATS amount (with sign)
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
if sats_match:
sats_amount = int(sats_match.group(1))
# For admin/castle view, use Beancount amounts as-is:
# Receivable (asset): positive in Beancount = user owes castle (positive)
# Payable (liability): negative in Beancount = castle owes user (negative)
user_data[user_id]["balance"] += sats_amount
# Extract fiat from cost syntax: {33.33 EUR, ...}
cost_match = re.search(r'\{([\d.]+)\s+([A-Z]+)', amount_str)
if cost_match:
fiat_amount_unsigned = Decimal(cost_match.group(1))
fiat_currency = cost_match.group(2)
if fiat_currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
# Apply the same sign as the SATS amount
# If SATS is negative, fiat should be negative too
if sats_match:
sats_amount_for_sign = int(sats_match.group(1))
if sats_amount_for_sign < 0:
fiat_amount_unsigned = -fiat_amount_unsigned
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount_unsigned
return list(user_data.values())
async def check_fava_health(self) -> bool:
"""
Check if Fava is running and accessible.
Returns:
True if Fava responds, False otherwise
"""
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(
f"{self.base_url}/changed"
)
return response.status_code == 200
except Exception as e:
logger.warning(f"Fava health check failed: {e}")
return False
async def query_transactions(
self,
account_pattern: Optional[str] = None,
limit: int = 100,
include_pending: bool = True
) -> List[Dict[str, Any]]:
"""
Query transactions from Fava/Beancount.
Args:
account_pattern: Optional regex pattern to filter accounts (e.g., "User-abc123")
limit: Maximum number of transactions to return
include_pending: Include pending transactions (flag='!')
Returns:
List of transaction dictionaries with date, description, postings, etc.
Example:
# All transactions
txns = await fava.query_transactions()
# User's transactions
txns = await fava.query_transactions(account_pattern="User-abc123")
# Account transactions
txns = await fava.query_transactions(account_pattern="Assets:Receivable:User-abc")
"""
# Build Beancount query
if account_pattern:
query = f"SELECT * WHERE account ~ '{account_pattern}' ORDER BY date DESC LIMIT {limit}"
else:
query = f"SELECT * ORDER BY date DESC LIMIT {limit}"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
result = response.json()
# Fava query API returns: {"data": {"rows": [...], "types": [...]}}
data = result.get("data", {})
rows = data.get("rows", [])
types = data.get("types", [])
# Build column name mapping
column_names = [t.get("name") for t in types]
# Transform Fava's query result to transaction list
transactions = []
for row in rows:
# Rows are arrays, convert to dict using column names
if isinstance(row, list) and len(row) == len(column_names):
txn = dict(zip(column_names, row))
# Filter by flag if needed
flag = txn.get("flag", "*")
if not include_pending and flag == "!":
continue
transactions.append(txn)
elif isinstance(row, dict):
# Already a dict (shouldn't happen with BQL, but handle it)
flag = row.get("flag", "*")
if not include_pending and flag == "!":
continue
transactions.append(row)
return transactions[:limit]
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_account_transactions(
self,
account_name: str,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get all transactions affecting a specific account.
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
limit: Maximum number of transactions
Returns:
List of transactions affecting this account
"""
return await self.query_transactions(
account_pattern=account_name.replace(":", "\\:"), # Escape colons for regex
limit=limit
)
async def get_user_transactions(
self,
user_id: str,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get all transactions affecting a user's accounts.
Args:
user_id: User ID
limit: Maximum number of transactions
Returns:
List of transactions affecting user's accounts
"""
return await self.query_transactions(
account_pattern=f"User-{user_id[:8]}",
limit=limit
)
async def get_journal_entries(self) -> List[Dict[str, Any]]:
"""
Get all journal entries from Fava (with entry hashes).
Returns:
List of all entries (transactions, opens, closes, etc.) with entry_hash field.
Example:
entries = await fava.get_journal_entries()
# Each entry has: entry_hash, date, flag, narration, tags, links, etc.
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(f"{self.base_url}/journal")
response.raise_for_status()
result = response.json()
return result.get("data", [])
except httpx.HTTPStatusError as e:
logger.error(f"Fava journal error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_entry_context(self, entry_hash: str) -> Dict[str, Any]:
"""
Get entry context including source text and sha256sum.
Args:
entry_hash: Entry hash from get_journal_entries()
Returns:
{
"entry": {...}, # Serialized entry
"slice": "2025-01-15 ! \"Description\"...", # Beancount source text
"sha256sum": "abc123...", # For concurrency control
"balances_before": {...},
"balances_after": {...}
}
Example:
context = await fava.get_entry_context("abc123")
source = context["slice"]
sha256sum = context["sha256sum"]
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/context",
params={"entry_hash": entry_hash}
)
response.raise_for_status()
result = response.json()
return result.get("data", {})
except httpx.HTTPStatusError as e:
logger.error(f"Fava context error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def update_entry_source(self, entry_hash: str, new_source: str, sha256sum: str) -> str:
"""
Update an entry's source text (e.g., change flag from ! to *).
Args:
entry_hash: Entry hash
new_source: Modified Beancount source text
sha256sum: Current sha256sum from get_entry_context() for concurrency control
Returns:
New sha256sum after update
Example:
# Get context
context = await fava.get_entry_context("abc123")
source = context["slice"]
sha256 = context["sha256sum"]
# Change flag
new_source = source.replace("2025-01-15 !", "2025-01-15 *")
# Update
new_sha256 = await fava.update_entry_source("abc123", new_source, sha256)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/source_slice",
json={
"entry_hash": entry_hash,
"source": new_source,
"sha256sum": sha256sum
}
)
response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e:
logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def delete_entry(self, entry_hash: str, sha256sum: str) -> str:
"""
Delete an entry from the Beancount file.
Args:
entry_hash: Entry hash
sha256sum: Current sha256sum for concurrency control
Returns:
Success message
Example:
context = await fava.get_entry_context("abc123")
await fava.delete_entry("abc123", context["sha256sum"])
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.delete(
f"{self.base_url}/source_slice",
params={
"entry_hash": entry_hash,
"sha256sum": sha256sum
}
)
response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e:
logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
# Singleton instance (configured from settings)
_fava_client: Optional[FavaClient] = None
def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0):
"""
Initialize the global Fava client.
Args:
fava_url: Base URL of Fava server
ledger_slug: Ledger identifier
timeout: Request timeout in seconds
"""
global _fava_client
_fava_client = FavaClient(fava_url, ledger_slug, timeout)
logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}")
def get_fava_client() -> FavaClient:
"""
Get the configured Fava client.
Returns:
FavaClient instance
Raises:
RuntimeError: If client not initialized
"""
if _fava_client is None:
raise RuntimeError(
"Fava client not initialized. Call init_fava_client() first. "
"Castle requires Fava for all accounting operations."
)
return _fava_client