From f6b5034e740176c88a385e87322fe86a056abf14 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 16 Mar 2023 16:01:14 +0200 Subject: [PATCH] feat: extract `nostr_client` class --- __init__.py | 18 ++++--- nostr/nostr_client.py | 106 +++++++++++++++++++++++++++--------------- services.py | 8 ++-- tasks.py | 40 ++-------------- views_api.py | 15 ++---- 5 files changed, 90 insertions(+), 97 deletions(-) diff --git a/__init__.py b/__init__.py index 6f753a2..7ae1a3a 100644 --- a/__init__.py +++ b/__init__.py @@ -1,5 +1,5 @@ import asyncio -from asyncio import Queue, Task +from asyncio import Task from typing import List from fastapi import APIRouter @@ -26,16 +26,14 @@ def nostrmarket_renderer(): return template_renderer(["lnbits/extensions/nostrmarket/templates"]) -recieve_event_queue: Queue = Queue() -send_req_queue: Queue = Queue() +from .nostr.nostr_client import NostrClient + +nostr_client = NostrClient() + scheduled_tasks: List[Task] = [] -from .tasks import ( - subscribe_to_nostr_client, - wait_for_nostr_events, - wait_for_paid_invoices, -) +from .tasks import wait_for_nostr_events, wait_for_paid_invoices from .views import * # noqa from .views_api import * # noqa @@ -44,12 +42,12 @@ def nostrmarket_start(): async def _subscribe_to_nostr_client(): # wait for 'nostrclient' extension to initialize await asyncio.sleep(10) - await subscribe_to_nostr_client(recieve_event_queue, send_req_queue) + await nostr_client.subscribe_to_nostr_client() async def _wait_for_nostr_events(): # wait for this extension to initialize await asyncio.sleep(5) - await wait_for_nostr_events(recieve_event_queue) + await wait_for_nostr_events(nostr_client) loop = asyncio.get_event_loop() task1 = loop.create_task(catch_everything_and_restart(wait_for_paid_invoices)) diff --git a/nostr/nostr_client.py b/nostr/nostr_client.py index 87682f5..2204ba1 100644 --- a/nostr/nostr_client.py +++ b/nostr/nostr_client.py @@ -1,3 +1,6 @@ +import asyncio +import json +from asyncio import Queue from threading import Thread from typing import Callable @@ -6,53 +9,82 @@ from websocket import WebSocketApp from lnbits.app import settings -from .. import send_req_queue from .event import NostrEvent -async def publish_nostr_event(e: NostrEvent): - print("### publish_nostr_event", e.dict()) - await send_req_queue.put(["EVENT", e.dict()]) +class NostrClient: + def __init__(self): + self.recieve_event_queue: Queue = Queue() + self.send_req_queue: Queue = Queue() + self.ws: WebSocketApp = None + async def get_event(self): + return await self.recieve_event_queue.get() -async def connect_to_nostrclient_ws( - on_open: Callable, on_message: Callable -) -> WebSocketApp: - def on_error(_, error): - logger.warning(error) + async def publish_nostr_event(self, e: NostrEvent): + print("### publish_nostr_event", e.dict()) + await self.send_req_queue.put(["EVENT", e.dict()]) - logger.debug(f"Subscribing to websockets for nostrclient extension") - ws = WebSocketApp( - f"ws://localhost:{settings.port}/nostrclient/api/v1/relay", - on_message=on_message, - on_open=on_open, - on_error=on_error, - ) + async def connect_to_nostrclient_ws( + self, on_open: Callable, on_message: Callable + ) -> WebSocketApp: + def on_error(_, error): + logger.warning(error) - wst = Thread(target=ws.run_forever) - wst.daemon = True - wst.start() + logger.debug(f"Subscribing to websockets for nostrclient extension") + ws = WebSocketApp( + f"ws://localhost:{settings.port}/nostrclient/api/v1/relay", + on_message=on_message, + on_open=on_open, + on_error=on_error, + ) - return ws + wst = Thread(target=ws.run_forever) + wst.daemon = True + wst.start() + return ws -async def subscribe_to_direct_messages(public_key: str, since: int): - in_messages_filter = {"kind": 4, "#p": [public_key]} - out_messages_filter = {"kind": 4, "authors": [public_key]} - if since != 0: - in_messages_filter["since"] = since - out_messages_filter["since"] = since - print("### in_messages_filter", in_messages_filter) - print("### out_messages_filter", out_messages_filter) + async def subscribe_to_direct_messages(self, public_key: str, since: int): + in_messages_filter = {"kind": 4, "#p": [public_key]} + out_messages_filter = {"kind": 4, "authors": [public_key]} + if since != 0: + in_messages_filter["since"] = since + out_messages_filter["since"] = since + print("### in_messages_filter", in_messages_filter) + print("### out_messages_filter", out_messages_filter) - await send_req_queue.put( - ["REQ", f"direct-messages-in:{public_key}", in_messages_filter] - ) - await send_req_queue.put( - ["REQ", f"direct-messages-out:{public_key}", out_messages_filter] - ) + await self.send_req_queue.put( + ["REQ", f"direct-messages-in:{public_key}", in_messages_filter] + ) + await self.send_req_queue.put( + ["REQ", f"direct-messages-out:{public_key}", out_messages_filter] + ) + async def subscribe_to_nostr_client(self): + def on_open(_): + logger.info("Connected to 'nostrclient' websocket") -async def unsubscribe_from_direct_messages(public_key: str): - await send_req_queue.put(["CLOSE", f"direct-messages-in:{public_key}"]) - await send_req_queue.put(["CLOSE", f"direct-messages-out:{public_key}"]) + def on_message(_, message): + # print("### on_message", message) + self.recieve_event_queue.put_nowait(message) + + while True: + try: + req = None + if not self.ws: + self.ws = await self.connect_to_nostrclient_ws(on_open, on_message) + # be sure the connection is open + await asyncio.sleep(3) + req = await self.send_req_queue.get() + self.ws.send(json.dumps(req)) + except Exception as ex: + logger.warning(ex) + if req: + await self.send_req_queue.put(req) + self.ws = None # todo close + await asyncio.sleep(5) + + async def unsubscribe_from_direct_messages(self, public_key: str): + await self.send_req_queue.put(["CLOSE", f"direct-messages-in:{public_key}"]) + await self.send_req_queue.put(["CLOSE", f"direct-messages-out:{public_key}"]) diff --git a/services.py b/services.py index d262a51..13f5154 100644 --- a/services.py +++ b/services.py @@ -5,6 +5,7 @@ from loguru import logger from lnbits.core import create_invoice, get_wallet +from . import nostr_client from .crud import ( create_direct_message, create_order, @@ -32,7 +33,6 @@ from .models import ( Product, ) from .nostr.event import NostrEvent -from .nostr.nostr_client import publish_nostr_event async def create_new_order( @@ -97,7 +97,7 @@ async def sign_and_send_to_nostr( else n.to_nostr_event(merchant.public_key) ) event.sig = merchant.sign_hash(bytes.fromhex(event.id)) - await publish_nostr_event(event) + await nostr_client.publish_nostr_event(event) return event @@ -135,7 +135,7 @@ async def notify_client_of_order_status( dm_content = f"Order cannot be fulfilled. Reason: {message}" dm_event = merchant.build_dm_event(dm_content, order.public_key) - await publish_nostr_event(dm_event) + await nostr_client.publish_nostr_event(dm_event) async def update_products_for_order( @@ -226,7 +226,7 @@ async def _handle_incoming_dms( ) if dm_content: dm_event = merchant.build_dm_event(dm_content, event.pubkey) - await publish_nostr_event(dm_event) + await nostr_client.publish_nostr_event(dm_event) async def _handle_outgoing_dms( diff --git a/tasks.py b/tasks.py index bedd683..ad791e3 100644 --- a/tasks.py +++ b/tasks.py @@ -1,11 +1,5 @@ -import asyncio -import json from asyncio import Queue -import websocket -from loguru import logger -from websocket import WebSocketApp - from lnbits.core.models import Payment from lnbits.tasks import register_invoice_listener @@ -14,7 +8,7 @@ from .crud import ( get_last_order_time, get_public_keys_for_merchants, ) -from .nostr.nostr_client import connect_to_nostrclient_ws, subscribe_to_direct_messages +from .nostr.nostr_client import NostrClient from .services import handle_order_paid, process_nostr_message @@ -39,41 +33,15 @@ async def on_invoice_paid(payment: Payment) -> None: await handle_order_paid(order_id, merchant_pubkey) -async def subscribe_to_nostr_client(recieve_event_queue: Queue, send_req_queue: Queue): - def on_open(_): - logger.info("Connected to 'nostrclient' websocket") - - def on_message(_, message): - # print("### on_message", message) - recieve_event_queue.put_nowait(message) - - ws: WebSocketApp = None - while True: - try: - req = None - if not ws: - ws = await connect_to_nostrclient_ws(on_open, on_message) - # be sure the connection is open - await asyncio.sleep(3) - req = await send_req_queue.get() - ws.send(json.dumps(req)) - except Exception as ex: - logger.warning(ex) - if req: - await send_req_queue.put(req) - ws = None # todo close - await asyncio.sleep(5) - - -async def wait_for_nostr_events(recieve_event_queue: Queue): +async def wait_for_nostr_events(nostr_client: NostrClient): public_keys = await get_public_keys_for_merchants() for p in public_keys: last_order_time = await get_last_order_time(p) last_dm_time = await get_last_direct_messages_time(p) since = max(last_order_time, last_dm_time) - await subscribe_to_direct_messages(p, since) + await nostr_client.subscribe_to_direct_messages(p, since) while True: - message = await recieve_event_queue.get() + message = await nostr_client.get_event() await process_nostr_message(message) diff --git a/views_api.py b/views_api.py index 7728e1b..544bf9b 100644 --- a/views_api.py +++ b/views_api.py @@ -15,7 +15,7 @@ from lnbits.decorators import ( ) from lnbits.utils.exchange_rates import currencies -from . import nostrmarket_ext, scheduled_tasks +from . import nostr_client, nostrmarket_ext, scheduled_tasks from .crud import ( create_direct_message, create_merchant, @@ -62,11 +62,6 @@ from .models import ( Stall, Zone, ) -from .nostr.nostr_client import ( - publish_nostr_event, - subscribe_to_direct_messages, - unsubscribe_from_direct_messages, -) from .services import sign_and_send_to_nostr ######################################## MERCHANT ######################################## @@ -86,7 +81,7 @@ async def api_create_merchant( assert merchant == None, "A merchant already exists for this user" merchant = await create_merchant(wallet.wallet.user, data) - await subscribe_to_direct_messages(data.public_key, 0) + await nostr_client.subscribe_to_direct_messages(data.public_key, 0) return merchant except AssertionError as ex: @@ -135,7 +130,7 @@ async def api_delete_merchant( await delete_merchant_direct_messages(merchant.id) await delete_merchant_zones(merchant.id) - await unsubscribe_from_direct_messages(merchant.public_key) + await nostr_client.unsubscribe_from_direct_messages(merchant.public_key) await delete_merchant(merchant.id) except AssertionError as ex: raise HTTPException( @@ -660,7 +655,7 @@ async def api_update_order_status( dm_content = json.dumps(data.dict(), separators=(",", ":"), ensure_ascii=False) dm_event = merchant.build_dm_event(dm_content, order.public_key) - await publish_nostr_event(dm_event) + await nostr_client.publish_nostr_event(dm_event) return order @@ -716,7 +711,7 @@ async def api_create_message( data.event_created_at = dm_event.created_at dm = await create_direct_message(merchant.id, data) - await publish_nostr_event(dm_event) + await nostr_client.publish_nostr_event(dm_event) return dm except AssertionError as ex: