diff --git a/nostr/client/client.py b/nostr/client/client.py index e033262..db07a06 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -1,4 +1,4 @@ -import time +import asyncio from typing import List from ..relay_manager import RelayManager @@ -21,7 +21,7 @@ class NostrClient: def close(self): self.relay_manager.close_connections() - def subscribe( + async def subscribe( self, callback_events_func=None, callback_notices_func=None, @@ -41,4 +41,4 @@ class NostrClient: if callback_eosenotices_func: callback_eosenotices_func(event_msg) - time.sleep(0.1) + await asyncio.sleep(0.5) diff --git a/nostr/relay.py b/nostr/relay.py index 0583bba..caacba0 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -1,3 +1,4 @@ +import asyncio import json import time from queue import Queue @@ -95,7 +96,7 @@ class Relay: json_str = json.dumps(["REQ", s["id"], s["filters"][0]]) self.publish(json_str) - def queue_worker(self): + async def queue_worker(self): while True: if self.connected: try: @@ -105,7 +106,7 @@ class Relay: except: pass else: - time.sleep(0.1) + await asyncio.sleep(1) if self.shutdown: logger.warning(f"Closing queue worker for '{self.url}'.") diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index a551253..f639fb0 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -1,4 +1,5 @@ +import asyncio import ssl import threading import time @@ -95,20 +96,22 @@ class RelayManager: ) self.threads[relay.url].start() + def wrap_async_queue_worker(): + asyncio.run(relay.queue_worker()) + self.queue_threads[relay.url] = threading.Thread( - target=relay.queue_worker, + target=wrap_async_queue_worker, name=f"{relay.url}-queue", daemon=True, ) self.queue_threads[relay.url].start() def _restart_relay(self, relay: Relay): - if relay.error_threshold_reached: - time_since_last_error = time.time() - relay.last_error_date - if time_since_last_error < 60 * 60 * 2: # last day - return - relay.error_counter = 0 - relay.error_list = [] + time_since_last_error = time.time() - relay.last_error_date + + min_wait_time = min(60 * relay.error_counter, 60 * 60 * 24) # try at least once a day + if time_since_last_error < min_wait_time: + return logger.info(f"Restarting connection to relay '{relay.url}'") diff --git a/router.py b/router.py index e85653c..86a8f41 100644 --- a/router.py +++ b/router.py @@ -170,13 +170,13 @@ class NostrRouter: subscription_id = json_data[1] subscription_id_rewritten = urlsafe_short_hash() self.original_subscription_ids[subscription_id_rewritten] = subscription_id - fltr = json_data[2] + fltr = json_data[2:] filters = self._marshall_nostr_filters(fltr) nostr.client.relay_manager.add_subscription( subscription_id_rewritten, filters ) - request_rewritten = json.dumps([json_data[0], subscription_id_rewritten, fltr]) + request_rewritten = json.dumps([json_data[0], subscription_id_rewritten] + fltr) self.subscriptions.append(subscription_id_rewritten) nostr.client.relay_manager.publish_message(request_rewritten) diff --git a/tasks.py b/tasks.py index 05057e7..4c316bc 100644 --- a/tasks.py +++ b/tasks.py @@ -66,13 +66,15 @@ async def subscribe_events(): return - t = threading.Thread( - target=nostr.client.subscribe, - args=( + def wrap_async_subscribe(): + asyncio.run(nostr.client.subscribe( callback_events, callback_notices, callback_eose_notices, - ), + )) + + t = threading.Thread( + target=wrap_async_subscribe, name="Nostr-event-subscription", daemon=True, )