From cba039d04113113961802fdfcbc751a556d89032 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Thu, 9 Mar 2023 14:42:31 +0100 Subject: [PATCH] EOSE works --- nostr/client/client.py | 20 +++++++++++++++++--- services.py | 14 +++++++++++++- tasks.py | 25 ++++++++++++++++++++----- views_api.py | 5 ++++- 4 files changed, 54 insertions(+), 10 deletions(-) diff --git a/nostr/client/client.py b/nostr/client/client.py index a574f20..6fb885f 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -129,10 +129,24 @@ class NostrClient: break time.sleep(0.1) - def subscribe(self, callback_func=None): + def subscribe( + self, + callback_events_func=None, + callback_notices_func=None, + callback_eosenotices_func=None, + ): while True: while self.relay_manager.message_pool.has_events(): event_msg = self.relay_manager.message_pool.get_event() - if callback_func: - callback_func(event_msg) + if callback_events_func: + callback_events_func(event_msg) + while self.relay_manager.message_pool.has_notices(): + event_msg = self.relay_manager.message_pool.has_notices() + if callback_notices_func: + callback_notices_func(event_msg) + while self.relay_manager.message_pool.has_eose_notices(): + event_msg = self.relay_manager.message_pool.get_eose_notice() + if callback_eosenotices_func: + callback_eosenotices_func(event_msg) + time.sleep(0.1) diff --git a/services.py b/services.py index 2b5b974..fa548bf 100644 --- a/services.py +++ b/services.py @@ -6,7 +6,12 @@ from .models import RelayList, Relay, Event, Filter, Filters from .nostr.event import Event as NostrEvent from .nostr.filter import Filter as NostrFilter from .nostr.filter import Filters as NostrFilters -from .tasks import client, received_event_queue, received_subscription_events +from .tasks import ( + client, + received_event_queue, + received_subscription_events, + received_subscription_eosenotices, +) from fastapi import WebSocket, WebSocketDisconnect from lnbits.helpers import urlsafe_short_hash @@ -73,6 +78,13 @@ class NostrRouter: # send data back to client await self.websocket.send_text(json.dumps(event_to_forward)) + if s in received_subscription_eosenotices: + my_event = received_subscription_eosenotices[s] + s_original = s[len(f"{self.subscription_id_rewrite}_") :] + event_to_forward = ["EOSE", s_original] + del received_subscription_eosenotices[s] + # send data back to client + await self.websocket.send_text(json.dumps(event_to_forward)) await asyncio.sleep(0.1) async def start(self): diff --git a/tasks.py b/tasks.py index 5f84a57..40d8aec 100644 --- a/tasks.py +++ b/tasks.py @@ -4,7 +4,7 @@ import threading from .nostr.client.client import NostrClient from .nostr.event import Event -from .nostr.message_pool import EventMessage +from .nostr.message_pool import EventMessage, NoticeMessage, EndOfStoredEventsMessage from .nostr.key import PublicKey from .nostr.relay_manager import RelayManager @@ -15,6 +15,8 @@ client = NostrClient( received_event_queue: asyncio.Queue[EventMessage] = asyncio.Queue(0) received_subscription_events: dict[str, list[Event]] = {} +received_subscription_notices: dict[str, list[NoticeMessage]] = {} +received_subscription_eosenotices: dict[str, EndOfStoredEventsMessage] = {} from .crud import get_relays @@ -30,9 +32,8 @@ async def subscribe_events(): while not any([r.connected for r in client.relay_manager.relays.values()]): await asyncio.sleep(2) - def callback(eventMessage: EventMessage): + def callback_events(eventMessage: EventMessage): # print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}") - if eventMessage.subscription_id in received_subscription_events: # do not add duplicate events (by event id) if eventMessage.event.id in set( @@ -50,12 +51,26 @@ async def subscribe_events(): received_subscription_events[eventMessage.subscription_id] = [ eventMessage.event ] + return - asyncio.run(received_event_queue.put(eventMessage)) + def callback_notices(eventMessage: NoticeMessage): + return + + def callback_eose_notices(eventMessage: EndOfStoredEventsMessage): + if eventMessage.subscription_id not in received_subscription_eosenotices: + received_subscription_eosenotices[ + eventMessage.subscription_id + ] = eventMessage + + return t = threading.Thread( target=client.subscribe, - args=(callback,), + args=( + callback_events, + callback_notices, + callback_eose_notices, + ), name="Nostr-event-subscription", daemon=True, ) diff --git a/views_api.py b/views_api.py index c470f67..d2c74fe 100644 --- a/views_api.py +++ b/views_api.py @@ -103,7 +103,10 @@ async def ws_relay(websocket: WebSocket): await asyncio.sleep(10) if not router.connected: for s in router.subscriptions: - client.relay_manager.close_subscription(s) + try: + client.relay_manager.close_subscription(s) + except: + pass await router.stop() all_routers.remove(router) break