diff --git a/services.py b/services.py index ca32d40..b499900 100644 --- a/services.py +++ b/services.py @@ -1,10 +1,14 @@ import json from typing import Optional + +import httpx from loguru import logger -from lnbits.core import create_invoice +from lnbits.core import create_invoice, get_wallet, url_for from .crud import ( + create_direct_message, + create_order, get_merchant_by_pubkey, get_merchant_for_user, get_order, @@ -13,11 +17,14 @@ from .crud import ( get_wallet_for_product, update_order_paid_status, ) +from .helpers import order_from_json from .models import ( + Merchant, Nostrable, Order, OrderExtra, OrderStatusUpdate, + PartialDirectMessage, PartialOrder, PaymentOption, PaymentRequest, @@ -26,7 +33,9 @@ from .nostr.event import NostrEvent from .nostr.nostr_client import publish_nostr_event -async def create_order(user_id: str, data: PartialOrder) -> Optional[PaymentRequest]: +async def create_new_order( + user_id: str, data: PartialOrder +) -> Optional[PaymentRequest]: if await get_order(user_id, data.id): return None if data.event_id and await get_order_by_event_id(user_id, data.event_id): @@ -102,4 +111,111 @@ async def handle_order_paid(order_id: str, merchant_pubkey: str): dm_event = merchant.build_dm_event(dm_content, order.pubkey) await publish_nostr_event(dm_event) except Exception as ex: - logger.warning(ex) \ No newline at end of file + logger.warning(ex) + + +async def process_nostr_message(msg: str): + try: + type, subscription_id, event = json.loads(msg) + subscription_name, public_key = subscription_id.split(":") + if type.upper() == "EVENT": + event = NostrEvent(**event) + if event.kind == 4: + await _handle_nip04_message(subscription_name, public_key, event) + + except Exception as ex: + logger.warning(ex) + + +async def _handle_nip04_message( + subscription_name: str, public_key: str, event: NostrEvent +): + merchant = await get_merchant_by_pubkey(public_key) + assert merchant, f"Merchant not found for public key '{public_key}'" + + clear_text_msg = merchant.decrypt_message(event.content, event.pubkey) + if subscription_name == "direct-messages-in": + await _handle_incoming_dms(event, merchant, clear_text_msg) + else: + await _handle_outgoing_dms(event, merchant, clear_text_msg) + + +async def _handle_incoming_dms( + event: NostrEvent, merchant: Merchant, clear_text_msg: str +): + dm_content = await _handle_dirrect_message( + merchant.id, event.pubkey, event.id, event.created_at, clear_text_msg + ) + if dm_content: + dm_event = merchant.build_dm_event(dm_content, event.pubkey) + await publish_nostr_event(dm_event) + + +async def _handle_outgoing_dms( + event: NostrEvent, merchant: Merchant, clear_text_msg: str +): + sent_to = event.tag_values("p") + if len(sent_to) != 0: + dm = PartialDirectMessage( + event_id=event.id, + event_created_at=event.created_at, + message=clear_text_msg, # exclude if json + public_key=sent_to[0], + incoming=True, + ) + await create_direct_message(merchant.id, dm) + + +async def _handle_dirrect_message( + merchant_id: str, from_pubkey: str, event_id: str, event_created_at: int, msg: str +) -> Optional[str]: + order, text_msg = order_from_json(msg) + try: + if order: + order["pubkey"] = from_pubkey + order["event_id"] = event_id + order["event_created_at"] = event_created_at + return await _handle_new_order(PartialOrder(**order)) + else: + print("### text_msg", text_msg) + dm = PartialDirectMessage( + event_id=event_id, + event_created_at=event_created_at, + message=text_msg, + public_key=from_pubkey, + incoming=True, + ) + await create_direct_message(merchant_id, dm) + return None + except Exception as ex: + logger.warning(ex) + return None + + +async def _handle_new_order(order: PartialOrder) -> Optional[str]: + ### todo: check that event_id not parsed already + + order.validate_order() + + first_product_id = order.items[0].product_id + wallet_id = await get_wallet_for_product(first_product_id) + assert wallet_id, f"Cannot find wallet id for product id: {first_product_id}" + + wallet = await get_wallet(wallet_id) + assert wallet, f"Cannot find wallet for product id: {first_product_id}" + + market_url = url_for(f"/nostrmarket/api/v1/order", external=True) + async with httpx.AsyncClient() as client: + resp = await client.post( + url=market_url, + headers={ + "X-Api-Key": wallet.adminkey, + }, + json=order.dict(), + ) + resp.raise_for_status() + data = resp.json() + if data: + return json.dumps(data, separators=(",", ":"), ensure_ascii=False) + + return None diff --git a/tasks.py b/tasks.py index 1b961f6..c1a20ba 100644 --- a/tasks.py +++ b/tasks.py @@ -2,28 +2,17 @@ import asyncio import json from asyncio import Queue -import httpx - import websocket from loguru import logger from websocket import WebSocketApp -from lnbits.core import get_wallet from lnbits.core.models import Payment -from lnbits.helpers import Optional, url_for from lnbits.tasks import register_invoice_listener -from .crud import ( - create_direct_message, - get_merchant_by_pubkey, - get_public_keys_for_merchants, - get_wallet_for_product, -) -from .helpers import order_from_json -from .models import PartialDirectMessage, PartialOrder -from .nostr.event import NostrEvent -from .nostr.nostr_client import connect_to_nostrclient_ws, publish_nostr_event -from .services import handle_order_paid +from .crud import get_public_keys_for_merchants +from .nostr.nostr_client import connect_to_nostrclient_ws +from .services import handle_order_paid, process_nostr_message + async def wait_for_paid_invoices(): invoice_queue = Queue() @@ -46,9 +35,6 @@ async def on_invoice_paid(payment: Payment) -> None: await handle_order_paid(order_id, merchant_pubkey) - - - async def subscribe_to_nostr_client(recieve_event_queue: Queue, send_req_queue: Queue): print("### subscribe_nostrclient_ws") @@ -91,107 +77,4 @@ async def wait_for_nostr_events(recieve_event_queue: Queue, send_req_queue: Queu while True: message = await recieve_event_queue.get() - await handle_message(message) - - -async def handle_message(msg: str): - try: - type, subscription_id, event = json.loads(msg) - subscription_name, public_key = subscription_id.split(":") - if type.upper() == "EVENT": - event = NostrEvent(**event) - if event.kind == 4: - await handle_nip04_message(subscription_name, public_key, event) - - except Exception as ex: - logger.warning(ex) - - -async def handle_nip04_message( - subscription_name: str, public_key: str, event: NostrEvent -): - merchant = await get_merchant_by_pubkey(public_key) - assert merchant, f"Merchant not found for public key '{public_key}'" - - clear_text_msg = merchant.decrypt_message(event.content, event.pubkey) - if subscription_name == "direct-messages-in": - await handle_incoming_dms(event, merchant, clear_text_msg) - else: - await handle_outgoing_dms(event, merchant, clear_text_msg) - - -async def handle_incoming_dms(event, merchant, clear_text_msg): - dm_content = await handle_dirrect_message( - merchant.id, event.pubkey, event.id, event.created_at, clear_text_msg - ) - if dm_content: - dm_event = merchant.build_dm_event(dm_content, event.pubkey) - await publish_nostr_event(dm_event) - - -async def handle_outgoing_dms(event, merchant, clear_text_msg): - sent_to = event.tag_values("p") - if len(sent_to) != 0: - dm = PartialDirectMessage( - event_id=event.id, - event_created_at=event.created_at, - message=clear_text_msg, # exclude if json - public_key=sent_to[0], - incoming=True, - ) - await create_direct_message(merchant.id, dm) - - -async def handle_dirrect_message( - merchant_id: str, from_pubkey: str, event_id: str, event_created_at: int, msg: str -) -> Optional[str]: - order, text_msg = order_from_json(msg) - try: - if order: - order["pubkey"] = from_pubkey - order["event_id"] = event_id - order["event_created_at"] = event_created_at - return await handle_new_order(PartialOrder(**order)) - else: - print("### text_msg", text_msg) - dm = PartialDirectMessage( - event_id=event_id, - event_created_at=event_created_at, - message=text_msg, - public_key=from_pubkey, - incoming=True, - ) - await create_direct_message(merchant_id, dm) - return None - except Exception as ex: - logger.warning(ex) - return None - - -async def handle_new_order(order: PartialOrder) -> Optional[str]: - ### todo: check that event_id not parsed already - - order.validate_order() - - first_product_id = order.items[0].product_id - wallet_id = await get_wallet_for_product(first_product_id) - assert wallet_id, f"Cannot find wallet id for product id: {first_product_id}" - - wallet = await get_wallet(wallet_id) - assert wallet, f"Cannot find wallet for product id: {first_product_id}" - - market_url = url_for(f"/nostrmarket/api/v1/order", external=True) - async with httpx.AsyncClient() as client: - resp = await client.post( - url=market_url, - headers={ - "X-Api-Key": wallet.adminkey, - }, - json=order.dict(), - ) - resp.raise_for_status() - data = resp.json() - if data: - return json.dumps(data, separators=(",", ":"), ensure_ascii=False) - - return None + await process_nostr_message(message) diff --git a/views_api.py b/views_api.py index f1fd193..05f077f 100644 --- a/views_api.py +++ b/views_api.py @@ -58,7 +58,7 @@ from .models import ( Zone, ) from .nostr.nostr_client import publish_nostr_event -from .services import create_order +from .services import create_new_order, sign_and_send_to_nostr ######################################## MERCHANT ######################################## @@ -455,7 +455,7 @@ async def api_create_order( ) -> Optional[PaymentRequest]: try: # print("### new order: ", json.dumps(data.dict())) - return await create_order(wallet.wallet.user, data) + return await create_new_order(wallet.wallet.user, data) except Exception as ex: logger.warning(ex) raise HTTPException(