Adds account permissioning system

Adds an account permissioning system to allow granular control over account access.

Introduces the ability to grant users specific permissions (read, submit_expense, manage) on individual accounts.  This includes support for hierarchical permission inheritance, where permissions on parent accounts cascade to child accounts.

Adds new API endpoints for managing account permissions, including granting, listing, and revoking permissions.

Integrates permission checks into existing endpoints, such as creating expense entries, to ensure that users only have access to the accounts they are authorized to use.

Fixes #33 - Implements role based access control
This commit is contained in:
padreug 2025-11-07 17:55:59 +01:00
parent 7f9cecefa1
commit 92c1649f3b
4 changed files with 617 additions and 3 deletions

221
crud.py
View file

@ -7,19 +7,24 @@ from lnbits.helpers import urlsafe_short_hash
from .models import (
Account,
AccountPermission,
AccountType,
AssertionStatus,
BalanceAssertion,
CastleSettings,
CreateAccount,
CreateAccountPermission,
CreateBalanceAssertion,
CreateEntryLine,
CreateJournalEntry,
CreateUserEquityStatus,
EntryLine,
JournalEntry,
PermissionType,
StoredUserWalletSettings,
UserBalance,
UserCastleSettings,
UserEquityStatus,
UserWalletSettings,
)
@ -1062,3 +1067,219 @@ async def get_all_equity_eligible_users() -> list["UserEquityStatus"]:
)
return [UserEquityStatus(**row) for row in rows]
# ===== ACCOUNT PERMISSION OPERATIONS =====
async def create_account_permission(
data: "CreateAccountPermission", granted_by: str
) -> "AccountPermission":
"""Create a new account permission"""
from .models import AccountPermission
permission_id = urlsafe_short_hash()
permission = AccountPermission(
id=permission_id,
user_id=data.user_id,
account_id=data.account_id,
permission_type=data.permission_type,
granted_by=granted_by,
granted_at=datetime.now(),
expires_at=data.expires_at,
notes=data.notes,
)
await db.execute(
"""
INSERT INTO account_permissions (
id, user_id, account_id, permission_type, granted_by,
granted_at, expires_at, notes
)
VALUES (
:id, :user_id, :account_id, :permission_type, :granted_by,
:granted_at, :expires_at, :notes
)
""",
{
"id": permission.id,
"user_id": permission.user_id,
"account_id": permission.account_id,
"permission_type": permission.permission_type.value,
"granted_by": permission.granted_by,
"granted_at": permission.granted_at,
"expires_at": permission.expires_at,
"notes": permission.notes,
},
)
return permission
async def get_account_permission(permission_id: str) -> Optional["AccountPermission"]:
"""Get account permission by ID"""
from .models import AccountPermission, PermissionType
row = await db.fetchone(
"SELECT * FROM account_permissions WHERE id = :id",
{"id": permission_id},
)
if not row:
return None
return AccountPermission(
id=row["id"],
user_id=row["user_id"],
account_id=row["account_id"],
permission_type=PermissionType(row["permission_type"]),
granted_by=row["granted_by"],
granted_at=row["granted_at"],
expires_at=row["expires_at"],
notes=row["notes"],
)
async def get_user_permissions(
user_id: str, permission_type: Optional["PermissionType"] = None
) -> list["AccountPermission"]:
"""Get all permissions for a specific user"""
from .models import AccountPermission, PermissionType
if permission_type:
rows = await db.fetchall(
"""
SELECT * FROM account_permissions
WHERE user_id = :user_id
AND permission_type = :permission_type
AND (expires_at IS NULL OR expires_at > :now)
ORDER BY granted_at DESC
""",
{
"user_id": user_id,
"permission_type": permission_type.value,
"now": datetime.now(),
},
)
else:
rows = await db.fetchall(
"""
SELECT * FROM account_permissions
WHERE user_id = :user_id
AND (expires_at IS NULL OR expires_at > :now)
ORDER BY granted_at DESC
""",
{"user_id": user_id, "now": datetime.now()},
)
return [
AccountPermission(
id=row["id"],
user_id=row["user_id"],
account_id=row["account_id"],
permission_type=PermissionType(row["permission_type"]),
granted_by=row["granted_by"],
granted_at=row["granted_at"],
expires_at=row["expires_at"],
notes=row["notes"],
)
for row in rows
]
async def get_account_permissions(account_id: str) -> list["AccountPermission"]:
"""Get all permissions for a specific account"""
from .models import AccountPermission, PermissionType
rows = await db.fetchall(
"""
SELECT * FROM account_permissions
WHERE account_id = :account_id
AND (expires_at IS NULL OR expires_at > :now)
ORDER BY granted_at DESC
""",
{"account_id": account_id, "now": datetime.now()},
)
return [
AccountPermission(
id=row["id"],
user_id=row["user_id"],
account_id=row["account_id"],
permission_type=PermissionType(row["permission_type"]),
granted_by=row["granted_by"],
granted_at=row["granted_at"],
expires_at=row["expires_at"],
notes=row["notes"],
)
for row in rows
]
async def delete_account_permission(permission_id: str) -> None:
"""Delete (revoke) an account permission"""
await db.execute(
"DELETE FROM account_permissions WHERE id = :id",
{"id": permission_id},
)
async def check_user_has_permission(
user_id: str, account_id: str, permission_type: "PermissionType"
) -> bool:
"""Check if user has a specific permission on an account (direct permission only, no inheritance)"""
row = await db.fetchone(
"""
SELECT id FROM account_permissions
WHERE user_id = :user_id
AND account_id = :account_id
AND permission_type = :permission_type
AND (expires_at IS NULL OR expires_at > :now)
""",
{
"user_id": user_id,
"account_id": account_id,
"permission_type": permission_type.value,
"now": datetime.now(),
},
)
return row is not None
async def get_user_permissions_with_inheritance(
user_id: str, account_name: str, permission_type: "PermissionType"
) -> list[tuple["AccountPermission", Optional[str]]]:
"""
Get all permissions for a user on an account, including inherited permissions from parent accounts.
Returns list of tuples: (permission, parent_account_name or None)
Example:
If user has permission on "Expenses:Food", they also have permission on "Expenses:Food:Groceries"
Returns: [(permission_on_food, "Expenses:Food")]
"""
from .models import AccountPermission, PermissionType
# Get all user's permissions of this type
user_permissions = await get_user_permissions(user_id, permission_type)
# Find which permissions apply to this account (direct or inherited)
applicable_permissions = []
for perm in user_permissions:
# Get the account for this permission
account = await get_account(perm.account_id)
if not account:
continue
# Check if this account is a parent of the target account
# Parent accounts are indicated by hierarchical names (colon-separated)
# e.g., "Expenses:Food" is parent of "Expenses:Food:Groceries"
if account_name == account.name:
# Direct permission
applicable_permissions.append((perm, None))
elif account_name.startswith(account.name + ":"):
# Inherited permission from parent account
applicable_permissions.append((perm, account.name))
return applicable_permissions