Use private realay enpoint (#97)

* feat: use private ws endpoint

* chore: bump `min_lnbits_version`

* fix: retry logic

* fix: restart logic

* chore: fux log message
This commit is contained in:
Vlad Stan 2024-01-22 13:50:40 +02:00 committed by GitHub
parent 55624cc2b5
commit 91e42ee686
4 changed files with 62 additions and 54 deletions

View file

@ -8,7 +8,7 @@ from loguru import logger
from websocket import WebSocketApp
from lnbits.app import settings
from lnbits.helpers import urlsafe_short_hash
from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash
from .event import NostrEvent
@ -19,23 +19,22 @@ class NostrClient:
self.send_req_queue: Queue = Queue()
self.ws: WebSocketApp = None
self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32]
self.running = False
async def connect_to_nostrclient_ws(
self, on_open: Callable, on_message: Callable
) -> WebSocketApp:
def on_error(_, error):
logger.warning(error)
self.send_req_queue.put_nowait(ValueError("Websocket error."))
self.recieve_event_queue.put_nowait(ValueError("Websocket error."))
@property
def is_websocket_connected(self):
if not self.ws:
return False
return self.ws.keep_running
def on_close(_, status_code, message):
logger.warning(f"Websocket closed: '{status_code}' '{message}'")
self.send_req_queue.put_nowait(ValueError("Websocket close."))
self.recieve_event_queue.put_nowait(ValueError("Websocket close."))
async def connect_to_nostrclient_ws(self) -> WebSocketApp:
logger.debug(f"Connecting to websockets for 'nostrclient' extension...")
logger.debug(f"Subscribing to websockets for nostrclient extension")
relay_endpoint = encrypt_internal_message("relay")
on_open, on_message, on_error, on_close = self._ws_handlers()
ws = WebSocketApp(
f"ws://localhost:{settings.port}/nostrclient/api/v1/relay",
f"ws://localhost:{settings.port}/nostrclient/api/v1/{relay_endpoint}",
on_message=on_message,
on_open=on_open,
on_close=on_close,
@ -48,42 +47,28 @@ class NostrClient:
return ws
async def run_forever(self):
self.running = True
while self.running:
try:
if not self.is_websocket_connected:
self.ws = await self.connect_to_nostrclient_ws()
# be sure the connection is open
await asyncio.sleep(5)
req = await self.send_req_queue.get()
self.ws.send(json.dumps(req))
except Exception as ex:
logger.warning(ex)
await asyncio.sleep(60)
async def get_event(self):
value = await self.recieve_event_queue.get()
if isinstance(value, ValueError):
raise value
return value
async def run_forever(self):
def on_open(_):
logger.info("Connected to 'nostrclient' websocket")
def on_message(_, message):
self.recieve_event_queue.put_nowait(message)
self._safe_ws_stop()
running = True
while running:
try:
req = None
if not self.ws:
self.ws = await self.connect_to_nostrclient_ws(on_open, on_message)
# be sure the connection is open
await asyncio.sleep(3)
req = await self.send_req_queue.get()
if isinstance(req, ValueError):
running = False
logger.warning(str(req))
else:
self.ws.send(json.dumps(req))
except Exception as ex:
logger.warning(ex)
if req:
await self.send_req_queue.put(req)
self._safe_ws_stop()
await asyncio.sleep(5)
async def publish_nostr_event(self, e: NostrEvent):
await self.send_req_queue.put(["EVENT", e.dict()])
@ -109,7 +94,7 @@ class NostrClient:
await self.send_req_queue.put(["REQ", self.subscription_id] + merchant_filters)
logger.debug(
f"Subscribed to events for: {len(public_keys)} keys. New subscription id: {self.subscription_id}"
f"Subscribing to events for: {len(public_keys)} keys. New subscription id: {self.subscription_id}"
)
async def merchant_temp_subscription(self, pk, duration=10):
@ -190,19 +175,36 @@ class NostrClient:
pass
self.ws = None
def _ws_handlers(self):
def on_open(_):
logger.info("Connected to 'nostrclient' websocket")
def on_message(_, message):
self.recieve_event_queue.put_nowait(message)
def on_error(_, error):
logger.warning(error)
def on_close(x, status_code, message):
logger.warning(f"Websocket closed: {x}: '{status_code}' '{message}'")
# force re-subscribe
self.recieve_event_queue.put_nowait(ValueError("Websocket close."))
return on_open, on_message, on_error, on_close
async def restart(self):
await self.unsubscribe_merchants()
# Give some time for the CLOSE events to propagate before restarting
await asyncio.sleep(10)
logger.info("Restating NostrClient...")
await self.send_req_queue.put(ValueError("Restarting NostrClient..."))
logger.info("Restarting NostrClient...")
await self.recieve_event_queue.put(ValueError("Restarting NostrClient..."))
self._safe_ws_stop()
async def stop(self):
await self.unsubscribe_merchants()
self.running = False
# Give some time for the CLOSE events to propagate before closing the connection
await asyncio.sleep(10)