From 629aa3a6c39de618ecb17fa5ecfe9b98103d8a85 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Wed, 31 May 2023 11:54:04 +0300 Subject: [PATCH 01/36] fix: event uniqueness (see comment in code) --- nostr/message_pool.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/nostr/message_pool.py b/nostr/message_pool.py index d364cf2..578d673 100644 --- a/nostr/message_pool.py +++ b/nostr/message_pool.py @@ -1,8 +1,9 @@ import json from queue import Queue from threading import Lock -from .message_type import RelayMessageType + from .event import Event +from .message_type import RelayMessageType class EventMessage: @@ -69,9 +70,18 @@ class MessagePool: ) with self.lock: if not event.id in self._unique_events: - self.events.put(EventMessage(event, subscription_id, url)) - self._unique_events.add(event.id) + self._accept_event(EventMessage(event, subscription_id, url)) elif message_type == RelayMessageType.NOTICE: self.notices.put(NoticeMessage(message_json[1], url)) elif message_type == RelayMessageType.END_OF_STORED_EVENTS: self.eose_notices.put(EndOfStoredEventsMessage(message_json[1], url)) + + def _accept_event(self, event_message: EventMessage): + """ + Event uniqueness is considered per `subscription_id`. + The `subscription_id` is rewritten to be unique and it is the same accross relays. + The same event can come from different subscriptions (from the same client or from different ones). + Clients that have joined later should receive older events. + """ + self.events.put(event_message) + self._unique_events.add(f"{event_message.subscription_id}_{event_message.event.id}") \ No newline at end of file From 6852a9aa5ef25688eff7443b9c9e6f233a79ac27 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Wed, 31 May 2023 11:55:41 +0300 Subject: [PATCH 02/36] fix: do not share the `subscriptions` object between relays Change in one relay reflects in others. The RelayManager takes care of updating each relay individually. --- nostr/relay_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index a698a33..fb5839f 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -23,7 +23,7 @@ class RelayManager: self, url: str, read: bool = True, write: bool = True, subscriptions={} ): policy = RelayPolicy(read, write) - relay = Relay(url, policy, self.message_pool, subscriptions) + relay = Relay(url, policy, self.message_pool, subscriptions.copy()) self.relays[url] = relay def remove_relay(self, url: str): From 727f8fc3ce90451c1173fbca2165fa1bb1e2c695 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Wed, 31 May 2023 11:55:56 +0300 Subject: [PATCH 03/36] fix: correctly handle `"REQ"` --- services.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/services.py b/services.py index 82f6578..b270539 100644 --- a/services.py +++ b/services.py @@ -48,16 +48,16 @@ class NostrRouter: break # registers a subscription if the input was a REQ request - subscription_id, json_str_rewritten = await self._add_nostr_subscription( + subscription_id, json_str_rewritten = await self._handle_nostr_subscription( json_str ) if subscription_id and json_str_rewritten: self.subscriptions.append(subscription_id) - json_str = json_str_rewritten # publish data - nostr.client.relay_manager.publish_message(json_str) + publish_data = json_str_rewritten or json_str + nostr.client.relay_manager.publish_message(publish_data) async def nostr_to_client(self): """Sends responses from relays back to the client. Polls the subscriptions of this client @@ -139,7 +139,7 @@ class NostrRouter: ) return NostrFilters(filter_list) - async def _add_nostr_subscription(self, json_str): + async def _handle_nostr_subscription(self, json_str): """Parses a (string) request from a client. If it is a subscription (REQ) or a CLOSE, it will register the subscription in the nostr client library that we're using so we can receive the callbacks on it later. Will rewrite the subscription id since we expect @@ -147,7 +147,7 @@ class NostrRouter: """ json_data = json.loads(json_str) assert len(json_data) - if json_data[0] in ["REQ", "CLOSE"]: + if json_data[0] == "REQ": subscription_id = json_data[1] subscription_id_rewritten = urlsafe_short_hash() self.oridinal_subscription_ids[subscription_id_rewritten] = subscription_id @@ -160,4 +160,12 @@ class NostrRouter: [json_data[0], subscription_id_rewritten, fltr] ) return subscription_id_rewritten, request_rewritten + elif json_data[0] == "CLOSE": + subscription_id = json_data[1] + subscription_id_rewritten = next((k for k, v in self.oridinal_subscription_ids.items() if v == subscription_id), None) + if subscription_id_rewritten: + nostr.client.relay_manager.close_subscription(subscription_id_rewritten) + request_rewritten = json.dumps([json_data[0], subscription_id_rewritten]) + return None, request_rewritten + return None, None From 7e2c9033346cb969c674801cda45dc7cfd968eaa Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Wed, 31 May 2023 12:13:57 +0300 Subject: [PATCH 04/36] fix: uniqueness check --- nostr/message_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nostr/message_pool.py b/nostr/message_pool.py index 578d673..02f7fd4 100644 --- a/nostr/message_pool.py +++ b/nostr/message_pool.py @@ -69,7 +69,7 @@ class MessagePool: e["sig"], ) with self.lock: - if not event.id in self._unique_events: + if not f"{subscription_id}_{event.id}" in self._unique_events: self._accept_event(EventMessage(event, subscription_id, url)) elif message_type == RelayMessageType.NOTICE: self.notices.put(NoticeMessage(message_json[1], url)) From 322679e7c5fe07b1004f6fb910e304dc81947651 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Wed, 21 Jun 2023 14:40:28 +0300 Subject: [PATCH 05/36] fix: do not use lambda in a loop --- nostr/pow.py | 2 ++ nostr/relay.py | 14 +++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/nostr/pow.py b/nostr/pow.py index e006288..fece484 100644 --- a/nostr/pow.py +++ b/nostr/pow.py @@ -1,7 +1,9 @@ import time + from .event import Event from .key import PrivateKey + def zero_bits(b: int) -> int: n = 0 diff --git a/nostr/relay.py b/nostr/relay.py index 7fb4baa..246b985 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -2,7 +2,9 @@ import json import time from queue import Queue from threading import Lock + from websocket import WebSocketApp + from .event import Event from .filter import Filters from .message_pool import MessagePool @@ -45,6 +47,7 @@ class Relay: self.queue = Queue() def connect(self, ssl_options: dict = None, proxy: dict = None): + print("### relay.connect", self.url) self.ws = WebSocketApp( self.url, on_open=self._on_open, @@ -81,24 +84,29 @@ class Relay: @property def ping(self): + print("### ping: ", self.url) ping_ms = int((self.ws.last_pong_tm - self.ws.last_ping_tm) * 1000) return ping_ms if self.connected and ping_ms > 0 else 0 def publish(self, message: str): self.queue.put(message) - def queue_worker(self, shutdown): + def queue_worker(self): + print("#### IN !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", self.url) while True: if self.connected: try: message = self.queue.get(timeout=1) + print("#### queue_worker", message) self.num_sent_events += 1 self.ws.send(message) - except: - if shutdown(): + except Exception as e: + if self.shutdown: + print("#### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! e [", e, self.url, self.shutdown," ]###") break else: time.sleep(0.1) + print("#### OUT !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", self.url) def add_subscription(self, id, filters: Filters): with self.lock: From d08e91b2c7b7954386141ee2b242b3f56da55fe0 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Wed, 21 Jun 2023 17:12:26 +0300 Subject: [PATCH 06/36] fix: do not double re-connect --- nostr/relay_manager.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index fb5839f..f6eba36 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -22,6 +22,8 @@ class RelayManager: def add_relay( self, url: str, read: bool = True, write: bool = True, subscriptions={} ): + if url in self.relays: + return policy = RelayPolicy(read, write) relay = Relay(url, policy, self.message_pool, subscriptions.copy()) self.relays[url] = relay @@ -42,21 +44,23 @@ class RelayManager: def open_connections(self, ssl_options: dict = None, proxy: dict = None): for relay in self.relays.values(): - self.threads[relay.url] = threading.Thread( - target=relay.connect, - args=(ssl_options, proxy), - name=f"{relay.url}-thread", - daemon=True, - ) - self.threads[relay.url].start() + if relay.url not in self.threads: + self.threads[relay.url] = threading.Thread( + target=relay.connect, + args=(ssl_options, proxy), + name=f"{relay.url}-thread", + daemon=True, + ) - self.queue_threads[relay.url] = threading.Thread( - target=relay.queue_worker, - args=(lambda: relay.shutdown,), - name=f"{relay.url}-queue", - daemon=True, - ) - self.queue_threads[relay.url].start() + self.threads[relay.url].start() + + if relay.url not in self.queue_threads: + self.queue_threads[relay.url] = threading.Thread( + target=relay.queue_worker, + name=f"{relay.url}-queue", + daemon=True, + ) + self.queue_threads[relay.url].start() def close_connections(self): for relay in self.relays.values(): From 09d2fc0493d56e407fa4b2c36d7fa0ba7fa6f672 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 10:02:15 +0300 Subject: [PATCH 07/36] refactor: `add_relay` logic --- nostr/client/client.py | 27 +++++++++------------ nostr/relay.py | 6 +++++ nostr/relay_manager.py | 55 ++++++++++++++++++++++++------------------ tasks.py | 20 +-------------- views_api.py | 14 ++++++++--- 5 files changed, 61 insertions(+), 61 deletions(-) diff --git a/nostr/client/client.py b/nostr/client/client.py index 6e70f71..4d10647 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -1,19 +1,15 @@ -from typing import * -import ssl -import time +import base64 import json import os -import base64 - -from ..event import Event -from ..relay_manager import RelayManager -from ..message_type import ClientMessageType -from ..key import PrivateKey, PublicKey +import time +from typing import * +from ..event import EncryptedDirectMessage, Event, EventKind from ..filter import Filter, Filters -from ..event import Event, EventKind, EncryptedDirectMessage -from ..relay_manager import RelayManager +from ..key import PrivateKey, PublicKey from ..message_type import ClientMessageType +from ..relay_manager import RelayManager +from ..subscription import Subscription # from aes import AESCipher from . import cbc @@ -38,12 +34,11 @@ class NostrClient: if connect: self.connect() - def connect(self): + async def connect(self, subscriptions: dict[str, Subscription] = {}): for relay in self.relays: - self.relay_manager.add_relay(relay) - self.relay_manager.open_connections( - {"cert_reqs": ssl.CERT_NONE} - ) # NOTE: This disables ssl certificate verification + self.relay_manager.add_relay(relay, subscriptions) + + def close(self): self.relay_manager.close_connections() diff --git a/nostr/relay.py b/nostr/relay.py index 246b985..b6207b5 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -91,6 +91,12 @@ class Relay: def publish(self, message: str): self.queue.put(message) + def publish_subscriptions(self): + for _, subscription in self.subscriptions.items(): + s = subscription.to_json_object() + json_str = json.dumps(["REQ", s["id"], s["filters"][0]]) + self.publish(json_str) + def queue_worker(self): print("#### IN !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", self.url) while True: diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index f6eba36..0ec324a 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -1,11 +1,12 @@ -import json + +import ssl import threading from .event import Event from .filter import Filters from .message_pool import MessagePool -from .message_type import ClientMessageType from .relay import Relay, RelayPolicy +from .subscription import Subscription class RelayException(Exception): @@ -20,19 +21,30 @@ class RelayManager: self.message_pool = MessagePool() def add_relay( - self, url: str, read: bool = True, write: bool = True, subscriptions={} - ): + self, url: str, read: bool = True, write: bool = True, subscriptions: dict[str, Subscription] = {} + ) -> Relay: if url in self.relays: return + policy = RelayPolicy(read, write) relay = Relay(url, policy, self.message_pool, subscriptions.copy()) self.relays[url] = relay + self.open_connection( + relay, + {"cert_reqs": ssl.CERT_NONE} + ) # NOTE: This disables ssl certificate verification + + relay.publish_subscriptions() + return relay + def remove_relay(self, url: str): - self.relays[url].close() - self.relays.pop(url) self.threads[url].join(timeout=1) self.threads.pop(url) + self.queue_threads[url].join(timeout=1) + self.queue_threads.pop(url) + self.relays[url].close() + self.relays.pop(url) def add_subscription(self, id: str, filters: Filters): for relay in self.relays.values(): @@ -42,25 +54,22 @@ class RelayManager: for relay in self.relays.values(): relay.close_subscription(id) - def open_connections(self, ssl_options: dict = None, proxy: dict = None): - for relay in self.relays.values(): - if relay.url not in self.threads: - self.threads[relay.url] = threading.Thread( - target=relay.connect, - args=(ssl_options, proxy), - name=f"{relay.url}-thread", - daemon=True, - ) - self.threads[relay.url].start() + def open_connection(self, relay: Relay, ssl_options: dict = None, proxy: dict = None): + self.threads[relay.url] = threading.Thread( + target=relay.connect, + args=(ssl_options, proxy), + name=f"{relay.url}-thread", + daemon=True, + ) + self.threads[relay.url].start() - if relay.url not in self.queue_threads: - self.queue_threads[relay.url] = threading.Thread( - target=relay.queue_worker, - name=f"{relay.url}-queue", - daemon=True, - ) - self.queue_threads[relay.url].start() + self.queue_threads[relay.url] = threading.Thread( + target=relay.queue_worker, + name=f"{relay.url}-queue", + daemon=True, + ) + self.queue_threads[relay.url].start() def close_connections(self): for relay in self.relays.values(): diff --git a/tasks.py b/tasks.py index beff9db..eb5391a 100644 --- a/tasks.py +++ b/tasks.py @@ -17,31 +17,13 @@ from .services import ( async def init_relays(): - # we save any subscriptions teporarily to re-add them after reinitializing the client - subscriptions = {} - for relay in nostr.client.relay_manager.relays.values(): - # relay.add_subscription(id, filters) - for subscription_id, filters in relay.subscriptions.items(): - subscriptions[subscription_id] = filters - # reinitialize the entire client nostr.__init__() # get relays from db relays = await get_relays() # set relays and connect to them nostr.client.relays = list(set([r.url for r in relays.__root__ if r.url])) - nostr.client.connect() - - await asyncio.sleep(2) - # re-add subscriptions - for subscription_id, subscription in subscriptions.items(): - nostr.client.relay_manager.add_subscription( - subscription_id, subscription.filters - ) - s = subscription.to_json_object() - json_str = json.dumps(["REQ", s["id"], s["filters"][0]]) - nostr.client.relay_manager.publish_message(json_str) - return + await nostr.client.connect() async def subscribe_events(): diff --git a/views_api.py b/views_api.py index 12f4f79..88193ad 100644 --- a/views_api.py +++ b/views_api.py @@ -1,7 +1,7 @@ import asyncio import json from http import HTTPStatus -from typing import Optional +from typing import List, Optional from fastapi import Depends, WebSocket from loguru import logger @@ -15,6 +15,7 @@ from .crud import add_relay, delete_relay, get_relays from .helpers import normalize_public_key from .models import Relay, RelayList, TestMessage, TestMessageResponse from .nostr.key import EncryptedDirectMessage, PrivateKey +from .nostr.relay import Relay as NostrRelay from .services import NostrRouter, nostr from .tasks import init_relays @@ -60,8 +61,15 @@ async def api_add_relay(relay: Relay) -> Optional[RelayList]: ) relay.id = urlsafe_short_hash() await add_relay(relay) - # we can't add relays during runtime yet - await init_relays() + + all_relays: List[NostrRelay] = nostr.client.relay_manager.relays.values() + if len(all_relays): + subscriptions = all_relays[0].subscriptions + nostr.client.relays.append(relay.url) + nostr.client.relay_manager.add_relay(subscriptions) + + nostr.client.relay_manager.connect_relay(relay.url) + return await get_relays() From 1a58f0fea8a6340bbce2d910a05410efd2b326cb Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 10:02:29 +0300 Subject: [PATCH 08/36] chore: code format --- nostr/bech32.py | 1 + nostr/event.py | 7 ++++--- nostr/key.py | 13 +++++++------ nostr/subscription.py | 1 + 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/nostr/bech32.py b/nostr/bech32.py index b068de7..61a92c4 100644 --- a/nostr/bech32.py +++ b/nostr/bech32.py @@ -23,6 +23,7 @@ from enum import Enum + class Encoding(Enum): """Enumeration type to list the various supported encodings.""" BECH32 = 1 diff --git a/nostr/event.py b/nostr/event.py index b903e0e..65b187d 100644 --- a/nostr/event.py +++ b/nostr/event.py @@ -1,10 +1,11 @@ -import time import json +import time from dataclasses import dataclass, field from enum import IntEnum -from typing import List -from secp256k1 import PublicKey from hashlib import sha256 +from typing import List + +from secp256k1 import PublicKey from .message_type import ClientMessageType diff --git a/nostr/key.py b/nostr/key.py index d34697f..8089e11 100644 --- a/nostr/key.py +++ b/nostr/key.py @@ -1,14 +1,15 @@ -import secrets import base64 -import secp256k1 -from cffi import FFI -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from cryptography.hazmat.primitives import padding +import secrets from hashlib import sha256 +import secp256k1 +from cffi import FFI +from cryptography.hazmat.primitives import padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from . import bech32 from .delegation import Delegation from .event import EncryptedDirectMessage, Event, EventKind -from . import bech32 class PublicKey: diff --git a/nostr/subscription.py b/nostr/subscription.py index 7afba20..76da0af 100644 --- a/nostr/subscription.py +++ b/nostr/subscription.py @@ -1,5 +1,6 @@ from .filter import Filters + class Subscription: def __init__(self, id: str, filters: Filters=None) -> None: self.id = id From 666009720a6b8189c8a85c49d2a828220f5801ec Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 10:22:13 +0300 Subject: [PATCH 09/36] chore: code clean-up --- nostr/client/client.py | 97 +----------------------------------------- nostr/relay.py | 5 --- tasks.py | 1 - 3 files changed, 1 insertion(+), 102 deletions(-) diff --git a/nostr/client/client.py b/nostr/client/client.py index 4d10647..faf5722 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -1,19 +1,9 @@ -import base64 -import json -import os import time from typing import * -from ..event import EncryptedDirectMessage, Event, EventKind -from ..filter import Filter, Filters -from ..key import PrivateKey, PublicKey -from ..message_type import ClientMessageType from ..relay_manager import RelayManager from ..subscription import Subscription -# from aes import AESCipher -from . import cbc - class NostrClient: relays = [ @@ -23,12 +13,8 @@ class NostrClient: "wss://nostr.oxtr.dev", ] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr" relay_manager = RelayManager() - private_key: PrivateKey - public_key: PublicKey - - def __init__(self, privatekey_hex: str = "", relays: List[str] = [], connect=True): - self.generate_keys(privatekey_hex) + def __init__(self, relays: List[str] = [], connect=True): if len(relays): self.relays = relays if connect: @@ -43,87 +29,6 @@ class NostrClient: def close(self): self.relay_manager.close_connections() - def generate_keys(self, privatekey_hex: str = None): - pk = bytes.fromhex(privatekey_hex) if privatekey_hex else None - self.private_key = PrivateKey(pk) - self.public_key = self.private_key.public_key - - def post(self, message: str): - event = Event(message, self.public_key.hex(), kind=EventKind.TEXT_NOTE) - self.private_key.sign_event(event) - event_json = event.to_message() - # print("Publishing message:") - # print(event_json) - self.relay_manager.publish_message(event_json) - - def get_post( - self, sender_publickey: PublicKey = None, callback_func=None, filter_kwargs={} - ): - filter = Filter( - authors=[sender_publickey.hex()] if sender_publickey else None, - kinds=[EventKind.TEXT_NOTE], - **filter_kwargs, - ) - filters = Filters([filter]) - subscription_id = os.urandom(4).hex() - self.relay_manager.add_subscription(subscription_id, filters) - - request = [ClientMessageType.REQUEST, subscription_id] - request.extend(filters.to_json_array()) - message = json.dumps(request) - self.relay_manager.publish_message(message) - - while True: - while self.relay_manager.message_pool.has_events(): - event_msg = self.relay_manager.message_pool.get_event() - if callback_func: - callback_func(event_msg.event) - time.sleep(0.1) - - def dm(self, message: str, to_pubkey: PublicKey): - dm = EncryptedDirectMessage( - recipient_pubkey=to_pubkey.hex(), cleartext_content=message - ) - self.private_key.sign_event(dm) - self.relay_manager.publish_event(dm) - - def get_dm(self, sender_publickey: PublicKey, callback_func=None): - filters = Filters( - [ - Filter( - kinds=[EventKind.ENCRYPTED_DIRECT_MESSAGE], - pubkey_refs=[sender_publickey.hex()], - ) - ] - ) - subscription_id = os.urandom(4).hex() - self.relay_manager.add_subscription(subscription_id, filters) - - request = [ClientMessageType.REQUEST, subscription_id] - request.extend(filters.to_json_array()) - message = json.dumps(request) - self.relay_manager.publish_message(message) - - while True: - while self.relay_manager.message_pool.has_events(): - event_msg = self.relay_manager.message_pool.get_event() - if "?iv=" in event_msg.event.content: - try: - shared_secret = self.private_key.compute_shared_secret( - event_msg.event.public_key - ) - aes = cbc.AESCipher(key=shared_secret) - enc_text_b64, iv_b64 = event_msg.event.content.split("?iv=") - iv = base64.decodebytes(iv_b64.encode("utf-8")) - enc_text = base64.decodebytes(enc_text_b64.encode("utf-8")) - dec_text = aes.decrypt(iv, enc_text) - if callback_func: - callback_func(event_msg.event, dec_text) - except: - pass - break - time.sleep(0.1) - def subscribe( self, callback_events_func=None, diff --git a/nostr/relay.py b/nostr/relay.py index b6207b5..94e532c 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -122,11 +122,6 @@ class Relay: with self.lock: self.subscriptions.pop(id) - def update_subscription(self, id: str, filters: Filters) -> None: - with self.lock: - subscription = self.subscriptions[id] - subscription.filters = filters - def to_json_object(self) -> dict: return { "url": self.url, diff --git a/tasks.py b/tasks.py index eb5391a..069c57d 100644 --- a/tasks.py +++ b/tasks.py @@ -31,7 +31,6 @@ async def subscribe_events(): await asyncio.sleep(2) def callback_events(eventMessage: EventMessage): - # print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}") if eventMessage.subscription_id in received_subscription_events: # do not add duplicate events (by event id) if eventMessage.event.id in set( From e2458d43dfdf1028d288810839f45fbc26e462a9 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 13:06:00 +0300 Subject: [PATCH 10/36] chore: code clean-up --- nostr/client/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nostr/client/client.py b/nostr/client/client.py index faf5722..0ff85a8 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -11,7 +11,7 @@ class NostrClient: "wss://nostr.zebedee.cloud", "wss://nodestr.fmt.wiz.biz", "wss://nostr.oxtr.dev", - ] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr" + ] relay_manager = RelayManager() def __init__(self, relays: List[str] = [], connect=True): From 6c66b71302c85198ff890db04ff2af60e9c2adac Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 13:06:23 +0300 Subject: [PATCH 11/36] chore: code clean-up --- nostr/relay_manager.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index 0ec324a..004b197 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -79,14 +79,3 @@ class RelayManager: for relay in self.relays.values(): if relay.policy.should_write: relay.publish(message) - - def publish_event(self, event: Event): - """Verifies that the Event is publishable before submitting it to relays""" - if event.signature is None: - raise RelayException(f"Could not publish {event.id}: must be signed") - - if not event.verify(): - raise RelayException( - f"Could not publish {event.id}: failed to verify signature {event.signature}" - ) - self.publish_message(event.to_message()) From 62641b56a6c7d0283d938b92cd69b437addbcb65 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 13:06:42 +0300 Subject: [PATCH 12/36] chore: code clean-up --- views_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/views_api.py b/views_api.py index 88193ad..26a68e9 100644 --- a/views_api.py +++ b/views_api.py @@ -17,7 +17,6 @@ from .models import Relay, RelayList, TestMessage, TestMessageResponse from .nostr.key import EncryptedDirectMessage, PrivateKey from .nostr.relay import Relay as NostrRelay from .services import NostrRouter, nostr -from .tasks import init_relays # we keep this in all_routers: list[NostrRouter] = [] From 414ae16cb0cbad9d457730a16925d2984e9563a1 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 13:06:57 +0300 Subject: [PATCH 13/36] refactor: split large methods --- nostr/relay.py | 1 + services.py | 162 ++++++++++++++++++++++++++----------------------- 2 files changed, 86 insertions(+), 77 deletions(-) diff --git a/nostr/relay.py b/nostr/relay.py index 94e532c..8d3545b 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -121,6 +121,7 @@ class Relay: def close_subscription(self, id: str) -> None: with self.lock: self.subscriptions.pop(id) + self.publish(json.dumps(["CLOSE", id])) def to_json_object(self) -> dict: return { diff --git a/services.py b/services.py index b270539..42e8c2c 100644 --- a/services.py +++ b/services.py @@ -2,17 +2,16 @@ import asyncio import json from typing import List, Union -from fastapi import WebSocket, WebSocketDisconnect +from fastapi import WebSocketDisconnect from loguru import logger from lnbits.helpers import urlsafe_short_hash -from .models import Event, Filter, Filters, Relay, RelayList +from .models import Event, Filter from .nostr.client.client import NostrClient as NostrClientLib -from .nostr.event import Event as NostrEvent from .nostr.filter import Filter as NostrFilter from .nostr.filter import Filters as NostrFilters -from .nostr.message_pool import EndOfStoredEventsMessage, EventMessage, NoticeMessage +from .nostr.message_pool import EndOfStoredEventsMessage, NoticeMessage received_subscription_events: dict[str, list[Event]] = {} received_subscription_notices: list[NoticeMessage] = [] @@ -33,7 +32,7 @@ class NostrRouter: self.connected: bool = True self.websocket = websocket self.tasks: List[asyncio.Task] = [] - self.oridinal_subscription_ids = {} + self.original_subscription_ids = {} async def client_to_nostr(self): """Receives requests / data from the client and forwards it to relays. If the @@ -47,17 +46,7 @@ class NostrRouter: self.connected = False break - # registers a subscription if the input was a REQ request - subscription_id, json_str_rewritten = await self._handle_nostr_subscription( - json_str - ) - - if subscription_id and json_str_rewritten: - self.subscriptions.append(subscription_id) - - # publish data - publish_data = json_str_rewritten or json_str - nostr.client.relay_manager.publish_message(publish_data) + await self._handle_client_to_nostr(json_str) async def nostr_to_client(self): """Sends responses from relays back to the client. Polls the subscriptions of this client @@ -67,50 +56,12 @@ class NostrRouter: that we had previously rewritten in order to avoid collisions when multiple clients use the same id. """ while True and self.connected: - for s in self.subscriptions: - if s in received_subscription_events: - while len(received_subscription_events[s]): - my_event = received_subscription_events[s].pop(0) - # event.to_message() does not include the subscription ID, we have to add it manually - event_json = { - "id": my_event.id, - "pubkey": my_event.public_key, - "created_at": my_event.created_at, - "kind": my_event.kind, - "tags": my_event.tags, - "content": my_event.content, - "sig": my_event.signature, - } - - # this reconstructs the original response from the relay - # reconstruct original subscription id - s_original = self.oridinal_subscription_ids[s] - event_to_forward = ["EVENT", s_original, event_json] - - # print("Event to forward") - # print(json.dumps(event_to_forward)) - - # send data back to client - await self.websocket.send_text(json.dumps(event_to_forward)) - if s in received_subscription_eosenotices: - my_event = received_subscription_eosenotices[s] - s_original = self.oridinal_subscription_ids[s] - event_to_forward = ["EOSE", s_original] - del received_subscription_eosenotices[s] - # send data back to client - # print("Sending EOSE", event_to_forward) - await self.websocket.send_text(json.dumps(event_to_forward)) - - # if s in received_subscription_notices: - while len(received_subscription_notices): - my_event = received_subscription_notices.pop(0) - event_to_forward = ["NOTICE", my_event.content] - # send data back to client - logger.debug("Nostrclient: Received notice", event_to_forward[1]) - # note: we don't send it to the user because we don't know who should receive it - # await self.websocket.send_text(json.dumps(event_to_forward)) + await self._handle_subscriptions() + self._handle_notices() + await asyncio.sleep(0.1) + async def start(self): self.tasks.append(asyncio.create_task(self.client_to_nostr())) self.tasks.append(asyncio.create_task(self.nostr_to_client())) @@ -120,6 +71,53 @@ class NostrRouter: t.cancel() self.connected = False + async def _handle_subscriptions(self): + for s in self.subscriptions: + if s in received_subscription_events: + await self._handle_received_subscription_events(s) + if s in received_subscription_eosenotices: + await self._handle_received_subscription_eosenotices(s) + + + + async def _handle_received_subscription_eosenotices(self, s): + my_event = received_subscription_eosenotices[s] + s_original = self.original_subscription_ids[s] + event_to_forward = ["EOSE", s_original] + del received_subscription_eosenotices[s] + # send data back to client + # print("Sending EOSE", event_to_forward) + await self.websocket.send_text(json.dumps(event_to_forward)) + + async def _handle_received_subscription_events(self, s): + while len(received_subscription_events[s]): + my_event = received_subscription_events[s].pop(0) + # event.to_message() does not include the subscription ID, we have to add it manually + event_json = { + "id": my_event.id, + "pubkey": my_event.public_key, + "created_at": my_event.created_at, + "kind": my_event.kind, + "tags": my_event.tags, + "content": my_event.content, + "sig": my_event.signature, + } + + # this reconstructs the original response from the relay + # reconstruct original subscription id + s_original = self.original_subscription_ids[s] + event_to_forward = ["EVENT", s_original, event_json] + await self.websocket.send_text(json.dumps(event_to_forward)) + + def _handle_notices(self): + while len(received_subscription_notices): + my_event = received_subscription_notices.pop(0) + event_to_forward = ["NOTICE", my_event.content] + # note: we don't send it to the user because we don't know who should receive it + logger.debug("Nostrclient: Received notice", event_to_forward[1]) + + + def _marshall_nostr_filters(self, data: Union[dict, list]): filters = data if isinstance(data, list) else [data] filters = [Filter.parse_obj(f) for f in filters] @@ -139,7 +137,7 @@ class NostrRouter: ) return NostrFilters(filter_list) - async def _handle_nostr_subscription(self, json_str): + async def _handle_client_to_nostr(self, json_str): """Parses a (string) request from a client. If it is a subscription (REQ) or a CLOSE, it will register the subscription in the nostr client library that we're using so we can receive the callbacks on it later. Will rewrite the subscription id since we expect @@ -147,25 +145,35 @@ class NostrRouter: """ json_data = json.loads(json_str) assert len(json_data) + if json_data[0] == "REQ": - subscription_id = json_data[1] - subscription_id_rewritten = urlsafe_short_hash() - self.oridinal_subscription_ids[subscription_id_rewritten] = subscription_id - fltr = json_data[2] - filters = self._marshall_nostr_filters(fltr) - nostr.client.relay_manager.add_subscription( + self._handle_client_req(json_data) + return + + if json_data[0] == "CLOSE": + self.handle_client_close(json_data[1]) + return + + if json_data[0] == "EVENT": + nostr.client.relay_manager.publish_message(json_str) + return + + def _handle_client_req(self, json_data): + subscription_id = json_data[1] + subscription_id_rewritten = urlsafe_short_hash() + self.original_subscription_ids[subscription_id_rewritten] = subscription_id + fltr = json_data[2] + filters = self._marshall_nostr_filters(fltr) + + nostr.client.relay_manager.add_subscription( subscription_id_rewritten, filters ) - request_rewritten = json.dumps( - [json_data[0], subscription_id_rewritten, fltr] - ) - return subscription_id_rewritten, request_rewritten - elif json_data[0] == "CLOSE": - subscription_id = json_data[1] - subscription_id_rewritten = next((k for k, v in self.oridinal_subscription_ids.items() if v == subscription_id), None) - if subscription_id_rewritten: - nostr.client.relay_manager.close_subscription(subscription_id_rewritten) - request_rewritten = json.dumps([json_data[0], subscription_id_rewritten]) - return None, request_rewritten + request_rewritten = json.dumps([json_data[0], subscription_id_rewritten, fltr]) + + self.subscriptions.append(subscription_id_rewritten) + nostr.client.relay_manager.publish_message(request_rewritten) - return None, None + def handle_client_close(self, subscription_id): + subscription_id_rewritten = next((k for k, v in self.original_subscription_ids.items() if v == subscription_id), None) + if subscription_id_rewritten: + nostr.client.relay_manager.close_subscription(subscription_id_rewritten) From 4238be498f655693604fa1df79fdc6e9eab56a96 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 13:19:22 +0300 Subject: [PATCH 14/36] chore: code clean-up --- nostr/relay.py | 6 ------ nostr/relay_manager.py | 1 - services.py | 6 ++---- tasks.py | 3 --- views_api.py | 1 - 5 files changed, 2 insertions(+), 15 deletions(-) diff --git a/nostr/relay.py b/nostr/relay.py index 8d3545b..cc992f0 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -47,7 +47,6 @@ class Relay: self.queue = Queue() def connect(self, ssl_options: dict = None, proxy: dict = None): - print("### relay.connect", self.url) self.ws = WebSocketApp( self.url, on_open=self._on_open, @@ -84,7 +83,6 @@ class Relay: @property def ping(self): - print("### ping: ", self.url) ping_ms = int((self.ws.last_pong_tm - self.ws.last_ping_tm) * 1000) return ping_ms if self.connected and ping_ms > 0 else 0 @@ -98,21 +96,17 @@ class Relay: self.publish(json_str) def queue_worker(self): - print("#### IN !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", self.url) while True: if self.connected: try: message = self.queue.get(timeout=1) - print("#### queue_worker", message) self.num_sent_events += 1 self.ws.send(message) except Exception as e: if self.shutdown: - print("#### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! e [", e, self.url, self.shutdown," ]###") break else: time.sleep(0.1) - print("#### OUT !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", self.url) def add_subscription(self, id, filters: Filters): with self.lock: diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index 004b197..b2e4fc2 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -2,7 +2,6 @@ import ssl import threading -from .event import Event from .filter import Filters from .message_pool import MessagePool from .relay import Relay, RelayPolicy diff --git a/services.py b/services.py index 42e8c2c..ed87b39 100644 --- a/services.py +++ b/services.py @@ -81,12 +81,10 @@ class NostrRouter: async def _handle_received_subscription_eosenotices(self, s): - my_event = received_subscription_eosenotices[s] s_original = self.original_subscription_ids[s] event_to_forward = ["EOSE", s_original] del received_subscription_eosenotices[s] - # send data back to client - # print("Sending EOSE", event_to_forward) + await self.websocket.send_text(json.dumps(event_to_forward)) async def _handle_received_subscription_events(self, s): @@ -114,7 +112,7 @@ class NostrRouter: my_event = received_subscription_notices.pop(0) event_to_forward = ["NOTICE", my_event.content] # note: we don't send it to the user because we don't know who should receive it - logger.debug("Nostrclient: Received notice", event_to_forward[1]) + logger.debug("Nostrclient: Received notice: ", event_to_forward[1]) diff --git a/tasks.py b/tasks.py index 069c57d..1db0d86 100644 --- a/tasks.py +++ b/tasks.py @@ -4,10 +4,7 @@ import ssl import threading from .crud import get_relays -from .nostr.event import Event -from .nostr.key import PublicKey from .nostr.message_pool import EndOfStoredEventsMessage, EventMessage, NoticeMessage -from .nostr.relay_manager import RelayManager from .services import ( nostr, received_subscription_eosenotices, diff --git a/views_api.py b/views_api.py index 26a68e9..a16599b 100644 --- a/views_api.py +++ b/views_api.py @@ -1,5 +1,4 @@ import asyncio -import json from http import HTTPStatus from typing import List, Optional From c0632cabe53aa579cd9753fde8170a3a5bed0cbf Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 14:33:26 +0300 Subject: [PATCH 15/36] refactor: move stop logic --- services.py | 11 ++++++++++- views_api.py | 5 ----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/services.py b/services.py index ed87b39..5546d90 100644 --- a/services.py +++ b/services.py @@ -68,7 +68,16 @@ class NostrRouter: async def stop(self): for t in self.tasks: - t.cancel() + try: + t.cancel() + except: + pass + + for s in self.subscriptions: + try: + nostr.client.relay_manager.close_subscription(s) + except: + pass self.connected = False async def _handle_subscriptions(self): diff --git a/views_api.py b/views_api.py index a16599b..1a8d227 100644 --- a/views_api.py +++ b/views_api.py @@ -152,11 +152,6 @@ async def ws_relay(websocket: WebSocket) -> None: while True: await asyncio.sleep(10) if not router.connected: - for s in router.subscriptions: - try: - nostr.client.relay_manager.close_subscription(s) - except: - pass await router.stop() all_routers.remove(router) break From af14e1c47bb6afc426e711af86673f2eb9965c73 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 15:15:00 +0300 Subject: [PATCH 16/36] fix: do not lose subscriptions if no relay --- nostr/relay_manager.py | 9 +++++++++ views_api.py | 4 +--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index b2e4fc2..63eb68f 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -1,4 +1,5 @@ +from asyncio import Lock import ssl import threading @@ -18,6 +19,8 @@ class RelayManager: self.threads: dict[str, threading.Thread] = {} self.queue_threads: dict[str, threading.Thread] = {} self.message_pool = MessagePool() + self._cached_subscriptions = dict[str, Subscription] = {} + self._subscriptions_lock = Lock() def add_relay( self, url: str, read: bool = True, write: bool = True, subscriptions: dict[str, Subscription] = {} @@ -46,10 +49,16 @@ class RelayManager: self.relays.pop(url) def add_subscription(self, id: str, filters: Filters): + with self._subscriptions_lock: + self._cached_subscriptions[id] = Subscription(id, filters) + for relay in self.relays.values(): relay.add_subscription(id, filters) def close_subscription(self, id: str): + with self._subscriptions_lock: + self._cached_subscriptions.pop(id) + for relay in self.relays.values(): relay.close_subscription(id) diff --git a/views_api.py b/views_api.py index 1a8d227..0036070 100644 --- a/views_api.py +++ b/views_api.py @@ -62,11 +62,9 @@ async def api_add_relay(relay: Relay) -> Optional[RelayList]: all_relays: List[NostrRelay] = nostr.client.relay_manager.relays.values() if len(all_relays): - subscriptions = all_relays[0].subscriptions nostr.client.relays.append(relay.url) - nostr.client.relay_manager.add_relay(subscriptions) + nostr.client.relay_manager.add_relay() - nostr.client.relay_manager.connect_relay(relay.url) return await get_relays() From 811bfdc45a707192d0c43ac91738ae76344cbc3f Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 16:55:09 +0300 Subject: [PATCH 17/36] fix: init new relays with previous subscriptions --- nostr/client/client.py | 4 ++-- nostr/relay_manager.py | 16 ++++++++-------- views_api.py | 6 ++---- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/nostr/client/client.py b/nostr/client/client.py index 0ff85a8..7c6063e 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -20,9 +20,9 @@ class NostrClient: if connect: self.connect() - async def connect(self, subscriptions: dict[str, Subscription] = {}): + async def connect(self): for relay in self.relays: - self.relay_manager.add_relay(relay, subscriptions) + self.relay_manager.add_relay(relay) diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index 63eb68f..ddc833c 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -1,5 +1,4 @@ -from asyncio import Lock import ssl import threading @@ -19,17 +18,18 @@ class RelayManager: self.threads: dict[str, threading.Thread] = {} self.queue_threads: dict[str, threading.Thread] = {} self.message_pool = MessagePool() - self._cached_subscriptions = dict[str, Subscription] = {} - self._subscriptions_lock = Lock() + self._cached_subscriptions: dict[str, Subscription] = {} + self._subscriptions_lock = threading.Lock() - def add_relay( - self, url: str, read: bool = True, write: bool = True, subscriptions: dict[str, Subscription] = {} - ) -> Relay: + def add_relay(self, url: str, read: bool = True, write: bool = True) -> Relay: if url in self.relays: return - + + with self._subscriptions_lock: + subscriptions = self._cached_subscriptions.copy() + policy = RelayPolicy(read, write) - relay = Relay(url, policy, self.message_pool, subscriptions.copy()) + relay = Relay(url, policy, self.message_pool, subscriptions) self.relays[url] = relay self.open_connection( diff --git a/views_api.py b/views_api.py index 0036070..131da44 100644 --- a/views_api.py +++ b/views_api.py @@ -60,10 +60,8 @@ async def api_add_relay(relay: Relay) -> Optional[RelayList]: relay.id = urlsafe_short_hash() await add_relay(relay) - all_relays: List[NostrRelay] = nostr.client.relay_manager.relays.values() - if len(all_relays): - nostr.client.relays.append(relay.url) - nostr.client.relay_manager.add_relay() + nostr.client.relays.append(relay.url) + nostr.client.relay_manager.add_relay(relay.url) return await get_relays() From defb9b8963efe01f4386c472c4c7d7953ddfe09d Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 17:10:34 +0300 Subject: [PATCH 18/36] chore: code clean-up --- nostr/client/client.py | 9 +-------- tasks.py | 2 -- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/nostr/client/client.py b/nostr/client/client.py index 7c6063e..adc1a6b 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -6,12 +6,7 @@ from ..subscription import Subscription class NostrClient: - relays = [ - "wss://nostr-pub.wellorder.net", - "wss://nostr.zebedee.cloud", - "wss://nodestr.fmt.wiz.biz", - "wss://nostr.oxtr.dev", - ] + relays = [ ] relay_manager = RelayManager() def __init__(self, relays: List[str] = [], connect=True): @@ -24,8 +19,6 @@ class NostrClient: for relay in self.relays: self.relay_manager.add_relay(relay) - - def close(self): self.relay_manager.close_connections() diff --git a/tasks.py b/tasks.py index 1db0d86..7d471fc 100644 --- a/tasks.py +++ b/tasks.py @@ -1,6 +1,4 @@ import asyncio -import json -import ssl import threading from .crud import get_relays From 64426d187c31c1ae50f2b020b0d53a5a0e403c09 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 22 Jun 2023 17:26:47 +0300 Subject: [PATCH 19/36] feat: add predefined relay list plus code format --- templates/nostrclient/index.html | 191 +++++++++---------------------- 1 file changed, 53 insertions(+), 138 deletions(-) diff --git a/templates/nostrclient/index.html b/templates/nostrclient/index.html index 10fd4a5..7e83b6d 100644 --- a/templates/nostrclient/index.html +++ b/templates/nostrclient/index.html @@ -6,18 +6,18 @@
- +
- Add relay - + + + + + + + +
@@ -29,36 +29,18 @@
Nostrclient
- +
- + @@ -255,6 +256,13 @@ label: 'Ping', field: 'ping' } + , + { + name: 'delete', + align: 'center', + label: '', + field: '' + } ], pagination: { rowsPerPage: 10 @@ -329,8 +337,14 @@ this.relayToAdd = relayUrl await this.addRelay() }, + showDeleteRelayDialog: function (url) { + LNbits.utils + .confirmDialog(' Are you sure you want to remove this relay?') + .onOk(async () => { + this.deleteRelay(url) + }) + }, deleteRelay(url) { - console.log('DELETE RELAY ' + url) LNbits.api .request( 'DELETE', @@ -338,12 +352,14 @@ this.g.user.wallets[0].adminkey, { url: url } ) - .then(function (response) { - if (response.data) { - console.log(response.data) + .then((response) => { + const relayIndex = this.nostrrelayLinks.indexOf(r => r.url === url) + if (relayIndex !== -1) { + this.nostrrelayLinks.splice(relayIndex, 1) } }) - .catch(function (error) { + .catch((error) => { + console.error(error) LNbits.utils.notifyApiError(error) }) }, From ce8b95c2c7a700885ff533be126bd37876ccf8d7 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Mon, 26 Jun 2023 10:21:06 +0300 Subject: [PATCH 27/36] feat: restart disconnected relays --- __init__.py | 4 +++- nostr/relay.py | 44 +++++++++++++++++++++--------------------- nostr/relay_manager.py | 37 ++++++++++++++++++++++++----------- tasks.py | 12 ++++++++++++ 4 files changed, 63 insertions(+), 34 deletions(-) diff --git a/__init__.py b/__init__.py index e98d8b8..ec6f4b4 100644 --- a/__init__.py +++ b/__init__.py @@ -36,7 +36,7 @@ def nostr_renderer(): return template_renderer(["lnbits/extensions/nostrclient/templates"]) -from .tasks import init_relays, subscribe_events +from .tasks import check_relays, init_relays, subscribe_events from .views import * # noqa from .views_api import * # noqa @@ -47,3 +47,5 @@ def nostrclient_start(): scheduled_tasks.append(task1) task2 = loop.create_task(catch_everything_and_restart(subscribe_events)) scheduled_tasks.append(task2) + task3 = loop.create_task(catch_everything_and_restart(check_relays)) + scheduled_tasks.append(task3) diff --git a/nostr/relay.py b/nostr/relay.py index cc992f0..6ff16f5 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -3,6 +3,7 @@ import time from queue import Queue from threading import Lock +from loguru import logger from websocket import WebSocketApp from .event import Event @@ -69,17 +70,12 @@ class Relay: def close(self): self.ws.close() + self.connected = False self.shutdown = True - def check_reconnect(self): - try: - self.close() - except: - pass - self.connected = False - if self.reconnect: - time.sleep(self.error_counter**2) - self.connect(self.ssl_options, self.proxy) + @property + def error_threshold_reached(self): + return self.error_threshold and self.error_counter > self.error_threshold @property def ping(self): @@ -104,6 +100,7 @@ class Relay: self.ws.send(message) except Exception as e: if self.shutdown: + logger.warning(f"Closing queue worker for {self.url}") break else: time.sleep(0.1) @@ -127,31 +124,34 @@ class Relay: ], } - def _on_open(self, class_obj): + def _on_open(self, _): + logger.info(f"Connected to relay: '{self.url}'.") self.connected = True - pass + - def _on_close(self, class_obj, status_code, message): - self.connected = False - if self.error_threshold and self.error_counter > self.error_threshold: - pass - else: - self.check_reconnect() - pass + def _on_close(self, _, status_code, message): + logger.warning(f"Connection to relay {self.url} closed. Status: '{status_code}'. Message: '{message}'.") + self.close() - def _on_message(self, class_obj, message: str): + + + + def _on_message(self, _, message: str): if self._is_valid_message(message): self.num_received_events += 1 self.message_pool.add_message(message, self.url) + else: + logger.debug(f"Invalid relay message: '{message}'.") - def _on_error(self, class_obj, error): + def _on_error(self, _, error): + logger.warning(f"Relay error: '{str(error)}'") self.connected = False self.error_counter += 1 - def _on_ping(self, class_obj, message): + def _on_ping(self, _*): return - def _on_pong(self, class_obj, message): + def _on_pong(self, _*): return def _is_valid_message(self, message: str) -> bool: diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index ddc833c..3ccc2fd 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -2,6 +2,8 @@ import ssl import threading +from loguru import logger + from .filter import Filters from .message_pool import MessagePool from .relay import Relay, RelayPolicy @@ -22,7 +24,7 @@ class RelayManager: self._subscriptions_lock = threading.Lock() def add_relay(self, url: str, read: bool = True, write: bool = True) -> Relay: - if url in self.relays: + if url in list(self.relays.keys()): return with self._subscriptions_lock: @@ -32,7 +34,7 @@ class RelayManager: relay = Relay(url, policy, self.message_pool, subscriptions) self.relays[url] = relay - self.open_connection( + self._open_connection( relay, {"cert_reqs": ssl.CERT_NONE} ) # NOTE: This disables ssl certificate verification @@ -62,8 +64,23 @@ class RelayManager: for relay in self.relays.values(): relay.close_subscription(id) + def check_and_restart_relays(self): + stopped_relays = [r for r in self.relays.values() if r.shutdown] + for relay in stopped_relays: + logger.info(f"Restarting connection to relay '{relay.url}'") + self._restart_relay(relay) - def open_connection(self, relay: Relay, ssl_options: dict = None, proxy: dict = None): + + def close_connections(self): + for relay in self.relays.values(): + relay.close() + + def publish_message(self, message: str): + for relay in self.relays.values(): + if relay.policy.should_write: + relay.publish(message) + + def _open_connection(self, relay: Relay, ssl_options: dict = None, proxy: dict = None): self.threads[relay.url] = threading.Thread( target=relay.connect, args=(ssl_options, proxy), @@ -79,11 +96,9 @@ class RelayManager: ) self.queue_threads[relay.url].start() - def close_connections(self): - for relay in self.relays.values(): - relay.close() - - def publish_message(self, message: str): - for relay in self.relays.values(): - if relay.policy.should_write: - relay.publish(message) + def _restart_relay(self, relay: Relay): + if relay.error_threshold_reached: + return + self.remove_relay(relay.url) + new_relay = self.add_relay(relay.url) + new_relay.error_counter = relay.error_counter \ No newline at end of file diff --git a/tasks.py b/tasks.py index 40ca9d9..05057e7 100644 --- a/tasks.py +++ b/tasks.py @@ -1,6 +1,8 @@ import asyncio import threading +from loguru import logger + from . import nostr from .crud import get_relays from .nostr.message_pool import EndOfStoredEventsMessage, EventMessage, NoticeMessage @@ -17,6 +19,16 @@ async def init_relays(): await nostr.client.connect() +async def check_relays(): + """ Check relays that have been disconnected """ + while True: + try: + await asyncio.sleep(20) + nostr.client.relay_manager.check_and_restart_relays() + except Exception as e: + logger.warning(f"Cannot restart relays: '{str(e)}'.") + + async def subscribe_events(): while not any([r.connected for r in nostr.client.relay_manager.relays.values()]): await asyncio.sleep(2) From dabc26f8a6eab854efe5c3e56e8a6a3fb62612e5 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Mon, 26 Jun 2023 10:53:48 +0300 Subject: [PATCH 28/36] fix: unused params --- nostr/relay.py | 4 ++-- nostr/relay_manager.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/nostr/relay.py b/nostr/relay.py index 6ff16f5..d98a219 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -148,10 +148,10 @@ class Relay: self.connected = False self.error_counter += 1 - def _on_ping(self, _*): + def _on_ping(self, *_): return - def _on_pong(self, _*): + def _on_pong(self, *_): return def _is_valid_message(self, message: str) -> bool: diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index 3ccc2fd..01889d9 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -67,7 +67,6 @@ class RelayManager: def check_and_restart_relays(self): stopped_relays = [r for r in self.relays.values() if r.shutdown] for relay in stopped_relays: - logger.info(f"Restarting connection to relay '{relay.url}'") self._restart_relay(relay) @@ -99,6 +98,8 @@ class RelayManager: def _restart_relay(self, relay: Relay): if relay.error_threshold_reached: return + logger.info(f"Restarting connection to relay '{relay.url}'") + self.remove_relay(relay.url) new_relay = self.add_relay(relay.url) new_relay.error_counter = relay.error_counter \ No newline at end of file From f8d578e6aa9f3d3d881cf0fd52d23e18aa553a31 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Mon, 26 Jun 2023 12:20:06 +0300 Subject: [PATCH 29/36] feat: improve error handling and reporting --- models.py | 8 ++- nostr/message_type.py | 11 ++++- nostr/relay.py | 84 ++++++++++++++++++++------------ nostr/relay_manager.py | 3 +- templates/nostrclient/index.html | 19 ++++++++ views_api.py | 18 ++++--- 6 files changed, 101 insertions(+), 42 deletions(-) diff --git a/models.py b/models.py index fe12e3b..1006605 100644 --- a/models.py +++ b/models.py @@ -8,12 +8,18 @@ from pydantic import BaseModel, Field from lnbits.helpers import urlsafe_short_hash +class RelayStatus(BaseModel): + num_sent_events: Optional[int] = 0 + num_received_events: Optional[int] = 0 + error_counter: Optional[int] = 0 + error_list: Optional[List] = [] + class Relay(BaseModel): id: Optional[str] = None url: Optional[str] = None connected: Optional[bool] = None connected_string: Optional[str] = None - status: Optional[str] = None + status: Optional[RelayStatus] = None active: Optional[bool] = None ping: Optional[int] = None diff --git a/nostr/message_type.py b/nostr/message_type.py index 3f5206b..d37cdfd 100644 --- a/nostr/message_type.py +++ b/nostr/message_type.py @@ -3,13 +3,20 @@ class ClientMessageType: REQUEST = "REQ" CLOSE = "CLOSE" + class RelayMessageType: EVENT = "EVENT" NOTICE = "NOTICE" END_OF_STORED_EVENTS = "EOSE" + COMMAND_RESULT = "OK" @staticmethod def is_valid(type: str) -> bool: - if type == RelayMessageType.EVENT or type == RelayMessageType.NOTICE or type == RelayMessageType.END_OF_STORED_EVENTS: + if ( + type == RelayMessageType.EVENT + or type == RelayMessageType.NOTICE + or type == RelayMessageType.END_OF_STORED_EVENTS + or type == RelayMessageType.COMMAND_RESULT + ): return True - return False \ No newline at end of file + return False diff --git a/nostr/relay.py b/nostr/relay.py index d98a219..4c989c2 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -2,6 +2,7 @@ import json import time from queue import Queue from threading import Lock +from typing import List from loguru import logger from websocket import WebSocketApp @@ -39,6 +40,7 @@ class Relay: self.shutdown: bool = False self.error_counter: int = 0 self.error_threshold: int = 100 + self.error_list: List[str] = [] self.num_received_events: int = 0 self.num_sent_events: int = 0 self.num_subscriptions: int = 0 @@ -100,7 +102,7 @@ class Relay: self.ws.send(message) except Exception as e: if self.shutdown: - logger.warning(f"Closing queue worker for {self.url}") + logger.warning(f"Closing queue worker for '{self.url}'.") break else: time.sleep(0.1) @@ -133,18 +135,14 @@ class Relay: logger.warning(f"Connection to relay {self.url} closed. Status: '{status_code}'. Message: '{message}'.") self.close() - - - def _on_message(self, _, message: str): if self._is_valid_message(message): self.num_received_events += 1 self.message_pool.add_message(message, self.url) - else: - logger.debug(f"Invalid relay message: '{message}'.") def _on_error(self, _, error): logger.warning(f"Relay error: '{str(error)}'") + self._append_error_message(str(error)) self.connected = False self.error_counter += 1 @@ -161,33 +159,57 @@ class Relay: message_json = json.loads(message) message_type = message_json[0] + if not RelayMessageType.is_valid(message_type): return False + if message_type == RelayMessageType.EVENT: - if not len(message_json) == 3: - return False - - subscription_id = message_json[1] - with self.lock: - if subscription_id not in self.subscriptions: - return False - - e = message_json[2] - event = Event( - e["content"], - e["pubkey"], - e["created_at"], - e["kind"], - e["tags"], - e["sig"], - ) - if not event.verify(): - return False - - with self.lock: - subscription = self.subscriptions[subscription_id] - - if subscription.filters and not subscription.filters.match(event): - return False + return self._is_valid_event_message(message_json) + + if message_type == RelayMessageType.COMMAND_RESULT: + return self._is_valid_command_result_message(message, message_json) return True + + def _is_valid_event_message(self, message_json): + if not len(message_json) == 3: + return False + + subscription_id = message_json[1] + with self.lock: + if subscription_id not in self.subscriptions: + return False + + e = message_json[2] + event = Event( + e["content"], + e["pubkey"], + e["created_at"], + e["kind"], + e["tags"], + e["sig"], + ) + if not event.verify(): + return False + + with self.lock: + subscription = self.subscriptions[subscription_id] + + if subscription.filters and not subscription.filters.match(event): + return False + + return True + + def _is_valid_command_result_message(self, message, message_json): + if not len(message_json) < 3: + return False + + if message_json[2] != True: + logger.warning(f"Relay '{self.url}' negative command result: '{message}'") + self._append_error_message(message) + return False + + return True + + def _append_error_message(self, message): + self.error_list = ([message] + self.error_list)[:20] \ No newline at end of file diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index 01889d9..5838308 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -102,4 +102,5 @@ class RelayManager: self.remove_relay(relay.url) new_relay = self.add_relay(relay.url) - new_relay.error_counter = relay.error_counter \ No newline at end of file + new_relay.error_counter = relay.error_counter + new_relay.error_list = relay.error_list \ No newline at end of file diff --git a/templates/nostrclient/index.html b/templates/nostrclient/index.html index f200973..8018a92 100644 --- a/templates/nostrclient/index.html +++ b/templates/nostrclient/index.html @@ -51,6 +51,18 @@
+
+
+ ⬆️ + ⬇️ + ⚠️ + + + +
+
+
+
{{ col.value }} @@ -196,6 +208,13 @@ obj._data = _.clone(obj) obj.theTime = obj.time * 60 - (Date.now() / 1000 - obj.timestamp) obj.time = obj.time + 'mins' + obj.status = { + sentEvents: obj.status.num_sent_events, + receveidEvents: obj.status.num_received_events, + errorCount: obj.status.error_counter, + errorList: obj.status.error_list + + } obj.ping = obj.ping + ' ms' diff --git a/views_api.py b/views_api.py index 316ec4d..f918e3c 100644 --- a/views_api.py +++ b/views_api.py @@ -24,19 +24,23 @@ all_routers: list[NostrRouter] = [] async def api_get_relays() -> RelayList: relays = RelayList(__root__=[]) for url, r in nostr.client.relay_manager.relays.items(): - status_text = ( - f"⬆️ {r.num_sent_events} ⬇️ {r.num_received_events} ⚠️ {r.error_counter}" - ) - connected_text = "🟢" if r.connected else "🔴" + # status_text = ( + # f"⬆️ {r.num_sent_events} ⬇️ {r.num_received_events} ⚠️ {r.error_counter}" + # ) + # connected_text = "🟢" if r.connected else "🔴" relay_id = urlsafe_short_hash() relays.__root__.append( Relay( id=relay_id, url=url, - connected_string=connected_text, - status=status_text, + connected=r.connected, + status={ + "num_sent_events": r.num_sent_events, + "num_received_events": r.num_received_events, + "error_counter": r.error_counter, + "error_list": r.error_list + }, ping=r.ping, - connected=True, active=True, ) ) From d619b965e71e59143331b523482add58deda684f Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Mon, 26 Jun 2023 12:59:58 +0300 Subject: [PATCH 30/36] fix: UI for connected --- templates/nostrclient/index.html | 36 ++++++++++++++++---------------- views_api.py | 4 ---- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/templates/nostrclient/index.html b/templates/nostrclient/index.html index 8018a92..57c3b73 100644 --- a/templates/nostrclient/index.html +++ b/templates/nostrclient/index.html @@ -50,30 +50,30 @@ @@ -252,10 +252,10 @@ relayTable: { columns: [ { - name: 'connected_string', + name: 'connected', align: 'left', label: '', - field: 'connected_string' + field: 'connected' }, { name: 'relay', diff --git a/views_api.py b/views_api.py index f918e3c..b681f50 100644 --- a/views_api.py +++ b/views_api.py @@ -24,10 +24,6 @@ all_routers: list[NostrRouter] = [] async def api_get_relays() -> RelayList: relays = RelayList(__root__=[]) for url, r in nostr.client.relay_manager.relays.items(): - # status_text = ( - # f"⬆️ {r.num_sent_events} ⬇️ {r.num_received_events} ⚠️ {r.error_counter}" - # ) - # connected_text = "🟢" if r.connected else "🔴" relay_id = urlsafe_short_hash() relays.__root__.append( Relay( From ada5b2a51d4c805854adab568fd939edb0e57572 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 29 Jun 2023 15:48:28 +0300 Subject: [PATCH 31/36] fix: unsubscribe --- router.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/router.py b/router.py index 3ee9d0a..264d4a4 100644 --- a/router.py +++ b/router.py @@ -39,7 +39,11 @@ class NostrRouter: self.connected = False break - await self._handle_client_to_nostr(json_str) + try: + await self._handle_client_to_nostr(json_str) + except Exception as e: + logger.debug(f"Failed to handle client message: '{str(e)}'.") + async def nostr_to_client(self): """Sends responses from relays back to the client. Polls the subscriptions of this client @@ -49,10 +53,13 @@ class NostrRouter: that we had previously rewritten in order to avoid collisions when multiple clients use the same id. """ while True and self.connected: - await self._handle_subscriptions() - self._handle_notices() - - await asyncio.sleep(0.1) + try: + await self._handle_subscriptions() + self._handle_notices() + await asyncio.sleep(0.1) + except Exception as e: + logger.debug(f"Failed to handle response for client: '{str(e)}'.") + async def start(self): @@ -112,9 +119,8 @@ class NostrRouter: def _handle_notices(self): while len(NostrRouter.received_subscription_notices): my_event = NostrRouter.received_subscription_notices.pop(0) - event_to_forward = ["NOTICE", my_event.content] # note: we don't send it to the user because we don't know who should receive it - logger.debug("Nostrclient: Received notice: ", event_to_forward[1]) + logger.info(f"Relay ('{my_event.url}') notice: '{my_event.content}']") @@ -143,9 +149,11 @@ class NostrRouter: receive the callbacks on it later. Will rewrite the subscription id since we expect multiple clients to use the router and want to avoid subscription id collisions """ + json_data = json.loads(json_str) assert len(json_data) + if json_data[0] == "REQ": self._handle_client_req(json_data) return @@ -176,4 +184,7 @@ class NostrRouter: def _handle_client_close(self, subscription_id): subscription_id_rewritten = next((k for k, v in self.original_subscription_ids.items() if v == subscription_id), None) if subscription_id_rewritten: + self.original_subscription_ids.pop(subscription_id_rewritten) nostr.client.relay_manager.close_subscription(subscription_id_rewritten) + else: + logger.debug(f"Failed to unsubscribe from '{subscription_id}.'") From 39c2f881c85c1337f5634106e82db72f73f65bf9 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Fri, 30 Jun 2023 10:52:16 +0300 Subject: [PATCH 32/36] feat: revive relay after 24 hours from the last error --- nostr/relay.py | 7 +++++-- nostr/relay_manager.py | 8 +++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/nostr/relay.py b/nostr/relay.py index 4c989c2..da3496d 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -41,6 +41,8 @@ class Relay: self.error_counter: int = 0 self.error_threshold: int = 100 self.error_list: List[str] = [] + self.notice_list: List[str] = [] + self.last_error_date: int = 0 self.num_received_events: int = 0 self.num_sent_events: int = 0 self.num_subscriptions: int = 0 @@ -77,7 +79,7 @@ class Relay: @property def error_threshold_reached(self): - return self.error_threshold and self.error_counter > self.error_threshold + return self.error_threshold and self.error_counter >= self.error_threshold @property def ping(self): @@ -212,4 +214,5 @@ class Relay: return True def _append_error_message(self, message): - self.error_list = ([message] + self.error_list)[:20] \ No newline at end of file + self.error_list = ([message] + self.error_list)[:20] + self.last_error_date = int(time.time()) \ No newline at end of file diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index 5838308..f8f852c 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -1,6 +1,7 @@ import ssl import threading +import time from loguru import logger @@ -97,7 +98,12 @@ class RelayManager: def _restart_relay(self, relay: Relay): if relay.error_threshold_reached: - return + time_since_last_error = time.time() - relay.last_error_date + if time_since_last_error < 60 * 60 * 24: # last day + return + relay.error_counter = 0 + relay.error_list = [] + logger.info(f"Restarting connection to relay '{relay.url}'") self.remove_relay(relay.url) From f244f60c562f8747e8b9dba91440e177a962271d Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Fri, 30 Jun 2023 10:53:39 +0300 Subject: [PATCH 33/36] fix: make it 2 hours --- nostr/relay_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index f8f852c..da650d6 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -99,7 +99,7 @@ class RelayManager: def _restart_relay(self, relay: Relay): if relay.error_threshold_reached: time_since_last_error = time.time() - relay.last_error_date - if time_since_last_error < 60 * 60 * 24: # last day + if time_since_last_error < 60 * 60 * 2: # last day return relay.error_counter = 0 relay.error_list = [] From 1601f71b03ffc2b53e6630f00495d83d50d41583 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Fri, 30 Jun 2023 11:46:47 +0300 Subject: [PATCH 34/36] feat: show relay NOTICE --- models.py | 1 + nostr/relay.py | 4 +++- nostr/relay_manager.py | 7 +++++- router.py | 1 + templates/nostrclient/index.html | 38 ++++++++++++++++++++++++++------ views_api.py | 3 ++- 6 files changed, 44 insertions(+), 10 deletions(-) diff --git a/models.py b/models.py index 1006605..88651fc 100644 --- a/models.py +++ b/models.py @@ -13,6 +13,7 @@ class RelayStatus(BaseModel): num_received_events: Optional[int] = 0 error_counter: Optional[int] = 0 error_list: Optional[List] = [] + notice_list: Optional[List] = [] class Relay(BaseModel): id: Optional[str] = None diff --git a/nostr/relay.py b/nostr/relay.py index da3496d..8b081a3 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -128,11 +128,13 @@ class Relay: ], } + def add_notice(self, notice: str): + self.notice_list = ([notice] + self.notice_list)[:20] + def _on_open(self, _): logger.info(f"Connected to relay: '{self.url}'.") self.connected = True - def _on_close(self, _, status_code, message): logger.warning(f"Connection to relay {self.url} closed. Status: '{status_code}'. Message: '{message}'.") self.close() diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index da650d6..b2df735 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -6,7 +6,7 @@ import time from loguru import logger from .filter import Filters -from .message_pool import MessagePool +from .message_pool import MessagePool, NoticeMessage from .relay import Relay, RelayPolicy from .subscription import Subscription @@ -80,6 +80,11 @@ class RelayManager: if relay.policy.should_write: relay.publish(message) + def handle_notice(self, notice: NoticeMessage): + relay = next((r for r in self.relays.values() if r.url == notice.url)) + if relay: + relay.add_notice(notice.content) + def _open_connection(self, relay: Relay, ssl_options: dict = None, proxy: dict = None): self.threads[relay.url] = threading.Thread( target=relay.connect, diff --git a/router.py b/router.py index 264d4a4..0982f76 100644 --- a/router.py +++ b/router.py @@ -121,6 +121,7 @@ class NostrRouter: my_event = NostrRouter.received_subscription_notices.pop(0) # note: we don't send it to the user because we don't know who should receive it logger.info(f"Relay ('{my_event.url}') notice: '{my_event.content}']") + nostr.client.relay_manager.handle_notice(my_event) diff --git a/templates/nostrclient/index.html b/templates/nostrclient/index.html index 57c3b73..82b149e 100644 --- a/templates/nostrclient/index.html +++ b/templates/nostrclient/index.html @@ -58,12 +58,15 @@
⬆️ ⬇️ - ⚠️ - + + ⚠️ + - -
-
+ + ⓘ + +
+
@@ -198,6 +201,17 @@
+ + + + + +
+ Close +
+
+
+
{% endraw %} {% endblock %} {% block scripts %} {{ window_vars(user) }} @@ -212,8 +226,8 @@ sentEvents: obj.status.num_sent_events, receveidEvents: obj.status.num_received_events, errorCount: obj.status.error_counter, - errorList: obj.status.error_list - + errorList: obj.status.error_list, + noticeList: obj.status.notice_list } obj.ping = obj.ping + ' ms' @@ -226,6 +240,7 @@ 'HH:mm:ss' ) } + console.log('### obj', obj) return obj } @@ -239,6 +254,10 @@ relayToAdd: '', nostrrelayLinks: [], filter: '', + logData: { + show: false, + data: null + }, testData: { show: false, wsConnection: null, @@ -492,6 +511,11 @@ console.warn(error) } }, + showLogDataDialog: function (data = []) { + console.log('### showLogDataDialog', data) + this.logData.data = data.join('\n') + this.logData.show = true + }, exportlnurldeviceCSV: function () { var self = this LNbits.utils.exportCSV(self.relayTable.columns, this.nostrLinks) diff --git a/views_api.py b/views_api.py index b681f50..b6b4527 100644 --- a/views_api.py +++ b/views_api.py @@ -34,7 +34,8 @@ async def api_get_relays() -> RelayList: "num_sent_events": r.num_sent_events, "num_received_events": r.num_received_events, "error_counter": r.error_counter, - "error_list": r.error_list + "error_list": r.error_list, + "notice_list": r.notice_list, }, ping=r.ping, active=True, From 80b86bf00c751cbcca11701bd9c3ab4ac9f2bd94 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Fri, 30 Jun 2023 12:05:28 +0300 Subject: [PATCH 35/36] fix: await even if error --- router.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/router.py b/router.py index 0982f76..e85653c 100644 --- a/router.py +++ b/router.py @@ -56,10 +56,9 @@ class NostrRouter: try: await self._handle_subscriptions() self._handle_notices() - await asyncio.sleep(0.1) except Exception as e: logger.debug(f"Failed to handle response for client: '{str(e)}'.") - + await asyncio.sleep(0.1) async def start(self): From 147af04c20505aa42b21acb9903bf4193a7b504c Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Tue, 4 Jul 2023 10:02:47 +0300 Subject: [PATCH 36/36] fix: remove unnecessary `async` --- nostr/client/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nostr/client/client.py b/nostr/client/client.py index e033262..66d722c 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -14,7 +14,7 @@ class NostrClient: if connect: self.connect() - async def connect(self): + def connect(self): for relay in self.relays: self.relay_manager.add_relay(relay)