Improves the handling of pending entries by extracting and deduplicating data from Fava's query results. Adds support for displaying fiat amounts alongside entries and extracts them from the position data in Fava. Streamlines receivables/payables/equity checks on the frontend by relying on BQL query to supply account type metadata and tags.
550 lines
20 KiB
Python
550 lines
20 KiB
Python
"""
|
|
Fava API client for Castle.
|
|
|
|
This module provides an async HTTP client for interacting with Fava's JSON API.
|
|
All accounting logic is delegated to Fava/Beancount.
|
|
|
|
Fava provides a REST API for:
|
|
- Adding transactions (PUT /api/add_entries)
|
|
- Querying balances (GET /api/query)
|
|
- Balance sheets (GET /api/balance_sheet)
|
|
- Account reports (GET /api/account_report)
|
|
|
|
See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py
|
|
"""
|
|
|
|
import httpx
|
|
from typing import Any, Dict, List, Optional
|
|
from decimal import Decimal
|
|
from datetime import date, datetime
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FavaClient:
|
|
"""
|
|
Async client for Fava REST API.
|
|
|
|
Fava runs as a separate web service and provides a JSON API
|
|
for adding entries and querying ledger data.
|
|
|
|
All accounting calculations are performed by Beancount via Fava.
|
|
"""
|
|
|
|
def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0):
|
|
"""
|
|
Initialize Fava client.
|
|
|
|
Args:
|
|
fava_url: Base URL of Fava server (e.g., http://localhost:3333)
|
|
ledger_slug: URL-safe ledger identifier (e.g., castle-accounting)
|
|
timeout: Request timeout in seconds
|
|
"""
|
|
self.fava_url = fava_url.rstrip('/')
|
|
self.ledger_slug = ledger_slug
|
|
self.base_url = f"{self.fava_url}/{self.ledger_slug}/api"
|
|
self.timeout = timeout
|
|
|
|
async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Submit a new journal entry to Fava.
|
|
|
|
Args:
|
|
entry: Beancount entry dict (format per Fava API spec)
|
|
Must include:
|
|
- t: "Transaction" (required by Fava)
|
|
- date: "YYYY-MM-DD"
|
|
- flag: "*" (cleared) or "!" (pending)
|
|
- narration: str
|
|
- postings: list of posting dicts
|
|
- payee: str (empty string, not None)
|
|
- tags: list of str
|
|
- links: list of str
|
|
- meta: dict
|
|
|
|
Returns:
|
|
Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."})
|
|
|
|
Raises:
|
|
httpx.HTTPStatusError: If Fava returns an error
|
|
httpx.RequestError: If connection fails
|
|
|
|
Example:
|
|
entry = {
|
|
"t": "Transaction",
|
|
"date": "2025-01-15",
|
|
"flag": "*",
|
|
"payee": "Store",
|
|
"narration": "Purchase",
|
|
"postings": [
|
|
{"account": "Expenses:Food", "amount": "50.00 EUR"},
|
|
{"account": "Assets:Cash", "amount": "-50.00 EUR"}
|
|
],
|
|
"tags": [],
|
|
"links": [],
|
|
"meta": {"user_id": "abc123"}
|
|
}
|
|
result = await fava_client.add_entry(entry)
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.put(
|
|
f"{self.base_url}/add_entries",
|
|
json={"entries": [entry]},
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}")
|
|
return result
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_account_balance(self, account_name: str) -> Dict[str, Any]:
|
|
"""
|
|
Get balance for a specific account.
|
|
|
|
Args:
|
|
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
|
|
|
|
Returns:
|
|
Dict with:
|
|
- sats: int (balance in satoshis)
|
|
- positions: dict (currency → amount with cost basis)
|
|
|
|
Example:
|
|
balance = await fava_client.get_account_balance("Assets:Receivable:User-abc")
|
|
# Returns: {
|
|
# "sats": 200000,
|
|
# "positions": {"SATS": {"{100.00 EUR}": 200000}}
|
|
# }
|
|
"""
|
|
query = f"SELECT sum(position) WHERE account = '{account_name}'"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/query",
|
|
params={"query_string": query}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if not data['data']['rows']:
|
|
return {"sats": 0, "positions": {}}
|
|
|
|
# Fava returns: [[account, {"SATS": {cost: amount}}]]
|
|
positions = data['data']['rows'][0][1] if data['data']['rows'] else {}
|
|
|
|
# Sum up all SATS positions
|
|
total_sats = 0
|
|
if isinstance(positions, dict) and "SATS" in positions:
|
|
sats_positions = positions["SATS"]
|
|
if isinstance(sats_positions, dict):
|
|
# Sum all amounts (with different cost bases)
|
|
total_sats = sum(int(amount) for amount in sats_positions.values())
|
|
elif isinstance(sats_positions, (int, float)):
|
|
# Simple number (no cost basis)
|
|
total_sats = int(sats_positions)
|
|
|
|
return {
|
|
"sats": total_sats,
|
|
"positions": positions
|
|
}
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_user_balance(self, user_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get user's total balance (what castle owes user).
|
|
|
|
Aggregates:
|
|
- Liabilities:Payable:User-{user_id} (positive = castle owes)
|
|
- Assets:Receivable:User-{user_id} (positive = user owes, so negate)
|
|
|
|
Args:
|
|
user_id: User ID
|
|
|
|
Returns:
|
|
{
|
|
"balance": int (sats, positive = castle owes user),
|
|
"fiat_balances": {"EUR": Decimal("100.50")},
|
|
"accounts": [list of account dicts with balances]
|
|
}
|
|
"""
|
|
# Query for all accounts matching user
|
|
query = f"""
|
|
SELECT account, sum(position)
|
|
WHERE account ~ 'User-{user_id[:8]}'
|
|
GROUP BY account
|
|
"""
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/query",
|
|
params={"query_string": query}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Calculate user balance
|
|
total_sats = 0
|
|
fiat_balances = {}
|
|
accounts = []
|
|
|
|
for row in data['data']['rows']:
|
|
account_name = row[0]
|
|
positions = row[1] # {"SATS": {cost: amount, ...}}
|
|
|
|
account_balance = {"account": account_name, "sats": 0, "positions": positions}
|
|
|
|
# Process SATS positions
|
|
if isinstance(positions, dict) and "SATS" in positions:
|
|
sats_positions = positions["SATS"]
|
|
|
|
if isinstance(sats_positions, dict):
|
|
# Positions with cost basis: {"100.00 EUR": 200000, ...}
|
|
for cost_str, amount in sats_positions.items():
|
|
amount_int = int(amount)
|
|
|
|
# For user balance perspective, negate Beancount balance
|
|
# - Payable (Liability): negative in Beancount → positive (castle owes user)
|
|
# - Receivable (Asset): positive in Beancount → negative (user owes castle)
|
|
adjusted_amount = -amount_int
|
|
total_sats += adjusted_amount
|
|
account_balance["sats"] += adjusted_amount
|
|
|
|
# Extract fiat amount from cost basis
|
|
# Format: "100.00 EUR" or "{100.00 EUR}"
|
|
if cost_str and cost_str != "SATS":
|
|
cost_clean = cost_str.strip('{}')
|
|
parts = cost_clean.split()
|
|
if len(parts) == 2:
|
|
try:
|
|
fiat_amount = Decimal(parts[0])
|
|
fiat_currency = parts[1]
|
|
|
|
if fiat_currency not in fiat_balances:
|
|
fiat_balances[fiat_currency] = Decimal(0)
|
|
|
|
# Apply same sign adjustment to fiat
|
|
# Cost basis is always positive, derive sign from amount
|
|
if amount_int < 0:
|
|
fiat_amount = -fiat_amount
|
|
adjusted_fiat = -fiat_amount
|
|
fiat_balances[fiat_currency] += adjusted_fiat
|
|
except (ValueError, IndexError):
|
|
logger.warning(f"Could not parse cost basis: {cost_str}")
|
|
|
|
elif isinstance(sats_positions, (int, float)):
|
|
# Simple number (no cost basis)
|
|
amount_int = int(sats_positions)
|
|
# Negate Beancount balance for user perspective
|
|
adjusted_amount = -amount_int
|
|
total_sats += adjusted_amount
|
|
account_balance["sats"] += adjusted_amount
|
|
|
|
accounts.append(account_balance)
|
|
|
|
return {
|
|
"balance": total_sats,
|
|
"fiat_balances": fiat_balances,
|
|
"accounts": accounts
|
|
}
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_all_user_balances(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get balances for all users (admin view).
|
|
|
|
Returns:
|
|
[
|
|
{
|
|
"user_id": "abc123",
|
|
"balance": 100000,
|
|
"fiat_balances": {"EUR": Decimal("100.50")},
|
|
"accounts": [...]
|
|
},
|
|
...
|
|
]
|
|
"""
|
|
query = """
|
|
SELECT account, sum(position)
|
|
WHERE account ~ 'Payable:User-|Receivable:User-'
|
|
GROUP BY account
|
|
"""
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/query",
|
|
params={"query_string": query}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Group by user_id
|
|
user_data = {}
|
|
|
|
for row in data['data']['rows']:
|
|
account_name = row[0]
|
|
positions = row[1]
|
|
|
|
# Extract user_id from account name
|
|
# e.g., "Liabilities:Payable:User-abc123" → "abc123..."
|
|
if ":User-" in account_name:
|
|
user_id = account_name.split(":User-")[1]
|
|
else:
|
|
continue
|
|
|
|
if user_id not in user_data:
|
|
user_data[user_id] = {
|
|
"user_id": user_id,
|
|
"balance": 0,
|
|
"fiat_balances": {},
|
|
"accounts": []
|
|
}
|
|
|
|
account_info = {"account": account_name, "sats": 0, "positions": positions}
|
|
|
|
# Process positions
|
|
if isinstance(positions, dict) and "SATS" in positions:
|
|
sats_positions = positions["SATS"]
|
|
|
|
if isinstance(sats_positions, dict):
|
|
for cost_str, amount in sats_positions.items():
|
|
amount_int = int(amount)
|
|
|
|
# Negate Beancount balance for user perspective
|
|
adjusted_amount = -amount_int
|
|
user_data[user_id]["balance"] += adjusted_amount
|
|
account_info["sats"] += adjusted_amount
|
|
|
|
# Extract fiat
|
|
if cost_str and cost_str != "SATS":
|
|
cost_clean = cost_str.strip('{}')
|
|
parts = cost_clean.split()
|
|
if len(parts) == 2:
|
|
try:
|
|
fiat_amount = Decimal(parts[0])
|
|
fiat_currency = parts[1]
|
|
|
|
if fiat_currency not in user_data[user_id]["fiat_balances"]:
|
|
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
|
|
|
|
# Apply sign from amount to fiat
|
|
if amount_int < 0:
|
|
fiat_amount = -fiat_amount
|
|
adjusted_fiat = -fiat_amount
|
|
user_data[user_id]["fiat_balances"][fiat_currency] += adjusted_fiat
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
elif isinstance(sats_positions, (int, float)):
|
|
amount_int = int(sats_positions)
|
|
# Negate Beancount balance for user perspective
|
|
adjusted_amount = -amount_int
|
|
user_data[user_id]["balance"] += adjusted_amount
|
|
account_info["sats"] += adjusted_amount
|
|
|
|
user_data[user_id]["accounts"].append(account_info)
|
|
|
|
return list(user_data.values())
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def check_fava_health(self) -> bool:
|
|
"""
|
|
Check if Fava is running and accessible.
|
|
|
|
Returns:
|
|
True if Fava responds, False otherwise
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=2.0) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/changed"
|
|
)
|
|
return response.status_code == 200
|
|
except Exception as e:
|
|
logger.warning(f"Fava health check failed: {e}")
|
|
return False
|
|
|
|
async def query_transactions(
|
|
self,
|
|
account_pattern: Optional[str] = None,
|
|
limit: int = 100,
|
|
include_pending: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Query transactions from Fava/Beancount.
|
|
|
|
Args:
|
|
account_pattern: Optional regex pattern to filter accounts (e.g., "User-abc123")
|
|
limit: Maximum number of transactions to return
|
|
include_pending: Include pending transactions (flag='!')
|
|
|
|
Returns:
|
|
List of transaction dictionaries with date, description, postings, etc.
|
|
|
|
Example:
|
|
# All transactions
|
|
txns = await fava.query_transactions()
|
|
|
|
# User's transactions
|
|
txns = await fava.query_transactions(account_pattern="User-abc123")
|
|
|
|
# Account transactions
|
|
txns = await fava.query_transactions(account_pattern="Assets:Receivable:User-abc")
|
|
"""
|
|
# Build Beancount query
|
|
if account_pattern:
|
|
query = f"SELECT * WHERE account ~ '{account_pattern}' ORDER BY date DESC LIMIT {limit}"
|
|
else:
|
|
query = f"SELECT * ORDER BY date DESC LIMIT {limit}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/query",
|
|
params={"query_string": query}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
# Fava query API returns: {"data": {"rows": [...], "types": [...]}}
|
|
data = result.get("data", {})
|
|
rows = data.get("rows", [])
|
|
types = data.get("types", [])
|
|
|
|
# Build column name mapping
|
|
column_names = [t.get("name") for t in types]
|
|
|
|
# Transform Fava's query result to transaction list
|
|
transactions = []
|
|
for row in rows:
|
|
# Rows are arrays, convert to dict using column names
|
|
if isinstance(row, list) and len(row) == len(column_names):
|
|
txn = dict(zip(column_names, row))
|
|
|
|
# Filter by flag if needed
|
|
flag = txn.get("flag", "*")
|
|
if not include_pending and flag == "!":
|
|
continue
|
|
|
|
transactions.append(txn)
|
|
elif isinstance(row, dict):
|
|
# Already a dict (shouldn't happen with BQL, but handle it)
|
|
flag = row.get("flag", "*")
|
|
if not include_pending and flag == "!":
|
|
continue
|
|
transactions.append(row)
|
|
|
|
return transactions[:limit]
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_account_transactions(
|
|
self,
|
|
account_name: str,
|
|
limit: int = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all transactions affecting a specific account.
|
|
|
|
Args:
|
|
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
|
|
limit: Maximum number of transactions
|
|
|
|
Returns:
|
|
List of transactions affecting this account
|
|
"""
|
|
return await self.query_transactions(
|
|
account_pattern=account_name.replace(":", "\\:"), # Escape colons for regex
|
|
limit=limit
|
|
)
|
|
|
|
async def get_user_transactions(
|
|
self,
|
|
user_id: str,
|
|
limit: int = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all transactions affecting a user's accounts.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
limit: Maximum number of transactions
|
|
|
|
Returns:
|
|
List of transactions affecting user's accounts
|
|
"""
|
|
return await self.query_transactions(
|
|
account_pattern=f"User-{user_id[:8]}",
|
|
limit=limit
|
|
)
|
|
|
|
|
|
# Singleton instance (configured from settings)
|
|
_fava_client: Optional[FavaClient] = None
|
|
|
|
|
|
def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0):
|
|
"""
|
|
Initialize the global Fava client.
|
|
|
|
Args:
|
|
fava_url: Base URL of Fava server
|
|
ledger_slug: Ledger identifier
|
|
timeout: Request timeout in seconds
|
|
"""
|
|
global _fava_client
|
|
_fava_client = FavaClient(fava_url, ledger_slug, timeout)
|
|
logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}")
|
|
|
|
|
|
def get_fava_client() -> FavaClient:
|
|
"""
|
|
Get the configured Fava client.
|
|
|
|
Returns:
|
|
FavaClient instance
|
|
|
|
Raises:
|
|
RuntimeError: If client not initialized
|
|
"""
|
|
if _fava_client is None:
|
|
raise RuntimeError(
|
|
"Fava client not initialized. Call init_fava_client() first. "
|
|
"Castle requires Fava for all accounting operations."
|
|
)
|
|
return _fava_client
|