171 lines
4.7 KiB
Python
171 lines
4.7 KiB
Python
import hashlib
|
|
import json
|
|
from enum import Enum
|
|
from sqlite3 import Row
|
|
from typing import List, Optional
|
|
|
|
from pydantic import BaseModel, Field
|
|
from secp256k1 import PublicKey
|
|
|
|
|
|
class RelayConfig(BaseModel):
|
|
is_paid_relay = Field(False, alias="isPaidRelay")
|
|
wallet = Field("")
|
|
cost_to_join = Field(0, alias="costToJoin")
|
|
free_storage = Field(0, alias="freeStorage")
|
|
storage_cost_per_kb = Field(0, alias="storageCostPerKb")
|
|
max_client_filters = Field(0, alias="maxClientFilters")
|
|
allowed_public_keys = Field([], alias="allowedPublicKeys")
|
|
blocked_public_keys = Field([], alias="blockedPublicKeys")
|
|
|
|
class Config:
|
|
allow_population_by_field_name = True
|
|
|
|
class NostrRelay(BaseModel):
|
|
id: str
|
|
name: str
|
|
description: Optional[str]
|
|
pubkey: Optional[str]
|
|
contact: Optional[str]
|
|
active: bool = False
|
|
|
|
config: "RelayConfig" = RelayConfig()
|
|
|
|
|
|
@classmethod
|
|
def from_row(cls, row: Row) -> "NostrRelay":
|
|
relay = cls(**dict(row))
|
|
relay.config = RelayConfig(**json.loads(row["meta"]))
|
|
return relay
|
|
|
|
@classmethod
|
|
def info(cls,) -> dict:
|
|
return {
|
|
"contact": "https://t.me/lnbits",
|
|
"supported_nips": [1, 9, 11, 15, 20],
|
|
"software": "LNbits",
|
|
"version": "",
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class NostrEventType(str, Enum):
|
|
EVENT = "EVENT"
|
|
REQ = "REQ"
|
|
CLOSE = "CLOSE"
|
|
|
|
|
|
class NostrEvent(BaseModel):
|
|
id: str
|
|
pubkey: str
|
|
created_at: int
|
|
kind: int
|
|
tags: List[List[str]] = []
|
|
content: str = ""
|
|
sig: str
|
|
|
|
def serialize(self) -> List:
|
|
return [0, self.pubkey, self.created_at, self.kind, self.tags, self.content]
|
|
|
|
def serialize_json(self) -> str:
|
|
e = self.serialize()
|
|
return json.dumps(e, separators=(",", ":"), ensure_ascii=False)
|
|
|
|
@property
|
|
def event_id(self) -> str:
|
|
data = self.serialize_json()
|
|
id = hashlib.sha256(data.encode()).hexdigest()
|
|
return id
|
|
|
|
def is_replaceable_event(self) -> bool:
|
|
return self.kind in [0, 3]
|
|
|
|
def is_delete_event(self) -> bool:
|
|
return self.kind == 5
|
|
|
|
def check_signature(self):
|
|
event_id = self.event_id
|
|
if self.id != event_id:
|
|
raise ValueError(
|
|
f"Invalid event id. Expected: '{event_id}' got '{self.id}'"
|
|
)
|
|
try:
|
|
pub_key = PublicKey(bytes.fromhex("02" + self.pubkey), True)
|
|
except Exception:
|
|
raise ValueError(
|
|
f"Invalid public key: '{self.pubkey}' for event '{self.id}'"
|
|
)
|
|
|
|
valid_signature = pub_key.schnorr_verify(
|
|
bytes.fromhex(event_id), bytes.fromhex(self.sig), None, raw=True
|
|
)
|
|
if not valid_signature:
|
|
raise ValueError(f"Invalid signature: '{self.sig}' for event '{self.id}'")
|
|
|
|
def serialize_response(self, subscription_id):
|
|
return [NostrEventType.EVENT, subscription_id, dict(self)]
|
|
|
|
@classmethod
|
|
def from_row(cls, row: Row) -> "NostrEvent":
|
|
return cls(**dict(row))
|
|
|
|
|
|
class NostrFilter(BaseModel):
|
|
subscription_id: Optional[str]
|
|
|
|
ids: List[str] = []
|
|
authors: List[str] = []
|
|
kinds: List[int] = []
|
|
e: List[str] = Field([], alias="#e")
|
|
p: List[str] = Field([], alias="#p")
|
|
since: Optional[int]
|
|
until: Optional[int]
|
|
limit: Optional[int]
|
|
|
|
def matches(self, e: NostrEvent) -> bool:
|
|
# todo: starts with
|
|
if len(self.ids) != 0 and e.id not in self.ids:
|
|
return False
|
|
if len(self.authors) != 0 and e.pubkey not in self.authors:
|
|
return False
|
|
if len(self.kinds) != 0 and e.kind not in self.kinds:
|
|
return False
|
|
|
|
if self.since and e.created_at < self.since:
|
|
return False
|
|
if self.until and self.until > 0 and e.created_at > self.until:
|
|
return False
|
|
|
|
found_e_tag = self.tag_in_list(e.tags, "e")
|
|
found_p_tag = self.tag_in_list(e.tags, "p")
|
|
if not found_e_tag or not found_p_tag:
|
|
return False
|
|
|
|
return True
|
|
|
|
def tag_in_list(self, event_tags, tag_name) -> bool:
|
|
filter_tags = dict(self).get(tag_name, [])
|
|
if len(filter_tags) == 0:
|
|
return True
|
|
|
|
event_tag_values = [t[1] for t in event_tags if t[0] == tag_name]
|
|
|
|
common_tags = [
|
|
event_tag for event_tag in event_tag_values if event_tag in filter_tags
|
|
]
|
|
if len(common_tags) == 0:
|
|
return False
|
|
return True
|
|
|
|
def is_empty(self):
|
|
return (
|
|
len(self.ids) == 0
|
|
and len(self.authors) == 0
|
|
and len(self.kinds) == 0
|
|
and len(self.e) == 0
|
|
and len(self.p) == 0
|
|
and (not self.since)
|
|
and (not self.until)
|
|
)
|