Merge pull request #4 from lnbits/router-and-rewrite-subs

Router and rewrite subs
This commit is contained in:
calle 2023-03-09 14:44:58 +01:00 committed by GitHub
commit 6ad9a83db1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 214 additions and 165 deletions

View file

@ -1,3 +1,3 @@
# Nostr
# nostrclient
Opens a Nostr daemon
`nostrclient` can open multiple connections to nostr relays and act as a multiplexer for other clients: A client can open a single websocket to `nostrclient` which then sends the data to multiple relays. The responses from these relays are then sent back to the client.

View file

@ -129,10 +129,24 @@ class NostrClient:
break
time.sleep(0.1)
def subscribe(self, callback_func=None):
def subscribe(
self,
callback_events_func=None,
callback_notices_func=None,
callback_eosenotices_func=None,
):
while True:
while self.relay_manager.message_pool.has_events():
event_msg = self.relay_manager.message_pool.get_event()
if callback_func:
callback_func(event_msg)
if callback_events_func:
callback_events_func(event_msg)
while self.relay_manager.message_pool.has_notices():
event_msg = self.relay_manager.message_pool.has_notices()
if callback_notices_func:
callback_notices_func(event_msg)
while self.relay_manager.message_pool.has_eose_notices():
event_msg = self.relay_manager.message_pool.get_eose_notice()
if callback_eosenotices_func:
callback_eosenotices_func(event_msg)
time.sleep(0.1)

134
services.py Normal file
View file

@ -0,0 +1,134 @@
import asyncio
import json
from typing import List, Union
from .models import RelayList, Relay, Event, Filter, Filters
from .nostr.event import Event as NostrEvent
from .nostr.filter import Filter as NostrFilter
from .nostr.filter import Filters as NostrFilters
from .tasks import (
client,
received_event_queue,
received_subscription_events,
received_subscription_eosenotices,
)
from fastapi import WebSocket, WebSocketDisconnect
from lnbits.helpers import urlsafe_short_hash
class NostrRouter:
def __init__(self, websocket):
self.subscriptions: List[str] = []
self.connected: bool = True
self.websocket = websocket
self.tasks: List[asyncio.Task] = []
self.subscription_id_rewrite: str = urlsafe_short_hash()
async def client_to_nostr(self):
"""Receives requests / data from the client and forwards it to relays. If the
request was a subscription/filter, registers it with the nostr client lib.
Remembers the subscription id so we can send back responses from the relay to this
client in `nostr_to_client`"""
while True:
try:
json_str = await self.websocket.receive_text()
except WebSocketDisconnect:
self.connected = False
break
# print(json_str)
# registers a subscription if the input was a REQ request
subscription_id, json_str_rewritten = await self._add_nostr_subscription(
json_str
)
if subscription_id and json_str_rewritten:
self.subscriptions.append(subscription_id)
json_str = json_str_rewritten
# publish data
client.relay_manager.publish_message(json_str)
async def nostr_to_client(self):
"""Sends responses from relays back to the client. Polls the subscriptions of this client
stored in `my_subscriptions`. Then gets all responses for this subscription id from `received_subscription_events` which
is filled in tasks.py. Takes one response after the other and relays it back to the client. Reconstructs
the reponse manually because the nostr client lib we're using can't do it. Reconstructs the original subscription id
that we had previously rewritten in order to avoid collisions when multiple clients use the same id."""
while True and self.connected:
for s in self.subscriptions:
if s in received_subscription_events:
while len(received_subscription_events[s]):
my_event = received_subscription_events[s].pop(0)
# event.to_message() does not include the subscription ID, we have to add it manually
event_json = {
"id": my_event.id,
"pubkey": my_event.public_key,
"created_at": my_event.created_at,
"kind": my_event.kind,
"tags": my_event.tags,
"content": my_event.content,
"sig": my_event.signature,
}
# this reconstructs the original response from the relay
# reconstruct oriiginal subscription id
s_original = s[len(f"{self.subscription_id_rewrite}_") :]
event_to_forward = ["EVENT", s_original, event_json]
# print(json.dumps(event_to_forward))
# send data back to client
await self.websocket.send_text(json.dumps(event_to_forward))
if s in received_subscription_eosenotices:
my_event = received_subscription_eosenotices[s]
s_original = s[len(f"{self.subscription_id_rewrite}_") :]
event_to_forward = ["EOSE", s_original]
del received_subscription_eosenotices[s]
# send data back to client
await self.websocket.send_text(json.dumps(event_to_forward))
await asyncio.sleep(0.1)
async def start(self):
self.tasks.append(asyncio.create_task(self.client_to_nostr()))
self.tasks.append(asyncio.create_task(self.nostr_to_client()))
async def stop(self):
for t in self.tasks:
t.cancel()
def _marshall_nostr_filters(self, data: Union[dict, list]):
filters = data if isinstance(data, list) else [data]
filters = [Filter.parse_obj(f) for f in filters]
filter_list: list[NostrFilter] = []
for filter in filters:
filter_list.append(
NostrFilter(
event_ids=filter.ids, # type: ignore
kinds=filter.kinds, # type: ignore
authors=filter.authors, # type: ignore
since=filter.since, # type: ignore
until=filter.until, # type: ignore
event_refs=filter.e, # type: ignore
pubkey_refs=filter.p, # type: ignore
limit=filter.limit, # type: ignore
)
)
return NostrFilters(filter_list)
async def _add_nostr_subscription(self, json_str):
"""Parses a (string) request from a client. If it is a subscription (REQ), it will
register the subscription in the nostr client library that we're using so we can
receive the callbacks on it later. Will rewrite the subscription id since we expect
multiple clients to use the router and want to avoid subscription id collisions"""
json_data = json.loads(json_str)
assert len(json_data)
if json_data[0] == "REQ":
subscription_id = json_data[1]
subscription_id_rewritten = (
f"{self.subscription_id_rewrite}_{subscription_id}"
)
fltr = json_data[2]
filters = self._marshall_nostr_filters(fltr)
client.relay_manager.add_subscription(subscription_id_rewritten, filters)
request_rewritten = json.dumps(["REQ", subscription_id_rewritten, fltr])
return subscription_id_rewritten, request_rewritten
return None, None

View file

@ -4,87 +4,41 @@ import threading
from .nostr.client.client import NostrClient
from .nostr.event import Event
from .nostr.message_pool import EventMessage
from .nostr.message_pool import EventMessage, NoticeMessage, EndOfStoredEventsMessage
from .nostr.key import PublicKey
from .nostr.relay_manager import RelayManager
# relays = [
# "wss://nostr.mom",
# "wss://nostr-pub.wellorder.net",
# "wss://nostr.zebedee.cloud",
# "wss://relay.damus.io",
# "wss://relay.nostr.info",
# "wss://nostr.onsats.org",
# "wss://nostr-relay.untethr.me",
# "wss://relay.snort.social",
# "wss://lnbits.link/nostrrelay/client",
# ]
client = NostrClient(
connect=False,
)
# client = NostrClient(
# connect=False,
# privatekey_hex="211aac75a687ad96cca402406f8147a2726e31c5fc838e22ce30640ca1f3a6fe",
# )
received_event_queue: asyncio.Queue[EventMessage] = asyncio.Queue(0)
received_subscription_events: dict[str, list[Event]] = {}
received_subscription_notices: dict[str, list[NoticeMessage]] = {}
received_subscription_eosenotices: dict[str, EndOfStoredEventsMessage] = {}
from .crud import get_relays
async def init_relays():
relays = await get_relays()
client.relays = set([r.url for r in relays.__root__])
client.relays = list(set([r.url for r in relays.__root__ if r.url]))
client.connect()
return
# async def send_data():
# while not any([r.connected for r in client.relay_manager.relays.values()]):
# print("no relays connected yet")
# await asyncio.sleep(0.5)
# while True:
# client.dm("test", PublicKey(bytes.fromhex(client.public_key.hex())))
# print("sent DM")
# await asyncio.sleep(5)
# return
# async def receive_data():
# while not any([r.connected for r in client.relay_manager.relays.values()]):
# print("no relays connected yet")
# await asyncio.sleep(0.5)
# def callback(event: Event, decrypted_content=None):
# print(
# f"From {event.public_key[:3]}..{event.public_key[-3:]}: {decrypted_content or event.content}"
# )
# t = threading.Thread(
# target=client.get_dm,
# args=(
# client.public_key,
# callback,
# ),
# name="Nostr DM",
# )
# t.start()
async def subscribe_events():
while not any([r.connected for r in client.relay_manager.relays.values()]):
await asyncio.sleep(2)
def callback(eventMessage: EventMessage):
def callback_events(eventMessage: EventMessage):
# print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}")
if eventMessage.subscription_id in received_subscription_events:
# do not add duplicate events (by signature)
if eventMessage.event.signature in set(
# do not add duplicate events (by event id)
if eventMessage.event.id in set(
[
e.signature
e.id
for e in received_subscription_events[eventMessage.subscription_id]
]
):
@ -97,12 +51,26 @@ async def subscribe_events():
received_subscription_events[eventMessage.subscription_id] = [
eventMessage.event
]
return
asyncio.run(received_event_queue.put(eventMessage))
def callback_notices(eventMessage: NoticeMessage):
return
def callback_eose_notices(eventMessage: EndOfStoredEventsMessage):
if eventMessage.subscription_id not in received_subscription_eosenotices:
received_subscription_eosenotices[
eventMessage.subscription_id
] = eventMessage
return
t = threading.Thread(
target=client.subscribe,
args=(callback,),
args=(
callback_events,
callback_notices,
callback_eose_notices,
),
name="Nostr-event-subscription",
daemon=True,
)

View file

@ -1,22 +1,16 @@
from http import HTTPStatus
import asyncio
import json
from typing import List, Union
from fastapi import WebSocket, WebSocketDisconnect
from fastapi.param_functions import Query
from fastapi import WebSocket
from fastapi.params import Depends
from loguru import logger
from . import nostrclient_ext
from .tasks import client, received_event_queue, received_subscription_events
from .tasks import client
from loguru import logger
from .crud import get_relays, add_relay, delete_relay
from .models import RelayList, Relay, Event, Filter, Filters
from .models import RelayList, Relay
from .nostr.event import Event as NostrEvent
from .nostr.filter import Filter as NostrFilter
from .nostr.filter import Filters as NostrFilters
from .services import NostrRouter
from lnbits.decorators import (
WalletTypeInfo,
@ -28,6 +22,9 @@ from lnbits.decorators import (
from lnbits.helpers import urlsafe_short_hash
from .tasks import init_relays
# we keep this in
all_routers: list[NostrRouter] = []
@nostrclient_ext.get("/api/v1/relays")
async def api_get_relays(): # type: ignore
@ -73,107 +70,43 @@ async def api_delete_relay(relay: Relay): # type: ignore
await delete_relay(relay)
def marshall_nostr_filters(data: Union[dict, list]):
filters = data if isinstance(data, list) else [data]
filters = [Filter.parse_obj(f) for f in filters]
filter_list: list[NostrFilter] = []
for filter in filters:
filter_list.append(
NostrFilter(
event_ids=filter.ids, # type: ignore
kinds=filter.kinds, # type: ignore
authors=filter.authors, # type: ignore
since=filter.since, # type: ignore
until=filter.until, # type: ignore
event_refs=filter.e, # type: ignore
pubkey_refs=filter.p, # type: ignore
limit=filter.limit, # type: ignore
)
)
return NostrFilters(filter_list)
@nostrclient_ext.delete(
"/api/v1", status_code=HTTPStatus.OK, dependencies=[Depends(check_admin)]
)
async def api_stop():
for router in all_routers:
try:
for s in router.subscriptions:
client.relay_manager.close_subscription(s)
await router.stop()
all_routers.remove(router)
except Exception as e:
logger.error(e)
try:
client.relay_manager.close_connections()
except Exception as e:
logger.error(e)
async def add_nostr_subscription(json_str):
"""Parses a (string) request from a client. If it is a subscription (REQ), it will
register the subscription in the nostr client library that we're using so we can
receive the callbacks on it later"""
json_data = json.loads(json_str)
assert len(json_data)
if json_data[0] == "REQ":
subscription_id = json_data[1]
fltr = json_data[2]
filters = marshall_nostr_filters(fltr)
client.relay_manager.add_subscription(subscription_id, filters)
return subscription_id
return {"success": True}
@nostrclient_ext.websocket("/api/v1/relay")
async def ws_relay(websocket: WebSocket):
"""Relay multiplexer: one client (per endpoint) <-> multiple relays"""
await websocket.accept()
my_subscriptions: List[str] = []
connected: bool = True
async def client_to_nostr(websocket):
"""Receives requests / data from the client and forwards it to relays. If the
request was a subscription/filter, registers it with the nostr client lib.
Remembers the subscription id so we can send back responses from the relay to this
client in `nostr_to_client`"""
nonlocal my_subscriptions
nonlocal connected
while True:
try:
json_str = await websocket.receive_text()
except WebSocketDisconnect:
connected = False
break
# print(json_str)
# registers a subscription if the input was a REQ request
subscription_id = await add_nostr_subscription(json_str)
if subscription_id:
my_subscriptions.append(subscription_id)
# publish data
client.relay_manager.publish_message(json_str)
async def nostr_to_client(websocket):
"""Sends responses from relays back to the client. Polls the subscriptions of this client
stored in `my_subscriptions`. Then gets all responses for this subscription id from `received_subscription_events` which
is filled in tasks.py. Takes one response after the other and relays it back to the client. Reconstructs
the reponse manually because the nostr client lib we're using can't do it."""
nonlocal connected
while True and connected:
for s in my_subscriptions:
if s in received_subscription_events:
while len(received_subscription_events[s]):
my_event = received_subscription_events[s].pop(0)
# event.to_message() does not include the subscription ID, we have to add it manually
event_json = {
"id": my_event.id,
"pubkey": my_event.public_key,
"created_at": my_event.created_at,
"kind": my_event.kind,
"tags": my_event.tags,
"content": my_event.content,
"sig": my_event.signature,
}
# this reconstructs the original response from the relay
event_to_forward = ["EVENT", s, event_json]
# print(json.dumps(event_to_forward))
# send data back to client
await websocket.send_text(json.dumps(event_to_forward))
await asyncio.sleep(0.1)
asyncio.create_task(client_to_nostr(websocket))
asyncio.create_task(nostr_to_client(websocket))
router = NostrRouter(websocket)
await router.start()
all_routers.append(router)
# we kill this websocket and the subscriptions if the user disconnects and thus `connected==False`
while True:
await asyncio.sleep(10)
if not connected:
for s in my_subscriptions:
client.relay_manager.close_subscription(s)
if not router.connected:
for s in router.subscriptions:
try:
client.relay_manager.close_subscription(s)
except:
pass
await router.stop()
all_routers.remove(router)
break