diff --git a/nostr/nostr_client.py b/nostr/nostr_client.py index 0e38029..967bc1b 100644 --- a/nostr/nostr_client.py +++ b/nostr/nostr_client.py @@ -67,6 +67,7 @@ class NostrClient: async def get_event(self): value = await self.recieve_event_queue.get() if isinstance(value, ValueError): + logger.error(f"[NOSTRMARKET] ❌ Queue returned error: {value}") raise value return value @@ -93,12 +94,6 @@ class NostrClient: self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32] await self.send_req_queue.put(["REQ", self.subscription_id] + merchant_filters) - logger.info( - f"[NOSTRMARKET DEBUG] Subscribing to events for: {len(public_keys)} keys. New subscription id: {self.subscription_id}" - ) - logger.info(f"[NOSTRMARKET DEBUG] Subscription filters: {merchant_filters}") - logger.info(f"[NOSTRMARKET DEBUG] Public keys: {public_keys}") - async def merchant_temp_subscription(self, pk, duration=10): dm_filters = self._filters_for_direct_messages([pk], 0) stall_filters = self._filters_for_stall_events([pk], 0) @@ -179,20 +174,21 @@ class NostrClient: def _ws_handlers(self): def on_open(_): - logger.info("[NOSTRMARKET DEBUG] Connected to 'nostrclient' websocket") + logger.debug("[NOSTRMARKET DEBUG] ✅ Connected to 'nostrclient' websocket successfully") def on_message(_, message): - logger.info(f"[NOSTRMARKET DEBUG] Received websocket message: {message[:200]}...") + logger.debug(f"[NOSTRMARKET DEBUG] 📨 Received websocket message: {message[:200]}...") try: self.recieve_event_queue.put_nowait(message) + logger.debug(f"[NOSTRMARKET DEBUG] 📤 Message queued successfully") except Exception as e: - logger.error(f"[NOSTRMARKET DEBUG] ❌ Failed to queue message: {e}") + logger.error(f"[NOSTRMARKET] ❌ Failed to queue message: {e}") def on_error(_, error): - logger.warning(f"[NOSTRMARKET DEBUG] ❌ Websocket error: {error}") + logger.warning(f"[NOSTRMARKET] ❌ Websocket error: {error}") def on_close(x, status_code, message): - logger.warning(f"[NOSTRMARKET DEBUG] 🔌 Websocket closed: {x}: '{status_code}' '{message}'") + logger.warning(f"[NOSTRMARKET] 🔌 Websocket closed: {x}: '{status_code}' '{message}'") # force re-subscribe self.recieve_event_queue.put_nowait(ValueError("Websocket close.")) diff --git a/services.py b/services.py index 35838ec..59d02ad 100644 --- a/services.py +++ b/services.py @@ -85,10 +85,13 @@ async def create_new_order( async def build_order_with_payment( merchant_id: str, merchant_public_key: str, data: PartialOrder ): + products = await get_products_by_ids( merchant_id, [p.product_id for p in data.items] ) + data.validate_order_items(products) + shipping_zone = await get_zone(merchant_id, data.shipping_id) assert shipping_zone, f"Shipping zone not found for order '{data.id}'" @@ -96,6 +99,7 @@ async def build_order_with_payment( product_cost_sat, shipping_cost_sat = await data.costs_in_sats( products, shipping_zone.id, shipping_zone.cost ) + receipt = data.receipt(products, shipping_zone.id, shipping_zone.cost) wallet_id = await get_wallet_for_product(data.items[0].product_id) @@ -106,6 +110,7 @@ async def build_order_with_payment( merchant_id, product_ids, data.items ) if not success: + logger.error(f"[NOSTRMARKET] ❌ Product quantity check failed: {message}") raise ValueError(message) total_amount_sat = round(product_cost_sat + shipping_cost_sat) @@ -317,9 +322,14 @@ async def compute_products_new_quantity( async def process_nostr_message(msg: str): try: - type_, *rest = json.loads(msg) + parsed_msg = json.loads(msg) + type_, *rest = parsed_msg + if type_.upper() == "EVENT": + if len(rest) < 2: + logger.warning(f"[NOSTRMARKET] ⚠️ EVENT message missing data: {rest}") + return _, event = rest event = NostrEvent(**event) if event.kind == 0: @@ -331,11 +341,14 @@ async def process_nostr_message(msg: str): elif event.kind == 30018: await _handle_product(event) else: - logger.info(f"[NOSTRMARKET] Unhandled event kind: {event.kind} - event: {event.id}") + logger.info(f"[NOSTRMARKET] ❓ Unhandled event kind: {event.kind} - event: {event.id}") return + else: + logger.info(f"[NOSTRMARKET] 🔄 Non-EVENT message type: {type_}") except Exception as ex: - logger.error(f"[NOSTRMARKET] Error processing nostr message: {ex}") + logger.error(f"[NOSTRMARKET] ❌ Error processing nostr message: {ex}") + logger.error(f"[NOSTRMARKET] 📄 Raw message that failed: {msg}") async def create_or_update_order_from_dm( @@ -417,28 +430,29 @@ async def extract_customer_order_from_dm( async def _handle_nip04_message(event: NostrEvent): - merchant_public_key = event.pubkey - merchant = await get_merchant_by_pubkey(merchant_public_key) - - if not merchant: - p_tags = event.tag_values("p") - if len(p_tags) and p_tags[0]: - merchant_public_key = p_tags[0] - merchant = await get_merchant_by_pubkey(merchant_public_key) - - assert merchant, f"Merchant not found for public key '{merchant_public_key}'" - - if event.pubkey == merchant_public_key: - assert len(event.tag_values("p")) != 0, "Outgong message has no 'p' tag" - clear_text_msg = merchant.decrypt_message( + + p_tags = event.tag_values("p") + + # PRIORITY 1: Check if any recipient (p_tag) is a merchant → incoming message TO merchant + for p_tag in p_tags: + if p_tag: + potential_merchant = await get_merchant_by_pubkey(p_tag) + if potential_merchant: + clear_text_msg = potential_merchant.decrypt_message(event.content, event.pubkey) + await _handle_incoming_dms(event, potential_merchant, clear_text_msg) + return # IMPORTANT: Return immediately to prevent double processing + + # PRIORITY 2: If no recipient merchant found, check if sender is a merchant → outgoing message FROM merchant + sender_merchant = await get_merchant_by_pubkey(event.pubkey) + if sender_merchant: + assert len(event.tag_values("p")) != 0, "Outgoing message has no 'p' tag" + clear_text_msg = sender_merchant.decrypt_message( event.content, event.tag_values("p")[0] ) - await _handle_outgoing_dms(event, merchant, clear_text_msg) - elif event.has_tag_value("p", merchant_public_key): - clear_text_msg = merchant.decrypt_message(event.content, event.pubkey) - await _handle_incoming_dms(event, merchant, clear_text_msg) - else: - logger.warning(f"[NOSTRMARKET] Bad NIP04 event: '{event.id}' - pubkey: {event.pubkey}, merchant: {merchant_public_key}") + await _handle_outgoing_dms(event, sender_merchant, clear_text_msg) + return # IMPORTANT: Return immediately + + # No merchant found in either direction async def _handle_incoming_dms( @@ -580,6 +594,7 @@ async def _handle_new_order( wallet = await get_wallet(wallet_id) assert wallet, f"Cannot find wallet for product id: {first_product_id}" + payment_req = await create_new_order(merchant_public_key, partial_order) if payment_req is None: @@ -654,8 +669,11 @@ async def subscribe_to_all_merchants(): last_stall_time = await get_last_stall_update_time() last_prod_time = await get_last_product_update_time() + # Make dm_time more lenient by subtracting 5 minutes to avoid missing recent events + lenient_dm_time = max(0, last_dm_time - 300) if last_dm_time > 0 else 0 + await nostr_client.subscribe_merchants( - public_keys, last_dm_time, last_stall_time, last_prod_time, 0 + public_keys, lenient_dm_time, last_stall_time, last_prod_time, 0 )