From 52c6c3f8f1a840501e291c6ce5081ae7d23fa3fd Mon Sep 17 00:00:00 2001 From: padreug Date: Wed, 12 Nov 2025 03:00:17 +0100 Subject: [PATCH] Fix RBAC role-based permissions for accounts endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixed critical bugs preventing users from seeing accounts through their assigned roles: 1. **Fixed duplicate function definition** (crud.py) - Removed duplicate auto_assign_default_role() that only took 1 parameter - Kept correct version with proper signature and logging - Added get_all_user_roles() helper function 2. **Added role-based permissions to accounts endpoint** (views_api.py) - Previously only checked direct user permissions - Now retrieves and combines both direct AND role permissions - Auto-assigns default role to new users on first access 3. **Fixed permission inheritance logic** (views_api.py) - Inheritance check now uses combined permissions (direct + role) - Previously only checked direct user permissions for parents - Users can now inherit access to child accounts via role permissions Changes enable proper RBAC functionality: - Users with "Employee" role (or any role) now see permitted accounts - Permission inheritance works correctly with role-based permissions - Auto-assignment of default role on first Castle access 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- crud.py | 85 ++++++++++++++++++++++++++++++++++++---------------- views_api.py | 84 ++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 134 insertions(+), 35 deletions(-) diff --git a/crud.py b/crud.py index 1f9dbf4..e4aeb5e 100644 --- a/crud.py +++ b/crud.py @@ -1502,6 +1502,31 @@ async def get_user_roles(user_id: str) -> list[UserRole]: ] +async def get_all_user_roles() -> list[UserRole]: + """Get all active user role assignments""" + rows = await db.fetchall( + """ + SELECT * FROM user_roles + WHERE (expires_at IS NULL OR expires_at > :now) + ORDER BY user_id, granted_at DESC + """, + {"now": datetime.now()}, + ) + + return [ + UserRole( + id=row["id"], + user_id=row["user_id"], + role_id=row["role_id"], + granted_by=row["granted_by"], + granted_at=row["granted_at"], + expires_at=row["expires_at"], + notes=row["notes"], + ) + for row in rows + ] + + async def get_role_users(role_id: str) -> list[UserRole]: """Get all users assigned to a role""" rows = await db.fetchall( @@ -1550,6 +1575,41 @@ async def get_role_count_for_user(user_id: str) -> int: return row["count"] if row else 0 +async def auto_assign_default_role(user_id: str, assigned_by: str) -> UserRole | None: + """ + Auto-assign the default role to a user if they don't have any roles yet. + Returns the created UserRole if assigned, None if user already has roles or no default role exists. + """ + from loguru import logger + + logger.info(f"[AUTO-ASSIGN] Checking auto-assignment for user {user_id}") + + # Check if user already has any roles + user_role_count = await get_role_count_for_user(user_id) + logger.info(f"[AUTO-ASSIGN] User {user_id} has {user_role_count} roles") + if user_role_count > 0: + logger.info(f"[AUTO-ASSIGN] User {user_id} already has roles, skipping auto-assignment") + return None + + # Find the default role + default_role = await get_default_role() + if not default_role: + logger.warning(f"[AUTO-ASSIGN] No default role found, cannot auto-assign for user {user_id}") + return None + + logger.info(f"[AUTO-ASSIGN] Found default role: {default_role.name} (id: {default_role.id})") + + # Assign the default role + data = AssignUserRole( + user_id=user_id, + role_id=default_role.id, + notes="Auto-assigned default role on first access", + ) + result = await assign_user_role(data, assigned_by) + logger.info(f"[AUTO-ASSIGN] Successfully assigned role {default_role.name} to user {user_id}") + return result + + async def get_user_count_for_role(role_id: str) -> int: """Get count of users assigned to a role""" row = await db.fetchone( @@ -1601,28 +1661,3 @@ async def check_user_has_role_permission( return True return False - - -async def auto_assign_default_role(user_id: str) -> Optional[UserRole]: - """ - Auto-assign the default role to a new user. - Returns the UserRole if a default role exists and was assigned, None otherwise. - """ - default_role = await get_default_role() - if not default_role: - return None - - # Check if user already has this role - user_roles = await get_user_roles(user_id) - if any(ur.role_id == default_role.id for ur in user_roles): - return None - - # Assign the default role - return await assign_user_role( - AssignUserRole( - user_id=user_id, - role_id=default_role.id, - notes="Auto-assigned default role", - ), - granted_by="system", - ) diff --git a/views_api.py b/views_api.py index e6735f4..baaadc6 100644 --- a/views_api.py +++ b/views_api.py @@ -45,6 +45,7 @@ from .models import ( AccountType, AccountWithPermissions, AssertionStatus, + AssignUserRole, BalanceAssertion, BulkGrantPermission, BulkGrantResult, @@ -55,6 +56,8 @@ from .models import ( CreateEntryLine, CreateJournalEntry, CreateManualPaymentRequest, + CreateRole, + CreateRolePermission, CreateUserEquityStatus, ExpenseEntry, GeneratePaymentInvoice, @@ -66,11 +69,17 @@ from .models import ( ReceivableEntry, RecordPayment, RevenueEntry, + Role, + RolePermission, + RoleWithPermissions, SettleReceivable, + UpdateRole, UserBalance, UserEquityStatus, UserInfo, + UserRole, UserWalletSettings, + UserWithRoles, ) from .services import get_settings, get_user_wallet, update_settings, update_user_wallet @@ -141,12 +150,19 @@ async def api_get_accounts( - Returns AccountWithPermissions objects when filter_by_user=true, otherwise Account objects """ from lnbits.settings import settings as lnbits_settings + from . import crud all_accounts = await get_all_accounts() user_id = wallet.wallet.user is_super_user = user_id == lnbits_settings.super_user + # Auto-assign default role if user has no roles (only for non-super users) + if not is_super_user: + assigned_role = await crud.auto_assign_default_role(user_id, "system") + if assigned_role: + logger.info(f"[ACCOUNTS] Auto-assigned role to user {user_id}") + # Super users bypass permission filtering - they see everything if not filter_by_user or is_super_user: # Filter out virtual accounts if requested (default behavior for user views) @@ -157,28 +173,52 @@ async def api_get_accounts( # Filter by user permissions # NOTE: Do NOT filter out virtual accounts yet - they're needed for inheritance logic + # Get direct user permissions user_permissions = await get_user_permissions(user_id) + # Get role-based permissions + role_permissions_list = await crud.get_user_permissions_from_roles(user_id) + # Flatten role permissions into a single list + role_perms = [] + for role, perms in role_permissions_list: + role_perms.extend(perms) + + # Combine direct and role-based permissions + all_permissions = list(user_permissions) + role_perms + + logger.info(f"[ACCOUNTS] User {user_id} has {len(user_permissions)} direct permissions and {len(role_perms)} role permissions (total: {len(all_permissions)})") + if role_perms: + logger.info(f"[ACCOUNTS] Role permissions: {[(p.account_id, p.permission_type) for p in role_perms]}") + logger.info(f"[ACCOUNTS] Total accounts in system: {len(all_accounts)}") + if len(all_accounts) > 0: + logger.info(f"[ACCOUNTS] Sample account IDs: {[acc.id for acc in all_accounts[:5]]}") + # Get set of account IDs the user has any permission on - permitted_account_ids = {perm.account_id for perm in user_permissions} + permitted_account_ids = {perm.account_id for perm in all_permissions} # Build list of accounts with permission metadata accounts_with_permissions = [] for account in all_accounts: - # Check if user has direct permission on this account + # Check if user has permission on this account (direct or from role) account_perms = [ - perm for perm in user_permissions if perm.account_id == account.id + perm for perm in all_permissions if perm.account_id == account.id ] - # Check if user has inherited permission from parent account (any permission type) - # Try each permission type to see if user has inherited access + # Check if user has inherited permission from parent account (using combined permissions) + # Check both direct and role-based permissions for parent accounts inherited_perms = [] - for perm_type in [PermissionType.READ, PermissionType.SUBMIT_EXPENSE, PermissionType.MANAGE]: - perms = await get_user_permissions_with_inheritance( - user_id, account.name, perm_type - ) - inherited_perms.extend(perms) + for perm in all_permissions: + # Get the account for this permission + perm_account = await get_account(perm.account_id) + if not perm_account: + continue + + # Check if this permission's account is a parent of the current account + # e.g., "Expenses:Supplies" is parent of "Expenses:Supplies:Food" + if account.name.startswith(perm_account.name + ":"): + # Inherited permission from parent account + inherited_perms.append((perm, perm_account.name)) # Determine if account should be included has_access = bool(account_perms) or bool(inherited_perms) @@ -228,6 +268,7 @@ async def api_get_accounts( acc for acc in accounts_with_permissions if not acc.is_virtual ] + logger.info(f"[ACCOUNTS] Returning {len(accounts_with_permissions)} accounts for user {user_id}") return accounts_with_permissions @@ -3510,6 +3551,29 @@ async def api_revoke_user_role( return {"success": True, "message": "Role assignment revoked"} +@castle_api_router.get("/api/v1/admin/users/roles") +async def api_get_all_user_roles( + wallet: WalletTypeInfo = Depends(require_admin_key), +): + """Get all user role assignments (admin only)""" + from . import crud + + user_roles = await crud.get_all_user_roles() + + return [ + { + "id": ur.id, + "user_id": ur.user_id, + "role_id": ur.role_id, + "granted_by": ur.granted_by, + "granted_at": ur.granted_at.isoformat(), + "expires_at": ur.expires_at.isoformat() if ur.expires_at else None, + "notes": ur.notes, + } + for ur in user_roles + ] + + @castle_api_router.get("/api/v1/users/me/roles") async def api_get_my_roles( wallet: WalletTypeInfo = Depends(require_invoice_key),