nostrmarket/nostr/nostr_client.py
padreug 429522adba Improves Nostr message handling and error logging
Enhances the processing of Nostr messages by adding more robust error handling and logging, providing better insights into potential issues.

Specifically:
- Improves the checks on the websocket connection to log errors and debug information.
- Implements more comprehensive error logging for failed product quantity checks.
- Enhances logging and validation of EVENT messages to prevent potential errors.
- Implements a more robust merchant lookup logic to avoid double processing of events.
- Implements a more lenient time window for direct message subscriptions.
2025-11-04 00:52:36 +01:00

223 lines
8 KiB
Python

import asyncio
import json
from asyncio import Queue
from threading import Thread
from typing import Callable, List, Optional
from loguru import logger
from websocket import WebSocketApp
from lnbits.settings import settings
from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash
from .event import NostrEvent
class NostrClient:
def __init__(self):
self.recieve_event_queue: Queue = Queue()
self.send_req_queue: Queue = Queue()
self.ws: Optional[WebSocketApp] = None
self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32]
self.running = False
@property
def is_websocket_connected(self):
if not self.ws:
return False
return self.ws.keep_running
async def connect_to_nostrclient_ws(self) -> WebSocketApp:
logger.debug(f"Connecting to websockets for 'nostrclient' extension...")
relay_endpoint = encrypt_internal_message("relay", urlsafe=True)
ws_url = f"ws://localhost:{settings.port}/nostrclient/api/v1/{relay_endpoint}"
on_open, on_message, on_error, on_close = self._ws_handlers()
ws = WebSocketApp(
ws_url,
on_message=on_message,
on_open=on_open,
on_close=on_close,
on_error=on_error,
)
wst = Thread(target=ws.run_forever)
wst.daemon = True
wst.start()
return ws
async def run_forever(self):
self.running = True
while self.running:
try:
if not self.is_websocket_connected:
self.ws = await self.connect_to_nostrclient_ws()
# be sure the connection is open
await asyncio.sleep(5)
req = await self.send_req_queue.get()
assert self.ws
self.ws.send(json.dumps(req))
except Exception as ex:
logger.warning(ex)
await asyncio.sleep(60)
async def get_event(self):
value = await self.recieve_event_queue.get()
if isinstance(value, ValueError):
logger.error(f"[NOSTRMARKET] ❌ Queue returned error: {value}")
raise value
return value
async def publish_nostr_event(self, e: NostrEvent):
await self.send_req_queue.put(["EVENT", e.dict()])
async def subscribe_merchants(
self,
public_keys: List[str],
dm_time=0,
stall_time=0,
product_time=0,
profile_time=0,
):
dm_filters = self._filters_for_direct_messages(public_keys, dm_time)
stall_filters = self._filters_for_stall_events(public_keys, stall_time)
product_filters = self._filters_for_product_events(public_keys, product_time)
profile_filters = self._filters_for_user_profile(public_keys, profile_time)
merchant_filters = (
dm_filters + stall_filters + product_filters + profile_filters
)
self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32]
await self.send_req_queue.put(["REQ", self.subscription_id] + merchant_filters)
async def merchant_temp_subscription(self, pk, duration=10):
dm_filters = self._filters_for_direct_messages([pk], 0)
stall_filters = self._filters_for_stall_events([pk], 0)
product_filters = self._filters_for_product_events([pk], 0)
profile_filters = self._filters_for_user_profile([pk], 0)
merchant_filters = (
dm_filters + stall_filters + product_filters + profile_filters
)
subscription_id = "merchant-" + urlsafe_short_hash()[:32]
logger.debug(
f"New merchant temp subscription ({duration} sec). Subscription id: {subscription_id}"
)
await self.send_req_queue.put(["REQ", subscription_id] + merchant_filters)
async def unsubscribe_with_delay(sub_id, d):
await asyncio.sleep(d)
await self.unsubscribe(sub_id)
asyncio.create_task(unsubscribe_with_delay(subscription_id, duration))
async def user_profile_temp_subscribe(self, public_key: str, duration=5):
try:
profile_filter = [{"kinds": [0], "authors": [public_key]}]
subscription_id = "profile-" + urlsafe_short_hash()[:32]
logger.debug(
f"New user temp subscription ({duration} sec). Subscription id: {subscription_id}"
)
await self.send_req_queue.put(["REQ", subscription_id] + profile_filter)
async def unsubscribe_with_delay(sub_id, d):
await asyncio.sleep(d)
await self.unsubscribe(sub_id)
asyncio.create_task(unsubscribe_with_delay(subscription_id, duration))
except Exception as ex:
logger.debug(ex)
def _filters_for_direct_messages(self, public_keys: List[str], since: int) -> List:
in_messages_filter = {"kinds": [4], "#p": public_keys}
out_messages_filter = {"kinds": [4], "authors": public_keys}
if since and since != 0:
in_messages_filter["since"] = since
out_messages_filter["since"] = since
return [in_messages_filter, out_messages_filter]
def _filters_for_stall_events(self, public_keys: List[str], since: int) -> List:
stall_filter = {"kinds": [30017], "authors": public_keys}
if since and since != 0:
stall_filter["since"] = since
return [stall_filter]
def _filters_for_product_events(self, public_keys: List[str], since: int) -> List:
product_filter = {"kinds": [30018], "authors": public_keys}
if since and since != 0:
product_filter["since"] = since
return [product_filter]
def _filters_for_user_profile(self, public_keys: List[str], since: int) -> List:
profile_filter = {"kinds": [0], "authors": public_keys}
if since and since != 0:
profile_filter["since"] = since
return [profile_filter]
def _safe_ws_stop(self):
if not self.ws:
return
try:
self.ws.close()
except:
pass
self.ws = None
def _ws_handlers(self):
def on_open(_):
logger.debug("[NOSTRMARKET DEBUG] ✅ Connected to 'nostrclient' websocket successfully")
def on_message(_, message):
logger.debug(f"[NOSTRMARKET DEBUG] 📨 Received websocket message: {message[:200]}...")
try:
self.recieve_event_queue.put_nowait(message)
logger.debug(f"[NOSTRMARKET DEBUG] 📤 Message queued successfully")
except Exception as e:
logger.error(f"[NOSTRMARKET] ❌ Failed to queue message: {e}")
def on_error(_, error):
logger.warning(f"[NOSTRMARKET] ❌ Websocket error: {error}")
def on_close(x, status_code, message):
logger.warning(f"[NOSTRMARKET] 🔌 Websocket closed: {x}: '{status_code}' '{message}'")
# force re-subscribe
self.recieve_event_queue.put_nowait(ValueError("Websocket close."))
return on_open, on_message, on_error, on_close
async def restart(self):
await self.unsubscribe_merchants()
# Give some time for the CLOSE events to propagate before restarting
await asyncio.sleep(10)
logger.info("Restarting NostrClient...")
await self.recieve_event_queue.put(ValueError("Restarting NostrClient..."))
self._safe_ws_stop()
async def stop(self):
await self.unsubscribe_merchants()
self.running = False
# Give some time for the CLOSE events to propagate before closing the connection
await asyncio.sleep(10)
self._safe_ws_stop()
async def unsubscribe_merchants(self):
await self.send_req_queue.put(["CLOSE", self.subscription_id])
logger.debug(
f"Unsubscribed from all merchants events. Subscription id: {self.subscription_id}"
)
async def unsubscribe(self, subscription_id):
await self.send_req_queue.put(["CLOSE", subscription_id])
logger.debug(f"Unsubscribed from subscription id: {subscription_id}")