nostrmarket/nostr/nostr_client.py
Vlad Stan 889152a80b Fix subscription errors (#81)
* pref: merge filters in one

* chore: load nit

* feat: restore individual order
2023-09-12 15:03:37 +03:00

174 lines
5.9 KiB
Python

import asyncio
import json
from asyncio import Queue
from threading import Thread
from typing import Callable, List
from loguru import logger
from websocket import WebSocketApp
from lnbits.app import settings
from .event import NostrEvent
class NostrClient:
def __init__(self):
self.recieve_event_queue: Queue = Queue()
self.send_req_queue: Queue = Queue()
self.ws: WebSocketApp = None
async def connect_to_nostrclient_ws(
self, on_open: Callable, on_message: Callable
) -> WebSocketApp:
def on_error(_, error):
logger.warning(error)
logger.debug(f"Subscribing to websockets for nostrclient extension")
ws = WebSocketApp(
f"ws://localhost:{settings.port}/nostrclient/api/v1/relay",
on_message=on_message,
on_open=on_open,
on_error=on_error,
)
wst = Thread(target=ws.run_forever)
wst.daemon = True
wst.start()
return ws
async def get_event(self):
value = await self.recieve_event_queue.get()
if isinstance(value, ValueError):
raise value
return value
async def run_forever(self):
def on_open(_):
logger.info("Connected to 'nostrclient' websocket")
def on_message(_, message):
self.recieve_event_queue.put_nowait(message)
running = True
while running:
try:
req = None
if not self.ws:
self.ws = await self.connect_to_nostrclient_ws(on_open, on_message)
# be sure the connection is open
await asyncio.sleep(3)
req = await self.send_req_queue.get()
if isinstance(req, ValueError):
running = False
logger.warning(str(req))
else:
self.ws.send(json.dumps(req))
except Exception as ex:
logger.warning(ex)
if req:
await self.send_req_queue.put(req)
self.ws = None # todo close
await asyncio.sleep(5)
async def publish_nostr_event(self, e: NostrEvent):
await self.send_req_queue.put(["EVENT", e.dict()])
async def subscribe_merchant(self, public_key: str, since = 0):
dm_filters = self._filters_for_direct_messages(public_key, since)
stall_filters = self._filters_for_stall_events(public_key, since)
product_filters = self._filters_for_product_events(public_key, since)
profile_filters = self._filters_for_user_profile(public_key, since)
# merchant_filters = [json.dumps(f) for f in dm_filters + stall_filters + product_filters + profile_filters]
merchant_filters = dm_filters + stall_filters + product_filters + profile_filters
await self.send_req_queue.put(
["REQ", f"merchant:{public_key}"] + merchant_filters
)
logger.debug(f"Subscribed to events for: '{public_key}'.")
# print("### merchant_filters", merchant_filters)
def _filters_for_direct_messages(self, public_key: str, since: int) -> List:
in_messages_filter = {"kinds": [4], "#p": [public_key]}
out_messages_filter = {"kinds": [4], "authors": [public_key]}
if since and since != 0:
in_messages_filter["since"] = since
out_messages_filter["since"] = since
return [in_messages_filter, out_messages_filter]
def _filters_for_stall_events(self, public_key: str, since: int) -> List:
stall_filter = {"kinds": [30017], "authors": [public_key]}
if since and since != 0:
stall_filter["since"] = since
return [stall_filter]
def _filters_for_product_events(self, public_key: str, since: int) -> List:
product_filter = {"kinds": [30018], "authors": [public_key]}
if since and since != 0:
product_filter["since"] = since
return [product_filter]
def _filters_for_user_profile(self, public_key: str, since: int) -> List:
profile_filter = {"kinds": [0], "authors": [public_key]}
if since and since != 0:
profile_filter["since"] = since + 1
return [profile_filter]
def subscribe_to_user_profile(self, public_key: str, since: int) -> List:
profile_filter = {"kinds": [0], "authors": [public_key]}
if since and since != 0:
profile_filter["since"] = since + 1
# Disabled for now. The number of clients can grow large.
# Some relays only allow a small number of subscriptions.
# There is the risk that more important subscriptions will be blocked.
# await self.send_req_queue.put(
# ["REQ", f"user-profile-events:{public_key}", profile_filter]
# )
async def restart(self, public_keys: List[str]):
await self.unsubscribe_merchants(public_keys)
# Give some time for the CLOSE events to propagate before restarting
await asyncio.sleep(10)
logger.info("Restating NostrClient...")
await self.send_req_queue.put(ValueError("Restarting NostrClient..."))
await self.recieve_event_queue.put(ValueError("Restarting NostrClient..."))
self.ws.close()
self.ws = None
async def stop(self, public_keys: List[str]):
await self.unsubscribe_merchants(public_keys)
# Give some time for the CLOSE events to propagate before closing the connection
await asyncio.sleep(10)
self.ws.close()
self.ws = None
async def unsubscribe_merchant(self, public_key: str):
await self.send_req_queue.put(["CLOSE", public_key])
logger.debug(f"Unsubscribed from merchant events: '{public_key}'.")
async def unsubscribe_merchants(self, public_keys: List[str]):
for pk in public_keys:
try:
await self.unsubscribe_merchant(pk)
except Exception as ex:
logger.warning(ex)