nostrclient/router.py
PatMulligan e66f997853
Some checks failed
CI / lint (push) Has been cancelled
/ release (push) Has been cancelled
CI / tests (push) Has been cancelled
/ pullrequest (push) Has been cancelled
FIX: Ensure valid json (#39)
* Build EVENT message with json.dumps instead of string interpolation

Ensures outbound Nostr messages are valid JSON and safely escaped by
constructing the payload as Python objects and serializing with
json.dumps

* improve logs

* remove log causing check failure
2025-09-15 13:42:33 +03:00

175 lines
6.3 KiB
Python

import asyncio
import json
from typing import ClassVar
from fastapi import WebSocket, WebSocketDisconnect
from lnbits.helpers import urlsafe_short_hash
from loguru import logger
from .nostr.client.client import NostrClient
# from . import nostr_client
from .nostr.message_pool import EndOfStoredEventsMessage, EventMessage, NoticeMessage
nostr_client: NostrClient = NostrClient()
all_routers: list["NostrRouter"] = []
class NostrRouter:
received_subscription_events: ClassVar[dict[str, list[EventMessage]]] = {}
received_subscription_notices: ClassVar[list[NoticeMessage]] = []
received_subscription_eosenotices: ClassVar[dict[str, EndOfStoredEventsMessage]] = (
{}
)
def __init__(self, websocket: WebSocket):
self.connected: bool = True
self.websocket: WebSocket = websocket
self.tasks: list[asyncio.Task] = []
self.original_subscription_ids: dict[str, str] = {}
@property
def subscriptions(self) -> list[str]:
return list(self.original_subscription_ids.keys())
def start(self):
self.connected = True
self.tasks.append(asyncio.create_task(self._client_to_nostr()))
self.tasks.append(asyncio.create_task(self._nostr_to_client()))
async def stop(self):
nostr_client.relay_manager.close_subscriptions(self.subscriptions)
self.connected = False
for t in self.tasks:
try:
t.cancel()
except Exception as _:
pass
try:
await self.websocket.close(reason="Websocket connection closed")
except Exception as _:
pass
async def _client_to_nostr(self):
"""
Receives requests / data from the client and forwards it to relays.
"""
while self.connected:
try:
json_str = await self.websocket.receive_text()
except WebSocketDisconnect as e:
logger.debug(e)
await self.stop()
break
try:
await self._handle_client_to_nostr(json_str)
except Exception as e:
logger.debug(f"Failed to handle client message: '{e!s}'.")
async def _nostr_to_client(self):
"""Sends responses from relays back to the client."""
while self.connected:
try:
await self._handle_subscriptions()
self._handle_notices()
except Exception as e:
logger.debug(f"Failed to handle response for client: '{e!s}'.")
await asyncio.sleep(1)
await asyncio.sleep(0.1)
async def _handle_subscriptions(self):
for s in self.subscriptions:
if s in NostrRouter.received_subscription_events:
await self._handle_received_subscription_events(s)
if s in NostrRouter.received_subscription_eosenotices:
await self._handle_received_subscription_eosenotices(s)
async def _handle_received_subscription_eosenotices(self, s):
try:
if s not in self.original_subscription_ids:
return
s_original = self.original_subscription_ids[s]
event_to_forward = ["EOSE", s_original]
del NostrRouter.received_subscription_eosenotices[s]
await self.websocket.send_text(json.dumps(event_to_forward))
except Exception as e:
logger.debug(e)
async def _handle_received_subscription_events(self, s):
try:
if s not in NostrRouter.received_subscription_events:
return
while len(NostrRouter.received_subscription_events[s]):
event_message = NostrRouter.received_subscription_events[s].pop(0)
event_json = event_message.event
# this reconstructs the original response from the relay
# reconstruct original subscription id
s_original = self.original_subscription_ids[s]
event_to_forward = json.dumps(
["EVENT", s_original, json.loads(event_json)]
)
await self.websocket.send_text(event_to_forward)
except Exception as e:
logger.warning(
f"[NOSTRCLIENT] Error in _handle_received_subscription_events: {e}"
)
def _handle_notices(self):
while len(NostrRouter.received_subscription_notices):
my_event = NostrRouter.received_subscription_notices.pop(0)
logger.debug(f"[Relay '{my_event.url}'] Notice: '{my_event.content}']")
# Note: we don't send it to the user because
# we don't know who should receive it
nostr_client.relay_manager.handle_notice(my_event)
async def _handle_client_to_nostr(self, json_str):
json_data = json.loads(json_str)
assert len(json_data), "Bad JSON array"
if json_data[0] == "REQ":
self._handle_client_req(json_data)
return
if json_data[0] == "CLOSE":
self._handle_client_close(json_data[1])
return
if json_data[0] == "EVENT":
nostr_client.relay_manager.publish_message(json_str)
return
def _handle_client_req(self, json_data):
subscription_id = json_data[1]
logger.info(f"New subscription: '{subscription_id}'")
subscription_id_rewritten = urlsafe_short_hash()
self.original_subscription_ids[subscription_id_rewritten] = subscription_id
filters = json_data[2:]
nostr_client.relay_manager.add_subscription(subscription_id_rewritten, filters)
def _handle_client_close(self, subscription_id):
subscription_id_rewritten = next(
(
k
for k, v in self.original_subscription_ids.items()
if v == subscription_id
),
None,
)
if subscription_id_rewritten:
self.original_subscription_ids.pop(subscription_id_rewritten)
nostr_client.relay_manager.close_subscription(subscription_id_rewritten)
logger.info(
f"""
Unsubscribe from '{subscription_id_rewritten}'.
Original id: '{subscription_id}.'
"""
)
else:
logger.info(f"Failed to unsubscribe from '{subscription_id}.'")