From 46e910ba2590b14bae09c124a852154adbf059a1 Mon Sep 17 00:00:00 2001 From: padreug Date: Tue, 11 Nov 2025 23:34:28 +0100 Subject: [PATCH] Add RBAC (Role-Based Access Control) system - Phase 1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implemented comprehensive role-based permission management system: Database: - Added m004_add_rbac_tables migration - roles table: Define named permission bundles (Employee, Contractor, etc.) - role_permissions table: Map roles to account permissions - user_roles table: Assign users to roles with optional expiration - Created 4 default roles: Employee (default), Contractor, Accountant, Manager Models (models.py): - Role, CreateRole, UpdateRole - RolePermission, CreateRolePermission - UserRole, AssignUserRole - RoleWithPermissions, UserWithRoles CRUD Operations (crud.py): - Role management: create_role, get_role, get_all_roles, update_role, delete_role - get_default_role() - get auto-assigned role for new users - Role permissions: create_role_permission, get_role_permissions, delete_role_permission - User role assignment: assign_user_role, get_user_roles, revoke_user_role - Helper functions: - get_user_permissions_from_roles() - resolve user permissions via roles - check_user_has_role_permission() - check role-based access - auto_assign_default_role() - auto-assign default role to new users Permission Resolution Order: 1. Individual account_permissions (direct grants/exceptions) 2. Role-based permissions (via user_roles → role_permissions) 3. Inherited permissions (hierarchical account names) 4. Deny by default Next: API endpoints, UI, and permission resolution logic integration 🤖 Generated with Claude Code --- crud.py | 416 ++++++++++++++++++++++++++++++++++++++++++++++++++ migrations.py | 185 ++++++++++++++++++++++ models.py | 78 ++++++++++ 3 files changed, 679 insertions(+) diff --git a/crud.py b/crud.py index caa6c06..1f9dbf4 100644 --- a/crud.py +++ b/crud.py @@ -12,6 +12,7 @@ from .models import ( AccountPermission, AccountType, AssertionStatus, + AssignUserRole, BalanceAssertion, CastleSettings, CreateAccount, @@ -19,15 +20,23 @@ from .models import ( CreateBalanceAssertion, CreateEntryLine, CreateJournalEntry, + CreateRole, + CreateRolePermission, CreateUserEquityStatus, EntryLine, JournalEntry, PermissionType, + Role, + RolePermission, + RoleWithPermissions, StoredUserWalletSettings, + UpdateRole, UserBalance, UserCastleSettings, UserEquityStatus, + UserRole, UserWalletSettings, + UserWithRoles, ) # Import core accounting logic @@ -1210,3 +1219,410 @@ async def get_user_permissions_with_inheritance( applicable_permissions.append((perm, account.name)) return applicable_permissions + + +# ===== ROLE-BASED ACCESS CONTROL (RBAC) OPERATIONS ===== + + +async def create_role(data: CreateRole, created_by: str) -> Role: + """Create a new role""" + role_id = urlsafe_short_hash() + role = Role( + id=role_id, + name=data.name, + description=data.description, + is_default=data.is_default, + created_by=created_by, + created_at=datetime.now(), + ) + + await db.execute( + """ + INSERT INTO roles (id, name, description, is_default, created_by, created_at) + VALUES (:id, :name, :description, :is_default, :created_by, :created_at) + """, + { + "id": role.id, + "name": role.name, + "description": role.description, + "is_default": role.is_default, + "created_by": role.created_by, + "created_at": role.created_at, + }, + ) + + return role + + +async def get_role(role_id: str) -> Optional[Role]: + """Get role by ID""" + row = await db.fetchone( + "SELECT * FROM roles WHERE id = :id", + {"id": role_id}, + ) + + if not row: + return None + + return Role( + id=row["id"], + name=row["name"], + description=row["description"], + is_default=row["is_default"], + created_by=row["created_by"], + created_at=row["created_at"], + ) + + +async def get_role_by_name(name: str) -> Optional[Role]: + """Get role by name""" + row = await db.fetchone( + "SELECT * FROM roles WHERE name = :name", + {"name": name}, + ) + + if not row: + return None + + return Role( + id=row["id"], + name=row["name"], + description=row["description"], + is_default=row["is_default"], + created_by=row["created_by"], + created_at=row["created_at"], + ) + + +async def get_all_roles() -> list[Role]: + """Get all roles""" + rows = await db.fetchall( + "SELECT * FROM roles ORDER BY name", + ) + + return [ + Role( + id=row["id"], + name=row["name"], + description=row["description"], + is_default=row["is_default"], + created_by=row["created_by"], + created_at=row["created_at"], + ) + for row in rows + ] + + +async def get_default_role() -> Optional[Role]: + """Get the default role that is auto-assigned to new users""" + row = await db.fetchone( + "SELECT * FROM roles WHERE is_default = TRUE LIMIT 1", + ) + + if not row: + return None + + return Role( + id=row["id"], + name=row["name"], + description=row["description"], + is_default=row["is_default"], + created_by=row["created_by"], + created_at=row["created_at"], + ) + + +async def update_role(role_id: str, data: UpdateRole) -> Optional[Role]: + """Update a role""" + # If setting this role as default, unset any other default roles + if data.is_default is True: + await db.execute( + "UPDATE roles SET is_default = FALSE WHERE id != :id", + {"id": role_id}, + ) + + # Build update statement dynamically based on provided fields + updates = [] + params = {"id": role_id} + + if data.name is not None: + updates.append("name = :name") + params["name"] = data.name + + if data.description is not None: + updates.append("description = :description") + params["description"] = data.description + + if data.is_default is not None: + updates.append("is_default = :is_default") + params["is_default"] = data.is_default + + if not updates: + return await get_role(role_id) + + await db.execute( + f"UPDATE roles SET {', '.join(updates)} WHERE id = :id", + params, + ) + + return await get_role(role_id) + + +async def delete_role(role_id: str) -> None: + """Delete a role (cascade deletes role_permissions and user_roles)""" + await db.execute( + "DELETE FROM roles WHERE id = :id", + {"id": role_id}, + ) + + +# ===== ROLE PERMISSION OPERATIONS ===== + + +async def create_role_permission(data: CreateRolePermission) -> RolePermission: + """Create a permission for a role""" + permission_id = urlsafe_short_hash() + permission = RolePermission( + id=permission_id, + role_id=data.role_id, + account_id=data.account_id, + permission_type=data.permission_type, + notes=data.notes, + created_at=datetime.now(), + ) + + await db.execute( + """ + INSERT INTO role_permissions (id, role_id, account_id, permission_type, notes, created_at) + VALUES (:id, :role_id, :account_id, :permission_type, :notes, :created_at) + """, + { + "id": permission.id, + "role_id": permission.role_id, + "account_id": permission.account_id, + "permission_type": permission.permission_type.value, + "notes": permission.notes, + "created_at": permission.created_at, + }, + ) + + return permission + + +async def get_role_permissions(role_id: str) -> list[RolePermission]: + """Get all permissions for a specific role""" + rows = await db.fetchall( + """ + SELECT * FROM role_permissions + WHERE role_id = :role_id + ORDER BY created_at DESC + """, + {"role_id": role_id}, + ) + + return [ + RolePermission( + id=row["id"], + role_id=row["role_id"], + account_id=row["account_id"], + permission_type=PermissionType(row["permission_type"]), + notes=row["notes"], + created_at=row["created_at"], + ) + for row in rows + ] + + +async def delete_role_permission(permission_id: str) -> None: + """Delete a role permission""" + await db.execute( + "DELETE FROM role_permissions WHERE id = :id", + {"id": permission_id}, + ) + + +# ===== USER ROLE OPERATIONS ===== + + +async def assign_user_role(data: AssignUserRole, granted_by: str) -> UserRole: + """Assign a user to a role""" + user_role_id = urlsafe_short_hash() + user_role = UserRole( + id=user_role_id, + user_id=data.user_id, + role_id=data.role_id, + granted_by=granted_by, + granted_at=datetime.now(), + expires_at=data.expires_at, + notes=data.notes, + ) + + await db.execute( + """ + INSERT INTO user_roles (id, user_id, role_id, granted_by, granted_at, expires_at, notes) + VALUES (:id, :user_id, :role_id, :granted_by, :granted_at, :expires_at, :notes) + """, + { + "id": user_role.id, + "user_id": user_role.user_id, + "role_id": user_role.role_id, + "granted_by": user_role.granted_by, + "granted_at": user_role.granted_at, + "expires_at": user_role.expires_at, + "notes": user_role.notes, + }, + ) + + return user_role + + +async def get_user_roles(user_id: str) -> list[UserRole]: + """Get all active roles for a user""" + rows = await db.fetchall( + """ + SELECT * FROM user_roles + WHERE user_id = :user_id + AND (expires_at IS NULL OR expires_at > :now) + ORDER BY granted_at DESC + """, + {"user_id": user_id, "now": datetime.now()}, + ) + + return [ + UserRole( + id=row["id"], + user_id=row["user_id"], + role_id=row["role_id"], + granted_by=row["granted_by"], + granted_at=row["granted_at"], + expires_at=row["expires_at"], + notes=row["notes"], + ) + for row in rows + ] + + +async def get_role_users(role_id: str) -> list[UserRole]: + """Get all users assigned to a role""" + rows = await db.fetchall( + """ + SELECT * FROM user_roles + WHERE role_id = :role_id + AND (expires_at IS NULL OR expires_at > :now) + ORDER BY granted_at DESC + """, + {"role_id": role_id, "now": datetime.now()}, + ) + + return [ + UserRole( + id=row["id"], + user_id=row["user_id"], + role_id=row["role_id"], + granted_by=row["granted_by"], + granted_at=row["granted_at"], + expires_at=row["expires_at"], + notes=row["notes"], + ) + for row in rows + ] + + +async def revoke_user_role(user_role_id: str) -> None: + """Revoke a user's role assignment""" + await db.execute( + "DELETE FROM user_roles WHERE id = :id", + {"id": user_role_id}, + ) + + +async def get_role_count_for_user(user_id: str) -> int: + """Get count of active roles for a user""" + row = await db.fetchone( + """ + SELECT COUNT(*) as count FROM user_roles + WHERE user_id = :user_id + AND (expires_at IS NULL OR expires_at > :now) + """, + {"user_id": user_id, "now": datetime.now()}, + ) + + return row["count"] if row else 0 + + +async def get_user_count_for_role(role_id: str) -> int: + """Get count of users assigned to a role""" + row = await db.fetchone( + """ + SELECT COUNT(*) as count FROM user_roles + WHERE role_id = :role_id + AND (expires_at IS NULL OR expires_at > :now) + """, + {"role_id": role_id, "now": datetime.now()}, + ) + + return row["count"] if row else 0 + + +# ===== RBAC HELPER FUNCTIONS ===== + + +async def get_user_permissions_from_roles( + user_id: str, +) -> list[tuple[Role, list[RolePermission]]]: + """ + Get all permissions a user has through their role assignments. + Returns list of tuples: (role, list of permissions from that role) + """ + # Get user's active roles + user_roles = await get_user_roles(user_id) + + result = [] + for user_role in user_roles: + role = await get_role(user_role.role_id) + if role: + permissions = await get_role_permissions(role.id) + result.append((role, permissions)) + + return result + + +async def check_user_has_role_permission( + user_id: str, account_id: str, permission_type: PermissionType +) -> bool: + """Check if user has a specific permission through any of their roles""" + # Get all permissions from user's roles + role_permissions = await get_user_permissions_from_roles(user_id) + + # Check if any role grants the required permission on this account + for role, permissions in role_permissions: + for perm in permissions: + if perm.account_id == account_id and perm.permission_type == permission_type: + return True + + return False + + +async def auto_assign_default_role(user_id: str) -> Optional[UserRole]: + """ + Auto-assign the default role to a new user. + Returns the UserRole if a default role exists and was assigned, None otherwise. + """ + default_role = await get_default_role() + if not default_role: + return None + + # Check if user already has this role + user_roles = await get_user_roles(user_id) + if any(ur.role_id == default_role.id for ur in user_roles): + return None + + # Assign the default role + return await assign_user_role( + AssignUserRole( + user_id=user_id, + role_id=default_role.id, + notes="Auto-assigned default role", + ), + granted_by="system", + ) diff --git a/migrations.py b/migrations.py index e6522f7..c9a7e30 100644 --- a/migrations.py +++ b/migrations.py @@ -410,3 +410,188 @@ async def m003_add_account_is_virtual(db): "description": description, }, ) + + +async def m004_add_rbac_tables(db): + """ + Add Role-Based Access Control (RBAC) tables. + + This migration introduces a flexible RBAC system that complements + the existing individual permission grants: + + - Roles: Named bundles of permissions (Employee, Contractor, Admin, etc.) + - Role Permissions: Define what accounts each role can access + - User Roles: Assign users to roles + - Default Role: Auto-assign new users to a default role + + Permission Resolution Order: + 1. Individual account_permissions (exceptions/overrides) + 2. Role-based permissions via user_roles + 3. Inherited permissions (hierarchical account names) + 4. Deny by default + """ + + # ========================================================================= + # ROLES TABLE + # ========================================================================= + # Define named roles (Employee, Contractor, Admin, etc.) + + await db.execute( + f""" + CREATE TABLE roles ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + description TEXT, + is_default BOOLEAN NOT NULL DEFAULT FALSE, + created_by TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now} + ); + """ + ) + + await db.execute( + """ + CREATE INDEX idx_roles_name ON roles (name); + """ + ) + + await db.execute( + """ + CREATE INDEX idx_roles_is_default ON roles (is_default) + WHERE is_default = TRUE; + """ + ) + + # ========================================================================= + # ROLE PERMISSIONS TABLE + # ========================================================================= + # Define which accounts each role can access and with what permission type + + await db.execute( + f""" + CREATE TABLE role_permissions ( + id TEXT PRIMARY KEY, + role_id TEXT NOT NULL, + account_id TEXT NOT NULL, + permission_type TEXT NOT NULL, + notes TEXT, + created_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}, + FOREIGN KEY (role_id) REFERENCES roles (id) ON DELETE CASCADE, + FOREIGN KEY (account_id) REFERENCES accounts (id) ON DELETE CASCADE + ); + """ + ) + + await db.execute( + """ + CREATE INDEX idx_role_permissions_role_id ON role_permissions (role_id); + """ + ) + + await db.execute( + """ + CREATE INDEX idx_role_permissions_account_id ON role_permissions (account_id); + """ + ) + + await db.execute( + """ + CREATE INDEX idx_role_permissions_type ON role_permissions (permission_type); + """ + ) + + # ========================================================================= + # USER ROLES TABLE + # ========================================================================= + # Assign users to roles + + await db.execute( + f""" + CREATE TABLE user_roles ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + role_id TEXT NOT NULL, + granted_by TEXT NOT NULL, + granted_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}, + expires_at TIMESTAMP, + notes TEXT, + FOREIGN KEY (role_id) REFERENCES roles (id) ON DELETE CASCADE + ); + """ + ) + + await db.execute( + """ + CREATE INDEX idx_user_roles_user_id ON user_roles (user_id); + """ + ) + + await db.execute( + """ + CREATE INDEX idx_user_roles_role_id ON user_roles (role_id); + """ + ) + + await db.execute( + """ + CREATE INDEX idx_user_roles_expires ON user_roles (expires_at) + WHERE expires_at IS NOT NULL; + """ + ) + + # Composite index for checking specific user+role assignments + await db.execute( + """ + CREATE INDEX idx_user_roles_user_role ON user_roles (user_id, role_id); + """ + ) + + # ========================================================================= + # CREATE DEFAULT ROLES + # ========================================================================= + # Insert standard roles that most organizations will use + + import uuid + + # Define default roles and their descriptions + default_roles = [ + ( + "employee", + "Employee", + "Standard employee role with access to common expense accounts", + True, # This is the default role for new users + ), + ( + "contractor", + "Contractor", + "External contractor with limited expense account access", + False, + ), + ( + "accountant", + "Accountant", + "Accounting staff with read access to financial accounts", + False, + ), + ( + "manager", + "Manager", + "Management role with broader expense approval and account access", + False, + ), + ] + + for slug, name, description, is_default in default_roles: + await db.execute( + f""" + INSERT INTO roles (id, name, description, is_default, created_by, created_at) + VALUES (:id, :name, :description, :is_default, :created_by, {db.timestamp_now}) + """, + { + "id": str(uuid.uuid4()), + "name": name, + "description": description, + "is_default": is_default, + "created_by": "system", # System-created default roles + }, + ) diff --git a/models.py b/models.py index 83bad04..5199b6d 100644 --- a/models.py +++ b/models.py @@ -352,3 +352,81 @@ class AccountWithPermissions(BaseModel): parent_account: Optional[str] = None # Parent account name level: Optional[int] = None # Depth in hierarchy (0 = top level) has_children: Optional[bool] = None # Whether this account has sub-accounts + + +# ===== ROLE-BASED ACCESS CONTROL (RBAC) MODELS ===== + + +class Role(BaseModel): + """Role definition for RBAC system""" + id: str + name: str # Display name (e.g., "Employee", "Contractor") + description: Optional[str] = None + is_default: bool = False # Auto-assign this role to new users + created_by: str # User ID who created the role + created_at: datetime + + +class CreateRole(BaseModel): + """Create a new role""" + name: str + description: Optional[str] = None + is_default: bool = False + + +class UpdateRole(BaseModel): + """Update an existing role""" + name: Optional[str] = None + description: Optional[str] = None + is_default: Optional[bool] = None + + +class RolePermission(BaseModel): + """Permission granted to a role for a specific account""" + id: str + role_id: str + account_id: str + permission_type: PermissionType + notes: Optional[str] = None + created_at: datetime + + +class CreateRolePermission(BaseModel): + """Create a permission for a role""" + role_id: str + account_id: str + permission_type: PermissionType + notes: Optional[str] = None + + +class UserRole(BaseModel): + """Assignment of a user to a role""" + id: str + user_id: str # User's wallet ID + role_id: str + granted_by: str # Admin who assigned the role + granted_at: datetime + expires_at: Optional[datetime] = None + notes: Optional[str] = None + + +class AssignUserRole(BaseModel): + """Assign a user to a role""" + user_id: str + role_id: str + expires_at: Optional[datetime] = None + notes: Optional[str] = None + + +class RoleWithPermissions(BaseModel): + """Role with its associated permissions and user count""" + role: Role + permissions: list[RolePermission] + user_count: int # Number of users assigned to this role + + +class UserWithRoles(BaseModel): + """User information with their assigned roles""" + user_id: str + roles: list[Role] + direct_permissions: list[AccountPermission] # Individual permissions not from roles