From c086916be8ddd710a409c8f8a170446dab7f13f5 Mon Sep 17 00:00:00 2001 From: padreug Date: Tue, 11 Nov 2025 23:47:13 +0100 Subject: [PATCH] Add RBAC API endpoints - Phase 2A MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implemented comprehensive REST API for role-based access control: Role Management Endpoints (Admin only): - GET /api/v1/admin/roles - List all roles with user/permission counts - POST /api/v1/admin/roles - Create new role - GET /api/v1/admin/roles/{role_id} - Get role details with permissions and users - PUT /api/v1/admin/roles/{role_id} - Update role (name, description, is_default) - DELETE /api/v1/admin/roles/{role_id} - Delete role (cascades to permissions/assignments) Role Permission Endpoints (Admin only): - POST /api/v1/admin/roles/{role_id}/permissions - Add permission to role - DELETE /api/v1/admin/roles/{role_id}/permissions/{permission_id} - Remove permission User Role Assignment Endpoints (Admin only): - POST /api/v1/admin/user-roles - Assign user to role (with optional expiration) - GET /api/v1/admin/user-roles/{user_id} - Get user's role assignments - DELETE /api/v1/admin/user-roles/{user_role_id} - Revoke role assignment User Endpoints: - GET /api/v1/users/me/roles - Get current user's roles and effective permissions (includes both role-based and direct permissions) All endpoints include: - Proper error handling with HTTP status codes - Admin key requirement for management operations - Rich response data with timestamps and metadata - Role details enriched with user counts and permission counts Next: Implement Roles tab UI and JavaScript integration 🤖 Generated with Claude Code --- views_api.py | 346 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 346 insertions(+) diff --git a/views_api.py b/views_api.py index 1a87917..e6735f4 100644 --- a/views_api.py +++ b/views_api.py @@ -3219,3 +3219,349 @@ async def api_sync_single_account( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Account sync failed: {str(e)}" ) + + +# ===== RBAC (ROLE-BASED ACCESS CONTROL) ENDPOINTS ===== + + +@castle_api_router.get("/api/v1/admin/roles") +async def api_get_all_roles( + wallet: WalletTypeInfo = Depends(require_admin_key), +) -> list: + """Get all roles (admin only)""" + from . import crud + + roles = await crud.get_all_roles() + + # Enrich each role with user count and permission count + enriched_roles = [] + for role in roles: + user_count = await crud.get_user_count_for_role(role.id) + permissions = await crud.get_role_permissions(role.id) + + enriched_roles.append({ + "id": role.id, + "name": role.name, + "description": role.description, + "is_default": role.is_default, + "created_by": role.created_by, + "created_at": role.created_at.isoformat(), + "user_count": user_count, + "permission_count": len(permissions), + }) + + return enriched_roles + + +@castle_api_router.post("/api/v1/admin/roles", status_code=HTTPStatus.CREATED) +async def api_create_role( + data: CreateRole, + wallet: WalletTypeInfo = Depends(require_admin_key), +): + """Create a new role (admin only)""" + from . import crud + + try: + role = await crud.create_role(data, created_by=wallet.wallet.user) + return { + "id": role.id, + "name": role.name, + "description": role.description, + "is_default": role.is_default, + "created_by": role.created_by, + "created_at": role.created_at.isoformat(), + } + except Exception as e: + logger.error(f"Failed to create role: {e}") + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=f"Failed to create role: {str(e)}" + ) + + +@castle_api_router.get("/api/v1/admin/roles/{role_id}") +async def api_get_role( + role_id: str, + wallet: WalletTypeInfo = Depends(require_admin_key), +): + """Get a specific role with its permissions and users (admin only)""" + from . import crud + + role = await crud.get_role(role_id) + if not role: + raise HTTPException( + status_code=HTTPStatus.NOT_FOUND, + detail=f"Role {role_id} not found" + ) + + permissions = await crud.get_role_permissions(role.id) + user_roles = await crud.get_role_users(role.id) + + return { + "id": role.id, + "name": role.name, + "description": role.description, + "is_default": role.is_default, + "created_by": role.created_by, + "created_at": role.created_at.isoformat(), + "permissions": [ + { + "id": p.id, + "account_id": p.account_id, + "permission_type": p.permission_type.value, + "notes": p.notes, + "created_at": p.created_at.isoformat(), + } + for p in permissions + ], + "users": [ + { + "id": ur.id, + "user_id": ur.user_id, + "granted_by": ur.granted_by, + "granted_at": ur.granted_at.isoformat(), + "expires_at": ur.expires_at.isoformat() if ur.expires_at else None, + "notes": ur.notes, + } + for ur in user_roles + ], + } + + +@castle_api_router.put("/api/v1/admin/roles/{role_id}") +async def api_update_role( + role_id: str, + data: UpdateRole, + wallet: WalletTypeInfo = Depends(require_admin_key), +): + """Update a role (admin only)""" + from . import crud + + role = await crud.update_role(role_id, data) + if not role: + raise HTTPException( + status_code=HTTPStatus.NOT_FOUND, + detail=f"Role {role_id} not found" + ) + + return { + "id": role.id, + "name": role.name, + "description": role.description, + "is_default": role.is_default, + "created_by": role.created_by, + "created_at": role.created_at.isoformat(), + } + + +@castle_api_router.delete("/api/v1/admin/roles/{role_id}") +async def api_delete_role( + role_id: str, + wallet: WalletTypeInfo = Depends(require_admin_key), +): + """Delete a role (admin only) - cascades to role_permissions and user_roles""" + from . import crud + + role = await crud.get_role(role_id) + if not role: + raise HTTPException( + status_code=HTTPStatus.NOT_FOUND, + detail=f"Role {role_id} not found" + ) + + await crud.delete_role(role_id) + return {"success": True, "message": f"Role '{role.name}' deleted successfully"} + + +# ===== ROLE PERMISSION ENDPOINTS ===== + + +@castle_api_router.post("/api/v1/admin/roles/{role_id}/permissions", status_code=HTTPStatus.CREATED) +async def api_add_role_permission( + role_id: str, + data: CreateRolePermission, + wallet: WalletTypeInfo = Depends(require_admin_key), +): + """Add a permission to a role (admin only)""" + from . import crud + + # Verify role exists + role = await crud.get_role(role_id) + if not role: + raise HTTPException( + status_code=HTTPStatus.NOT_FOUND, + detail=f"Role {role_id} not found" + ) + + # Ensure data has correct role_id + data.role_id = role_id + + try: + permission = await crud.create_role_permission(data) + return { + "id": permission.id, + "role_id": permission.role_id, + "account_id": permission.account_id, + "permission_type": permission.permission_type.value, + "notes": permission.notes, + "created_at": permission.created_at.isoformat(), + } + except Exception as e: + logger.error(f"Failed to add role permission: {e}") + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=f"Failed to add permission: {str(e)}" + ) + + +@castle_api_router.delete("/api/v1/admin/roles/{role_id}/permissions/{permission_id}") +async def api_delete_role_permission( + role_id: str, + permission_id: str, + wallet: WalletTypeInfo = Depends(require_admin_key), +): + """Remove a permission from a role (admin only)""" + from . import crud + + await crud.delete_role_permission(permission_id) + return {"success": True, "message": "Permission removed from role"} + + +# ===== USER ROLE ASSIGNMENT ENDPOINTS ===== + + +@castle_api_router.post("/api/v1/admin/user-roles", status_code=HTTPStatus.CREATED) +async def api_assign_user_role( + data: AssignUserRole, + wallet: WalletTypeInfo = Depends(require_admin_key), +): + """Assign a user to a role (admin only)""" + from . import crud + + # Verify role exists + role = await crud.get_role(data.role_id) + if not role: + raise HTTPException( + status_code=HTTPStatus.NOT_FOUND, + detail=f"Role {data.role_id} not found" + ) + + try: + user_role = await crud.assign_user_role(data, granted_by=wallet.wallet.user) + return { + "id": user_role.id, + "user_id": user_role.user_id, + "role_id": user_role.role_id, + "granted_by": user_role.granted_by, + "granted_at": user_role.granted_at.isoformat(), + "expires_at": user_role.expires_at.isoformat() if user_role.expires_at else None, + "notes": user_role.notes, + } + except Exception as e: + logger.error(f"Failed to assign user role: {e}") + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=f"Failed to assign role: {str(e)}" + ) + + +@castle_api_router.get("/api/v1/admin/user-roles/{user_id}") +async def api_get_user_roles( + user_id: str, + wallet: WalletTypeInfo = Depends(require_admin_key), +): + """Get all roles assigned to a user (admin only)""" + from . import crud + + user_roles = await crud.get_user_roles(user_id) + + # Enrich with role details + enriched = [] + for ur in user_roles: + role = await crud.get_role(ur.role_id) + if role: + enriched.append({ + "user_role_id": ur.id, + "user_id": ur.user_id, + "role": { + "id": role.id, + "name": role.name, + "description": role.description, + "is_default": role.is_default, + }, + "granted_by": ur.granted_by, + "granted_at": ur.granted_at.isoformat(), + "expires_at": ur.expires_at.isoformat() if ur.expires_at else None, + "notes": ur.notes, + }) + + return enriched + + +@castle_api_router.delete("/api/v1/admin/user-roles/{user_role_id}") +async def api_revoke_user_role( + user_role_id: str, + wallet: WalletTypeInfo = Depends(require_admin_key), +): + """Revoke a user's role assignment (admin only)""" + from . import crud + + await crud.revoke_user_role(user_role_id) + return {"success": True, "message": "Role assignment revoked"} + + +@castle_api_router.get("/api/v1/users/me/roles") +async def api_get_my_roles( + wallet: WalletTypeInfo = Depends(require_invoice_key), +): + """Get current user's roles and effective permissions""" + from . import crud + + user_id = wallet.wallet.user + + # Get user's roles + user_roles = await crud.get_user_roles(user_id) + + # Get permissions from roles + role_permissions_list = await crud.get_user_permissions_from_roles(user_id) + + # Get direct permissions + direct_permissions = await crud.get_user_permissions(user_id) + + # Build response + roles_data = [] + for ur in user_roles: + role = await crud.get_role(ur.role_id) + if role: + permissions = await crud.get_role_permissions(role.id) + roles_data.append({ + "role": { + "id": role.id, + "name": role.name, + "description": role.description, + }, + "permissions": [ + { + "account_id": p.account_id, + "permission_type": p.permission_type.value, + } + for p in permissions + ], + "granted_at": ur.granted_at.isoformat(), + "expires_at": ur.expires_at.isoformat() if ur.expires_at else None, + }) + + return { + "roles": roles_data, + "direct_permissions": [ + { + "id": p.id, + "account_id": p.account_id, + "permission_type": p.permission_type.value, + "granted_at": p.granted_at.isoformat(), + "expires_at": p.expires_at.isoformat() if p.expires_at else None, + "notes": p.notes, + } + for p in direct_permissions + ], + }