refactor nostrclient

This commit is contained in:
callebtc 2023-03-21 16:17:04 +01:00
parent 38f28ac6aa
commit d67133ae61
5 changed files with 43 additions and 105 deletions

View file

@ -1,3 +1,4 @@
import asyncio
from fastapi import APIRouter
from lnbits.db import Database
from lnbits.helpers import template_renderer
@ -22,13 +23,9 @@ def nostr_renderer():
from .tasks import init_relays, subscribe_events
from .views import * # noqa
from .views_api import * # noqa
def nostrclient_start():
loop = asyncio.get_event_loop()
loop.create_task(catch_everything_and_restart(init_relays))
# loop.create_task(catch_everything_and_restart(send_data))
# loop.create_task(catch_everything_and_restart(receive_data))
loop.create_task(catch_everything_and_restart(subscribe_events))

View file

@ -4,14 +4,26 @@ from typing import List, Union
from fastapi import WebSocket, WebSocketDisconnect
from lnbits.helpers import urlsafe_short_hash
from .nostr.client.client import NostrClient as NostrClientLib
from .models import Event, Filter, Filters, Relay, RelayList
from .nostr.event import Event as NostrEvent
from .nostr.filter import Filter as NostrFilter
from .nostr.filter import Filters as NostrFilters
from .tasks import (client, received_event_queue,
received_subscription_eosenotices,
received_subscription_events)
from .nostr.message_pool import EndOfStoredEventsMessage, EventMessage, NoticeMessage
received_subscription_events: dict[str, list[Event]] = {}
received_subscription_notices: dict[str, list[NoticeMessage]] = {}
received_subscription_eosenotices: dict[str, EndOfStoredEventsMessage] = {}
class NostrClient:
def __init__(self):
self.client: NostrClientLib = NostrClientLib(connect=False)
nostr = NostrClient()
class NostrRouter:
@ -44,7 +56,7 @@ class NostrRouter:
json_str = json_str_rewritten
# publish data
client.relay_manager.publish_message(json_str)
nostr.client.relay_manager.publish_message(json_str)
async def nostr_to_client(self):
"""Sends responses from relays back to the client. Polls the subscriptions of this client
@ -126,7 +138,9 @@ class NostrRouter:
)
fltr = json_data[2]
filters = self._marshall_nostr_filters(fltr)
client.relay_manager.add_subscription(subscription_id_rewritten, filters)
nostr.client.relay_manager.add_subscription(
subscription_id_rewritten, filters
)
request_rewritten = json.dumps(["REQ", subscription_id_rewritten, fltr])
return subscription_id_rewritten, request_rewritten
return None, None

View file

@ -2,34 +2,33 @@ import asyncio
import ssl
import threading
from .nostr.client.client import NostrClient
from .nostr.event import Event
from .nostr.key import PublicKey
from .nostr.message_pool import (EndOfStoredEventsMessage, EventMessage,
NoticeMessage)
from .nostr.message_pool import EndOfStoredEventsMessage, EventMessage, NoticeMessage
from .nostr.relay_manager import RelayManager
client = NostrClient(
connect=False,
from .services import (
nostr,
received_subscription_events,
received_subscription_eosenotices,
)
received_event_queue: asyncio.Queue[EventMessage] = asyncio.Queue(0)
received_subscription_events: dict[str, list[Event]] = {}
received_subscription_notices: dict[str, list[NoticeMessage]] = {}
received_subscription_eosenotices: dict[str, EndOfStoredEventsMessage] = {}
from .crud import get_relays
async def init_relays():
# reinitialize the entire client
nostr.__init__()
# get relays from db
relays = await get_relays()
client.relays = list(set([r.url for r in relays.__root__ if r.url]))
client.connect()
# set relays and connect to them
nostr.client.relays = list(set([r.url for r in relays.__root__ if r.url]))
nostr.client.connect()
return
async def subscribe_events():
while not any([r.connected for r in client.relay_manager.relays.values()]):
while not any([r.connected for r in nostr.client.relay_manager.relays.values()]):
await asyncio.sleep(2)
def callback_events(eventMessage: EventMessage):
@ -65,7 +64,7 @@ async def subscribe_events():
return
t = threading.Thread(
target=client.subscribe,
target=nostr.client.subscribe,
args=(
callback_events,
callback_notices,

View file

@ -22,77 +22,3 @@ async def index(request: Request, user: User = Depends(check_admin)):
return nostr_renderer().TemplateResponse(
"nostrclient/index.html", {"request": request, "user": user.dict()}
)
# #####################################################################
# #################### NOSTR WEBSOCKET THREAD #########################
# ##### THE QUEUE LOOP THREAD THING THAT LISTENS TO BUNCH OF ##########
# ### WEBSOCKET CONNECTIONS, STORING DATA IN DB/PUSHING TO FRONTEND ###
# ################### VIA updater() FUNCTION ##########################
# #####################################################################
# websocket_queue = asyncio.Queue(1000)
# # while True:
# async def nostr_subscribe():
# return
# # for the relays:
# # async with websockets.connect("ws://localhost:8765") as websocket:
# # for the public keys:
# # await websocket.send("subscribe to events")
# # await websocket.recv()
# #####################################################################
# ################### LNBITS WEBSOCKET ROUTES #########################
# #### HERE IS WHERE LNBITS FRONTEND CAN RECEIVE AND SEND MESSAGES ####
# #####################################################################
# class ConnectionManager:
# def __init__(self):
# self.active_connections: List[WebSocket] = []
# async def connect(self, websocket: WebSocket, nostr_id: str):
# await websocket.accept()
# websocket.id = nostr_id
# self.active_connections.append(websocket)
# def disconnect(self, websocket: WebSocket):
# self.active_connections.remove(websocket)
# async def send_personal_message(self, message: str, nostr_id: str):
# for connection in self.active_connections:
# if connection.id == nostr_id:
# await connection.send_text(message)
# async def broadcast(self, message: str):
# for connection in self.active_connections:
# await connection.send_text(message)
# manager = ConnectionManager()
# @nostrclient_ext.websocket("/nostrclient/ws/relayevents/{nostr_id}", name="nostr_id.websocket_by_id")
# async def websocket_endpoint(websocket: WebSocket, nostr_id: str):
# await manager.connect(websocket, nostr_id)
# try:
# while True:
# data = await websocket.receive_text()
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# async def updater(nostr_id, message):
# copilot = await get_copilot(nostr_id)
# if not copilot:
# return
# await manager.send_personal_message(f"{message}", nostr_id)
# async def relay_check(relay: str):
# async with websockets.connect(relay) as websocket:
# if str(websocket.state) == "State.OPEN":
# print(str(websocket.state))
# return True
# else:
# return False

View file

@ -11,8 +11,8 @@ from starlette.exceptions import HTTPException
from . import nostrclient_ext
from .crud import add_relay, delete_relay, get_relays
from .models import Relay, RelayList
from .services import NostrRouter
from .tasks import client, init_relays
from .services import NostrRouter, nostr
from .tasks import init_relays
# we keep this in
all_routers: list[NostrRouter] = []
@ -21,7 +21,7 @@ all_routers: list[NostrRouter] = []
@nostrclient_ext.get("/api/v1/relays")
async def api_get_relays() -> RelayList:
relays = RelayList(__root__=[])
for url, r in client.relay_manager.relays.items():
for url, r in nostr.client.relay_manager.relays.items():
status_text = (
f"⬆️ {r.num_sent_events} ⬇️ {r.num_received_events} ⚠️ {r.error_counter}"
)
@ -49,13 +49,14 @@ async def api_add_relay(relay: Relay) -> Optional[RelayList]:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail=f"Relay url not provided."
)
if relay.url in client.relay_manager.relays:
if relay.url in nostr.client.relay_manager.relays:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=f"Relay: {relay.url} already exists.",
)
relay.id = urlsafe_short_hash()
await add_relay(relay)
# we can't add relays during runtime yet
await init_relays()
return await get_relays()
@ -68,7 +69,8 @@ async def api_delete_relay(relay: Relay) -> None:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail=f"Relay url not provided."
)
client.relay_manager.remove_relay(relay.url)
# we can remove relays during runtime
nostr.client.relay_manager.remove_relay(relay.url)
await delete_relay(relay)
@ -79,13 +81,13 @@ async def api_stop():
for router in all_routers:
try:
for s in router.subscriptions:
client.relay_manager.close_subscription(s)
nostr.client.relay_manager.close_subscription(s)
await router.stop()
all_routers.remove(router)
except Exception as e:
logger.error(e)
try:
client.relay_manager.close_connections()
nostr.client.relay_manager.close_connections()
except Exception as e:
logger.error(e)
@ -106,7 +108,7 @@ async def ws_relay(websocket: WebSocket) -> None:
if not router.connected:
for s in router.subscriptions:
try:
client.relay_manager.close_subscription(s)
nostr.client.relay_manager.close_subscription(s)
except:
pass
await router.stop()