refactor: clean-up tasks.py

This commit is contained in:
Vlad Stan 2023-03-08 15:17:24 +02:00
parent 6c6cd861ce
commit 7f3438d07f
3 changed files with 126 additions and 127 deletions

View file

@ -1,10 +1,14 @@
import json import json
from typing import Optional from typing import Optional
import httpx
from loguru import logger from loguru import logger
from lnbits.core import create_invoice from lnbits.core import create_invoice, get_wallet, url_for
from .crud import ( from .crud import (
create_direct_message,
create_order,
get_merchant_by_pubkey, get_merchant_by_pubkey,
get_merchant_for_user, get_merchant_for_user,
get_order, get_order,
@ -13,11 +17,14 @@ from .crud import (
get_wallet_for_product, get_wallet_for_product,
update_order_paid_status, update_order_paid_status,
) )
from .helpers import order_from_json
from .models import ( from .models import (
Merchant,
Nostrable, Nostrable,
Order, Order,
OrderExtra, OrderExtra,
OrderStatusUpdate, OrderStatusUpdate,
PartialDirectMessage,
PartialOrder, PartialOrder,
PaymentOption, PaymentOption,
PaymentRequest, PaymentRequest,
@ -26,7 +33,9 @@ from .nostr.event import NostrEvent
from .nostr.nostr_client import publish_nostr_event from .nostr.nostr_client import publish_nostr_event
async def create_order(user_id: str, data: PartialOrder) -> Optional[PaymentRequest]: async def create_new_order(
user_id: str, data: PartialOrder
) -> Optional[PaymentRequest]:
if await get_order(user_id, data.id): if await get_order(user_id, data.id):
return None return None
if data.event_id and await get_order_by_event_id(user_id, data.event_id): if data.event_id and await get_order_by_event_id(user_id, data.event_id):
@ -102,4 +111,111 @@ async def handle_order_paid(order_id: str, merchant_pubkey: str):
dm_event = merchant.build_dm_event(dm_content, order.pubkey) dm_event = merchant.build_dm_event(dm_content, order.pubkey)
await publish_nostr_event(dm_event) await publish_nostr_event(dm_event)
except Exception as ex: except Exception as ex:
logger.warning(ex) logger.warning(ex)
async def process_nostr_message(msg: str):
try:
type, subscription_id, event = json.loads(msg)
subscription_name, public_key = subscription_id.split(":")
if type.upper() == "EVENT":
event = NostrEvent(**event)
if event.kind == 4:
await _handle_nip04_message(subscription_name, public_key, event)
except Exception as ex:
logger.warning(ex)
async def _handle_nip04_message(
subscription_name: str, public_key: str, event: NostrEvent
):
merchant = await get_merchant_by_pubkey(public_key)
assert merchant, f"Merchant not found for public key '{public_key}'"
clear_text_msg = merchant.decrypt_message(event.content, event.pubkey)
if subscription_name == "direct-messages-in":
await _handle_incoming_dms(event, merchant, clear_text_msg)
else:
await _handle_outgoing_dms(event, merchant, clear_text_msg)
async def _handle_incoming_dms(
event: NostrEvent, merchant: Merchant, clear_text_msg: str
):
dm_content = await _handle_dirrect_message(
merchant.id, event.pubkey, event.id, event.created_at, clear_text_msg
)
if dm_content:
dm_event = merchant.build_dm_event(dm_content, event.pubkey)
await publish_nostr_event(dm_event)
async def _handle_outgoing_dms(
event: NostrEvent, merchant: Merchant, clear_text_msg: str
):
sent_to = event.tag_values("p")
if len(sent_to) != 0:
dm = PartialDirectMessage(
event_id=event.id,
event_created_at=event.created_at,
message=clear_text_msg, # exclude if json
public_key=sent_to[0],
incoming=True,
)
await create_direct_message(merchant.id, dm)
async def _handle_dirrect_message(
merchant_id: str, from_pubkey: str, event_id: str, event_created_at: int, msg: str
) -> Optional[str]:
order, text_msg = order_from_json(msg)
try:
if order:
order["pubkey"] = from_pubkey
order["event_id"] = event_id
order["event_created_at"] = event_created_at
return await _handle_new_order(PartialOrder(**order))
else:
print("### text_msg", text_msg)
dm = PartialDirectMessage(
event_id=event_id,
event_created_at=event_created_at,
message=text_msg,
public_key=from_pubkey,
incoming=True,
)
await create_direct_message(merchant_id, dm)
return None
except Exception as ex:
logger.warning(ex)
return None
async def _handle_new_order(order: PartialOrder) -> Optional[str]:
### todo: check that event_id not parsed already
order.validate_order()
first_product_id = order.items[0].product_id
wallet_id = await get_wallet_for_product(first_product_id)
assert wallet_id, f"Cannot find wallet id for product id: {first_product_id}"
wallet = await get_wallet(wallet_id)
assert wallet, f"Cannot find wallet for product id: {first_product_id}"
market_url = url_for(f"/nostrmarket/api/v1/order", external=True)
async with httpx.AsyncClient() as client:
resp = await client.post(
url=market_url,
headers={
"X-Api-Key": wallet.adminkey,
},
json=order.dict(),
)
resp.raise_for_status()
data = resp.json()
if data:
return json.dumps(data, separators=(",", ":"), ensure_ascii=False)
return None

127
tasks.py
View file

@ -2,28 +2,17 @@ import asyncio
import json import json
from asyncio import Queue from asyncio import Queue
import httpx
import websocket import websocket
from loguru import logger from loguru import logger
from websocket import WebSocketApp from websocket import WebSocketApp
from lnbits.core import get_wallet
from lnbits.core.models import Payment from lnbits.core.models import Payment
from lnbits.helpers import Optional, url_for
from lnbits.tasks import register_invoice_listener from lnbits.tasks import register_invoice_listener
from .crud import ( from .crud import get_public_keys_for_merchants
create_direct_message, from .nostr.nostr_client import connect_to_nostrclient_ws
get_merchant_by_pubkey, from .services import handle_order_paid, process_nostr_message
get_public_keys_for_merchants,
get_wallet_for_product,
)
from .helpers import order_from_json
from .models import PartialDirectMessage, PartialOrder
from .nostr.event import NostrEvent
from .nostr.nostr_client import connect_to_nostrclient_ws, publish_nostr_event
from .services import handle_order_paid
async def wait_for_paid_invoices(): async def wait_for_paid_invoices():
invoice_queue = Queue() invoice_queue = Queue()
@ -46,9 +35,6 @@ async def on_invoice_paid(payment: Payment) -> None:
await handle_order_paid(order_id, merchant_pubkey) await handle_order_paid(order_id, merchant_pubkey)
async def subscribe_to_nostr_client(recieve_event_queue: Queue, send_req_queue: Queue): async def subscribe_to_nostr_client(recieve_event_queue: Queue, send_req_queue: Queue):
print("### subscribe_nostrclient_ws") print("### subscribe_nostrclient_ws")
@ -91,107 +77,4 @@ async def wait_for_nostr_events(recieve_event_queue: Queue, send_req_queue: Queu
while True: while True:
message = await recieve_event_queue.get() message = await recieve_event_queue.get()
await handle_message(message) await process_nostr_message(message)
async def handle_message(msg: str):
try:
type, subscription_id, event = json.loads(msg)
subscription_name, public_key = subscription_id.split(":")
if type.upper() == "EVENT":
event = NostrEvent(**event)
if event.kind == 4:
await handle_nip04_message(subscription_name, public_key, event)
except Exception as ex:
logger.warning(ex)
async def handle_nip04_message(
subscription_name: str, public_key: str, event: NostrEvent
):
merchant = await get_merchant_by_pubkey(public_key)
assert merchant, f"Merchant not found for public key '{public_key}'"
clear_text_msg = merchant.decrypt_message(event.content, event.pubkey)
if subscription_name == "direct-messages-in":
await handle_incoming_dms(event, merchant, clear_text_msg)
else:
await handle_outgoing_dms(event, merchant, clear_text_msg)
async def handle_incoming_dms(event, merchant, clear_text_msg):
dm_content = await handle_dirrect_message(
merchant.id, event.pubkey, event.id, event.created_at, clear_text_msg
)
if dm_content:
dm_event = merchant.build_dm_event(dm_content, event.pubkey)
await publish_nostr_event(dm_event)
async def handle_outgoing_dms(event, merchant, clear_text_msg):
sent_to = event.tag_values("p")
if len(sent_to) != 0:
dm = PartialDirectMessage(
event_id=event.id,
event_created_at=event.created_at,
message=clear_text_msg, # exclude if json
public_key=sent_to[0],
incoming=True,
)
await create_direct_message(merchant.id, dm)
async def handle_dirrect_message(
merchant_id: str, from_pubkey: str, event_id: str, event_created_at: int, msg: str
) -> Optional[str]:
order, text_msg = order_from_json(msg)
try:
if order:
order["pubkey"] = from_pubkey
order["event_id"] = event_id
order["event_created_at"] = event_created_at
return await handle_new_order(PartialOrder(**order))
else:
print("### text_msg", text_msg)
dm = PartialDirectMessage(
event_id=event_id,
event_created_at=event_created_at,
message=text_msg,
public_key=from_pubkey,
incoming=True,
)
await create_direct_message(merchant_id, dm)
return None
except Exception as ex:
logger.warning(ex)
return None
async def handle_new_order(order: PartialOrder) -> Optional[str]:
### todo: check that event_id not parsed already
order.validate_order()
first_product_id = order.items[0].product_id
wallet_id = await get_wallet_for_product(first_product_id)
assert wallet_id, f"Cannot find wallet id for product id: {first_product_id}"
wallet = await get_wallet(wallet_id)
assert wallet, f"Cannot find wallet for product id: {first_product_id}"
market_url = url_for(f"/nostrmarket/api/v1/order", external=True)
async with httpx.AsyncClient() as client:
resp = await client.post(
url=market_url,
headers={
"X-Api-Key": wallet.adminkey,
},
json=order.dict(),
)
resp.raise_for_status()
data = resp.json()
if data:
return json.dumps(data, separators=(",", ":"), ensure_ascii=False)
return None

View file

@ -58,7 +58,7 @@ from .models import (
Zone, Zone,
) )
from .nostr.nostr_client import publish_nostr_event from .nostr.nostr_client import publish_nostr_event
from .services import create_order from .services import create_new_order, sign_and_send_to_nostr
######################################## MERCHANT ######################################## ######################################## MERCHANT ########################################
@ -455,7 +455,7 @@ async def api_create_order(
) -> Optional[PaymentRequest]: ) -> Optional[PaymentRequest]:
try: try:
# print("### new order: ", json.dumps(data.dict())) # print("### new order: ", json.dumps(data.dict()))
return await create_order(wallet.wallet.user, data) return await create_new_order(wallet.wallet.user, data)
except Exception as ex: except Exception as ex:
logger.warning(ex) logger.warning(ex)
raise HTTPException( raise HTTPException(