Refactor CRUD operations for client extension: Update database interactions to focus on the admin extension, streamline client dashboard functionalities, and enhance transaction history retrieval. Remove legacy DCA client references and improve code organization for better maintainability.

This commit is contained in:
padreug 2025-06-22 12:32:10 +02:00
parent 32e8f31b82
commit 1570e7ba95

709
crud.py
View file

@ -1,453 +1,304 @@
# Description: This file contains the CRUD operations for talking to the database.
# Description: Client extension CRUD operations - reads from admin extension database
from typing import List, Optional, Union
from datetime import datetime, timezone
from typing import List, Optional
from datetime import datetime, timedelta
from lnbits.db import Database
from lnbits.helpers import urlsafe_short_hash
from .models import ()
from .models import (
ClientDashboardSummary,
ClientTransaction,
ClientAnalytics,
UpdateClientSettings,
)
db = Database("ext_satmachineclient")
# Connect to admin extension's database
db = Database("ext_satmachineadmin")
async def get_dca_client(client_id: str) -> Optional[DcaClient]:
return await db.fetchone(
"SELECT * FROM satmachineclient.dca_clients WHERE id = :id",
{"id": client_id},
DcaClient,
)
###################################################
############## CLIENT DASHBOARD CRUD ##############
###################################################
async def get_dca_client_by_user(user_id: str) -> Optional[DcaClient]:
return await db.fetchone(
"SELECT * FROM satmachineclient.dca_clients WHERE user_id = :user_id",
{"user_id": user_id},
DcaClient,
)
async def update_dca_client(client_id: str, data: UpdateDcaClientData) -> Optional[DcaClient]:
update_data = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return await get_dca_client(client_id)
async def get_client_dashboard_summary(user_id: str) -> Optional[ClientDashboardSummary]:
"""Get dashboard summary for a specific user"""
update_data["updated_at"] = datetime.now()
set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()])
update_data["id"] = client_id
# Get client info
client = await db.fetchone(
"SELECT * FROM satmachineadmin.dca_clients WHERE user_id = :user_id",
{"user_id": user_id}
)
await db.execute(
f"UPDATE satmachineclient.dca_clients SET {set_clause} WHERE id = :id",
update_data
)
return await get_dca_client(client_id)
async def delete_dca_client(client_id: str) -> None:
await db.execute(
"DELETE FROM satmachineclient.dca_clients WHERE id = :id",
{"id": client_id}
)
# DCA Deposit CRUD Operations
async def create_deposit(data: CreateDepositData) -> DcaDeposit:
deposit_id = urlsafe_short_hash()
await db.execute(
if not client:
return None
# Get total sats accumulated
sats_result = await db.fetchone(
"""
INSERT INTO satmachineclient.dca_deposits
(id, client_id, amount, currency, status, notes, created_at)
VALUES (:id, :client_id, :amount, :currency, :status, :notes, :created_at)
SELECT COALESCE(SUM(amount_sats), 0) as total_sats
FROM satmachineadmin.dca_payments
WHERE client_id = :client_id AND status = 'confirmed'
""",
{"client_id": client["id"]}
)
# Get total fiat invested
fiat_result = await db.fetchone(
"""
SELECT COALESCE(SUM(amount_fiat), 0) as total_fiat
FROM satmachineadmin.dca_payments
WHERE client_id = :client_id AND status = 'confirmed'
""",
{"client_id": client["id"]}
)
# Get current fiat balance (deposits - payments)
balance_result = await db.fetchone(
"""
SELECT
COALESCE(SUM(CASE WHEN d.status = 'confirmed' THEN d.amount ELSE 0 END), 0) as deposits,
COALESCE(SUM(CASE WHEN p.status = 'confirmed' THEN p.amount_fiat ELSE 0 END), 0) as payments
FROM satmachineadmin.dca_clients c
LEFT JOIN satmachineadmin.dca_deposits d ON c.id = d.client_id
LEFT JOIN satmachineadmin.dca_payments p ON c.id = p.client_id
WHERE c.id = :client_id
""",
{"client_id": client["id"]}
)
# Get transaction count and last transaction date
tx_stats = await db.fetchone(
"""
SELECT
COUNT(*) as tx_count,
MAX(created_at) as last_tx_date
FROM satmachineadmin.dca_payments
WHERE client_id = :client_id AND status = 'confirmed'
""",
{"client_id": client["id"]}
)
total_sats = sats_result["total_sats"] if sats_result else 0
total_fiat = fiat_result["total_fiat"] if fiat_result else 0
deposits = balance_result["deposits"] if balance_result else 0
payments = balance_result["payments"] if balance_result else 0
# Calculate average cost basis (sats per fiat unit)
avg_cost_basis = total_sats / total_fiat if total_fiat > 0 else 0
return ClientDashboardSummary(
user_id=user_id,
total_sats_accumulated=total_sats,
total_fiat_invested=total_fiat,
average_cost_basis=avg_cost_basis,
current_fiat_balance=deposits - payments,
total_transactions=tx_stats["tx_count"] if tx_stats else 0,
dca_mode=client["dca_mode"],
dca_status=client["status"],
last_transaction_date=tx_stats["last_tx_date"] if tx_stats else None
)
async def get_client_transactions(
user_id: str,
limit: int = 50,
offset: int = 0,
transaction_type: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List[ClientTransaction]:
"""Get client's transaction history with filtering"""
# Get client ID first
client = await db.fetchone(
"SELECT id FROM satmachineadmin.dca_clients WHERE user_id = :user_id",
{"user_id": user_id}
)
if not client:
return []
# Build query with filters
where_conditions = ["client_id = :client_id"]
params = {"client_id": client["id"], "limit": limit, "offset": offset}
if transaction_type:
where_conditions.append("transaction_type = :transaction_type")
params["transaction_type"] = transaction_type
if start_date:
where_conditions.append("created_at >= :start_date")
params["start_date"] = start_date
if end_date:
where_conditions.append("created_at <= :end_date")
params["end_date"] = end_date
where_clause = " AND ".join(where_conditions)
transactions = await db.fetchall(
f"""
SELECT id, amount_sats, amount_fiat, exchange_rate, transaction_type,
status, created_at, lamassu_transaction_id
FROM satmachineadmin.dca_payments
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT :limit OFFSET :offset
""",
params
)
return [
ClientTransaction(
id=tx["id"],
amount_sats=tx["amount_sats"],
amount_fiat=tx["amount_fiat"],
exchange_rate=tx["exchange_rate"],
transaction_type=tx["transaction_type"],
status=tx["status"],
created_at=tx["created_at"],
lamassu_transaction_id=tx["lamassu_transaction_id"]
)
for tx in transactions
]
async def get_client_analytics(user_id: str, time_range: str = "30d") -> Optional[ClientAnalytics]:
"""Get client performance analytics"""
# Get client ID
client = await db.fetchone(
"SELECT id FROM satmachineadmin.dca_clients WHERE user_id = :user_id",
{"user_id": user_id}
)
if not client:
return None
# Calculate date range
if time_range == "7d":
start_date = datetime.now() - timedelta(days=7)
elif time_range == "30d":
start_date = datetime.now() - timedelta(days=30)
elif time_range == "90d":
start_date = datetime.now() - timedelta(days=90)
elif time_range == "1y":
start_date = datetime.now() - timedelta(days=365)
else: # "all"
start_date = datetime(2020, 1, 1) # Arbitrary early date
# Get cost basis history (running average)
cost_basis_data = await db.fetchall(
"""
SELECT
created_at,
amount_sats,
amount_fiat,
exchange_rate,
SUM(amount_sats) OVER (ORDER BY created_at) as cumulative_sats,
SUM(amount_fiat) OVER (ORDER BY created_at) as cumulative_fiat
FROM satmachineadmin.dca_payments
WHERE client_id = :client_id
AND status = 'confirmed'
AND created_at >= :start_date
ORDER BY created_at
""",
{"client_id": client["id"], "start_date": start_date}
)
# Build cost basis history
cost_basis_history = []
for record in cost_basis_data:
avg_cost_basis = record["cumulative_sats"] / record["cumulative_fiat"] if record["cumulative_fiat"] > 0 else 0
cost_basis_history.append({
"date": record["created_at"].isoformat(),
"average_cost_basis": avg_cost_basis,
"cumulative_sats": record["cumulative_sats"],
"cumulative_fiat": record["cumulative_fiat"]
})
# Get accumulation timeline (daily/weekly aggregation)
accumulation_data = await db.fetchall(
"""
SELECT
DATE(created_at) as date,
SUM(amount_sats) as daily_sats,
SUM(amount_fiat) as daily_fiat,
COUNT(*) as daily_transactions
FROM satmachineadmin.dca_payments
WHERE client_id = :client_id
AND status = 'confirmed'
AND created_at >= :start_date
GROUP BY DATE(created_at)
ORDER BY date
""",
{"client_id": client["id"], "start_date": start_date}
)
accumulation_timeline = [
{
"id": deposit_id,
"client_id": data.client_id,
"amount": data.amount,
"currency": data.currency,
"status": "pending",
"notes": data.notes,
"created_at": datetime.now()
"date": record["date"],
"sats": record["daily_sats"],
"fiat": record["daily_fiat"],
"transactions": record["daily_transactions"]
}
for record in accumulation_data
]
# Get transaction frequency metrics
frequency_stats = await db.fetchone(
"""
SELECT
COUNT(*) as total_transactions,
AVG(amount_sats) as avg_sats_per_tx,
AVG(amount_fiat) as avg_fiat_per_tx,
MIN(created_at) as first_tx,
MAX(created_at) as last_tx
FROM satmachineadmin.dca_payments
WHERE client_id = :client_id AND status = 'confirmed'
""",
{"client_id": client["id"]}
)
return await get_deposit(deposit_id)
async def get_deposit(deposit_id: str) -> Optional[DcaDeposit]:
return await db.fetchone(
"SELECT * FROM satmachineclient.dca_deposits WHERE id = :id",
{"id": deposit_id},
DcaDeposit,
)
async def get_deposits_by_client(client_id: str) -> List[DcaDeposit]:
return await db.fetchall(
"SELECT * FROM satmachineclient.dca_deposits WHERE client_id = :client_id ORDER BY created_at DESC",
{"client_id": client_id},
DcaDeposit,
)
async def get_all_deposits() -> List[DcaDeposit]:
return await db.fetchall(
"SELECT * FROM satmachineclient.dca_deposits ORDER BY created_at DESC",
model=DcaDeposit,
)
async def update_deposit_status(deposit_id: str, data: UpdateDepositStatusData) -> Optional[DcaDeposit]:
update_data = {
"status": data.status,
"notes": data.notes
transaction_frequency = {
"total_transactions": frequency_stats["total_transactions"] if frequency_stats else 0,
"avg_sats_per_transaction": frequency_stats["avg_sats_per_tx"] if frequency_stats else 0,
"avg_fiat_per_transaction": frequency_stats["avg_fiat_per_tx"] if frequency_stats else 0,
"first_transaction": frequency_stats["first_tx"].isoformat() if frequency_stats and frequency_stats["first_tx"] else None,
"last_transaction": frequency_stats["last_tx"].isoformat() if frequency_stats and frequency_stats["last_tx"] else None
}
if data.status == "confirmed":
update_data["confirmed_at"] = datetime.now()
set_clause = ", ".join([f"{k} = :{k}" for k, v in update_data.items() if v is not None])
filtered_data = {k: v for k, v in update_data.items() if v is not None}
filtered_data["id"] = deposit_id
await db.execute(
f"UPDATE satmachineclient.dca_deposits SET {set_clause} WHERE id = :id",
filtered_data
return ClientAnalytics(
user_id=user_id,
cost_basis_history=cost_basis_history,
accumulation_timeline=accumulation_timeline,
transaction_frequency=transaction_frequency
)
return await get_deposit(deposit_id)
# DCA Payment CRUD Operations
async def create_dca_payment(data: CreateDcaPaymentData) -> DcaPayment:
payment_id = urlsafe_short_hash()
await db.execute(
"""
INSERT INTO satmachineclient.dca_payments
(id, client_id, amount_sats, amount_fiat, exchange_rate, transaction_type,
lamassu_transaction_id, payment_hash, status, created_at)
VALUES (:id, :client_id, :amount_sats, :amount_fiat, :exchange_rate, :transaction_type,
:lamassu_transaction_id, :payment_hash, :status, :created_at)
""",
{
"id": payment_id,
"client_id": data.client_id,
"amount_sats": data.amount_sats,
"amount_fiat": data.amount_fiat,
"exchange_rate": data.exchange_rate,
"transaction_type": data.transaction_type,
"lamassu_transaction_id": data.lamassu_transaction_id,
"payment_hash": data.payment_hash,
"status": "pending",
"created_at": datetime.now()
}
)
return await get_dca_payment(payment_id)
async def get_dca_payment(payment_id: str) -> Optional[DcaPayment]:
async def get_client_by_user_id(user_id: str):
"""Get client record by user_id"""
return await db.fetchone(
"SELECT * FROM satmachineclient.dca_payments WHERE id = :id",
{"id": payment_id},
DcaPayment,
"SELECT * FROM satmachineadmin.dca_clients WHERE user_id = :user_id",
{"user_id": user_id}
)
async def get_payments_by_client(client_id: str) -> List[DcaPayment]:
return await db.fetchall(
"SELECT * FROM satmachineclient.dca_payments WHERE client_id = :client_id ORDER BY created_at DESC",
{"client_id": client_id},
DcaPayment,
)
async def get_all_payments() -> List[DcaPayment]:
return await db.fetchall(
"SELECT * FROM satmachineclient.dca_payments ORDER BY created_at DESC",
model=DcaPayment,
)
async def update_dca_payment_status(payment_id: str, status: str) -> None:
"""Update the status of a DCA payment"""
await db.execute(
"UPDATE satmachineclient.dca_payments SET status = :status WHERE id = :id",
{"status": status, "id": payment_id}
)
async def get_payments_by_lamassu_transaction(lamassu_transaction_id: str) -> List[DcaPayment]:
return await db.fetchall(
"SELECT * FROM satmachineclient.dca_payments WHERE lamassu_transaction_id = :transaction_id",
{"transaction_id": lamassu_transaction_id},
DcaPayment,
)
# Balance and Summary Operations
async def get_client_balance_summary(client_id: str) -> ClientBalanceSummary:
# Get total confirmed deposits
total_deposits_result = await db.fetchone(
"""
SELECT COALESCE(SUM(amount), 0) as total, currency
FROM satmachineclient.dca_deposits
WHERE client_id = :client_id AND status = 'confirmed'
GROUP BY currency
""",
{"client_id": client_id}
)
# Get total payments made
total_payments_result = await db.fetchone(
"""
SELECT COALESCE(SUM(amount_fiat), 0) as total
FROM satmachineclient.dca_payments
WHERE client_id = :client_id AND status = 'confirmed'
""",
{"client_id": client_id}
)
total_deposits = total_deposits_result["total"] if total_deposits_result else 0
total_payments = total_payments_result["total"] if total_payments_result else 0
currency = total_deposits_result["currency"] if total_deposits_result else "GTQ"
return ClientBalanceSummary(
client_id=client_id,
total_deposits=total_deposits,
total_payments=total_payments,
remaining_balance=total_deposits - total_payments,
currency=currency
)
async def get_flow_mode_clients() -> List[DcaClient]:
return await db.fetchall(
"SELECT * FROM satmachineclient.dca_clients WHERE dca_mode = 'flow' AND status = 'active'",
model=DcaClient,
)
async def get_fixed_mode_clients() -> List[DcaClient]:
return await db.fetchall(
"SELECT * FROM satmachineclient.dca_clients WHERE dca_mode = 'fixed' AND status = 'active'",
model=DcaClient,
)
# Lamassu Configuration CRUD Operations
async def create_lamassu_config(data: CreateLamassuConfigData) -> LamassuConfig:
config_id = urlsafe_short_hash()
# Deactivate any existing configs first (only one active config allowed)
await db.execute(
"UPDATE satmachineclient.lamassu_config SET is_active = false, updated_at = :updated_at",
{"updated_at": datetime.now()}
)
await db.execute(
"""
INSERT INTO satmachineclient.lamassu_config
(id, host, port, database_name, username, password, source_wallet_id, commission_wallet_id, is_active, created_at, updated_at,
use_ssh_tunnel, ssh_host, ssh_port, ssh_username, ssh_password, ssh_private_key)
VALUES (:id, :host, :port, :database_name, :username, :password, :source_wallet_id, :commission_wallet_id, :is_active, :created_at, :updated_at,
:use_ssh_tunnel, :ssh_host, :ssh_port, :ssh_username, :ssh_password, :ssh_private_key)
""",
{
"id": config_id,
"host": data.host,
"port": data.port,
"database_name": data.database_name,
"username": data.username,
"password": data.password,
"source_wallet_id": data.source_wallet_id,
"commission_wallet_id": data.commission_wallet_id,
"is_active": True,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"use_ssh_tunnel": data.use_ssh_tunnel,
"ssh_host": data.ssh_host,
"ssh_port": data.ssh_port,
"ssh_username": data.ssh_username,
"ssh_password": data.ssh_password,
"ssh_private_key": data.ssh_private_key
}
)
return await get_lamassu_config(config_id)
async def get_lamassu_config(config_id: str) -> Optional[LamassuConfig]:
return await db.fetchone(
"SELECT * FROM satmachineclient.lamassu_config WHERE id = :id",
{"id": config_id},
LamassuConfig,
)
async def get_active_lamassu_config() -> Optional[LamassuConfig]:
return await db.fetchone(
"SELECT * FROM satmachineclient.lamassu_config WHERE is_active = true ORDER BY created_at DESC LIMIT 1",
model=LamassuConfig,
)
async def get_all_lamassu_configs() -> List[LamassuConfig]:
return await db.fetchall(
"SELECT * FROM satmachineclient.lamassu_config ORDER BY created_at DESC",
model=LamassuConfig,
)
async def update_lamassu_config(config_id: str, data: UpdateLamassuConfigData) -> Optional[LamassuConfig]:
update_data = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return await get_lamassu_config(config_id)
update_data["updated_at"] = datetime.now()
set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()])
update_data["id"] = config_id
await db.execute(
f"UPDATE satmachineclient.lamassu_config SET {set_clause} WHERE id = :id",
update_data
)
return await get_lamassu_config(config_id)
async def update_config_test_result(config_id: str, success: bool) -> None:
utc_now = datetime.now(timezone.utc)
await db.execute(
"""
UPDATE satmachineclient.lamassu_config
SET test_connection_last = :test_time, test_connection_success = :success, updated_at = :updated_at
WHERE id = :id
""",
{
"id": config_id,
"test_time": utc_now,
"success": success,
"updated_at": utc_now
}
)
async def delete_lamassu_config(config_id: str) -> None:
await db.execute(
"DELETE FROM satmachineclient.lamassu_config WHERE id = :id",
{"id": config_id}
)
async def update_poll_start_time(config_id: str) -> None:
"""Update the last poll start time"""
utc_now = datetime.now(timezone.utc)
await db.execute(
"""
UPDATE satmachineclient.lamassu_config
SET last_poll_time = :poll_time, updated_at = :updated_at
WHERE id = :id
""",
{
"id": config_id,
"poll_time": utc_now,
"updated_at": utc_now
}
)
async def update_poll_success_time(config_id: str) -> None:
"""Update the last successful poll time"""
utc_now = datetime.now(timezone.utc)
await db.execute(
"""
UPDATE satmachineclient.lamassu_config
SET last_successful_poll = :poll_time, updated_at = :updated_at
WHERE id = :id
""",
{
"id": config_id,
"poll_time": utc_now,
"updated_at": utc_now
}
)
# Lamassu Transaction Storage CRUD Operations
async def create_lamassu_transaction(data: CreateLamassuTransactionData) -> StoredLamassuTransaction:
"""Store a processed Lamassu transaction"""
transaction_id = urlsafe_short_hash()
await db.execute(
"""
INSERT INTO satmachineclient.lamassu_transactions
(id, lamassu_transaction_id, fiat_amount, crypto_amount, commission_percentage,
discount, effective_commission, commission_amount_sats, base_amount_sats,
exchange_rate, crypto_code, fiat_code, device_id, transaction_time, processed_at,
clients_count, distributions_total_sats)
VALUES (:id, :lamassu_transaction_id, :fiat_amount, :crypto_amount, :commission_percentage,
:discount, :effective_commission, :commission_amount_sats, :base_amount_sats,
:exchange_rate, :crypto_code, :fiat_code, :device_id, :transaction_time, :processed_at,
:clients_count, :distributions_total_sats)
""",
{
"id": transaction_id,
"lamassu_transaction_id": data.lamassu_transaction_id,
"fiat_amount": data.fiat_amount,
"crypto_amount": data.crypto_amount,
"commission_percentage": data.commission_percentage,
"discount": data.discount,
"effective_commission": data.effective_commission,
"commission_amount_sats": data.commission_amount_sats,
"base_amount_sats": data.base_amount_sats,
"exchange_rate": data.exchange_rate,
"crypto_code": data.crypto_code,
"fiat_code": data.fiat_code,
"device_id": data.device_id,
"transaction_time": data.transaction_time,
"processed_at": datetime.now(),
"clients_count": 0, # Will be updated after distributions
"distributions_total_sats": 0 # Will be updated after distributions
}
)
return await get_lamassu_transaction(transaction_id)
async def get_lamassu_transaction(transaction_id: str) -> Optional[StoredLamassuTransaction]:
"""Get a stored Lamassu transaction by ID"""
return await db.fetchone(
"SELECT * FROM satmachineclient.lamassu_transactions WHERE id = :id",
{"id": transaction_id},
StoredLamassuTransaction,
)
async def get_lamassu_transaction_by_lamassu_id(lamassu_transaction_id: str) -> Optional[StoredLamassuTransaction]:
"""Get a stored Lamassu transaction by Lamassu transaction ID"""
return await db.fetchone(
"SELECT * FROM satmachineclient.lamassu_transactions WHERE lamassu_transaction_id = :lamassu_id",
{"lamassu_id": lamassu_transaction_id},
StoredLamassuTransaction,
)
async def get_all_lamassu_transactions() -> List[StoredLamassuTransaction]:
"""Get all stored Lamassu transactions"""
return await db.fetchall(
"SELECT * FROM satmachineclient.lamassu_transactions ORDER BY transaction_time DESC",
model=StoredLamassuTransaction,
)
async def update_lamassu_transaction_distribution_stats(
transaction_id: str,
clients_count: int,
distributions_total_sats: int
) -> None:
"""Update distribution statistics for a Lamassu transaction"""
await db.execute(
"""
UPDATE satmachineclient.lamassu_transactions
SET clients_count = :clients_count, distributions_total_sats = :distributions_total_sats
WHERE id = :id
""",
{
"clients_count": clients_count,
"distributions_total_sats": distributions_total_sats,
"id": transaction_id
}
)
async def update_client_dca_settings(client_id: str, settings: UpdateClientSettings) -> bool:
"""Update client DCA settings (mode, limits, status)"""
try:
update_data = {k: v for k, v in settings.dict().items() if v is not None}
if not update_data:
return True # Nothing to update
update_data["updated_at"] = datetime.now()
set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()])
update_data["id"] = client_id
await db.execute(
f"UPDATE satmachineadmin.dca_clients SET {set_clause} WHERE id = :id",
update_data
)
return True
except Exception:
return False