""" Bulk Permission Management Module Provides convenience functions for managing permissions at scale. Features: - Bulk grant to multiple users - Bulk revoke operations - Permission templates/copying - User offboarding - Permission analytics Related: PERMISSIONS-SYSTEM.md - Improvement Opportunity #3 """ from datetime import datetime from typing import Optional from loguru import logger from .crud import ( create_account_permission, delete_account_permission, get_account_permissions, get_user_permissions, get_account, ) from .models import ( AccountPermission, CreateAccountPermission, PermissionType, ) async def bulk_grant_permission( user_ids: list[str], account_id: str, permission_type: PermissionType, granted_by: str, expires_at: Optional[datetime] = None, notes: Optional[str] = None, ) -> dict: """ Grant the same permission to multiple users. Args: user_ids: List of user IDs to grant permission to account_id: Account to grant permission on permission_type: Type of permission (READ, SUBMIT_EXPENSE, MANAGE) granted_by: Admin user ID granting the permission expires_at: Optional expiration date notes: Optional notes about this bulk grant Returns: dict with results: { "granted": 15, "failed": 2, "errors": ["user123: Already has permission", ...], "permissions": [permission_obj, ...] } Example: # Grant submit_expense to all food team members await bulk_grant_permission( user_ids=["alice", "bob", "charlie"], account_id="expenses_food_id", permission_type=PermissionType.SUBMIT_EXPENSE, granted_by="admin", expires_at=datetime(2025, 12, 31), notes="Q4 food team members" ) """ logger.info( f"Bulk granting {permission_type.value} permission to {len(user_ids)} users on account {account_id}" ) # Verify account exists account = await get_account(account_id) if not account: return { "granted": 0, "failed": len(user_ids), "errors": [f"Account {account_id} not found"], "permissions": [], } granted = 0 failed = 0 errors = [] permissions = [] for user_id in user_ids: try: permission = await create_account_permission( data=CreateAccountPermission( user_id=user_id, account_id=account_id, permission_type=permission_type, expires_at=expires_at, notes=notes, ), granted_by=granted_by, ) permissions.append(permission) granted += 1 logger.debug(f"Granted {permission_type.value} to {user_id} on {account.name}") except Exception as e: failed += 1 error_msg = f"{user_id}: {str(e)}" errors.append(error_msg) logger.warning(f"Failed to grant permission to {user_id}: {e}") logger.info( f"Bulk grant complete: {granted} granted, {failed} failed on account {account.name}" ) return { "granted": granted, "failed": failed, "errors": errors, "permissions": permissions, } async def revoke_all_user_permissions(user_id: str) -> dict: """ Revoke ALL permissions for a user (offboarding). Args: user_id: User ID to revoke all permissions from Returns: dict with results: { "revoked": 5, "failed": 0, "errors": [], "permission_types_removed": ["read", "submit_expense"] } Example: # Remove all access when user leaves await revoke_all_user_permissions("departed_user") """ logger.info(f"Revoking ALL permissions for user {user_id}") permissions = await get_user_permissions(user_id) revoked = 0 failed = 0 errors = [] permission_types = set() for perm in permissions: try: await delete_account_permission(perm.id) revoked += 1 permission_types.add(perm.permission_type.value) logger.debug(f"Revoked {perm.permission_type.value} from {user_id}") except Exception as e: failed += 1 error_msg = f"{perm.id}: {str(e)}" errors.append(error_msg) logger.warning(f"Failed to revoke permission {perm.id}: {e}") logger.info(f"User offboarding complete: {revoked} permissions revoked for {user_id}") return { "revoked": revoked, "failed": failed, "errors": errors, "permission_types_removed": sorted(list(permission_types)), } async def revoke_all_permissions_on_account(account_id: str) -> dict: """ Revoke ALL permissions on an account (account closure). Args: account_id: Account ID to revoke all permissions from Returns: dict with results: { "revoked": 8, "failed": 0, "errors": [], "users_affected": ["alice", "bob", "charlie"] } Example: # Close project and remove all access await revoke_all_permissions_on_account("old_project_id") """ logger.info(f"Revoking ALL permissions on account {account_id}") permissions = await get_account_permissions(account_id) revoked = 0 failed = 0 errors = [] users_affected = set() for perm in permissions: try: await delete_account_permission(perm.id) revoked += 1 users_affected.add(perm.user_id) logger.debug(f"Revoked permission from {perm.user_id} on account") except Exception as e: failed += 1 error_msg = f"{perm.id}: {str(e)}" errors.append(error_msg) logger.warning(f"Failed to revoke permission {perm.id}: {e}") logger.info(f"Account closure complete: {revoked} permissions revoked") return { "revoked": revoked, "failed": failed, "errors": errors, "users_affected": sorted(list(users_affected)), } async def copy_permissions( from_user_id: str, to_user_id: str, granted_by: str, permission_types: Optional[list[PermissionType]] = None, notes: Optional[str] = None, ) -> dict: """ Copy all permissions from one user to another (permission template). Args: from_user_id: User to copy permissions from to_user_id: User to copy permissions to granted_by: Admin granting the new permissions permission_types: Optional filter - only copy specific permission types notes: Optional notes for the copied permissions Returns: dict with results: { "copied": 5, "failed": 0, "errors": [], "permissions": [permission_obj, ...] } Example: # Copy all submit_expense permissions from experienced user await copy_permissions( from_user_id="alice", to_user_id="bob", granted_by="admin", permission_types=[PermissionType.SUBMIT_EXPENSE], notes="Copied from Alice - new food coordinator" ) """ logger.info(f"Copying permissions from {from_user_id} to {to_user_id}") # Get source user's permissions source_permissions = await get_user_permissions(from_user_id) # Filter by permission type if specified if permission_types: source_permissions = [ p for p in source_permissions if p.permission_type in permission_types ] copied = 0 failed = 0 errors = [] permissions = [] for source_perm in source_permissions: try: # Create new permission for target user new_permission = await create_account_permission( data=CreateAccountPermission( user_id=to_user_id, account_id=source_perm.account_id, permission_type=source_perm.permission_type, expires_at=source_perm.expires_at, # Copy expiration notes=notes or f"Copied from {from_user_id}", ), granted_by=granted_by, ) permissions.append(new_permission) copied += 1 logger.debug( f"Copied {source_perm.permission_type.value} permission to {to_user_id}" ) except Exception as e: failed += 1 error_msg = f"{source_perm.id}: {str(e)}" errors.append(error_msg) logger.warning(f"Failed to copy permission {source_perm.id}: {e}") logger.info(f"Permission copy complete: {copied} copied, {failed} failed") return { "copied": copied, "failed": failed, "errors": errors, "permissions": permissions, } async def get_permission_analytics() -> dict: """ Get analytics about permission usage (for admin dashboard). Returns: dict with analytics: { "total_permissions": 150, "by_type": {"read": 50, "submit_expense": 80, "manage": 20}, "expiring_soon": [...], # Expire in next 7 days "expired": [...], # Already expired but not cleaned up "users_with_permissions": 45, "users_without_permissions": ["bob", ...], "most_permissioned_accounts": [...] } Example: stats = await get_permission_analytics() print(f"Total permissions: {stats['total_permissions']}") """ from datetime import timedelta from . import db logger.debug("Gathering permission analytics") # Total permissions total_result = await db.fetchone("SELECT COUNT(*) as count FROM account_permissions") total_permissions = total_result["count"] if total_result else 0 # By type type_result = await db.fetchall( """ SELECT permission_type, COUNT(*) as count FROM account_permissions GROUP BY permission_type """ ) by_type = {row["permission_type"]: row["count"] for row in type_result} # Expiring soon (next 7 days) seven_days_from_now = datetime.now() + timedelta(days=7) expiring_result = await db.fetchall( """ SELECT ap.*, a.name as account_name FROM account_permissions ap JOIN castle_accounts a ON ap.account_id = a.id WHERE ap.expires_at IS NOT NULL AND ap.expires_at > :now AND ap.expires_at <= :seven_days ORDER BY ap.expires_at ASC LIMIT 20 """, {"now": datetime.now(), "seven_days": seven_days_from_now}, ) expiring_soon = [ { "user_id": row["user_id"], "account_name": row["account_name"], "permission_type": row["permission_type"], "expires_at": row["expires_at"], } for row in expiring_result ] # Most permissioned accounts top_accounts_result = await db.fetchall( """ SELECT a.name, COUNT(ap.id) as permission_count FROM castle_accounts a LEFT JOIN account_permissions ap ON a.id = ap.account_id GROUP BY a.id, a.name HAVING COUNT(ap.id) > 0 ORDER BY permission_count DESC LIMIT 10 """ ) most_permissioned_accounts = [ {"account": row["name"], "permission_count": row["permission_count"]} for row in top_accounts_result ] # Unique users with permissions users_result = await db.fetchone( "SELECT COUNT(DISTINCT user_id) as count FROM account_permissions" ) users_with_permissions = users_result["count"] if users_result else 0 return { "total_permissions": total_permissions, "by_type": by_type, "expiring_soon": expiring_soon, "users_with_permissions": users_with_permissions, "most_permissioned_accounts": most_permissioned_accounts, } async def cleanup_expired_permissions(days_old: int = 30) -> dict: """ Clean up permissions that expired more than N days ago. Args: days_old: Delete permissions expired this many days ago Returns: dict with results: { "deleted": 15, "errors": [] } Example: # Delete permissions expired more than 30 days ago await cleanup_expired_permissions(days_old=30) """ from datetime import timedelta from . import db logger.info(f"Cleaning up permissions expired more than {days_old} days ago") cutoff_date = datetime.now() - timedelta(days=days_old) try: result = await db.execute( """ DELETE FROM account_permissions WHERE expires_at IS NOT NULL AND expires_at < :cutoff_date """, {"cutoff_date": cutoff_date}, ) # SQLite doesn't return rowcount reliably, so count before delete count_result = await db.fetchone( """ SELECT COUNT(*) as count FROM account_permissions WHERE expires_at IS NOT NULL AND expires_at < :cutoff_date """, {"cutoff_date": cutoff_date}, ) deleted = count_result["count"] if count_result else 0 logger.info(f"Cleaned up {deleted} expired permissions") return { "deleted": deleted, "errors": [], } except Exception as e: logger.error(f"Failed to cleanup expired permissions: {e}") return { "deleted": 0, "errors": [str(e)], }