179 lines
6.6 KiB
Python
179 lines
6.6 KiB
Python
from http import HTTPStatus
|
|
import asyncio
|
|
import json
|
|
from typing import List, Union
|
|
from fastapi import WebSocket, WebSocketDisconnect
|
|
from fastapi.param_functions import Query
|
|
from fastapi.params import Depends
|
|
from loguru import logger
|
|
|
|
from . import nostrclient_ext
|
|
|
|
from .tasks import client, received_event_queue, received_subscription_events
|
|
|
|
from .crud import get_relays, add_relay, delete_relay
|
|
from .models import RelayList, Relay, Event, Filter, Filters
|
|
|
|
from .nostr.event import Event as NostrEvent
|
|
from .nostr.filter import Filter as NostrFilter
|
|
from .nostr.filter import Filters as NostrFilters
|
|
|
|
from lnbits.decorators import (
|
|
WalletTypeInfo,
|
|
get_key_type,
|
|
require_admin_key,
|
|
check_admin,
|
|
)
|
|
|
|
from lnbits.helpers import urlsafe_short_hash
|
|
from .tasks import init_relays
|
|
|
|
|
|
@nostrclient_ext.get("/api/v1/relays")
|
|
async def api_get_relays(): # type: ignore
|
|
relays = RelayList(__root__=[])
|
|
for url, r in client.relay_manager.relays.items():
|
|
status_text = (
|
|
f"⬆️ {r.num_sent_events} ⬇️ {r.num_received_events} ⚠️ {r.error_counter}"
|
|
)
|
|
connected_text = "🟢" if r.connected else "🔴"
|
|
relay_id = urlsafe_short_hash()
|
|
relays.__root__.append(
|
|
Relay(
|
|
id=relay_id,
|
|
url=url,
|
|
connected_string=connected_text,
|
|
status=status_text,
|
|
ping=r.ping,
|
|
connected=True,
|
|
active=True,
|
|
)
|
|
)
|
|
return relays
|
|
|
|
|
|
@nostrclient_ext.post(
|
|
"/api/v1/relay", status_code=HTTPStatus.OK, dependencies=[Depends(check_admin)]
|
|
)
|
|
async def api_add_relay(relay: Relay): # type: ignore
|
|
assert relay.url, "no URL"
|
|
if relay.url in client.relay_manager.relays:
|
|
return
|
|
relay.id = urlsafe_short_hash()
|
|
await add_relay(relay)
|
|
await init_relays()
|
|
|
|
|
|
@nostrclient_ext.delete(
|
|
"/api/v1/relay", status_code=HTTPStatus.OK, dependencies=[Depends(check_admin)]
|
|
)
|
|
async def api_delete_relay(relay: Relay): # type: ignore
|
|
assert relay.url
|
|
client.relay_manager.remove_relay(relay.url)
|
|
await delete_relay(relay)
|
|
|
|
|
|
def marshall_nostr_filters(data: Union[dict, list]):
|
|
filters = data if isinstance(data, list) else [data]
|
|
filters = [Filter.parse_obj(f) for f in filters]
|
|
filter_list: list[NostrFilter] = []
|
|
for filter in filters:
|
|
filter_list.append(
|
|
NostrFilter(
|
|
event_ids=filter.ids, # type: ignore
|
|
kinds=filter.kinds, # type: ignore
|
|
authors=filter.authors, # type: ignore
|
|
since=filter.since, # type: ignore
|
|
until=filter.until, # type: ignore
|
|
event_refs=filter.e, # type: ignore
|
|
pubkey_refs=filter.p, # type: ignore
|
|
limit=filter.limit, # type: ignore
|
|
)
|
|
)
|
|
return NostrFilters(filter_list)
|
|
|
|
|
|
async def add_nostr_subscription(json_str):
|
|
"""Parses a (string) request from a client. If it is a subscription (REQ), it will
|
|
register the subscription in the nostr client library that we're using so we can
|
|
receive the callbacks on it later"""
|
|
json_data = json.loads(json_str)
|
|
assert len(json_data)
|
|
if json_data[0] == "REQ":
|
|
subscription_id = json_data[1]
|
|
fltr = json_data[2]
|
|
filters = marshall_nostr_filters(fltr)
|
|
client.relay_manager.add_subscription(subscription_id, filters)
|
|
return subscription_id
|
|
|
|
|
|
@nostrclient_ext.websocket("/api/v1/relay")
|
|
async def ws_relay(websocket: WebSocket):
|
|
"""Relay multiplexer: one client (per endpoint) <-> multiple relays"""
|
|
await websocket.accept()
|
|
my_subscriptions: List[str] = []
|
|
connected: bool = True
|
|
|
|
async def client_to_nostr(websocket):
|
|
"""Receives requests / data from the client and forwards it to relays. If the
|
|
request was a subscription/filter, registers it with the nostr client lib.
|
|
Remembers the subscription id so we can send back responses from the relay to this
|
|
client in `nostr_to_client`"""
|
|
nonlocal my_subscriptions
|
|
nonlocal connected
|
|
while True:
|
|
try:
|
|
json_str = await websocket.receive_text()
|
|
except WebSocketDisconnect:
|
|
connected = False
|
|
break
|
|
# print(json_str)
|
|
|
|
# registers a subscription if the input was a REQ request
|
|
subscription_id = await add_nostr_subscription(json_str)
|
|
if subscription_id:
|
|
my_subscriptions.append(subscription_id)
|
|
|
|
# publish data
|
|
client.relay_manager.publish_message(json_str)
|
|
|
|
async def nostr_to_client(websocket):
|
|
"""Sends responses from relays back to the client. Polls the subscriptions of this client
|
|
stored in `my_subscriptions`. Then gets all responses for this subscription id from `received_subscription_events` which
|
|
is filled in tasks.py. Takes one response after the other and relays it back to the client. Reconstructs
|
|
the reponse manually because the nostr client lib we're using can't do it."""
|
|
nonlocal connected
|
|
while True and connected:
|
|
for s in my_subscriptions:
|
|
if s in received_subscription_events:
|
|
while len(received_subscription_events[s]):
|
|
my_event = received_subscription_events[s].pop(0)
|
|
# event.to_message() does not include the subscription ID, we have to add it manually
|
|
event_json = {
|
|
"id": my_event.id,
|
|
"pubkey": my_event.public_key,
|
|
"created_at": my_event.created_at,
|
|
"kind": my_event.kind,
|
|
"tags": my_event.tags,
|
|
"content": my_event.content,
|
|
"sig": my_event.signature,
|
|
}
|
|
|
|
# this reconstructs the original response from the relay
|
|
event_to_forward = ["EVENT", s, event_json]
|
|
# print(json.dumps(event_to_forward))
|
|
|
|
# send data back to client
|
|
await websocket.send_text(json.dumps(event_to_forward))
|
|
await asyncio.sleep(0.1)
|
|
|
|
asyncio.create_task(client_to_nostr(websocket))
|
|
asyncio.create_task(nostr_to_client(websocket))
|
|
|
|
# we kill this websocket and the subscriptions if the user disconnects and thus `connected==False`
|
|
while True:
|
|
await asyncio.sleep(10)
|
|
if not connected:
|
|
for s in my_subscriptions:
|
|
client.relay_manager.close_subscription(s)
|
|
break
|