EOSE works

This commit is contained in:
callebtc 2023-03-09 14:42:31 +01:00
parent 8a6adb4768
commit cba039d041
4 changed files with 54 additions and 10 deletions

View file

@ -129,10 +129,24 @@ class NostrClient:
break break
time.sleep(0.1) time.sleep(0.1)
def subscribe(self, callback_func=None): def subscribe(
self,
callback_events_func=None,
callback_notices_func=None,
callback_eosenotices_func=None,
):
while True: while True:
while self.relay_manager.message_pool.has_events(): while self.relay_manager.message_pool.has_events():
event_msg = self.relay_manager.message_pool.get_event() event_msg = self.relay_manager.message_pool.get_event()
if callback_func: if callback_events_func:
callback_func(event_msg) callback_events_func(event_msg)
while self.relay_manager.message_pool.has_notices():
event_msg = self.relay_manager.message_pool.has_notices()
if callback_notices_func:
callback_notices_func(event_msg)
while self.relay_manager.message_pool.has_eose_notices():
event_msg = self.relay_manager.message_pool.get_eose_notice()
if callback_eosenotices_func:
callback_eosenotices_func(event_msg)
time.sleep(0.1) time.sleep(0.1)

View file

@ -6,7 +6,12 @@ from .models import RelayList, Relay, Event, Filter, Filters
from .nostr.event import Event as NostrEvent from .nostr.event import Event as NostrEvent
from .nostr.filter import Filter as NostrFilter from .nostr.filter import Filter as NostrFilter
from .nostr.filter import Filters as NostrFilters from .nostr.filter import Filters as NostrFilters
from .tasks import client, received_event_queue, received_subscription_events from .tasks import (
client,
received_event_queue,
received_subscription_events,
received_subscription_eosenotices,
)
from fastapi import WebSocket, WebSocketDisconnect from fastapi import WebSocket, WebSocketDisconnect
from lnbits.helpers import urlsafe_short_hash from lnbits.helpers import urlsafe_short_hash
@ -73,6 +78,13 @@ class NostrRouter:
# send data back to client # send data back to client
await self.websocket.send_text(json.dumps(event_to_forward)) await self.websocket.send_text(json.dumps(event_to_forward))
if s in received_subscription_eosenotices:
my_event = received_subscription_eosenotices[s]
s_original = s[len(f"{self.subscription_id_rewrite}_") :]
event_to_forward = ["EOSE", s_original]
del received_subscription_eosenotices[s]
# send data back to client
await self.websocket.send_text(json.dumps(event_to_forward))
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
async def start(self): async def start(self):

View file

@ -4,7 +4,7 @@ import threading
from .nostr.client.client import NostrClient from .nostr.client.client import NostrClient
from .nostr.event import Event from .nostr.event import Event
from .nostr.message_pool import EventMessage from .nostr.message_pool import EventMessage, NoticeMessage, EndOfStoredEventsMessage
from .nostr.key import PublicKey from .nostr.key import PublicKey
from .nostr.relay_manager import RelayManager from .nostr.relay_manager import RelayManager
@ -15,6 +15,8 @@ client = NostrClient(
received_event_queue: asyncio.Queue[EventMessage] = asyncio.Queue(0) received_event_queue: asyncio.Queue[EventMessage] = asyncio.Queue(0)
received_subscription_events: dict[str, list[Event]] = {} received_subscription_events: dict[str, list[Event]] = {}
received_subscription_notices: dict[str, list[NoticeMessage]] = {}
received_subscription_eosenotices: dict[str, EndOfStoredEventsMessage] = {}
from .crud import get_relays from .crud import get_relays
@ -30,9 +32,8 @@ async def subscribe_events():
while not any([r.connected for r in client.relay_manager.relays.values()]): while not any([r.connected for r in client.relay_manager.relays.values()]):
await asyncio.sleep(2) await asyncio.sleep(2)
def callback(eventMessage: EventMessage): def callback_events(eventMessage: EventMessage):
# print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}") # print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}")
if eventMessage.subscription_id in received_subscription_events: if eventMessage.subscription_id in received_subscription_events:
# do not add duplicate events (by event id) # do not add duplicate events (by event id)
if eventMessage.event.id in set( if eventMessage.event.id in set(
@ -50,12 +51,26 @@ async def subscribe_events():
received_subscription_events[eventMessage.subscription_id] = [ received_subscription_events[eventMessage.subscription_id] = [
eventMessage.event eventMessage.event
] ]
return
asyncio.run(received_event_queue.put(eventMessage)) def callback_notices(eventMessage: NoticeMessage):
return
def callback_eose_notices(eventMessage: EndOfStoredEventsMessage):
if eventMessage.subscription_id not in received_subscription_eosenotices:
received_subscription_eosenotices[
eventMessage.subscription_id
] = eventMessage
return
t = threading.Thread( t = threading.Thread(
target=client.subscribe, target=client.subscribe,
args=(callback,), args=(
callback_events,
callback_notices,
callback_eose_notices,
),
name="Nostr-event-subscription", name="Nostr-event-subscription",
daemon=True, daemon=True,
) )

View file

@ -103,7 +103,10 @@ async def ws_relay(websocket: WebSocket):
await asyncio.sleep(10) await asyncio.sleep(10)
if not router.connected: if not router.connected:
for s in router.subscriptions: for s in router.subscriptions:
client.relay_manager.close_subscription(s) try:
client.relay_manager.close_subscription(s)
except:
pass
await router.stop() await router.stop()
all_routers.remove(router) all_routers.remove(router)
break break