Add RBAC API endpoints - Phase 2A

Implemented comprehensive REST API for role-based access control:

Role Management Endpoints (Admin only):
- GET /api/v1/admin/roles - List all roles with user/permission counts
- POST /api/v1/admin/roles - Create new role
- GET /api/v1/admin/roles/{role_id} - Get role details with permissions and users
- PUT /api/v1/admin/roles/{role_id} - Update role (name, description, is_default)
- DELETE /api/v1/admin/roles/{role_id} - Delete role (cascades to permissions/assignments)

Role Permission Endpoints (Admin only):
- POST /api/v1/admin/roles/{role_id}/permissions - Add permission to role
- DELETE /api/v1/admin/roles/{role_id}/permissions/{permission_id} - Remove permission

User Role Assignment Endpoints (Admin only):
- POST /api/v1/admin/user-roles - Assign user to role (with optional expiration)
- GET /api/v1/admin/user-roles/{user_id} - Get user's role assignments
- DELETE /api/v1/admin/user-roles/{user_role_id} - Revoke role assignment

User Endpoints:
- GET /api/v1/users/me/roles - Get current user's roles and effective permissions
  (includes both role-based and direct permissions)

All endpoints include:
- Proper error handling with HTTP status codes
- Admin key requirement for management operations
- Rich response data with timestamps and metadata
- Role details enriched with user counts and permission counts

Next: Implement Roles tab UI and JavaScript integration

🤖 Generated with Claude Code
This commit is contained in:
padreug 2025-11-11 23:47:13 +01:00
parent 46e910ba25
commit c086916be8

View file

@ -3219,3 +3219,349 @@ async def api_sync_single_account(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Account sync failed: {str(e)}" detail=f"Account sync failed: {str(e)}"
) )
# ===== RBAC (ROLE-BASED ACCESS CONTROL) ENDPOINTS =====
@castle_api_router.get("/api/v1/admin/roles")
async def api_get_all_roles(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> list:
"""Get all roles (admin only)"""
from . import crud
roles = await crud.get_all_roles()
# Enrich each role with user count and permission count
enriched_roles = []
for role in roles:
user_count = await crud.get_user_count_for_role(role.id)
permissions = await crud.get_role_permissions(role.id)
enriched_roles.append({
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
"created_by": role.created_by,
"created_at": role.created_at.isoformat(),
"user_count": user_count,
"permission_count": len(permissions),
})
return enriched_roles
@castle_api_router.post("/api/v1/admin/roles", status_code=HTTPStatus.CREATED)
async def api_create_role(
data: CreateRole,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Create a new role (admin only)"""
from . import crud
try:
role = await crud.create_role(data, created_by=wallet.wallet.user)
return {
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
"created_by": role.created_by,
"created_at": role.created_at.isoformat(),
}
except Exception as e:
logger.error(f"Failed to create role: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Failed to create role: {str(e)}"
)
@castle_api_router.get("/api/v1/admin/roles/{role_id}")
async def api_get_role(
role_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Get a specific role with its permissions and users (admin only)"""
from . import crud
role = await crud.get_role(role_id)
if not role:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Role {role_id} not found"
)
permissions = await crud.get_role_permissions(role.id)
user_roles = await crud.get_role_users(role.id)
return {
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
"created_by": role.created_by,
"created_at": role.created_at.isoformat(),
"permissions": [
{
"id": p.id,
"account_id": p.account_id,
"permission_type": p.permission_type.value,
"notes": p.notes,
"created_at": p.created_at.isoformat(),
}
for p in permissions
],
"users": [
{
"id": ur.id,
"user_id": ur.user_id,
"granted_by": ur.granted_by,
"granted_at": ur.granted_at.isoformat(),
"expires_at": ur.expires_at.isoformat() if ur.expires_at else None,
"notes": ur.notes,
}
for ur in user_roles
],
}
@castle_api_router.put("/api/v1/admin/roles/{role_id}")
async def api_update_role(
role_id: str,
data: UpdateRole,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Update a role (admin only)"""
from . import crud
role = await crud.update_role(role_id, data)
if not role:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Role {role_id} not found"
)
return {
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
"created_by": role.created_by,
"created_at": role.created_at.isoformat(),
}
@castle_api_router.delete("/api/v1/admin/roles/{role_id}")
async def api_delete_role(
role_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Delete a role (admin only) - cascades to role_permissions and user_roles"""
from . import crud
role = await crud.get_role(role_id)
if not role:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Role {role_id} not found"
)
await crud.delete_role(role_id)
return {"success": True, "message": f"Role '{role.name}' deleted successfully"}
# ===== ROLE PERMISSION ENDPOINTS =====
@castle_api_router.post("/api/v1/admin/roles/{role_id}/permissions", status_code=HTTPStatus.CREATED)
async def api_add_role_permission(
role_id: str,
data: CreateRolePermission,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Add a permission to a role (admin only)"""
from . import crud
# Verify role exists
role = await crud.get_role(role_id)
if not role:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Role {role_id} not found"
)
# Ensure data has correct role_id
data.role_id = role_id
try:
permission = await crud.create_role_permission(data)
return {
"id": permission.id,
"role_id": permission.role_id,
"account_id": permission.account_id,
"permission_type": permission.permission_type.value,
"notes": permission.notes,
"created_at": permission.created_at.isoformat(),
}
except Exception as e:
logger.error(f"Failed to add role permission: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Failed to add permission: {str(e)}"
)
@castle_api_router.delete("/api/v1/admin/roles/{role_id}/permissions/{permission_id}")
async def api_delete_role_permission(
role_id: str,
permission_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Remove a permission from a role (admin only)"""
from . import crud
await crud.delete_role_permission(permission_id)
return {"success": True, "message": "Permission removed from role"}
# ===== USER ROLE ASSIGNMENT ENDPOINTS =====
@castle_api_router.post("/api/v1/admin/user-roles", status_code=HTTPStatus.CREATED)
async def api_assign_user_role(
data: AssignUserRole,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Assign a user to a role (admin only)"""
from . import crud
# Verify role exists
role = await crud.get_role(data.role_id)
if not role:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Role {data.role_id} not found"
)
try:
user_role = await crud.assign_user_role(data, granted_by=wallet.wallet.user)
return {
"id": user_role.id,
"user_id": user_role.user_id,
"role_id": user_role.role_id,
"granted_by": user_role.granted_by,
"granted_at": user_role.granted_at.isoformat(),
"expires_at": user_role.expires_at.isoformat() if user_role.expires_at else None,
"notes": user_role.notes,
}
except Exception as e:
logger.error(f"Failed to assign user role: {e}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Failed to assign role: {str(e)}"
)
@castle_api_router.get("/api/v1/admin/user-roles/{user_id}")
async def api_get_user_roles(
user_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Get all roles assigned to a user (admin only)"""
from . import crud
user_roles = await crud.get_user_roles(user_id)
# Enrich with role details
enriched = []
for ur in user_roles:
role = await crud.get_role(ur.role_id)
if role:
enriched.append({
"user_role_id": ur.id,
"user_id": ur.user_id,
"role": {
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
},
"granted_by": ur.granted_by,
"granted_at": ur.granted_at.isoformat(),
"expires_at": ur.expires_at.isoformat() if ur.expires_at else None,
"notes": ur.notes,
})
return enriched
@castle_api_router.delete("/api/v1/admin/user-roles/{user_role_id}")
async def api_revoke_user_role(
user_role_id: str,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
"""Revoke a user's role assignment (admin only)"""
from . import crud
await crud.revoke_user_role(user_role_id)
return {"success": True, "message": "Role assignment revoked"}
@castle_api_router.get("/api/v1/users/me/roles")
async def api_get_my_roles(
wallet: WalletTypeInfo = Depends(require_invoice_key),
):
"""Get current user's roles and effective permissions"""
from . import crud
user_id = wallet.wallet.user
# Get user's roles
user_roles = await crud.get_user_roles(user_id)
# Get permissions from roles
role_permissions_list = await crud.get_user_permissions_from_roles(user_id)
# Get direct permissions
direct_permissions = await crud.get_user_permissions(user_id)
# Build response
roles_data = []
for ur in user_roles:
role = await crud.get_role(ur.role_id)
if role:
permissions = await crud.get_role_permissions(role.id)
roles_data.append({
"role": {
"id": role.id,
"name": role.name,
"description": role.description,
},
"permissions": [
{
"account_id": p.account_id,
"permission_type": p.permission_type.value,
}
for p in permissions
],
"granted_at": ur.granted_at.isoformat(),
"expires_at": ur.expires_at.isoformat() if ur.expires_at else None,
})
return {
"roles": roles_data,
"direct_permissions": [
{
"id": p.id,
"account_id": p.account_id,
"permission_type": p.permission_type.value,
"granted_at": p.granted_at.isoformat(),
"expires_at": p.expires_at.isoformat() if p.expires_at else None,
"notes": p.notes,
}
for p in direct_permissions
],
}