Merge pull request #7 from lnbits/fix/adding_removing

refactor nostrclient
This commit is contained in:
calle 2023-03-21 16:31:42 +01:00 committed by GitHub
commit 0ffb158769
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 42 additions and 103 deletions

View file

@ -29,6 +29,4 @@ from .views_api import * # noqa
def nostrclient_start(): def nostrclient_start():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.create_task(catch_everything_and_restart(init_relays)) loop.create_task(catch_everything_and_restart(init_relays))
# loop.create_task(catch_everything_and_restart(send_data))
# loop.create_task(catch_everything_and_restart(receive_data))
loop.create_task(catch_everything_and_restart(subscribe_events)) loop.create_task(catch_everything_and_restart(subscribe_events))

View file

@ -4,14 +4,26 @@ from typing import List, Union
from fastapi import WebSocket, WebSocketDisconnect from fastapi import WebSocket, WebSocketDisconnect
from lnbits.helpers import urlsafe_short_hash from lnbits.helpers import urlsafe_short_hash
from .nostr.client.client import NostrClient as NostrClientLib
from .models import Event, Filter, Filters, Relay, RelayList from .models import Event, Filter, Filters, Relay, RelayList
from .nostr.event import Event as NostrEvent from .nostr.event import Event as NostrEvent
from .nostr.filter import Filter as NostrFilter from .nostr.filter import Filter as NostrFilter
from .nostr.filter import Filters as NostrFilters from .nostr.filter import Filters as NostrFilters
from .tasks import (client, received_event_queue, from .nostr.message_pool import EndOfStoredEventsMessage, EventMessage, NoticeMessage
received_subscription_eosenotices,
received_subscription_events)
received_subscription_events: dict[str, list[Event]] = {}
received_subscription_notices: dict[str, list[NoticeMessage]] = {}
received_subscription_eosenotices: dict[str, EndOfStoredEventsMessage] = {}
class NostrClient:
def __init__(self):
self.client: NostrClientLib = NostrClientLib(connect=False)
nostr = NostrClient()
class NostrRouter: class NostrRouter:
@ -44,7 +56,7 @@ class NostrRouter:
json_str = json_str_rewritten json_str = json_str_rewritten
# publish data # publish data
client.relay_manager.publish_message(json_str) nostr.client.relay_manager.publish_message(json_str)
async def nostr_to_client(self): async def nostr_to_client(self):
"""Sends responses from relays back to the client. Polls the subscriptions of this client """Sends responses from relays back to the client. Polls the subscriptions of this client
@ -126,7 +138,9 @@ class NostrRouter:
) )
fltr = json_data[2] fltr = json_data[2]
filters = self._marshall_nostr_filters(fltr) filters = self._marshall_nostr_filters(fltr)
client.relay_manager.add_subscription(subscription_id_rewritten, filters) nostr.client.relay_manager.add_subscription(
subscription_id_rewritten, filters
)
request_rewritten = json.dumps(["REQ", subscription_id_rewritten, fltr]) request_rewritten = json.dumps(["REQ", subscription_id_rewritten, fltr])
return subscription_id_rewritten, request_rewritten return subscription_id_rewritten, request_rewritten
return None, None return None, None

View file

@ -2,34 +2,33 @@ import asyncio
import ssl import ssl
import threading import threading
from .nostr.client.client import NostrClient
from .nostr.event import Event from .nostr.event import Event
from .nostr.key import PublicKey from .nostr.key import PublicKey
from .nostr.message_pool import (EndOfStoredEventsMessage, EventMessage, from .nostr.message_pool import EndOfStoredEventsMessage, EventMessage, NoticeMessage
NoticeMessage)
from .nostr.relay_manager import RelayManager from .nostr.relay_manager import RelayManager
from .services import (
client = NostrClient( nostr,
connect=False, received_subscription_events,
received_subscription_eosenotices,
) )
received_event_queue: asyncio.Queue[EventMessage] = asyncio.Queue(0)
received_subscription_events: dict[str, list[Event]] = {}
received_subscription_notices: dict[str, list[NoticeMessage]] = {}
received_subscription_eosenotices: dict[str, EndOfStoredEventsMessage] = {}
from .crud import get_relays from .crud import get_relays
async def init_relays(): async def init_relays():
# reinitialize the entire client
nostr.__init__()
# get relays from db
relays = await get_relays() relays = await get_relays()
client.relays = list(set([r.url for r in relays.__root__ if r.url])) # set relays and connect to them
client.connect() nostr.client.relays = list(set([r.url for r in relays.__root__ if r.url]))
nostr.client.connect()
return return
async def subscribe_events(): async def subscribe_events():
while not any([r.connected for r in client.relay_manager.relays.values()]): while not any([r.connected for r in nostr.client.relay_manager.relays.values()]):
await asyncio.sleep(2) await asyncio.sleep(2)
def callback_events(eventMessage: EventMessage): def callback_events(eventMessage: EventMessage):
@ -65,7 +64,7 @@ async def subscribe_events():
return return
t = threading.Thread( t = threading.Thread(
target=client.subscribe, target=nostr.client.subscribe,
args=( args=(
callback_events, callback_events,
callback_notices, callback_notices,

View file

@ -22,77 +22,3 @@ async def index(request: Request, user: User = Depends(check_admin)):
return nostr_renderer().TemplateResponse( return nostr_renderer().TemplateResponse(
"nostrclient/index.html", {"request": request, "user": user.dict()} "nostrclient/index.html", {"request": request, "user": user.dict()}
) )
# #####################################################################
# #################### NOSTR WEBSOCKET THREAD #########################
# ##### THE QUEUE LOOP THREAD THING THAT LISTENS TO BUNCH OF ##########
# ### WEBSOCKET CONNECTIONS, STORING DATA IN DB/PUSHING TO FRONTEND ###
# ################### VIA updater() FUNCTION ##########################
# #####################################################################
# websocket_queue = asyncio.Queue(1000)
# # while True:
# async def nostr_subscribe():
# return
# # for the relays:
# # async with websockets.connect("ws://localhost:8765") as websocket:
# # for the public keys:
# # await websocket.send("subscribe to events")
# # await websocket.recv()
# #####################################################################
# ################### LNBITS WEBSOCKET ROUTES #########################
# #### HERE IS WHERE LNBITS FRONTEND CAN RECEIVE AND SEND MESSAGES ####
# #####################################################################
# class ConnectionManager:
# def __init__(self):
# self.active_connections: List[WebSocket] = []
# async def connect(self, websocket: WebSocket, nostr_id: str):
# await websocket.accept()
# websocket.id = nostr_id
# self.active_connections.append(websocket)
# def disconnect(self, websocket: WebSocket):
# self.active_connections.remove(websocket)
# async def send_personal_message(self, message: str, nostr_id: str):
# for connection in self.active_connections:
# if connection.id == nostr_id:
# await connection.send_text(message)
# async def broadcast(self, message: str):
# for connection in self.active_connections:
# await connection.send_text(message)
# manager = ConnectionManager()
# @nostrclient_ext.websocket("/nostrclient/ws/relayevents/{nostr_id}", name="nostr_id.websocket_by_id")
# async def websocket_endpoint(websocket: WebSocket, nostr_id: str):
# await manager.connect(websocket, nostr_id)
# try:
# while True:
# data = await websocket.receive_text()
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# async def updater(nostr_id, message):
# copilot = await get_copilot(nostr_id)
# if not copilot:
# return
# await manager.send_personal_message(f"{message}", nostr_id)
# async def relay_check(relay: str):
# async with websockets.connect(relay) as websocket:
# if str(websocket.state) == "State.OPEN":
# print(str(websocket.state))
# return True
# else:
# return False

View file

@ -11,8 +11,8 @@ from starlette.exceptions import HTTPException
from . import nostrclient_ext from . import nostrclient_ext
from .crud import add_relay, delete_relay, get_relays from .crud import add_relay, delete_relay, get_relays
from .models import Relay, RelayList from .models import Relay, RelayList
from .services import NostrRouter from .services import NostrRouter, nostr
from .tasks import client, init_relays from .tasks import init_relays
# we keep this in # we keep this in
all_routers: list[NostrRouter] = [] all_routers: list[NostrRouter] = []
@ -21,7 +21,7 @@ all_routers: list[NostrRouter] = []
@nostrclient_ext.get("/api/v1/relays") @nostrclient_ext.get("/api/v1/relays")
async def api_get_relays() -> RelayList: async def api_get_relays() -> RelayList:
relays = RelayList(__root__=[]) relays = RelayList(__root__=[])
for url, r in client.relay_manager.relays.items(): for url, r in nostr.client.relay_manager.relays.items():
status_text = ( status_text = (
f"⬆️ {r.num_sent_events} ⬇️ {r.num_received_events} ⚠️ {r.error_counter}" f"⬆️ {r.num_sent_events} ⬇️ {r.num_received_events} ⚠️ {r.error_counter}"
) )
@ -49,13 +49,14 @@ async def api_add_relay(relay: Relay) -> Optional[RelayList]:
raise HTTPException( raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail=f"Relay url not provided." status_code=HTTPStatus.BAD_REQUEST, detail=f"Relay url not provided."
) )
if relay.url in client.relay_manager.relays: if relay.url in nostr.client.relay_manager.relays:
raise HTTPException( raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, status_code=HTTPStatus.BAD_REQUEST,
detail=f"Relay: {relay.url} already exists.", detail=f"Relay: {relay.url} already exists.",
) )
relay.id = urlsafe_short_hash() relay.id = urlsafe_short_hash()
await add_relay(relay) await add_relay(relay)
# we can't add relays during runtime yet
await init_relays() await init_relays()
return await get_relays() return await get_relays()
@ -68,7 +69,8 @@ async def api_delete_relay(relay: Relay) -> None:
raise HTTPException( raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail=f"Relay url not provided." status_code=HTTPStatus.BAD_REQUEST, detail=f"Relay url not provided."
) )
client.relay_manager.remove_relay(relay.url) # we can remove relays during runtime
nostr.client.relay_manager.remove_relay(relay.url)
await delete_relay(relay) await delete_relay(relay)
@ -79,13 +81,13 @@ async def api_stop():
for router in all_routers: for router in all_routers:
try: try:
for s in router.subscriptions: for s in router.subscriptions:
client.relay_manager.close_subscription(s) nostr.client.relay_manager.close_subscription(s)
await router.stop() await router.stop()
all_routers.remove(router) all_routers.remove(router)
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
try: try:
client.relay_manager.close_connections() nostr.client.relay_manager.close_connections()
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
@ -106,7 +108,7 @@ async def ws_relay(websocket: WebSocket) -> None:
if not router.connected: if not router.connected:
for s in router.subscriptions: for s in router.subscriptions:
try: try:
client.relay_manager.close_subscription(s) nostr.client.relay_manager.close_subscription(s)
except: except:
pass pass
await router.stop() await router.stop()