feat: code quality

format

add ci

fixup ci
This commit is contained in:
dni ⚡ 2024-08-12 14:52:42 +02:00
parent 09bb033f85
commit 42b5edaf5d
26 changed files with 3199 additions and 295 deletions

128
crud.py
View file

@ -1,99 +1,77 @@
from typing import List, Optional, Union
from typing import Optional, Union
from lnbits.helpers import urlsafe_short_hash
from lnbits.lnurl import encode as lnurl_encode
from . import db
from .models import CreateMyExtensionData, MyExtension
from fastapi import Request
from lnurl import encode as lnurl_encode
import shortuuid
from lnbits.db import Database
from lnbits.helpers import insert_query, update_query
from .models import MyExtension
db = Database("ext_myextension")
table_name = "myextension.maintable"
async def create_myextension(
wallet_id: str, data: CreateMyExtensionData, req: Request
) -> MyExtension:
myextension_id = urlsafe_short_hash()
async def create_myextension(data: MyExtension) -> MyExtension:
await db.execute(
"""
INSERT INTO myextension.maintable (id, wallet, name, lnurlpayamount, lnurlwithdrawamount)
VALUES (?, ?, ?, ?, ?)
""",
(
myextension_id,
wallet_id,
data.name,
data.lnurlpayamount,
data.lnurlwithdrawamount,
),
insert_query(table_name, data),
(*data.dict().values(),),
)
myextension = await get_myextension(myextension_id, req)
assert myextension, "Newly created table couldn't be retrieved"
return myextension
return data
# this is how we used to do it
# myextension_id = urlsafe_short_hash()
# await db.execute(
# """
# INSERT INTO myextension.maintable
# (id, wallet, name, lnurlpayamount, lnurlwithdrawamount)
# VALUES (?, ?, ?, ?, ?)
# """,
# (
# myextension_id,
# wallet_id,
# data.name,
# data.lnurlpayamount,
# data.lnurlwithdrawamount,
# ),
# )
# myextension = await get_myextension(myextension_id)
# assert myextension, "Newly created table couldn't be retrieved"
async def get_myextension(
myextension_id: str, req: Optional[Request] = None
) -> Optional[MyExtension]:
async def get_myextension(myextension_id: str) -> Optional[MyExtension]:
row = await db.fetchone(
"SELECT * FROM myextension.maintable WHERE id = ?", (myextension_id,)
f"SELECT * FROM {table_name} WHERE id = ?", (myextension_id,)
)
if not row:
return None
rowAmended = MyExtension(**row)
if req:
rowAmended.lnurlpay = lnurl_encode(
req.url_for("myextension.api_lnurl_pay", myextension_id=row.id)._url
)
rowAmended.lnurlwithdraw = lnurl_encode(
req.url_for(
"myextension.api_lnurl_withdraw",
myextension_id=row.id,
tickerhash=shortuuid.uuid(name=rowAmended.id + str(rowAmended.ticker)),
)._url
)
return rowAmended
return MyExtension(**row) if row else None
async def get_myextensions(
wallet_ids: Union[str, List[str]], req: Optional[Request] = None
) -> List[MyExtension]:
async def get_myextensions(wallet_ids: Union[str, list[str]]) -> list[MyExtension]:
if isinstance(wallet_ids, str):
wallet_ids = [wallet_ids]
q = ",".join(["?"] * len(wallet_ids))
rows = await db.fetchall(
f"SELECT * FROM myextension.maintable WHERE wallet IN ({q})", (*wallet_ids,)
f"SELECT * FROM {table_name} WHERE wallet IN ({q})", (*wallet_ids,)
)
tempRows = [MyExtension(**row) for row in rows]
if req:
for row in tempRows:
row.lnurlpay = lnurl_encode(
req.url_for("myextension.api_lnurl_pay", myextension_id=row.id)._url
)
row.lnurlwithdraw = lnurl_encode(
req.url_for(
"myextension.api_lnurl_withdraw",
myextension_id=row.id,
tickerhash=shortuuid.uuid(name=row.id + str(row.ticker)),
)._url
)
return tempRows
return [MyExtension(**row) for row in rows]
async def update_myextension(
myextension_id: str, req: Optional[Request] = None, **kwargs
) -> MyExtension:
q = ", ".join([f"{field[0]} = ?" for field in kwargs.items()])
async def update_myextension(data: MyExtension) -> MyExtension:
await db.execute(
f"UPDATE myextension.maintable SET {q} WHERE id = ?",
(*kwargs.values(), myextension_id),
update_query(table_name, data),
(
*data.dict().values(),
data.id,
),
)
myextension = await get_myextension(myextension_id, req)
assert myextension, "Newly updated myextension couldn't be retrieved"
return myextension
return data
# this is how we used to do it
# q = ", ".join([f"{field[0]} = ?" for field in kwargs.items()])
# await db.execute(
# f"UPDATE myextension.maintable SET {q} WHERE id = ?",
# (*kwargs.values(), myextension_id),
# )
async def delete_myextension(myextension_id: str) -> None:
await db.execute(
"DELETE FROM myextension.maintable WHERE id = ?", (myextension_id,)
)
await db.execute(f"DELETE FROM {table_name} WHERE id = ?", (myextension_id,))