From d67133ae61ef08d1f27c2987d56423312444cb32 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Tue, 21 Mar 2023 16:17:04 +0100 Subject: [PATCH 1/2] refactor nostrclient --- __init__.py | 5 +--- services.py | 24 +++++++++++++---- tasks.py | 27 +++++++++---------- views.py | 74 ---------------------------------------------------- views_api.py | 18 +++++++------ 5 files changed, 43 insertions(+), 105 deletions(-) diff --git a/__init__.py b/__init__.py index ace3254..29f5658 100644 --- a/__init__.py +++ b/__init__.py @@ -1,3 +1,4 @@ +import asyncio from fastapi import APIRouter from lnbits.db import Database from lnbits.helpers import template_renderer @@ -22,13 +23,9 @@ def nostr_renderer(): from .tasks import init_relays, subscribe_events -from .views import * # noqa -from .views_api import * # noqa def nostrclient_start(): loop = asyncio.get_event_loop() loop.create_task(catch_everything_and_restart(init_relays)) - # loop.create_task(catch_everything_and_restart(send_data)) - # loop.create_task(catch_everything_and_restart(receive_data)) loop.create_task(catch_everything_and_restart(subscribe_events)) diff --git a/services.py b/services.py index 243af58..09e9235 100644 --- a/services.py +++ b/services.py @@ -4,14 +4,26 @@ from typing import List, Union from fastapi import WebSocket, WebSocketDisconnect from lnbits.helpers import urlsafe_short_hash +from .nostr.client.client import NostrClient as NostrClientLib from .models import Event, Filter, Filters, Relay, RelayList from .nostr.event import Event as NostrEvent from .nostr.filter import Filter as NostrFilter from .nostr.filter import Filters as NostrFilters -from .tasks import (client, received_event_queue, - received_subscription_eosenotices, - received_subscription_events) +from .nostr.message_pool import EndOfStoredEventsMessage, EventMessage, NoticeMessage + + +received_subscription_events: dict[str, list[Event]] = {} +received_subscription_notices: dict[str, list[NoticeMessage]] = {} +received_subscription_eosenotices: dict[str, EndOfStoredEventsMessage] = {} + + +class NostrClient: + def __init__(self): + self.client: NostrClientLib = NostrClientLib(connect=False) + + +nostr = NostrClient() class NostrRouter: @@ -44,7 +56,7 @@ class NostrRouter: json_str = json_str_rewritten # publish data - client.relay_manager.publish_message(json_str) + nostr.client.relay_manager.publish_message(json_str) async def nostr_to_client(self): """Sends responses from relays back to the client. Polls the subscriptions of this client @@ -126,7 +138,9 @@ class NostrRouter: ) fltr = json_data[2] filters = self._marshall_nostr_filters(fltr) - client.relay_manager.add_subscription(subscription_id_rewritten, filters) + nostr.client.relay_manager.add_subscription( + subscription_id_rewritten, filters + ) request_rewritten = json.dumps(["REQ", subscription_id_rewritten, fltr]) return subscription_id_rewritten, request_rewritten return None, None diff --git a/tasks.py b/tasks.py index 8bbf8aa..7894e40 100644 --- a/tasks.py +++ b/tasks.py @@ -2,34 +2,33 @@ import asyncio import ssl import threading -from .nostr.client.client import NostrClient from .nostr.event import Event from .nostr.key import PublicKey -from .nostr.message_pool import (EndOfStoredEventsMessage, EventMessage, - NoticeMessage) +from .nostr.message_pool import EndOfStoredEventsMessage, EventMessage, NoticeMessage from .nostr.relay_manager import RelayManager - -client = NostrClient( - connect=False, +from .services import ( + nostr, + received_subscription_events, + received_subscription_eosenotices, ) -received_event_queue: asyncio.Queue[EventMessage] = asyncio.Queue(0) -received_subscription_events: dict[str, list[Event]] = {} -received_subscription_notices: dict[str, list[NoticeMessage]] = {} -received_subscription_eosenotices: dict[str, EndOfStoredEventsMessage] = {} from .crud import get_relays async def init_relays(): + # reinitialize the entire client + nostr.__init__() + # get relays from db relays = await get_relays() - client.relays = list(set([r.url for r in relays.__root__ if r.url])) - client.connect() + # set relays and connect to them + nostr.client.relays = list(set([r.url for r in relays.__root__ if r.url])) + nostr.client.connect() return async def subscribe_events(): - while not any([r.connected for r in client.relay_manager.relays.values()]): + while not any([r.connected for r in nostr.client.relay_manager.relays.values()]): await asyncio.sleep(2) def callback_events(eventMessage: EventMessage): @@ -65,7 +64,7 @@ async def subscribe_events(): return t = threading.Thread( - target=client.subscribe, + target=nostr.client.subscribe, args=( callback_events, callback_notices, diff --git a/views.py b/views.py index 7f07f5d..ce30b3b 100644 --- a/views.py +++ b/views.py @@ -22,77 +22,3 @@ async def index(request: Request, user: User = Depends(check_admin)): return nostr_renderer().TemplateResponse( "nostrclient/index.html", {"request": request, "user": user.dict()} ) - - -# ##################################################################### -# #################### NOSTR WEBSOCKET THREAD ######################### -# ##### THE QUEUE LOOP THREAD THING THAT LISTENS TO BUNCH OF ########## -# ### WEBSOCKET CONNECTIONS, STORING DATA IN DB/PUSHING TO FRONTEND ### -# ################### VIA updater() FUNCTION ########################## -# ##################################################################### - -# websocket_queue = asyncio.Queue(1000) - -# # while True: -# async def nostr_subscribe(): -# return -# # for the relays: -# # async with websockets.connect("ws://localhost:8765") as websocket: -# # for the public keys: -# # await websocket.send("subscribe to events") -# # await websocket.recv() - -# ##################################################################### -# ################### LNBITS WEBSOCKET ROUTES ######################### -# #### HERE IS WHERE LNBITS FRONTEND CAN RECEIVE AND SEND MESSAGES #### -# ##################################################################### - -# class ConnectionManager: -# def __init__(self): -# self.active_connections: List[WebSocket] = [] - -# async def connect(self, websocket: WebSocket, nostr_id: str): -# await websocket.accept() -# websocket.id = nostr_id -# self.active_connections.append(websocket) - -# def disconnect(self, websocket: WebSocket): -# self.active_connections.remove(websocket) - -# async def send_personal_message(self, message: str, nostr_id: str): -# for connection in self.active_connections: -# if connection.id == nostr_id: -# await connection.send_text(message) - -# async def broadcast(self, message: str): -# for connection in self.active_connections: -# await connection.send_text(message) - - -# manager = ConnectionManager() - - -# @nostrclient_ext.websocket("/nostrclient/ws/relayevents/{nostr_id}", name="nostr_id.websocket_by_id") -# async def websocket_endpoint(websocket: WebSocket, nostr_id: str): -# await manager.connect(websocket, nostr_id) -# try: -# while True: -# data = await websocket.receive_text() -# except WebSocketDisconnect: -# manager.disconnect(websocket) - - -# async def updater(nostr_id, message): -# copilot = await get_copilot(nostr_id) -# if not copilot: -# return -# await manager.send_personal_message(f"{message}", nostr_id) - - -# async def relay_check(relay: str): -# async with websockets.connect(relay) as websocket: -# if str(websocket.state) == "State.OPEN": -# print(str(websocket.state)) -# return True -# else: -# return False diff --git a/views_api.py b/views_api.py index 1cf7a19..c3da200 100644 --- a/views_api.py +++ b/views_api.py @@ -11,8 +11,8 @@ from starlette.exceptions import HTTPException from . import nostrclient_ext from .crud import add_relay, delete_relay, get_relays from .models import Relay, RelayList -from .services import NostrRouter -from .tasks import client, init_relays +from .services import NostrRouter, nostr +from .tasks import init_relays # we keep this in all_routers: list[NostrRouter] = [] @@ -21,7 +21,7 @@ all_routers: list[NostrRouter] = [] @nostrclient_ext.get("/api/v1/relays") async def api_get_relays() -> RelayList: relays = RelayList(__root__=[]) - for url, r in client.relay_manager.relays.items(): + for url, r in nostr.client.relay_manager.relays.items(): status_text = ( f"⬆️ {r.num_sent_events} ⬇️ {r.num_received_events} ⚠️ {r.error_counter}" ) @@ -49,13 +49,14 @@ async def api_add_relay(relay: Relay) -> Optional[RelayList]: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Relay url not provided." ) - if relay.url in client.relay_manager.relays: + if relay.url in nostr.client.relay_manager.relays: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Relay: {relay.url} already exists.", ) relay.id = urlsafe_short_hash() await add_relay(relay) + # we can't add relays during runtime yet await init_relays() return await get_relays() @@ -68,7 +69,8 @@ async def api_delete_relay(relay: Relay) -> None: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail=f"Relay url not provided." ) - client.relay_manager.remove_relay(relay.url) + # we can remove relays during runtime + nostr.client.relay_manager.remove_relay(relay.url) await delete_relay(relay) @@ -79,13 +81,13 @@ async def api_stop(): for router in all_routers: try: for s in router.subscriptions: - client.relay_manager.close_subscription(s) + nostr.client.relay_manager.close_subscription(s) await router.stop() all_routers.remove(router) except Exception as e: logger.error(e) try: - client.relay_manager.close_connections() + nostr.client.relay_manager.close_connections() except Exception as e: logger.error(e) @@ -106,7 +108,7 @@ async def ws_relay(websocket: WebSocket) -> None: if not router.connected: for s in router.subscriptions: try: - client.relay_manager.close_subscription(s) + nostr.client.relay_manager.close_subscription(s) except: pass await router.stop() From 2ff67b65e888f534a0a3d9e86e163bd56540aecd Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Tue, 21 Mar 2023 16:31:32 +0100 Subject: [PATCH 2/2] fix it --- __init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/__init__.py b/__init__.py index 29f5658..90da4ec 100644 --- a/__init__.py +++ b/__init__.py @@ -1,4 +1,3 @@ -import asyncio from fastapi import APIRouter from lnbits.db import Database from lnbits.helpers import template_renderer @@ -23,6 +22,8 @@ def nostr_renderer(): from .tasks import init_relays, subscribe_events +from .views import * # noqa +from .views_api import * # noqa def nostrclient_start():