feat: extract nostr_client class

This commit is contained in:
Vlad Stan 2023-03-16 16:01:14 +02:00
parent 45c7744282
commit f6b5034e74
5 changed files with 90 additions and 97 deletions

View file

@ -1,5 +1,5 @@
import asyncio import asyncio
from asyncio import Queue, Task from asyncio import Task
from typing import List from typing import List
from fastapi import APIRouter from fastapi import APIRouter
@ -26,16 +26,14 @@ def nostrmarket_renderer():
return template_renderer(["lnbits/extensions/nostrmarket/templates"]) return template_renderer(["lnbits/extensions/nostrmarket/templates"])
recieve_event_queue: Queue = Queue() from .nostr.nostr_client import NostrClient
send_req_queue: Queue = Queue()
nostr_client = NostrClient()
scheduled_tasks: List[Task] = [] scheduled_tasks: List[Task] = []
from .tasks import ( from .tasks import wait_for_nostr_events, wait_for_paid_invoices
subscribe_to_nostr_client,
wait_for_nostr_events,
wait_for_paid_invoices,
)
from .views import * # noqa from .views import * # noqa
from .views_api import * # noqa from .views_api import * # noqa
@ -44,12 +42,12 @@ def nostrmarket_start():
async def _subscribe_to_nostr_client(): async def _subscribe_to_nostr_client():
# wait for 'nostrclient' extension to initialize # wait for 'nostrclient' extension to initialize
await asyncio.sleep(10) await asyncio.sleep(10)
await subscribe_to_nostr_client(recieve_event_queue, send_req_queue) await nostr_client.subscribe_to_nostr_client()
async def _wait_for_nostr_events(): async def _wait_for_nostr_events():
# wait for this extension to initialize # wait for this extension to initialize
await asyncio.sleep(5) await asyncio.sleep(5)
await wait_for_nostr_events(recieve_event_queue) await wait_for_nostr_events(nostr_client)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
task1 = loop.create_task(catch_everything_and_restart(wait_for_paid_invoices)) task1 = loop.create_task(catch_everything_and_restart(wait_for_paid_invoices))

View file

@ -1,3 +1,6 @@
import asyncio
import json
from asyncio import Queue
from threading import Thread from threading import Thread
from typing import Callable from typing import Callable
@ -6,18 +9,25 @@ from websocket import WebSocketApp
from lnbits.app import settings from lnbits.app import settings
from .. import send_req_queue
from .event import NostrEvent from .event import NostrEvent
async def publish_nostr_event(e: NostrEvent): class NostrClient:
def __init__(self):
self.recieve_event_queue: Queue = Queue()
self.send_req_queue: Queue = Queue()
self.ws: WebSocketApp = None
async def get_event(self):
return await self.recieve_event_queue.get()
async def publish_nostr_event(self, e: NostrEvent):
print("### publish_nostr_event", e.dict()) print("### publish_nostr_event", e.dict())
await send_req_queue.put(["EVENT", e.dict()]) await self.send_req_queue.put(["EVENT", e.dict()])
async def connect_to_nostrclient_ws(
async def connect_to_nostrclient_ws( self, on_open: Callable, on_message: Callable
on_open: Callable, on_message: Callable ) -> WebSocketApp:
) -> WebSocketApp:
def on_error(_, error): def on_error(_, error):
logger.warning(error) logger.warning(error)
@ -35,8 +45,7 @@ async def connect_to_nostrclient_ws(
return ws return ws
async def subscribe_to_direct_messages(self, public_key: str, since: int):
async def subscribe_to_direct_messages(public_key: str, since: int):
in_messages_filter = {"kind": 4, "#p": [public_key]} in_messages_filter = {"kind": 4, "#p": [public_key]}
out_messages_filter = {"kind": 4, "authors": [public_key]} out_messages_filter = {"kind": 4, "authors": [public_key]}
if since != 0: if since != 0:
@ -45,14 +54,37 @@ async def subscribe_to_direct_messages(public_key: str, since: int):
print("### in_messages_filter", in_messages_filter) print("### in_messages_filter", in_messages_filter)
print("### out_messages_filter", out_messages_filter) print("### out_messages_filter", out_messages_filter)
await send_req_queue.put( await self.send_req_queue.put(
["REQ", f"direct-messages-in:{public_key}", in_messages_filter] ["REQ", f"direct-messages-in:{public_key}", in_messages_filter]
) )
await send_req_queue.put( await self.send_req_queue.put(
["REQ", f"direct-messages-out:{public_key}", out_messages_filter] ["REQ", f"direct-messages-out:{public_key}", out_messages_filter]
) )
async def subscribe_to_nostr_client(self):
def on_open(_):
logger.info("Connected to 'nostrclient' websocket")
async def unsubscribe_from_direct_messages(public_key: str): def on_message(_, message):
await send_req_queue.put(["CLOSE", f"direct-messages-in:{public_key}"]) # print("### on_message", message)
await send_req_queue.put(["CLOSE", f"direct-messages-out:{public_key}"]) self.recieve_event_queue.put_nowait(message)
while True:
try:
req = None
if not self.ws:
self.ws = await self.connect_to_nostrclient_ws(on_open, on_message)
# be sure the connection is open
await asyncio.sleep(3)
req = await self.send_req_queue.get()
self.ws.send(json.dumps(req))
except Exception as ex:
logger.warning(ex)
if req:
await self.send_req_queue.put(req)
self.ws = None # todo close
await asyncio.sleep(5)
async def unsubscribe_from_direct_messages(self, public_key: str):
await self.send_req_queue.put(["CLOSE", f"direct-messages-in:{public_key}"])
await self.send_req_queue.put(["CLOSE", f"direct-messages-out:{public_key}"])

View file

@ -5,6 +5,7 @@ from loguru import logger
from lnbits.core import create_invoice, get_wallet from lnbits.core import create_invoice, get_wallet
from . import nostr_client
from .crud import ( from .crud import (
create_direct_message, create_direct_message,
create_order, create_order,
@ -32,7 +33,6 @@ from .models import (
Product, Product,
) )
from .nostr.event import NostrEvent from .nostr.event import NostrEvent
from .nostr.nostr_client import publish_nostr_event
async def create_new_order( async def create_new_order(
@ -97,7 +97,7 @@ async def sign_and_send_to_nostr(
else n.to_nostr_event(merchant.public_key) else n.to_nostr_event(merchant.public_key)
) )
event.sig = merchant.sign_hash(bytes.fromhex(event.id)) event.sig = merchant.sign_hash(bytes.fromhex(event.id))
await publish_nostr_event(event) await nostr_client.publish_nostr_event(event)
return event return event
@ -135,7 +135,7 @@ async def notify_client_of_order_status(
dm_content = f"Order cannot be fulfilled. Reason: {message}" dm_content = f"Order cannot be fulfilled. Reason: {message}"
dm_event = merchant.build_dm_event(dm_content, order.public_key) dm_event = merchant.build_dm_event(dm_content, order.public_key)
await publish_nostr_event(dm_event) await nostr_client.publish_nostr_event(dm_event)
async def update_products_for_order( async def update_products_for_order(
@ -226,7 +226,7 @@ async def _handle_incoming_dms(
) )
if dm_content: if dm_content:
dm_event = merchant.build_dm_event(dm_content, event.pubkey) dm_event = merchant.build_dm_event(dm_content, event.pubkey)
await publish_nostr_event(dm_event) await nostr_client.publish_nostr_event(dm_event)
async def _handle_outgoing_dms( async def _handle_outgoing_dms(

View file

@ -1,11 +1,5 @@
import asyncio
import json
from asyncio import Queue from asyncio import Queue
import websocket
from loguru import logger
from websocket import WebSocketApp
from lnbits.core.models import Payment from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener from lnbits.tasks import register_invoice_listener
@ -14,7 +8,7 @@ from .crud import (
get_last_order_time, get_last_order_time,
get_public_keys_for_merchants, get_public_keys_for_merchants,
) )
from .nostr.nostr_client import connect_to_nostrclient_ws, subscribe_to_direct_messages from .nostr.nostr_client import NostrClient
from .services import handle_order_paid, process_nostr_message from .services import handle_order_paid, process_nostr_message
@ -39,41 +33,15 @@ async def on_invoice_paid(payment: Payment) -> None:
await handle_order_paid(order_id, merchant_pubkey) await handle_order_paid(order_id, merchant_pubkey)
async def subscribe_to_nostr_client(recieve_event_queue: Queue, send_req_queue: Queue): async def wait_for_nostr_events(nostr_client: NostrClient):
def on_open(_):
logger.info("Connected to 'nostrclient' websocket")
def on_message(_, message):
# print("### on_message", message)
recieve_event_queue.put_nowait(message)
ws: WebSocketApp = None
while True:
try:
req = None
if not ws:
ws = await connect_to_nostrclient_ws(on_open, on_message)
# be sure the connection is open
await asyncio.sleep(3)
req = await send_req_queue.get()
ws.send(json.dumps(req))
except Exception as ex:
logger.warning(ex)
if req:
await send_req_queue.put(req)
ws = None # todo close
await asyncio.sleep(5)
async def wait_for_nostr_events(recieve_event_queue: Queue):
public_keys = await get_public_keys_for_merchants() public_keys = await get_public_keys_for_merchants()
for p in public_keys: for p in public_keys:
last_order_time = await get_last_order_time(p) last_order_time = await get_last_order_time(p)
last_dm_time = await get_last_direct_messages_time(p) last_dm_time = await get_last_direct_messages_time(p)
since = max(last_order_time, last_dm_time) since = max(last_order_time, last_dm_time)
await subscribe_to_direct_messages(p, since) await nostr_client.subscribe_to_direct_messages(p, since)
while True: while True:
message = await recieve_event_queue.get() message = await nostr_client.get_event()
await process_nostr_message(message) await process_nostr_message(message)

View file

@ -15,7 +15,7 @@ from lnbits.decorators import (
) )
from lnbits.utils.exchange_rates import currencies from lnbits.utils.exchange_rates import currencies
from . import nostrmarket_ext, scheduled_tasks from . import nostr_client, nostrmarket_ext, scheduled_tasks
from .crud import ( from .crud import (
create_direct_message, create_direct_message,
create_merchant, create_merchant,
@ -62,11 +62,6 @@ from .models import (
Stall, Stall,
Zone, Zone,
) )
from .nostr.nostr_client import (
publish_nostr_event,
subscribe_to_direct_messages,
unsubscribe_from_direct_messages,
)
from .services import sign_and_send_to_nostr from .services import sign_and_send_to_nostr
######################################## MERCHANT ######################################## ######################################## MERCHANT ########################################
@ -86,7 +81,7 @@ async def api_create_merchant(
assert merchant == None, "A merchant already exists for this user" assert merchant == None, "A merchant already exists for this user"
merchant = await create_merchant(wallet.wallet.user, data) merchant = await create_merchant(wallet.wallet.user, data)
await subscribe_to_direct_messages(data.public_key, 0) await nostr_client.subscribe_to_direct_messages(data.public_key, 0)
return merchant return merchant
except AssertionError as ex: except AssertionError as ex:
@ -135,7 +130,7 @@ async def api_delete_merchant(
await delete_merchant_direct_messages(merchant.id) await delete_merchant_direct_messages(merchant.id)
await delete_merchant_zones(merchant.id) await delete_merchant_zones(merchant.id)
await unsubscribe_from_direct_messages(merchant.public_key) await nostr_client.unsubscribe_from_direct_messages(merchant.public_key)
await delete_merchant(merchant.id) await delete_merchant(merchant.id)
except AssertionError as ex: except AssertionError as ex:
raise HTTPException( raise HTTPException(
@ -660,7 +655,7 @@ async def api_update_order_status(
dm_content = json.dumps(data.dict(), separators=(",", ":"), ensure_ascii=False) dm_content = json.dumps(data.dict(), separators=(",", ":"), ensure_ascii=False)
dm_event = merchant.build_dm_event(dm_content, order.public_key) dm_event = merchant.build_dm_event(dm_content, order.public_key)
await publish_nostr_event(dm_event) await nostr_client.publish_nostr_event(dm_event)
return order return order
@ -716,7 +711,7 @@ async def api_create_message(
data.event_created_at = dm_event.created_at data.event_created_at = dm_event.created_at
dm = await create_direct_message(merchant.id, data) dm = await create_direct_message(merchant.id, data)
await publish_nostr_event(dm_event) await nostr_client.publish_nostr_event(dm_event)
return dm return dm
except AssertionError as ex: except AssertionError as ex: