1167 lines
45 KiB
Python
1167 lines
45 KiB
Python
"""
|
|
Fava API client for Castle.
|
|
|
|
This module provides an async HTTP client for interacting with Fava's JSON API.
|
|
All accounting logic is delegated to Fava/Beancount.
|
|
|
|
Fava provides a REST API for:
|
|
- Adding transactions (PUT /api/add_entries)
|
|
- Adding accounts via Open directives (PUT /api/add_entries)
|
|
- Querying balances (GET /api/query)
|
|
- Balance sheets (GET /api/balance_sheet)
|
|
- Account reports (GET /api/account_report)
|
|
- Updating/deleting entries (PUT/DELETE /api/source_slice)
|
|
|
|
See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py
|
|
"""
|
|
|
|
import httpx
|
|
from typing import Any, Dict, List, Optional
|
|
from decimal import Decimal
|
|
from datetime import date, datetime
|
|
from loguru import logger
|
|
|
|
|
|
class FavaClient:
|
|
"""
|
|
Async client for Fava REST API.
|
|
|
|
Fava runs as a separate web service and provides a JSON API
|
|
for adding entries and querying ledger data.
|
|
|
|
All accounting calculations are performed by Beancount via Fava.
|
|
"""
|
|
|
|
def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0):
|
|
"""
|
|
Initialize Fava client.
|
|
|
|
Args:
|
|
fava_url: Base URL of Fava server (e.g., http://localhost:3333)
|
|
ledger_slug: URL-safe ledger identifier (e.g., castle-accounting)
|
|
timeout: Request timeout in seconds
|
|
"""
|
|
self.fava_url = fava_url.rstrip('/')
|
|
self.ledger_slug = ledger_slug
|
|
self.base_url = f"{self.fava_url}/{self.ledger_slug}/api"
|
|
self.timeout = timeout
|
|
|
|
async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Submit a new journal entry to Fava.
|
|
|
|
Args:
|
|
entry: Beancount entry dict (format per Fava API spec)
|
|
Must include:
|
|
- t: "Transaction" (required by Fava)
|
|
- date: "YYYY-MM-DD"
|
|
- flag: "*" (cleared) or "!" (pending)
|
|
- narration: str
|
|
- postings: list of posting dicts
|
|
- payee: str (empty string, not None)
|
|
- tags: list of str
|
|
- links: list of str
|
|
- meta: dict
|
|
|
|
Returns:
|
|
Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."})
|
|
|
|
Raises:
|
|
httpx.HTTPStatusError: If Fava returns an error
|
|
httpx.RequestError: If connection fails
|
|
|
|
Example:
|
|
entry = {
|
|
"t": "Transaction",
|
|
"date": "2025-01-15",
|
|
"flag": "*",
|
|
"payee": "Store",
|
|
"narration": "Purchase",
|
|
"postings": [
|
|
{"account": "Expenses:Food", "amount": "50.00 EUR"},
|
|
{"account": "Assets:Cash", "amount": "-50.00 EUR"}
|
|
],
|
|
"tags": [],
|
|
"links": [],
|
|
"meta": {"user_id": "abc123"}
|
|
}
|
|
result = await fava_client.add_entry(entry)
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.put(
|
|
f"{self.base_url}/add_entries",
|
|
json={"entries": [entry]},
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}")
|
|
return result
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_account_balance(self, account_name: str) -> Dict[str, Any]:
|
|
"""
|
|
Get balance for a specific account (excluding pending transactions).
|
|
|
|
Args:
|
|
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
|
|
|
|
Returns:
|
|
Dict with:
|
|
- sats: int (balance in satoshis)
|
|
- positions: dict (currency → amount with cost basis)
|
|
|
|
Note:
|
|
Excludes pending transactions (flag='!') from balance calculation.
|
|
Only cleared/completed transactions (flag='*') are included.
|
|
|
|
Example:
|
|
balance = await fava_client.get_account_balance("Assets:Receivable:User-abc")
|
|
# Returns: {
|
|
# "sats": 200000,
|
|
# "positions": {"SATS": {"{100.00 EUR}": 200000}}
|
|
# }
|
|
"""
|
|
query = f"SELECT sum(position) WHERE account = '{account_name}' AND flag != '!'"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/query",
|
|
params={"query_string": query}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if not data['data']['rows']:
|
|
return {"sats": 0, "positions": {}}
|
|
|
|
# Fava returns: [[account, {"SATS": {cost: amount}}]]
|
|
positions = data['data']['rows'][0][1] if data['data']['rows'] else {}
|
|
|
|
# Sum up all SATS positions
|
|
total_sats = 0
|
|
if isinstance(positions, dict) and "SATS" in positions:
|
|
sats_positions = positions["SATS"]
|
|
if isinstance(sats_positions, dict):
|
|
# Sum all amounts (with different cost bases)
|
|
total_sats = sum(int(amount) for amount in sats_positions.values())
|
|
elif isinstance(sats_positions, (int, float)):
|
|
# Simple number (no cost basis)
|
|
total_sats = int(sats_positions)
|
|
|
|
return {
|
|
"sats": total_sats,
|
|
"positions": positions
|
|
}
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_user_balance(self, user_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get user's balance from castle's perspective.
|
|
|
|
Aggregates:
|
|
- Liabilities:Payable:User-{user_id} (negative = castle owes user)
|
|
- Assets:Receivable:User-{user_id} (positive = user owes castle)
|
|
|
|
Args:
|
|
user_id: User ID
|
|
|
|
Returns:
|
|
{
|
|
"balance": int (sats, positive = user owes castle, negative = castle owes user),
|
|
"fiat_balances": {"EUR": Decimal("100.50")},
|
|
"accounts": [list of account dicts with balances]
|
|
}
|
|
|
|
Note:
|
|
Excludes pending transactions (flag='!') from balance calculation.
|
|
Only cleared/completed transactions (flag='*') are included.
|
|
"""
|
|
# Get all journal entries for this user
|
|
all_entries = await self.get_journal_entries()
|
|
|
|
total_sats = 0
|
|
fiat_balances = {}
|
|
accounts_dict = {} # Track balances per account
|
|
|
|
for entry in all_entries:
|
|
# Skip non-transactions, pending (!), and voided
|
|
if entry.get("t") != "Transaction":
|
|
continue
|
|
if entry.get("flag") == "!":
|
|
continue
|
|
if "voided" in entry.get("tags", []):
|
|
continue
|
|
|
|
# Process postings for this user
|
|
for posting in entry.get("postings", []):
|
|
account_name = posting.get("account", "")
|
|
|
|
# Only process this user's accounts (account names use first 8 chars of user_id)
|
|
if f":User-{user_id[:8]}" not in account_name:
|
|
continue
|
|
if "Payable" not in account_name and "Receivable" not in account_name:
|
|
continue
|
|
|
|
# Parse amount string: can be EUR, USD, or SATS
|
|
amount_str = posting.get("amount", "")
|
|
if not isinstance(amount_str, str) or not amount_str:
|
|
continue
|
|
|
|
import re
|
|
# Try to extract EUR/USD amount first (new format)
|
|
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
|
|
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
|
|
# Direct EUR/USD amount (new approach)
|
|
fiat_amount = Decimal(fiat_match.group(1))
|
|
fiat_currency = fiat_match.group(2)
|
|
|
|
if fiat_currency not in fiat_balances:
|
|
fiat_balances[fiat_currency] = Decimal(0)
|
|
|
|
fiat_balances[fiat_currency] += fiat_amount
|
|
|
|
# Also track SATS equivalent from metadata if available
|
|
posting_meta = posting.get("meta", {})
|
|
sats_equiv = posting_meta.get("sats-equivalent")
|
|
if sats_equiv:
|
|
sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv)
|
|
total_sats += sats_amount
|
|
if account_name not in accounts_dict:
|
|
accounts_dict[account_name] = {"account": account_name, "sats": 0}
|
|
accounts_dict[account_name]["sats"] += sats_amount
|
|
|
|
else:
|
|
# Old format: SATS with cost/price notation - extract SATS amount
|
|
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
|
|
if sats_match:
|
|
sats_amount = int(sats_match.group(1))
|
|
total_sats += sats_amount
|
|
|
|
# Track per account
|
|
if account_name not in accounts_dict:
|
|
accounts_dict[account_name] = {"account": account_name, "sats": 0}
|
|
accounts_dict[account_name]["sats"] += sats_amount
|
|
|
|
# Try to extract fiat from metadata or cost syntax (backward compatibility)
|
|
posting_meta = posting.get("meta", {})
|
|
fiat_amount_total_str = posting_meta.get("fiat-amount-total")
|
|
fiat_currency_meta = posting_meta.get("fiat-currency")
|
|
|
|
if fiat_amount_total_str and fiat_currency_meta:
|
|
# Use exact total from metadata
|
|
fiat_total = Decimal(fiat_amount_total_str)
|
|
fiat_currency = fiat_currency_meta
|
|
|
|
if fiat_currency not in fiat_balances:
|
|
fiat_balances[fiat_currency] = Decimal(0)
|
|
|
|
# Apply the same sign as the SATS amount
|
|
if sats_match:
|
|
sats_amount_for_sign = int(sats_match.group(1))
|
|
if sats_amount_for_sign < 0:
|
|
fiat_total = -fiat_total
|
|
|
|
fiat_balances[fiat_currency] += fiat_total
|
|
|
|
logger.info(f"User {user_id[:8]} balance: {total_sats} sats, fiat: {dict(fiat_balances)}")
|
|
return {
|
|
"balance": total_sats,
|
|
"fiat_balances": fiat_balances,
|
|
"accounts": list(accounts_dict.values())
|
|
}
|
|
|
|
async def get_all_user_balances(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get balances for all users (admin view).
|
|
|
|
Returns:
|
|
[
|
|
{
|
|
"user_id": "abc123",
|
|
"balance": 100000,
|
|
"fiat_balances": {"EUR": Decimal("100.50")},
|
|
"accounts": [...]
|
|
},
|
|
...
|
|
]
|
|
|
|
Note:
|
|
Excludes pending transactions (flag='!') and voided (tag #voided) from balance calculation.
|
|
Only cleared/completed transactions (flag='*') are included.
|
|
"""
|
|
# Get all journal entries and calculate balances from postings
|
|
all_entries = await self.get_journal_entries()
|
|
|
|
# Group by user_id
|
|
user_data = {}
|
|
|
|
for entry in all_entries:
|
|
# Skip non-transactions, pending (!), and voided
|
|
if entry.get("t") != "Transaction":
|
|
continue
|
|
if entry.get("flag") == "!":
|
|
continue
|
|
if "voided" in entry.get("tags", []):
|
|
continue
|
|
|
|
# Process postings
|
|
for posting in entry.get("postings", []):
|
|
account_name = posting.get("account", "")
|
|
|
|
# Only process user accounts (Payable or Receivable)
|
|
if ":User-" not in account_name:
|
|
continue
|
|
if "Payable" not in account_name and "Receivable" not in account_name:
|
|
continue
|
|
|
|
# Extract user_id from account name
|
|
user_id = account_name.split(":User-")[1]
|
|
|
|
if user_id not in user_data:
|
|
user_data[user_id] = {
|
|
"user_id": user_id,
|
|
"balance": 0,
|
|
"fiat_balances": {},
|
|
"accounts": []
|
|
}
|
|
|
|
# Parse amount string: can be EUR/USD directly (new format) or "SATS {EUR}" (old format)
|
|
amount_str = posting.get("amount", "")
|
|
if not isinstance(amount_str, str) or not amount_str:
|
|
continue
|
|
|
|
import re
|
|
# Try to extract EUR/USD amount first (new format)
|
|
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
|
|
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
|
|
# Direct EUR/USD amount (new approach)
|
|
fiat_amount = Decimal(fiat_match.group(1))
|
|
fiat_currency = fiat_match.group(2)
|
|
|
|
if fiat_currency not in user_data[user_id]["fiat_balances"]:
|
|
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
|
|
|
|
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount
|
|
|
|
# Also track SATS equivalent from metadata if available
|
|
posting_meta = posting.get("meta", {})
|
|
sats_equiv = posting_meta.get("sats-equivalent")
|
|
if sats_equiv:
|
|
sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv)
|
|
user_data[user_id]["balance"] += sats_amount
|
|
|
|
else:
|
|
# Old format: SATS with cost/price notation
|
|
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
|
|
if sats_match:
|
|
sats_amount = int(sats_match.group(1))
|
|
user_data[user_id]["balance"] += sats_amount
|
|
|
|
# Extract fiat from cost syntax or metadata (backward compatibility)
|
|
posting_meta = posting.get("meta", {})
|
|
fiat_amount_total_str = posting_meta.get("fiat-amount-total")
|
|
fiat_currency_meta = posting_meta.get("fiat-currency")
|
|
|
|
if fiat_amount_total_str and fiat_currency_meta:
|
|
fiat_total = Decimal(fiat_amount_total_str)
|
|
fiat_currency = fiat_currency_meta
|
|
|
|
if fiat_currency not in user_data[user_id]["fiat_balances"]:
|
|
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
|
|
|
|
# Apply the same sign as the SATS amount
|
|
if sats_match:
|
|
sats_amount_for_sign = int(sats_match.group(1))
|
|
if sats_amount_for_sign < 0:
|
|
fiat_total = -fiat_total
|
|
|
|
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_total
|
|
|
|
return list(user_data.values())
|
|
|
|
async def check_fava_health(self) -> bool:
|
|
"""
|
|
Check if Fava is running and accessible.
|
|
|
|
Returns:
|
|
True if Fava responds, False otherwise
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=2.0) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/changed"
|
|
)
|
|
return response.status_code == 200
|
|
except Exception as e:
|
|
logger.warning(f"Fava health check failed: {e}")
|
|
return False
|
|
|
|
async def query_transactions(
|
|
self,
|
|
account_pattern: Optional[str] = None,
|
|
limit: int = 100,
|
|
include_pending: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Query transactions from Fava/Beancount.
|
|
|
|
Args:
|
|
account_pattern: Optional regex pattern to filter accounts (e.g., "User-abc123")
|
|
limit: Maximum number of transactions to return
|
|
include_pending: Include pending transactions (flag='!')
|
|
|
|
Returns:
|
|
List of transaction dictionaries with date, description, postings, etc.
|
|
|
|
Example:
|
|
# All transactions
|
|
txns = await fava.query_transactions()
|
|
|
|
# User's transactions
|
|
txns = await fava.query_transactions(account_pattern="User-abc123")
|
|
|
|
# Account transactions
|
|
txns = await fava.query_transactions(account_pattern="Assets:Receivable:User-abc")
|
|
"""
|
|
# Build Beancount query
|
|
if account_pattern:
|
|
query = f"SELECT * WHERE account ~ '{account_pattern}' ORDER BY date DESC LIMIT {limit}"
|
|
else:
|
|
query = f"SELECT * ORDER BY date DESC LIMIT {limit}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/query",
|
|
params={"query_string": query}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
# Fava query API returns: {"data": {"rows": [...], "types": [...]}}
|
|
data = result.get("data", {})
|
|
rows = data.get("rows", [])
|
|
types = data.get("types", [])
|
|
|
|
# Build column name mapping
|
|
column_names = [t.get("name") for t in types]
|
|
|
|
# Transform Fava's query result to transaction list
|
|
transactions = []
|
|
for row in rows:
|
|
# Rows are arrays, convert to dict using column names
|
|
if isinstance(row, list) and len(row) == len(column_names):
|
|
txn = dict(zip(column_names, row))
|
|
|
|
# Filter by flag if needed
|
|
flag = txn.get("flag", "*")
|
|
if not include_pending and flag == "!":
|
|
continue
|
|
|
|
transactions.append(txn)
|
|
elif isinstance(row, dict):
|
|
# Already a dict (shouldn't happen with BQL, but handle it)
|
|
flag = row.get("flag", "*")
|
|
if not include_pending and flag == "!":
|
|
continue
|
|
transactions.append(row)
|
|
|
|
return transactions[:limit]
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def query_bql(self, query_string: str) -> Dict[str, Any]:
|
|
"""
|
|
Execute arbitrary Beancount Query Language (BQL) query.
|
|
|
|
This is a general-purpose method for executing BQL queries against Fava/Beancount.
|
|
Use this for efficient aggregations, filtering, and data retrieval.
|
|
|
|
⚠️ LIMITATION: BQL can only query position amounts and transaction-level data.
|
|
It CANNOT access posting metadata (like 'sats-equivalent'). For Castle's current
|
|
ledger format where SATS are stored in metadata, manual aggregation is required.
|
|
|
|
See: docs/BQL-BALANCE-QUERIES.md for detailed analysis and test results.
|
|
|
|
FUTURE CONSIDERATION: If Castle's ledger format changes to use SATS as position
|
|
amounts (instead of metadata), BQL could provide significant performance benefits.
|
|
|
|
Args:
|
|
query_string: BQL query (e.g., "SELECT account, sum(position) WHERE account ~ 'User-abc'")
|
|
|
|
Returns:
|
|
{
|
|
"rows": [[col1, col2, ...], ...],
|
|
"types": [{"name": "col1", "type": "str"}, ...],
|
|
"column_names": ["col1", "col2", ...]
|
|
}
|
|
|
|
Example:
|
|
result = await fava.query_bql("SELECT account, sum(position) WHERE account ~ 'User-abc'")
|
|
for row in result["rows"]:
|
|
account, balance = row
|
|
print(f"{account}: {balance}")
|
|
|
|
See:
|
|
https://beancount.github.io/docs/beancount_query_language.html
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/query",
|
|
params={"query_string": query_string}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
# Fava returns: {"data": {"rows": [...], "types": [...]}}
|
|
data = result.get("data", {})
|
|
rows = data.get("rows", [])
|
|
types = data.get("types", [])
|
|
column_names = [t.get("name") for t in types]
|
|
|
|
return {
|
|
"rows": rows,
|
|
"types": types,
|
|
"column_names": column_names
|
|
}
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"BQL query error: {e.response.status_code} - {e.response.text}")
|
|
logger.error(f"Query was: {query_string}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_user_balance_bql(self, user_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get user balance using BQL (efficient, replaces 115-line manual aggregation).
|
|
|
|
⚠️ NOT CURRENTLY USED: This method cannot access SATS balances stored in posting
|
|
metadata. It only queries position amounts (EUR/USD). For Castle's current ledger
|
|
format, use get_user_balance() instead (manual aggregation with caching).
|
|
|
|
This method uses Beancount Query Language for server-side filtering and aggregation,
|
|
which would provide 5-10x performance improvement IF SATS were stored as position
|
|
amounts instead of metadata.
|
|
|
|
FUTURE CONSIDERATION: If Castle's ledger format changes to store SATS as position
|
|
amounts (e.g., "100000 SATS {100.00 EUR}"), this method would become feasible and
|
|
provide significant performance benefits.
|
|
|
|
See: docs/BQL-BALANCE-QUERIES.md for detailed test results and analysis.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
|
|
Returns:
|
|
{
|
|
"balance": int (sats), # Currently returns 0 (cannot access metadata)
|
|
"fiat_balances": {"EUR": Decimal("100.50"), ...}, # Works correctly
|
|
"accounts": [{"account": "...", "sats": 150000}, ...]
|
|
}
|
|
|
|
Example:
|
|
balance = await fava.get_user_balance_bql("af983632")
|
|
print(f"Balance: {balance['balance']} sats") # Will be 0 with current ledger format
|
|
"""
|
|
from decimal import Decimal
|
|
import re
|
|
|
|
# Build BQL query for this user's Payable/Receivable accounts
|
|
user_id_prefix = user_id[:8]
|
|
query = f"""
|
|
SELECT account, sum(position) as balance
|
|
WHERE account ~ ':User-{user_id_prefix}'
|
|
AND (account ~ 'Payable' OR account ~ 'Receivable')
|
|
AND flag != '!'
|
|
GROUP BY account
|
|
"""
|
|
|
|
result = await self.query_bql(query)
|
|
|
|
# Process results
|
|
total_sats = 0
|
|
fiat_balances = {}
|
|
accounts = []
|
|
|
|
for row in result["rows"]:
|
|
account_name, position = row
|
|
|
|
# Position can be:
|
|
# - Dict: {"SATS": "150000", "EUR": "145.50"}
|
|
# - String: "150000 SATS" or "145.50 EUR"
|
|
|
|
if isinstance(position, dict):
|
|
# Extract SATS
|
|
sats_str = position.get("SATS", "0")
|
|
sats_amount = int(sats_str) if sats_str else 0
|
|
total_sats += sats_amount
|
|
|
|
accounts.append({
|
|
"account": account_name,
|
|
"sats": sats_amount
|
|
})
|
|
|
|
# Extract fiat currencies
|
|
for currency in ["EUR", "USD", "GBP"]:
|
|
if currency in position:
|
|
fiat_str = position[currency]
|
|
fiat_amount = Decimal(fiat_str) if fiat_str else Decimal(0)
|
|
|
|
if currency not in fiat_balances:
|
|
fiat_balances[currency] = Decimal(0)
|
|
fiat_balances[currency] += fiat_amount
|
|
|
|
elif isinstance(position, str):
|
|
# Single currency (parse "150000 SATS" or "145.50 EUR")
|
|
sats_match = re.match(r'^(-?\d+)\s+SATS$', position)
|
|
if sats_match:
|
|
sats_amount = int(sats_match.group(1))
|
|
total_sats += sats_amount
|
|
accounts.append({
|
|
"account": account_name,
|
|
"sats": sats_amount
|
|
})
|
|
else:
|
|
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', position)
|
|
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
|
|
fiat_amount = Decimal(fiat_match.group(1))
|
|
currency = fiat_match.group(2)
|
|
|
|
if currency not in fiat_balances:
|
|
fiat_balances[currency] = Decimal(0)
|
|
fiat_balances[currency] += fiat_amount
|
|
|
|
logger.info(f"User {user_id[:8]} balance (BQL): {total_sats} sats, fiat: {dict(fiat_balances)}")
|
|
|
|
return {
|
|
"balance": total_sats,
|
|
"fiat_balances": fiat_balances,
|
|
"accounts": accounts
|
|
}
|
|
|
|
async def get_all_user_balances_bql(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get balances for all users using BQL (efficient admin view).
|
|
|
|
⚠️ NOT CURRENTLY USED: This method cannot access SATS balances stored in posting
|
|
metadata. It only queries position amounts (EUR/USD). For Castle's current ledger
|
|
format, use get_all_user_balances() instead (manual aggregation with caching).
|
|
|
|
This method uses Beancount Query Language to query all user balances
|
|
in a single efficient query, which would be faster than fetching all entries IF
|
|
SATS were stored as position amounts instead of metadata.
|
|
|
|
FUTURE CONSIDERATION: If Castle's ledger format changes to store SATS as position
|
|
amounts, this method would provide significant performance benefits for admin views.
|
|
|
|
See: docs/BQL-BALANCE-QUERIES.md for detailed test results and analysis.
|
|
|
|
Returns:
|
|
[
|
|
{
|
|
"user_id": "abc123",
|
|
"balance": 100000, # Currently 0 (cannot access metadata)
|
|
"fiat_balances": {"EUR": Decimal("100.50")}, # Works correctly
|
|
"accounts": [{"account": "...", "sats": 150000}, ...]
|
|
},
|
|
...
|
|
]
|
|
|
|
Example:
|
|
all_balances = await fava.get_all_user_balances_bql()
|
|
for user in all_balances:
|
|
print(f"{user['user_id']}: {user['balance']} sats") # Will be 0 with current format
|
|
"""
|
|
from decimal import Decimal
|
|
import re
|
|
|
|
# BQL query for ALL user accounts
|
|
query = """
|
|
SELECT account, sum(position) as balance
|
|
WHERE (account ~ 'Payable:User-' OR account ~ 'Receivable:User-')
|
|
AND flag != '!'
|
|
GROUP BY account
|
|
"""
|
|
|
|
result = await self.query_bql(query)
|
|
|
|
# Group by user_id
|
|
user_data = {}
|
|
|
|
for row in result["rows"]:
|
|
account_name, position = row
|
|
|
|
# Extract user_id from account name
|
|
# Format: "Liabilities:Payable:User-abc12345" or "Assets:Receivable:User-abc12345"
|
|
if ":User-" not in account_name:
|
|
continue
|
|
|
|
user_id_with_prefix = account_name.split(":User-")[1]
|
|
# User ID is the first 8 chars (our standard)
|
|
user_id = user_id_with_prefix[:8]
|
|
|
|
if user_id not in user_data:
|
|
user_data[user_id] = {
|
|
"user_id": user_id,
|
|
"balance": 0,
|
|
"fiat_balances": {},
|
|
"accounts": []
|
|
}
|
|
|
|
# Process position (same logic as single-user query)
|
|
if isinstance(position, dict):
|
|
sats_str = position.get("SATS", "0")
|
|
sats_amount = int(sats_str) if sats_str else 0
|
|
user_data[user_id]["balance"] += sats_amount
|
|
|
|
user_data[user_id]["accounts"].append({
|
|
"account": account_name,
|
|
"sats": sats_amount
|
|
})
|
|
|
|
for currency in ["EUR", "USD", "GBP"]:
|
|
if currency in position:
|
|
fiat_str = position[currency]
|
|
fiat_amount = Decimal(fiat_str) if fiat_str else Decimal(0)
|
|
|
|
if currency not in user_data[user_id]["fiat_balances"]:
|
|
user_data[user_id]["fiat_balances"][currency] = Decimal(0)
|
|
user_data[user_id]["fiat_balances"][currency] += fiat_amount
|
|
|
|
elif isinstance(position, str):
|
|
# Single currency (parse "150000 SATS" or "145.50 EUR")
|
|
sats_match = re.match(r'^(-?\d+)\s+SATS$', position)
|
|
if sats_match:
|
|
sats_amount = int(sats_match.group(1))
|
|
user_data[user_id]["balance"] += sats_amount
|
|
user_data[user_id]["accounts"].append({
|
|
"account": account_name,
|
|
"sats": sats_amount
|
|
})
|
|
else:
|
|
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', position)
|
|
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
|
|
fiat_amount = Decimal(fiat_match.group(1))
|
|
currency = fiat_match.group(2)
|
|
|
|
if currency not in user_data[user_id]["fiat_balances"]:
|
|
user_data[user_id]["fiat_balances"][currency] = Decimal(0)
|
|
user_data[user_id]["fiat_balances"][currency] += fiat_amount
|
|
|
|
logger.info(f"Fetched balances for {len(user_data)} users (BQL)")
|
|
|
|
return list(user_data.values())
|
|
|
|
async def get_account_transactions(
|
|
self,
|
|
account_name: str,
|
|
limit: int = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all transactions affecting a specific account.
|
|
|
|
Args:
|
|
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
|
|
limit: Maximum number of transactions
|
|
|
|
Returns:
|
|
List of transactions affecting this account
|
|
"""
|
|
return await self.query_transactions(
|
|
account_pattern=account_name.replace(":", "\\:"), # Escape colons for regex
|
|
limit=limit
|
|
)
|
|
|
|
async def get_user_transactions(
|
|
self,
|
|
user_id: str,
|
|
limit: int = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all transactions affecting a user's accounts.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
limit: Maximum number of transactions
|
|
|
|
Returns:
|
|
List of transactions affecting user's accounts
|
|
"""
|
|
return await self.query_transactions(
|
|
account_pattern=f"User-{user_id[:8]}",
|
|
limit=limit
|
|
)
|
|
|
|
async def get_all_accounts(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all accounts from Beancount/Fava using BQL query.
|
|
|
|
Returns:
|
|
List of account dictionaries:
|
|
[
|
|
{"account": "Assets:Cash", "meta": {...}},
|
|
{"account": "Expenses:Food", "meta": {...}},
|
|
...
|
|
]
|
|
|
|
Example:
|
|
accounts = await fava.get_all_accounts()
|
|
for acc in accounts:
|
|
print(acc["account"]) # "Assets:Cash"
|
|
"""
|
|
try:
|
|
# Use BQL to get all unique accounts
|
|
query = "SELECT DISTINCT account"
|
|
result = await self.query_bql(query)
|
|
|
|
# Convert BQL result to expected format
|
|
accounts = []
|
|
for row in result["rows"]:
|
|
account_name = row[0] if isinstance(row, list) else row.get("account")
|
|
if account_name:
|
|
accounts.append({
|
|
"account": account_name,
|
|
"meta": {} # BQL doesn't return metadata easily
|
|
})
|
|
|
|
logger.debug(f"Fava returned {len(accounts)} accounts via BQL")
|
|
return accounts
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch accounts via BQL: {e}")
|
|
raise
|
|
|
|
async def get_journal_entries(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all journal entries from Fava (with entry hashes).
|
|
|
|
Returns:
|
|
List of all entries (transactions, opens, closes, etc.) with entry_hash field.
|
|
|
|
Example:
|
|
entries = await fava.get_journal_entries()
|
|
# Each entry has: entry_hash, date, flag, narration, tags, links, etc.
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(f"{self.base_url}/journal")
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
entries = result.get("data", [])
|
|
logger.info(f"Fava /journal returned {len(entries)} entries")
|
|
|
|
# Log transactions with "Lightning payment" in narration
|
|
lightning_entries = [e for e in entries if "Lightning payment" in e.get("narration", "")]
|
|
logger.info(f"Found {len(lightning_entries)} Lightning payment entries in journal")
|
|
|
|
return entries
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava journal error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_entry_context(self, entry_hash: str) -> Dict[str, Any]:
|
|
"""
|
|
Get entry context including source text and sha256sum.
|
|
|
|
Args:
|
|
entry_hash: Entry hash from get_journal_entries()
|
|
|
|
Returns:
|
|
{
|
|
"entry": {...}, # Serialized entry
|
|
"slice": "2025-01-15 ! \"Description\"...", # Beancount source text
|
|
"sha256sum": "abc123...", # For concurrency control
|
|
"balances_before": {...},
|
|
"balances_after": {...}
|
|
}
|
|
|
|
Example:
|
|
context = await fava.get_entry_context("abc123")
|
|
source = context["slice"]
|
|
sha256sum = context["sha256sum"]
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/context",
|
|
params={"entry_hash": entry_hash}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
return result.get("data", {})
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava context error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def update_entry_source(self, entry_hash: str, new_source: str, sha256sum: str) -> str:
|
|
"""
|
|
Update an entry's source text (e.g., change flag from ! to *).
|
|
|
|
Args:
|
|
entry_hash: Entry hash
|
|
new_source: Modified Beancount source text
|
|
sha256sum: Current sha256sum from get_entry_context() for concurrency control
|
|
|
|
Returns:
|
|
New sha256sum after update
|
|
|
|
Example:
|
|
# Get context
|
|
context = await fava.get_entry_context("abc123")
|
|
source = context["slice"]
|
|
sha256 = context["sha256sum"]
|
|
|
|
# Change flag
|
|
new_source = source.replace("2025-01-15 !", "2025-01-15 *")
|
|
|
|
# Update
|
|
new_sha256 = await fava.update_entry_source("abc123", new_source, sha256)
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.put(
|
|
f"{self.base_url}/source_slice",
|
|
json={
|
|
"entry_hash": entry_hash,
|
|
"source": new_source,
|
|
"sha256sum": sha256sum
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
return result.get("data", "")
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def delete_entry(self, entry_hash: str, sha256sum: str) -> str:
|
|
"""
|
|
Delete an entry from the Beancount file.
|
|
|
|
Args:
|
|
entry_hash: Entry hash
|
|
sha256sum: Current sha256sum for concurrency control
|
|
|
|
Returns:
|
|
Success message
|
|
|
|
Example:
|
|
context = await fava.get_entry_context("abc123")
|
|
await fava.delete_entry("abc123", context["sha256sum"])
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.delete(
|
|
f"{self.base_url}/source_slice",
|
|
params={
|
|
"entry_hash": entry_hash,
|
|
"sha256sum": sha256sum
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
return result.get("data", "")
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def add_account(
|
|
self,
|
|
account_name: str,
|
|
currencies: list[str],
|
|
opening_date: Optional[date] = None,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Add an account to the Beancount ledger via an Open directive.
|
|
|
|
NOTE: Fava's /api/add_entries endpoint does NOT support Open directives.
|
|
This method uses /api/source to directly edit the Beancount file.
|
|
|
|
Args:
|
|
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
|
|
currencies: List of currencies for this account (e.g., ["EUR", "SATS"])
|
|
opening_date: Date to open the account (defaults to today)
|
|
metadata: Optional metadata for the account
|
|
|
|
Returns:
|
|
Response from Fava ({"data": "new_sha256sum", "mtime": "..."})
|
|
|
|
Example:
|
|
# Add a user's receivable account
|
|
result = await fava.add_account(
|
|
account_name="Assets:Receivable:User-abc123",
|
|
currencies=["EUR", "SATS", "USD"],
|
|
metadata={"user_id": "abc123", "description": "User receivables"}
|
|
)
|
|
|
|
# Add a user's payable account
|
|
result = await fava.add_account(
|
|
account_name="Liabilities:Payable:User-abc123",
|
|
currencies=["EUR", "SATS"]
|
|
)
|
|
"""
|
|
from datetime import date as date_type
|
|
|
|
if opening_date is None:
|
|
opening_date = date_type.today()
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
# Step 1: Get the main Beancount file path from Fava
|
|
options_response = await client.get(f"{self.base_url}/options")
|
|
options_response.raise_for_status()
|
|
options_data = options_response.json()["data"]
|
|
file_path = options_data["beancount_options"]["filename"]
|
|
|
|
logger.debug(f"Fava main file: {file_path}")
|
|
|
|
# Step 2: Get current source file
|
|
response = await client.get(
|
|
f"{self.base_url}/source",
|
|
params={"filename": file_path}
|
|
)
|
|
response.raise_for_status()
|
|
source_data = response.json()["data"]
|
|
|
|
sha256sum = source_data["sha256sum"]
|
|
source = source_data["source"]
|
|
|
|
# Step 2: Check if account already exists
|
|
if f"open {account_name}" in source:
|
|
logger.info(f"Account {account_name} already exists in Beancount file")
|
|
return {"data": sha256sum, "mtime": source_data.get("mtime", "")}
|
|
|
|
# Step 3: Find insertion point (after last Open directive AND its metadata)
|
|
lines = source.split('\n')
|
|
insert_index = 0
|
|
for i, line in enumerate(lines):
|
|
if line.strip().startswith(('open ', f'{opening_date.year}-')) and 'open' in line:
|
|
# Found an Open directive, now skip over any metadata lines
|
|
insert_index = i + 1
|
|
# Skip metadata lines (lines starting with whitespace)
|
|
while insert_index < len(lines) and lines[insert_index].startswith((' ', '\t')) and lines[insert_index].strip():
|
|
insert_index += 1
|
|
|
|
# Step 4: Format Open directive as Beancount text
|
|
currencies_str = ", ".join(currencies)
|
|
open_lines = [
|
|
"",
|
|
f"{opening_date.isoformat()} open {account_name} {currencies_str}"
|
|
]
|
|
|
|
# Add metadata if provided
|
|
if metadata:
|
|
for key, value in metadata.items():
|
|
# Format metadata with proper indentation
|
|
if isinstance(value, str):
|
|
open_lines.append(f' {key}: "{value}"')
|
|
else:
|
|
open_lines.append(f' {key}: {value}')
|
|
|
|
# Step 5: Insert into source
|
|
for i, line in enumerate(open_lines):
|
|
lines.insert(insert_index + i, line)
|
|
|
|
new_source = '\n'.join(lines)
|
|
|
|
# Step 6: Update source file via PUT /api/source
|
|
update_payload = {
|
|
"file_path": file_path,
|
|
"source": new_source,
|
|
"sha256sum": sha256sum
|
|
}
|
|
|
|
response = await client.put(
|
|
f"{self.base_url}/source",
|
|
json=update_payload,
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
logger.info(f"Added account {account_name} to Beancount file with currencies {currencies}")
|
|
return result
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava HTTP error adding account: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
|
|
# Singleton instance (configured from settings)
|
|
_fava_client: Optional[FavaClient] = None
|
|
|
|
|
|
def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0):
|
|
"""
|
|
Initialize the global Fava client.
|
|
|
|
Args:
|
|
fava_url: Base URL of Fava server
|
|
ledger_slug: Ledger identifier
|
|
timeout: Request timeout in seconds
|
|
"""
|
|
global _fava_client
|
|
_fava_client = FavaClient(fava_url, ledger_slug, timeout)
|
|
logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}")
|
|
|
|
|
|
def get_fava_client() -> FavaClient:
|
|
"""
|
|
Get the configured Fava client.
|
|
|
|
Returns:
|
|
FavaClient instance
|
|
|
|
Raises:
|
|
RuntimeError: If client not initialized
|
|
"""
|
|
if _fava_client is None:
|
|
raise RuntimeError(
|
|
"Fava client not initialized. Call init_fava_client() first. "
|
|
"Castle requires Fava for all accounting operations."
|
|
)
|
|
return _fava_client
|