Add RBAC (Role-Based Access Control) system - Phase 1

Implemented comprehensive role-based permission management system:

Database:
- Added m004_add_rbac_tables migration
- roles table: Define named permission bundles (Employee, Contractor, etc.)
- role_permissions table: Map roles to account permissions
- user_roles table: Assign users to roles with optional expiration
- Created 4 default roles: Employee (default), Contractor, Accountant, Manager

Models (models.py):
- Role, CreateRole, UpdateRole
- RolePermission, CreateRolePermission
- UserRole, AssignUserRole
- RoleWithPermissions, UserWithRoles

CRUD Operations (crud.py):
- Role management: create_role, get_role, get_all_roles, update_role, delete_role
- get_default_role() - get auto-assigned role for new users
- Role permissions: create_role_permission, get_role_permissions, delete_role_permission
- User role assignment: assign_user_role, get_user_roles, revoke_user_role
- Helper functions:
  - get_user_permissions_from_roles() - resolve user permissions via roles
  - check_user_has_role_permission() - check role-based access
  - auto_assign_default_role() - auto-assign default role to new users

Permission Resolution Order:
1. Individual account_permissions (direct grants/exceptions)
2. Role-based permissions (via user_roles → role_permissions)
3. Inherited permissions (hierarchical account names)
4. Deny by default

Next: API endpoints, UI, and permission resolution logic integration

🤖 Generated with Claude Code
This commit is contained in:
padreug 2025-11-11 23:34:28 +01:00
parent 142b26d7da
commit 46e910ba25
3 changed files with 679 additions and 0 deletions

416
crud.py
View file

@ -12,6 +12,7 @@ from .models import (
AccountPermission,
AccountType,
AssertionStatus,
AssignUserRole,
BalanceAssertion,
CastleSettings,
CreateAccount,
@ -19,15 +20,23 @@ from .models import (
CreateBalanceAssertion,
CreateEntryLine,
CreateJournalEntry,
CreateRole,
CreateRolePermission,
CreateUserEquityStatus,
EntryLine,
JournalEntry,
PermissionType,
Role,
RolePermission,
RoleWithPermissions,
StoredUserWalletSettings,
UpdateRole,
UserBalance,
UserCastleSettings,
UserEquityStatus,
UserRole,
UserWalletSettings,
UserWithRoles,
)
# Import core accounting logic
@ -1210,3 +1219,410 @@ async def get_user_permissions_with_inheritance(
applicable_permissions.append((perm, account.name))
return applicable_permissions
# ===== ROLE-BASED ACCESS CONTROL (RBAC) OPERATIONS =====
async def create_role(data: CreateRole, created_by: str) -> Role:
"""Create a new role"""
role_id = urlsafe_short_hash()
role = Role(
id=role_id,
name=data.name,
description=data.description,
is_default=data.is_default,
created_by=created_by,
created_at=datetime.now(),
)
await db.execute(
"""
INSERT INTO roles (id, name, description, is_default, created_by, created_at)
VALUES (:id, :name, :description, :is_default, :created_by, :created_at)
""",
{
"id": role.id,
"name": role.name,
"description": role.description,
"is_default": role.is_default,
"created_by": role.created_by,
"created_at": role.created_at,
},
)
return role
async def get_role(role_id: str) -> Optional[Role]:
"""Get role by ID"""
row = await db.fetchone(
"SELECT * FROM roles WHERE id = :id",
{"id": role_id},
)
if not row:
return None
return Role(
id=row["id"],
name=row["name"],
description=row["description"],
is_default=row["is_default"],
created_by=row["created_by"],
created_at=row["created_at"],
)
async def get_role_by_name(name: str) -> Optional[Role]:
"""Get role by name"""
row = await db.fetchone(
"SELECT * FROM roles WHERE name = :name",
{"name": name},
)
if not row:
return None
return Role(
id=row["id"],
name=row["name"],
description=row["description"],
is_default=row["is_default"],
created_by=row["created_by"],
created_at=row["created_at"],
)
async def get_all_roles() -> list[Role]:
"""Get all roles"""
rows = await db.fetchall(
"SELECT * FROM roles ORDER BY name",
)
return [
Role(
id=row["id"],
name=row["name"],
description=row["description"],
is_default=row["is_default"],
created_by=row["created_by"],
created_at=row["created_at"],
)
for row in rows
]
async def get_default_role() -> Optional[Role]:
"""Get the default role that is auto-assigned to new users"""
row = await db.fetchone(
"SELECT * FROM roles WHERE is_default = TRUE LIMIT 1",
)
if not row:
return None
return Role(
id=row["id"],
name=row["name"],
description=row["description"],
is_default=row["is_default"],
created_by=row["created_by"],
created_at=row["created_at"],
)
async def update_role(role_id: str, data: UpdateRole) -> Optional[Role]:
"""Update a role"""
# If setting this role as default, unset any other default roles
if data.is_default is True:
await db.execute(
"UPDATE roles SET is_default = FALSE WHERE id != :id",
{"id": role_id},
)
# Build update statement dynamically based on provided fields
updates = []
params = {"id": role_id}
if data.name is not None:
updates.append("name = :name")
params["name"] = data.name
if data.description is not None:
updates.append("description = :description")
params["description"] = data.description
if data.is_default is not None:
updates.append("is_default = :is_default")
params["is_default"] = data.is_default
if not updates:
return await get_role(role_id)
await db.execute(
f"UPDATE roles SET {', '.join(updates)} WHERE id = :id",
params,
)
return await get_role(role_id)
async def delete_role(role_id: str) -> None:
"""Delete a role (cascade deletes role_permissions and user_roles)"""
await db.execute(
"DELETE FROM roles WHERE id = :id",
{"id": role_id},
)
# ===== ROLE PERMISSION OPERATIONS =====
async def create_role_permission(data: CreateRolePermission) -> RolePermission:
"""Create a permission for a role"""
permission_id = urlsafe_short_hash()
permission = RolePermission(
id=permission_id,
role_id=data.role_id,
account_id=data.account_id,
permission_type=data.permission_type,
notes=data.notes,
created_at=datetime.now(),
)
await db.execute(
"""
INSERT INTO role_permissions (id, role_id, account_id, permission_type, notes, created_at)
VALUES (:id, :role_id, :account_id, :permission_type, :notes, :created_at)
""",
{
"id": permission.id,
"role_id": permission.role_id,
"account_id": permission.account_id,
"permission_type": permission.permission_type.value,
"notes": permission.notes,
"created_at": permission.created_at,
},
)
return permission
async def get_role_permissions(role_id: str) -> list[RolePermission]:
"""Get all permissions for a specific role"""
rows = await db.fetchall(
"""
SELECT * FROM role_permissions
WHERE role_id = :role_id
ORDER BY created_at DESC
""",
{"role_id": role_id},
)
return [
RolePermission(
id=row["id"],
role_id=row["role_id"],
account_id=row["account_id"],
permission_type=PermissionType(row["permission_type"]),
notes=row["notes"],
created_at=row["created_at"],
)
for row in rows
]
async def delete_role_permission(permission_id: str) -> None:
"""Delete a role permission"""
await db.execute(
"DELETE FROM role_permissions WHERE id = :id",
{"id": permission_id},
)
# ===== USER ROLE OPERATIONS =====
async def assign_user_role(data: AssignUserRole, granted_by: str) -> UserRole:
"""Assign a user to a role"""
user_role_id = urlsafe_short_hash()
user_role = UserRole(
id=user_role_id,
user_id=data.user_id,
role_id=data.role_id,
granted_by=granted_by,
granted_at=datetime.now(),
expires_at=data.expires_at,
notes=data.notes,
)
await db.execute(
"""
INSERT INTO user_roles (id, user_id, role_id, granted_by, granted_at, expires_at, notes)
VALUES (:id, :user_id, :role_id, :granted_by, :granted_at, :expires_at, :notes)
""",
{
"id": user_role.id,
"user_id": user_role.user_id,
"role_id": user_role.role_id,
"granted_by": user_role.granted_by,
"granted_at": user_role.granted_at,
"expires_at": user_role.expires_at,
"notes": user_role.notes,
},
)
return user_role
async def get_user_roles(user_id: str) -> list[UserRole]:
"""Get all active roles for a user"""
rows = await db.fetchall(
"""
SELECT * FROM user_roles
WHERE user_id = :user_id
AND (expires_at IS NULL OR expires_at > :now)
ORDER BY granted_at DESC
""",
{"user_id": user_id, "now": datetime.now()},
)
return [
UserRole(
id=row["id"],
user_id=row["user_id"],
role_id=row["role_id"],
granted_by=row["granted_by"],
granted_at=row["granted_at"],
expires_at=row["expires_at"],
notes=row["notes"],
)
for row in rows
]
async def get_role_users(role_id: str) -> list[UserRole]:
"""Get all users assigned to a role"""
rows = await db.fetchall(
"""
SELECT * FROM user_roles
WHERE role_id = :role_id
AND (expires_at IS NULL OR expires_at > :now)
ORDER BY granted_at DESC
""",
{"role_id": role_id, "now": datetime.now()},
)
return [
UserRole(
id=row["id"],
user_id=row["user_id"],
role_id=row["role_id"],
granted_by=row["granted_by"],
granted_at=row["granted_at"],
expires_at=row["expires_at"],
notes=row["notes"],
)
for row in rows
]
async def revoke_user_role(user_role_id: str) -> None:
"""Revoke a user's role assignment"""
await db.execute(
"DELETE FROM user_roles WHERE id = :id",
{"id": user_role_id},
)
async def get_role_count_for_user(user_id: str) -> int:
"""Get count of active roles for a user"""
row = await db.fetchone(
"""
SELECT COUNT(*) as count FROM user_roles
WHERE user_id = :user_id
AND (expires_at IS NULL OR expires_at > :now)
""",
{"user_id": user_id, "now": datetime.now()},
)
return row["count"] if row else 0
async def get_user_count_for_role(role_id: str) -> int:
"""Get count of users assigned to a role"""
row = await db.fetchone(
"""
SELECT COUNT(*) as count FROM user_roles
WHERE role_id = :role_id
AND (expires_at IS NULL OR expires_at > :now)
""",
{"role_id": role_id, "now": datetime.now()},
)
return row["count"] if row else 0
# ===== RBAC HELPER FUNCTIONS =====
async def get_user_permissions_from_roles(
user_id: str,
) -> list[tuple[Role, list[RolePermission]]]:
"""
Get all permissions a user has through their role assignments.
Returns list of tuples: (role, list of permissions from that role)
"""
# Get user's active roles
user_roles = await get_user_roles(user_id)
result = []
for user_role in user_roles:
role = await get_role(user_role.role_id)
if role:
permissions = await get_role_permissions(role.id)
result.append((role, permissions))
return result
async def check_user_has_role_permission(
user_id: str, account_id: str, permission_type: PermissionType
) -> bool:
"""Check if user has a specific permission through any of their roles"""
# Get all permissions from user's roles
role_permissions = await get_user_permissions_from_roles(user_id)
# Check if any role grants the required permission on this account
for role, permissions in role_permissions:
for perm in permissions:
if perm.account_id == account_id and perm.permission_type == permission_type:
return True
return False
async def auto_assign_default_role(user_id: str) -> Optional[UserRole]:
"""
Auto-assign the default role to a new user.
Returns the UserRole if a default role exists and was assigned, None otherwise.
"""
default_role = await get_default_role()
if not default_role:
return None
# Check if user already has this role
user_roles = await get_user_roles(user_id)
if any(ur.role_id == default_role.id for ur in user_roles):
return None
# Assign the default role
return await assign_user_role(
AssignUserRole(
user_id=user_id,
role_id=default_role.id,
notes="Auto-assigned default role",
),
granted_by="system",
)