Stabilize (#24)
* refactor: clean-up * refactor: extra logs plus try-catch * refactor: do not use bare `except` * refactor: clean-up redundant fields * chore: pass code checks * chore: code format * refactor: code clean-up * fix: refactoring stuff * refactor: remove un-used file * chore: code clean-up * chore: code clean-up * chore: code-format fix * refactor: remove nostr.client wrapper * refactor: code clean-up * chore: code format * refactor: remove `RelayList` class * refactor: extract smaller methods with try-catch * fix: better exception handling * fix: remove redundant filters * fix: simplify event * chore: code format * fix: code check * fix: code check * fix: simplify `REQ` * fix: more clean-ups * refactor: use simpler method * refactor: re-order and rename * fix: stop logic * fix: subscription close before disconnect * chore: play commit
This commit is contained in:
parent
ab185bd2c4
commit
16ae9d15a1
20 changed files with 522 additions and 717 deletions
|
|
@ -1,21 +1,15 @@
|
|||
|
||||
import asyncio
|
||||
import ssl
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from .filter import Filters
|
||||
from .message_pool import MessagePool, NoticeMessage
|
||||
from .relay import Relay, RelayPolicy
|
||||
from .relay import Relay
|
||||
from .subscription import Subscription
|
||||
|
||||
|
||||
class RelayException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class RelayManager:
|
||||
def __init__(self) -> None:
|
||||
self.relays: dict[str, Relay] = {}
|
||||
|
|
@ -25,72 +19,97 @@ class RelayManager:
|
|||
self._cached_subscriptions: dict[str, Subscription] = {}
|
||||
self._subscriptions_lock = threading.Lock()
|
||||
|
||||
def add_relay(self, url: str, read: bool = True, write: bool = True) -> Relay:
|
||||
def add_relay(self, url: str) -> Relay:
|
||||
if url in list(self.relays.keys()):
|
||||
return
|
||||
|
||||
with self._subscriptions_lock:
|
||||
subscriptions = self._cached_subscriptions.copy()
|
||||
logger.debug(f"Relay '{url}' already present.")
|
||||
return self.relays[url]
|
||||
|
||||
policy = RelayPolicy(read, write)
|
||||
relay = Relay(url, policy, self.message_pool, subscriptions)
|
||||
relay = Relay(url, self.message_pool)
|
||||
self.relays[url] = relay
|
||||
|
||||
self._open_connection(
|
||||
relay,
|
||||
{"cert_reqs": ssl.CERT_NONE}
|
||||
) # NOTE: This disables ssl certificate verification
|
||||
self._open_connection(relay)
|
||||
|
||||
relay.publish_subscriptions()
|
||||
relay.publish_subscriptions(list(self._cached_subscriptions.values()))
|
||||
return relay
|
||||
|
||||
def remove_relay(self, url: str):
|
||||
self.relays[url].close()
|
||||
self.relays.pop(url)
|
||||
self.threads[url].join(timeout=5)
|
||||
self.threads.pop(url)
|
||||
self.queue_threads[url].join(timeout=5)
|
||||
self.queue_threads.pop(url)
|
||||
|
||||
try:
|
||||
self.relays[url].close()
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
|
||||
def add_subscription(self, id: str, filters: Filters):
|
||||
if url in self.relays:
|
||||
self.relays.pop(url)
|
||||
|
||||
try:
|
||||
self.threads[url].join(timeout=5)
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
|
||||
if url in self.threads:
|
||||
self.threads.pop(url)
|
||||
|
||||
try:
|
||||
self.queue_threads[url].join(timeout=5)
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
|
||||
if url in self.queue_threads:
|
||||
self.queue_threads.pop(url)
|
||||
|
||||
def remove_relays(self):
|
||||
relay_urls = list(self.relays.keys())
|
||||
for url in relay_urls:
|
||||
self.remove_relay(url)
|
||||
|
||||
def add_subscription(self, id: str, filters: List[str]):
|
||||
s = Subscription(id, filters)
|
||||
with self._subscriptions_lock:
|
||||
self._cached_subscriptions[id] = Subscription(id, filters)
|
||||
self._cached_subscriptions[id] = s
|
||||
|
||||
for relay in self.relays.values():
|
||||
relay.add_subscription(id, filters)
|
||||
relay.publish_subscriptions([s])
|
||||
|
||||
def close_subscription(self, id: str):
|
||||
with self._subscriptions_lock:
|
||||
self._cached_subscriptions.pop(id)
|
||||
try:
|
||||
with self._subscriptions_lock:
|
||||
if id in self._cached_subscriptions:
|
||||
self._cached_subscriptions.pop(id)
|
||||
|
||||
for relay in self.relays.values():
|
||||
relay.close_subscription(id)
|
||||
for relay in self.relays.values():
|
||||
relay.close_subscription(id)
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
|
||||
def close_subscriptions(self, subscriptions: List[str]):
|
||||
for id in subscriptions:
|
||||
self.close_subscription(id)
|
||||
|
||||
def close_all_subscriptions(self):
|
||||
all_subscriptions = list(self._cached_subscriptions.keys())
|
||||
self.close_subscriptions(all_subscriptions)
|
||||
|
||||
def check_and_restart_relays(self):
|
||||
stopped_relays = [r for r in self.relays.values() if r.shutdown]
|
||||
for relay in stopped_relays:
|
||||
self._restart_relay(relay)
|
||||
|
||||
|
||||
def close_connections(self):
|
||||
for relay in self.relays.values():
|
||||
relay.close()
|
||||
|
||||
def publish_message(self, message: str):
|
||||
for relay in self.relays.values():
|
||||
if relay.policy.should_write:
|
||||
relay.publish(message)
|
||||
relay.publish(message)
|
||||
|
||||
def handle_notice(self, notice: NoticeMessage):
|
||||
relay = next((r for r in self.relays.values() if r.url == notice.url))
|
||||
if relay:
|
||||
relay.add_notice(notice.content)
|
||||
|
||||
def _open_connection(self, relay: Relay, ssl_options: dict = None, proxy: dict = None):
|
||||
def _open_connection(self, relay: Relay):
|
||||
self.threads[relay.url] = threading.Thread(
|
||||
target=relay.connect,
|
||||
args=(ssl_options, proxy),
|
||||
name=f"{relay.url}-thread",
|
||||
daemon=True,
|
||||
)
|
||||
|
|
@ -98,7 +117,7 @@ class RelayManager:
|
|||
|
||||
def wrap_async_queue_worker():
|
||||
asyncio.run(relay.queue_worker())
|
||||
|
||||
|
||||
self.queue_threads[relay.url] = threading.Thread(
|
||||
target=wrap_async_queue_worker,
|
||||
name=f"{relay.url}-queue",
|
||||
|
|
@ -108,14 +127,16 @@ class RelayManager:
|
|||
|
||||
def _restart_relay(self, relay: Relay):
|
||||
time_since_last_error = time.time() - relay.last_error_date
|
||||
|
||||
min_wait_time = min(60 * relay.error_counter, 60 * 60 * 24) # try at least once a day
|
||||
|
||||
min_wait_time = min(
|
||||
60 * relay.error_counter, 60 * 60
|
||||
) # try at least once an hour
|
||||
if time_since_last_error < min_wait_time:
|
||||
return
|
||||
|
||||
|
||||
logger.info(f"Restarting connection to relay '{relay.url}'")
|
||||
|
||||
self.remove_relay(relay.url)
|
||||
new_relay = self.add_relay(relay.url)
|
||||
new_relay.error_counter = relay.error_counter
|
||||
new_relay.error_list = relay.error_list
|
||||
new_relay.error_list = relay.error_list
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue