diff --git a/README.md b/README.md index 596cce9..0b79c3b 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ -# Nostr +# nostrclient -Opens a Nostr daemon +`nostrclient` can open multiple connections to nostr relays and act as a multiplexer for other clients: A client can open a single websocket to `nostrclient` which then sends the data to multiple relays. The responses from these relays are then sent back to the client. diff --git a/services.py b/services.py new file mode 100644 index 0000000..2b5b974 --- /dev/null +++ b/services.py @@ -0,0 +1,122 @@ +import asyncio +import json +from typing import List, Union +from .models import RelayList, Relay, Event, Filter, Filters + +from .nostr.event import Event as NostrEvent +from .nostr.filter import Filter as NostrFilter +from .nostr.filter import Filters as NostrFilters +from .tasks import client, received_event_queue, received_subscription_events +from fastapi import WebSocket, WebSocketDisconnect +from lnbits.helpers import urlsafe_short_hash + + +class NostrRouter: + def __init__(self, websocket): + self.subscriptions: List[str] = [] + self.connected: bool = True + self.websocket = websocket + self.tasks: List[asyncio.Task] = [] + self.subscription_id_rewrite: str = urlsafe_short_hash() + + async def client_to_nostr(self): + """Receives requests / data from the client and forwards it to relays. If the + request was a subscription/filter, registers it with the nostr client lib. + Remembers the subscription id so we can send back responses from the relay to this + client in `nostr_to_client`""" + while True: + try: + json_str = await self.websocket.receive_text() + except WebSocketDisconnect: + self.connected = False + break + # print(json_str) + + # registers a subscription if the input was a REQ request + subscription_id, json_str_rewritten = await self._add_nostr_subscription( + json_str + ) + if subscription_id and json_str_rewritten: + self.subscriptions.append(subscription_id) + json_str = json_str_rewritten + + # publish data + client.relay_manager.publish_message(json_str) + + async def nostr_to_client(self): + """Sends responses from relays back to the client. Polls the subscriptions of this client + stored in `my_subscriptions`. Then gets all responses for this subscription id from `received_subscription_events` which + is filled in tasks.py. Takes one response after the other and relays it back to the client. Reconstructs + the reponse manually because the nostr client lib we're using can't do it. Reconstructs the original subscription id + that we had previously rewritten in order to avoid collisions when multiple clients use the same id.""" + while True and self.connected: + for s in self.subscriptions: + if s in received_subscription_events: + while len(received_subscription_events[s]): + my_event = received_subscription_events[s].pop(0) + # event.to_message() does not include the subscription ID, we have to add it manually + event_json = { + "id": my_event.id, + "pubkey": my_event.public_key, + "created_at": my_event.created_at, + "kind": my_event.kind, + "tags": my_event.tags, + "content": my_event.content, + "sig": my_event.signature, + } + + # this reconstructs the original response from the relay + # reconstruct oriiginal subscription id + s_original = s[len(f"{self.subscription_id_rewrite}_") :] + event_to_forward = ["EVENT", s_original, event_json] + # print(json.dumps(event_to_forward)) + + # send data back to client + await self.websocket.send_text(json.dumps(event_to_forward)) + await asyncio.sleep(0.1) + + async def start(self): + self.tasks.append(asyncio.create_task(self.client_to_nostr())) + self.tasks.append(asyncio.create_task(self.nostr_to_client())) + + async def stop(self): + for t in self.tasks: + t.cancel() + + def _marshall_nostr_filters(self, data: Union[dict, list]): + filters = data if isinstance(data, list) else [data] + filters = [Filter.parse_obj(f) for f in filters] + filter_list: list[NostrFilter] = [] + for filter in filters: + filter_list.append( + NostrFilter( + event_ids=filter.ids, # type: ignore + kinds=filter.kinds, # type: ignore + authors=filter.authors, # type: ignore + since=filter.since, # type: ignore + until=filter.until, # type: ignore + event_refs=filter.e, # type: ignore + pubkey_refs=filter.p, # type: ignore + limit=filter.limit, # type: ignore + ) + ) + return NostrFilters(filter_list) + + async def _add_nostr_subscription(self, json_str): + """Parses a (string) request from a client. If it is a subscription (REQ), it will + register the subscription in the nostr client library that we're using so we can + receive the callbacks on it later. Will rewrite the subscription id since we expect + multiple clients to use the router and want to avoid subscription id collisions""" + json_data = json.loads(json_str) + assert len(json_data) + if json_data[0] == "REQ": + subscription_id = json_data[1] + subscription_id_rewritten = ( + f"{self.subscription_id_rewrite}_{subscription_id}" + ) + fltr = json_data[2] + filters = self._marshall_nostr_filters(fltr) + client.relay_manager.add_subscription(subscription_id_rewritten, filters) + request_rewritten = json.dumps(["REQ", subscription_id_rewritten, fltr]) + return subscription_id_rewritten, request_rewritten + return None, None diff --git a/tasks.py b/tasks.py index fc6684f..5f84a57 100644 --- a/tasks.py +++ b/tasks.py @@ -8,26 +8,11 @@ from .nostr.message_pool import EventMessage from .nostr.key import PublicKey from .nostr.relay_manager import RelayManager -# relays = [ -# "wss://nostr.mom", -# "wss://nostr-pub.wellorder.net", -# "wss://nostr.zebedee.cloud", -# "wss://relay.damus.io", -# "wss://relay.nostr.info", -# "wss://nostr.onsats.org", -# "wss://nostr-relay.untethr.me", -# "wss://relay.snort.social", -# "wss://lnbits.link/nostrrelay/client", -# ] + client = NostrClient( connect=False, ) -# client = NostrClient( -# connect=False, -# privatekey_hex="211aac75a687ad96cca402406f8147a2726e31c5fc838e22ce30640ca1f3a6fe", -# ) - received_event_queue: asyncio.Queue[EventMessage] = asyncio.Queue(0) received_subscription_events: dict[str, list[Event]] = {} @@ -36,43 +21,11 @@ from .crud import get_relays async def init_relays(): relays = await get_relays() - client.relays = set([r.url for r in relays.__root__]) + client.relays = list(set([r.url for r in relays.__root__ if r.url])) client.connect() return -# async def send_data(): -# while not any([r.connected for r in client.relay_manager.relays.values()]): -# print("no relays connected yet") -# await asyncio.sleep(0.5) -# while True: -# client.dm("test", PublicKey(bytes.fromhex(client.public_key.hex()))) -# print("sent DM") -# await asyncio.sleep(5) -# return - - -# async def receive_data(): -# while not any([r.connected for r in client.relay_manager.relays.values()]): -# print("no relays connected yet") -# await asyncio.sleep(0.5) - -# def callback(event: Event, decrypted_content=None): -# print( -# f"From {event.public_key[:3]}..{event.public_key[-3:]}: {decrypted_content or event.content}" -# ) - -# t = threading.Thread( -# target=client.get_dm, -# args=( -# client.public_key, -# callback, -# ), -# name="Nostr DM", -# ) -# t.start() - - async def subscribe_events(): while not any([r.connected for r in client.relay_manager.relays.values()]): await asyncio.sleep(2) @@ -81,10 +34,10 @@ async def subscribe_events(): # print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}") if eventMessage.subscription_id in received_subscription_events: - # do not add duplicate events (by signature) - if eventMessage.event.signature in set( + # do not add duplicate events (by event id) + if eventMessage.event.id in set( [ - e.signature + e.id for e in received_subscription_events[eventMessage.subscription_id] ] ): diff --git a/views_api.py b/views_api.py index d41fb60..c470f67 100644 --- a/views_api.py +++ b/views_api.py @@ -1,22 +1,16 @@ from http import HTTPStatus import asyncio -import json -from typing import List, Union -from fastapi import WebSocket, WebSocketDisconnect -from fastapi.param_functions import Query +from fastapi import WebSocket from fastapi.params import Depends -from loguru import logger from . import nostrclient_ext - -from .tasks import client, received_event_queue, received_subscription_events +from .tasks import client +from loguru import logger from .crud import get_relays, add_relay, delete_relay -from .models import RelayList, Relay, Event, Filter, Filters +from .models import RelayList, Relay -from .nostr.event import Event as NostrEvent -from .nostr.filter import Filter as NostrFilter -from .nostr.filter import Filters as NostrFilters +from .services import NostrRouter from lnbits.decorators import ( WalletTypeInfo, @@ -28,6 +22,9 @@ from lnbits.decorators import ( from lnbits.helpers import urlsafe_short_hash from .tasks import init_relays +# we keep this in +all_routers: list[NostrRouter] = [] + @nostrclient_ext.get("/api/v1/relays") async def api_get_relays(): # type: ignore @@ -73,107 +70,40 @@ async def api_delete_relay(relay: Relay): # type: ignore await delete_relay(relay) -def marshall_nostr_filters(data: Union[dict, list]): - filters = data if isinstance(data, list) else [data] - filters = [Filter.parse_obj(f) for f in filters] - filter_list: list[NostrFilter] = [] - for filter in filters: - filter_list.append( - NostrFilter( - event_ids=filter.ids, # type: ignore - kinds=filter.kinds, # type: ignore - authors=filter.authors, # type: ignore - since=filter.since, # type: ignore - until=filter.until, # type: ignore - event_refs=filter.e, # type: ignore - pubkey_refs=filter.p, # type: ignore - limit=filter.limit, # type: ignore - ) - ) - return NostrFilters(filter_list) +@nostrclient_ext.delete( + "/api/v1", status_code=HTTPStatus.OK, dependencies=[Depends(check_admin)] +) +async def api_stop(): + for router in all_routers: + try: + for s in router.subscriptions: + client.relay_manager.close_subscription(s) + await router.stop() + all_routers.remove(router) + except Exception as e: + logger.error(e) + try: + client.relay_manager.close_connections() + except Exception as e: + logger.error(e) - -async def add_nostr_subscription(json_str): - """Parses a (string) request from a client. If it is a subscription (REQ), it will - register the subscription in the nostr client library that we're using so we can - receive the callbacks on it later""" - json_data = json.loads(json_str) - assert len(json_data) - if json_data[0] == "REQ": - subscription_id = json_data[1] - fltr = json_data[2] - filters = marshall_nostr_filters(fltr) - client.relay_manager.add_subscription(subscription_id, filters) - return subscription_id + return {"success": True} @nostrclient_ext.websocket("/api/v1/relay") async def ws_relay(websocket: WebSocket): """Relay multiplexer: one client (per endpoint) <-> multiple relays""" await websocket.accept() - my_subscriptions: List[str] = [] - connected: bool = True - - async def client_to_nostr(websocket): - """Receives requests / data from the client and forwards it to relays. If the - request was a subscription/filter, registers it with the nostr client lib. - Remembers the subscription id so we can send back responses from the relay to this - client in `nostr_to_client`""" - nonlocal my_subscriptions - nonlocal connected - while True: - try: - json_str = await websocket.receive_text() - except WebSocketDisconnect: - connected = False - break - # print(json_str) - - # registers a subscription if the input was a REQ request - subscription_id = await add_nostr_subscription(json_str) - if subscription_id: - my_subscriptions.append(subscription_id) - - # publish data - client.relay_manager.publish_message(json_str) - - async def nostr_to_client(websocket): - """Sends responses from relays back to the client. Polls the subscriptions of this client - stored in `my_subscriptions`. Then gets all responses for this subscription id from `received_subscription_events` which - is filled in tasks.py. Takes one response after the other and relays it back to the client. Reconstructs - the reponse manually because the nostr client lib we're using can't do it.""" - nonlocal connected - while True and connected: - for s in my_subscriptions: - if s in received_subscription_events: - while len(received_subscription_events[s]): - my_event = received_subscription_events[s].pop(0) - # event.to_message() does not include the subscription ID, we have to add it manually - event_json = { - "id": my_event.id, - "pubkey": my_event.public_key, - "created_at": my_event.created_at, - "kind": my_event.kind, - "tags": my_event.tags, - "content": my_event.content, - "sig": my_event.signature, - } - - # this reconstructs the original response from the relay - event_to_forward = ["EVENT", s, event_json] - # print(json.dumps(event_to_forward)) - - # send data back to client - await websocket.send_text(json.dumps(event_to_forward)) - await asyncio.sleep(0.1) - - asyncio.create_task(client_to_nostr(websocket)) - asyncio.create_task(nostr_to_client(websocket)) + router = NostrRouter(websocket) + await router.start() + all_routers.append(router) # we kill this websocket and the subscriptions if the user disconnects and thus `connected==False` while True: await asyncio.sleep(10) - if not connected: - for s in my_subscriptions: + if not router.connected: + for s in router.subscriptions: client.relay_manager.close_subscription(s) + await router.stop() + all_routers.remove(router) break