chore: testing

This commit is contained in:
Vlad Stan 2023-02-17 10:30:15 +02:00
parent 8c24109dd3
commit 30ab2b8f70

View file

@ -97,8 +97,7 @@ class NostrClientConnection:
self._auth_challenge: Optional[str] = None
self._auth_challenge_created_at = 0
self._last_event_timestamp = 0 # in hours
self._event_count_per_timestamp = 0
self.event_validator = EventValidator(self.relay_id)
self.broadcast_event: Optional[
Callable[[NostrClientConnection, NostrEvent], Awaitable[None]]
@ -184,7 +183,7 @@ class NostrClientConnection:
resp_nip20: List[Any] = ["OK", e.id]
if e.is_auth_response_event:
valid, message = self._validate_auth_event(e)
valid, message = self.event_validator.validate_auth_event(e, self._auth_challenge)
if not valid:
resp_nip20 += [valid, message]
await self._send_msg(resp_nip20)
@ -201,7 +200,8 @@ class NostrClientConnection:
await self._send_msg(resp_nip20)
return None
valid, message = await self._validate_write(e)
publisher_pubkey = self.pubkey if self.pubkey else e.pubkey
valid, message = await self.event_validator.validate_write(e, publisher_pubkey)
if not valid:
resp_nip20 += [valid, message]
await self._send_msg(resp_nip20)
@ -285,7 +285,48 @@ class NostrClientConnection:
and len(self.filters) >= self.client_config.max_client_filters
)
def _validate_auth_event(self, e: NostrEvent) -> Tuple[bool, str]:
def _auth_challenge_expired(self):
if self._auth_challenge_created_at == 0:
return True
current_time_seconds = round(time.time())
chanllenge_max_age_seconds = 300 # 5 min
return (
current_time_seconds - self._auth_challenge_created_at
) >= chanllenge_max_age_seconds
def _current_auth_challenge(self):
if self._auth_challenge_expired():
self._auth_challenge = self.relay_id + ":" + urlsafe_short_hash()
self._auth_challenge_created_at = round(time.time())
return self._auth_challenge
class EventValidator:
def __init__(self, relay_id: str):
self.relay_id = relay_id
self.client_config: RelaySpec
self._last_event_timestamp = 0 # in hours
self._event_count_per_timestamp = 0
async def validate_write(self, e: NostrEvent, publisher_pubkey: str) -> Tuple[bool, str]:
valid, message = self._validate_event(e)
if not valid:
return (valid, message)
if e.is_ephemeral_event:
return True, ""
valid, message = await self._validate_storage(publisher_pubkey, e.size_bytes)
if not valid:
return (valid, message)
return True, ""
def validate_auth_event(self, e: NostrEvent, auth_challenge: Optional[str]) -> Tuple[bool, str]:
valid, message = self._validate_event(e)
if not valid:
return (valid, message)
@ -298,26 +339,11 @@ class NostrClientConnection:
if self.client_config.domain != extract_domain(relay_tag[0]):
return False, "error: wrong relay domain for auth event"
if self._auth_challenge != challenge_tag[0]:
if auth_challenge != challenge_tag[0]:
return False, "error: wrong chanlange value for auth event"
return True, ""
async def _validate_write(self, e: NostrEvent) -> Tuple[bool, str]:
valid, message = self._validate_event(e)
if not valid:
return (valid, message)
if e.is_ephemeral_event:
return True, ""
publisher_pubkey = self.pubkey if self.pubkey else e.pubkey
valid, message = await self._validate_storage(publisher_pubkey, e.size_bytes)
if not valid:
return (valid, message)
return True, ""
def _validate_event(self, e: NostrEvent) -> Tuple[bool, str]:
if self._exceeded_max_events_per_hour():
return False, f"Exceeded max events per hour limit'!"
@ -372,6 +398,7 @@ class NostrClientConnection:
return True, ""
def _exceeded_max_events_per_hour(self) -> bool:
if self.client_config.max_events_per_hour == 0:
return False
@ -395,19 +422,4 @@ class NostrClientConnection:
if self.client_config.created_at_in_future != 0:
if created_at > (current_time + self.client_config.created_at_in_future):
return False, "created_at is too much into the future"
return True, ""
def _auth_challenge_expired(self):
if self._auth_challenge_created_at == 0:
return True
current_time_seconds = round(time.time())
chanllenge_max_age_seconds = 300 # 5 min
return (
current_time_seconds - self._auth_challenge_created_at
) >= chanllenge_max_age_seconds
def _current_auth_challenge(self):
if self._auth_challenge_expired():
self._auth_challenge = self.relay_id + ":" + urlsafe_short_hash()
self._auth_challenge_created_at = round(time.time())
return self._auth_challenge
return True, ""