feat: improve error handling and reporting

This commit is contained in:
Vlad Stan 2023-06-26 12:20:06 +03:00
parent dabc26f8a6
commit f8d578e6aa
6 changed files with 101 additions and 42 deletions

View file

@ -3,13 +3,20 @@ class ClientMessageType:
REQUEST = "REQ"
CLOSE = "CLOSE"
class RelayMessageType:
EVENT = "EVENT"
NOTICE = "NOTICE"
END_OF_STORED_EVENTS = "EOSE"
COMMAND_RESULT = "OK"
@staticmethod
def is_valid(type: str) -> bool:
if type == RelayMessageType.EVENT or type == RelayMessageType.NOTICE or type == RelayMessageType.END_OF_STORED_EVENTS:
if (
type == RelayMessageType.EVENT
or type == RelayMessageType.NOTICE
or type == RelayMessageType.END_OF_STORED_EVENTS
or type == RelayMessageType.COMMAND_RESULT
):
return True
return False
return False

View file

@ -2,6 +2,7 @@ import json
import time
from queue import Queue
from threading import Lock
from typing import List
from loguru import logger
from websocket import WebSocketApp
@ -39,6 +40,7 @@ class Relay:
self.shutdown: bool = False
self.error_counter: int = 0
self.error_threshold: int = 100
self.error_list: List[str] = []
self.num_received_events: int = 0
self.num_sent_events: int = 0
self.num_subscriptions: int = 0
@ -100,7 +102,7 @@ class Relay:
self.ws.send(message)
except Exception as e:
if self.shutdown:
logger.warning(f"Closing queue worker for {self.url}")
logger.warning(f"Closing queue worker for '{self.url}'.")
break
else:
time.sleep(0.1)
@ -133,18 +135,14 @@ class Relay:
logger.warning(f"Connection to relay {self.url} closed. Status: '{status_code}'. Message: '{message}'.")
self.close()
def _on_message(self, _, message: str):
if self._is_valid_message(message):
self.num_received_events += 1
self.message_pool.add_message(message, self.url)
else:
logger.debug(f"Invalid relay message: '{message}'.")
def _on_error(self, _, error):
logger.warning(f"Relay error: '{str(error)}'")
self._append_error_message(str(error))
self.connected = False
self.error_counter += 1
@ -161,33 +159,57 @@ class Relay:
message_json = json.loads(message)
message_type = message_json[0]
if not RelayMessageType.is_valid(message_type):
return False
if message_type == RelayMessageType.EVENT:
if not len(message_json) == 3:
return False
subscription_id = message_json[1]
with self.lock:
if subscription_id not in self.subscriptions:
return False
e = message_json[2]
event = Event(
e["content"],
e["pubkey"],
e["created_at"],
e["kind"],
e["tags"],
e["sig"],
)
if not event.verify():
return False
with self.lock:
subscription = self.subscriptions[subscription_id]
if subscription.filters and not subscription.filters.match(event):
return False
return self._is_valid_event_message(message_json)
if message_type == RelayMessageType.COMMAND_RESULT:
return self._is_valid_command_result_message(message, message_json)
return True
def _is_valid_event_message(self, message_json):
if not len(message_json) == 3:
return False
subscription_id = message_json[1]
with self.lock:
if subscription_id not in self.subscriptions:
return False
e = message_json[2]
event = Event(
e["content"],
e["pubkey"],
e["created_at"],
e["kind"],
e["tags"],
e["sig"],
)
if not event.verify():
return False
with self.lock:
subscription = self.subscriptions[subscription_id]
if subscription.filters and not subscription.filters.match(event):
return False
return True
def _is_valid_command_result_message(self, message, message_json):
if not len(message_json) < 3:
return False
if message_json[2] != True:
logger.warning(f"Relay '{self.url}' negative command result: '{message}'")
self._append_error_message(message)
return False
return True
def _append_error_message(self, message):
self.error_list = ([message] + self.error_list)[:20]

View file

@ -102,4 +102,5 @@ class RelayManager:
self.remove_relay(relay.url)
new_relay = self.add_relay(relay.url)
new_relay.error_counter = relay.error_counter
new_relay.error_counter = relay.error_counter
new_relay.error_list = relay.error_list