Compare commits
No commits in common. "3a8c16d1552b28ca2ab7bd88e469dc3c40004434" and "e8ddc4b6977bed3d56e9cf64cc29b3aba5a14059" have entirely different histories.
3a8c16d155
...
e8ddc4b697
9 changed files with 596 additions and 184 deletions
69
crud.py
69
crud.py
|
|
@ -1,4 +1,5 @@
|
|||
import json
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from lnbits.helpers import urlsafe_short_hash
|
||||
|
||||
|
|
@ -43,7 +44,7 @@ async def create_merchant(user_id: str, m: PartialMerchant) -> Merchant:
|
|||
|
||||
async def update_merchant(
|
||||
user_id: str, merchant_id: str, config: MerchantConfig
|
||||
) -> Merchant | None:
|
||||
) -> Optional[Merchant]:
|
||||
await db.execute(
|
||||
f"""
|
||||
UPDATE nostrmarket.merchants SET meta = :meta, time = {db.timestamp_now}
|
||||
|
|
@ -54,7 +55,7 @@ async def update_merchant(
|
|||
return await get_merchant(user_id, merchant_id)
|
||||
|
||||
|
||||
async def touch_merchant(user_id: str, merchant_id: str) -> Merchant | None:
|
||||
async def touch_merchant(user_id: str, merchant_id: str) -> Optional[Merchant]:
|
||||
await db.execute(
|
||||
f"""
|
||||
UPDATE nostrmarket.merchants SET time = {db.timestamp_now}
|
||||
|
|
@ -65,7 +66,7 @@ async def touch_merchant(user_id: str, merchant_id: str) -> Merchant | None:
|
|||
return await get_merchant(user_id, merchant_id)
|
||||
|
||||
|
||||
async def get_merchant(user_id: str, merchant_id: str) -> Merchant | None:
|
||||
async def get_merchant(user_id: str, merchant_id: str) -> Optional[Merchant]:
|
||||
row: dict = await db.fetchone(
|
||||
"""SELECT * FROM nostrmarket.merchants WHERE user_id = :user_id AND id = :id""",
|
||||
{
|
||||
|
|
@ -77,7 +78,7 @@ async def get_merchant(user_id: str, merchant_id: str) -> Merchant | None:
|
|||
return Merchant.from_row(row) if row else None
|
||||
|
||||
|
||||
async def get_merchant_by_pubkey(public_key: str) -> Merchant | None:
|
||||
async def get_merchant_by_pubkey(public_key: str) -> Optional[Merchant]:
|
||||
row: dict = await db.fetchone(
|
||||
"""SELECT * FROM nostrmarket.merchants WHERE public_key = :public_key""",
|
||||
{"public_key": public_key},
|
||||
|
|
@ -86,7 +87,7 @@ async def get_merchant_by_pubkey(public_key: str) -> Merchant | None:
|
|||
return Merchant.from_row(row) if row else None
|
||||
|
||||
|
||||
async def get_merchants_ids_with_pubkeys() -> list[tuple[str, str]]:
|
||||
async def get_merchants_ids_with_pubkeys() -> List[Tuple[str, str]]:
|
||||
rows: list[dict] = await db.fetchall(
|
||||
"""SELECT id, public_key FROM nostrmarket.merchants""",
|
||||
)
|
||||
|
|
@ -94,7 +95,7 @@ async def get_merchants_ids_with_pubkeys() -> list[tuple[str, str]]:
|
|||
return [(row["id"], row["public_key"]) for row in rows]
|
||||
|
||||
|
||||
async def get_merchant_for_user(user_id: str) -> Merchant | None:
|
||||
async def get_merchant_for_user(user_id: str) -> Optional[Merchant]:
|
||||
row: dict = await db.fetchone(
|
||||
"""SELECT * FROM nostrmarket.merchants WHERE user_id = :user_id """,
|
||||
{"user_id": user_id},
|
||||
|
|
@ -137,7 +138,7 @@ async def create_zone(merchant_id: str, data: Zone) -> Zone:
|
|||
return zone
|
||||
|
||||
|
||||
async def update_zone(merchant_id: str, z: Zone) -> Zone | None:
|
||||
async def update_zone(merchant_id: str, z: Zone) -> Optional[Zone]:
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE nostrmarket.zones
|
||||
|
|
@ -156,7 +157,7 @@ async def update_zone(merchant_id: str, z: Zone) -> Zone | None:
|
|||
return await get_zone(merchant_id, z.id)
|
||||
|
||||
|
||||
async def get_zone(merchant_id: str, zone_id: str) -> Zone | None:
|
||||
async def get_zone(merchant_id: str, zone_id: str) -> Optional[Zone]:
|
||||
row: dict = await db.fetchone(
|
||||
"SELECT * FROM nostrmarket.zones WHERE merchant_id = :merchant_id AND id = :id",
|
||||
{
|
||||
|
|
@ -167,7 +168,7 @@ async def get_zone(merchant_id: str, zone_id: str) -> Zone | None:
|
|||
return Zone.from_row(row) if row else None
|
||||
|
||||
|
||||
async def get_zones(merchant_id: str) -> list[Zone]:
|
||||
async def get_zones(merchant_id: str) -> List[Zone]:
|
||||
rows: list[dict] = await db.fetchall(
|
||||
"SELECT * FROM nostrmarket.zones WHERE merchant_id = :merchant_id",
|
||||
{"merchant_id": merchant_id},
|
||||
|
|
@ -234,7 +235,7 @@ async def create_stall(merchant_id: str, data: Stall) -> Stall:
|
|||
return stall
|
||||
|
||||
|
||||
async def get_stall(merchant_id: str, stall_id: str) -> Stall | None:
|
||||
async def get_stall(merchant_id: str, stall_id: str) -> Optional[Stall]:
|
||||
row: dict = await db.fetchone(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.stalls
|
||||
|
|
@ -248,7 +249,7 @@ async def get_stall(merchant_id: str, stall_id: str) -> Stall | None:
|
|||
return Stall.from_row(row) if row else None
|
||||
|
||||
|
||||
async def get_stalls(merchant_id: str, pending: bool | None = False) -> list[Stall]:
|
||||
async def get_stalls(merchant_id: str, pending: Optional[bool] = False) -> List[Stall]:
|
||||
rows: list[dict] = await db.fetchall(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.stalls
|
||||
|
|
@ -273,7 +274,7 @@ async def get_last_stall_update_time() -> int:
|
|||
return row["event_created_at"] or 0 if row else 0
|
||||
|
||||
|
||||
async def update_stall(merchant_id: str, stall: Stall) -> Stall | None:
|
||||
async def update_stall(merchant_id: str, stall: Stall) -> Optional[Stall]:
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE nostrmarket.stalls
|
||||
|
|
@ -397,7 +398,9 @@ async def update_product(merchant_id: str, product: Product) -> Product:
|
|||
return updated_product
|
||||
|
||||
|
||||
async def update_product_quantity(product_id: str, new_quantity: int) -> Product | None:
|
||||
async def update_product_quantity(
|
||||
product_id: str, new_quantity: int
|
||||
) -> Optional[Product]:
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE nostrmarket.products SET quantity = :quantity
|
||||
|
|
@ -412,7 +415,7 @@ async def update_product_quantity(product_id: str, new_quantity: int) -> Product
|
|||
return Product.from_row(row) if row else None
|
||||
|
||||
|
||||
async def get_product(merchant_id: str, product_id: str) -> Product | None:
|
||||
async def get_product(merchant_id: str, product_id: str) -> Optional[Product]:
|
||||
row: dict = await db.fetchone(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.products
|
||||
|
|
@ -428,8 +431,8 @@ async def get_product(merchant_id: str, product_id: str) -> Product | None:
|
|||
|
||||
|
||||
async def get_products(
|
||||
merchant_id: str, stall_id: str, pending: bool | None = False
|
||||
) -> list[Product]:
|
||||
merchant_id: str, stall_id: str, pending: Optional[bool] = False
|
||||
) -> List[Product]:
|
||||
rows: list[dict] = await db.fetchall(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.products
|
||||
|
|
@ -442,8 +445,8 @@ async def get_products(
|
|||
|
||||
|
||||
async def get_products_by_ids(
|
||||
merchant_id: str, product_ids: list[str]
|
||||
) -> list[Product]:
|
||||
merchant_id: str, product_ids: List[str]
|
||||
) -> List[Product]:
|
||||
# todo: revisit
|
||||
|
||||
keys = []
|
||||
|
|
@ -464,7 +467,7 @@ async def get_products_by_ids(
|
|||
return [Product.from_row(row) for row in rows]
|
||||
|
||||
|
||||
async def get_wallet_for_product(product_id: str) -> str | None:
|
||||
async def get_wallet_for_product(product_id: str) -> Optional[str]:
|
||||
row: dict = await db.fetchone(
|
||||
"""
|
||||
SELECT s.wallet as wallet FROM nostrmarket.products p
|
||||
|
|
@ -571,7 +574,7 @@ async def create_order(merchant_id: str, o: Order) -> Order:
|
|||
return order
|
||||
|
||||
|
||||
async def get_order(merchant_id: str, order_id: str) -> Order | None:
|
||||
async def get_order(merchant_id: str, order_id: str) -> Optional[Order]:
|
||||
row: dict = await db.fetchone(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.orders
|
||||
|
|
@ -585,7 +588,7 @@ async def get_order(merchant_id: str, order_id: str) -> Order | None:
|
|||
return Order.from_row(row) if row else None
|
||||
|
||||
|
||||
async def get_order_by_event_id(merchant_id: str, event_id: str) -> Order | None:
|
||||
async def get_order_by_event_id(merchant_id: str, event_id: str) -> Optional[Order]:
|
||||
row: dict = await db.fetchone(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.orders
|
||||
|
|
@ -599,7 +602,7 @@ async def get_order_by_event_id(merchant_id: str, event_id: str) -> Order | None
|
|||
return Order.from_row(row) if row else None
|
||||
|
||||
|
||||
async def get_orders(merchant_id: str, **kwargs) -> list[Order]:
|
||||
async def get_orders(merchant_id: str, **kwargs) -> List[Order]:
|
||||
q = " AND ".join(
|
||||
[
|
||||
f"{field[0]} = :{field[0]}"
|
||||
|
|
@ -626,7 +629,7 @@ async def get_orders(merchant_id: str, **kwargs) -> list[Order]:
|
|||
|
||||
async def get_orders_for_stall(
|
||||
merchant_id: str, stall_id: str, **kwargs
|
||||
) -> list[Order]:
|
||||
) -> List[Order]:
|
||||
q = " AND ".join(
|
||||
[
|
||||
f"{field[0]} = :{field[0]}"
|
||||
|
|
@ -651,7 +654,7 @@ async def get_orders_for_stall(
|
|||
return [Order.from_row(row) for row in rows]
|
||||
|
||||
|
||||
async def update_order(merchant_id: str, order_id: str, **kwargs) -> Order | None:
|
||||
async def update_order(merchant_id: str, order_id: str, **kwargs) -> Optional[Order]:
|
||||
q = ", ".join(
|
||||
[
|
||||
f"{field[0]} = :{field[0]}"
|
||||
|
|
@ -675,7 +678,7 @@ async def update_order(merchant_id: str, order_id: str, **kwargs) -> Order | Non
|
|||
return await get_order(merchant_id, order_id)
|
||||
|
||||
|
||||
async def update_order_paid_status(order_id: str, paid: bool) -> Order | None:
|
||||
async def update_order_paid_status(order_id: str, paid: bool) -> Optional[Order]:
|
||||
await db.execute(
|
||||
"UPDATE nostrmarket.orders SET paid = :paid WHERE id = :id",
|
||||
{"paid": paid, "id": order_id},
|
||||
|
|
@ -689,7 +692,7 @@ async def update_order_paid_status(order_id: str, paid: bool) -> Order | None:
|
|||
|
||||
async def update_order_shipped_status(
|
||||
merchant_id: str, order_id: str, shipped: bool
|
||||
) -> Order | None:
|
||||
) -> Optional[Order]:
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE nostrmarket.orders
|
||||
|
|
@ -753,7 +756,7 @@ async def create_direct_message(
|
|||
return msg
|
||||
|
||||
|
||||
async def get_direct_message(merchant_id: str, dm_id: str) -> DirectMessage | None:
|
||||
async def get_direct_message(merchant_id: str, dm_id: str) -> Optional[DirectMessage]:
|
||||
row: dict = await db.fetchone(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.direct_messages
|
||||
|
|
@ -769,7 +772,7 @@ async def get_direct_message(merchant_id: str, dm_id: str) -> DirectMessage | No
|
|||
|
||||
async def get_direct_message_by_event_id(
|
||||
merchant_id: str, event_id: str
|
||||
) -> DirectMessage | None:
|
||||
) -> Optional[DirectMessage]:
|
||||
row: dict = await db.fetchone(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.direct_messages
|
||||
|
|
@ -783,7 +786,7 @@ async def get_direct_message_by_event_id(
|
|||
return DirectMessage.from_row(row) if row else None
|
||||
|
||||
|
||||
async def get_direct_messages(merchant_id: str, public_key: str) -> list[DirectMessage]:
|
||||
async def get_direct_messages(merchant_id: str, public_key: str) -> List[DirectMessage]:
|
||||
rows: list[dict] = await db.fetchall(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.direct_messages
|
||||
|
|
@ -795,7 +798,7 @@ async def get_direct_messages(merchant_id: str, public_key: str) -> list[DirectM
|
|||
return [DirectMessage.from_row(row) for row in rows]
|
||||
|
||||
|
||||
async def get_orders_from_direct_messages(merchant_id: str) -> list[DirectMessage]:
|
||||
async def get_orders_from_direct_messages(merchant_id: str) -> List[DirectMessage]:
|
||||
rows: list[dict] = await db.fetchall(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.direct_messages
|
||||
|
|
@ -856,7 +859,7 @@ async def create_customer(merchant_id: str, data: Customer) -> Customer:
|
|||
return customer
|
||||
|
||||
|
||||
async def get_customer(merchant_id: str, public_key: str) -> Customer | None:
|
||||
async def get_customer(merchant_id: str, public_key: str) -> Optional[Customer]:
|
||||
row: dict = await db.fetchone(
|
||||
"""
|
||||
SELECT * FROM nostrmarket.customers
|
||||
|
|
@ -870,7 +873,7 @@ async def get_customer(merchant_id: str, public_key: str) -> Customer | None:
|
|||
return Customer.from_row(row) if row else None
|
||||
|
||||
|
||||
async def get_customers(merchant_id: str) -> list[Customer]:
|
||||
async def get_customers(merchant_id: str) -> List[Customer]:
|
||||
rows: list[dict] = await db.fetchall(
|
||||
"SELECT * FROM nostrmarket.customers WHERE merchant_id = :merchant_id",
|
||||
{"merchant_id": merchant_id},
|
||||
|
|
@ -878,7 +881,7 @@ async def get_customers(merchant_id: str) -> list[Customer]:
|
|||
return [Customer.from_row(row) for row in rows]
|
||||
|
||||
|
||||
async def get_all_unique_customers() -> list[Customer]:
|
||||
async def get_all_unique_customers() -> List[Customer]:
|
||||
q = """
|
||||
SELECT public_key, MAX(merchant_id) as merchant_id, MAX(event_created_at)
|
||||
FROM nostrmarket.customers
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import base64
|
||||
import secrets
|
||||
from typing import Optional
|
||||
|
||||
import secp256k1
|
||||
from bech32 import bech32_decode, convertbits
|
||||
|
|
@ -32,7 +33,7 @@ def decrypt_message(encoded_message: str, encryption_key) -> str:
|
|||
return unpadded_data.decode()
|
||||
|
||||
|
||||
def encrypt_message(message: str, encryption_key, iv: bytes | None = None) -> str:
|
||||
def encrypt_message(message: str, encryption_key, iv: Optional[bytes] = None) -> str:
|
||||
padder = padding.PKCS7(128).padder()
|
||||
padded_data = padder.update(message.encode()) + padder.finalize()
|
||||
|
||||
|
|
|
|||
320
misc-docs/ORDER-DISCOVERY-ANALYSIS.md
Normal file
320
misc-docs/ORDER-DISCOVERY-ANALYSIS.md
Normal file
|
|
@ -0,0 +1,320 @@
|
|||
# Nostrmarket Order Discovery Analysis
|
||||
|
||||
## Executive Summary
|
||||
|
||||
This document analyzes the order discovery mechanism in the Nostrmarket extension and identifies why merchants must manually refresh to see new orders instead of receiving them automatically through persistent subscriptions.
|
||||
|
||||
---
|
||||
|
||||
## Current Architecture
|
||||
|
||||
### Two Subscription Systems
|
||||
|
||||
The Nostrmarket extension implements two distinct subscription mechanisms for receiving Nostr events:
|
||||
|
||||
#### 1. **Persistent Subscriptions (Background Task)**
|
||||
|
||||
**Purpose**: Continuous monitoring for new orders, products, and merchant events
|
||||
|
||||
**Implementation**:
|
||||
|
||||
- Runs via `wait_for_nostr_events()` background task
|
||||
- Initiated on extension startup (15-second delay)
|
||||
- Creates subscription ID: `nostrmarket-{hash}`
|
||||
- Monitors all merchant public keys continuously
|
||||
|
||||
**Code Location**: `/nostrmarket/tasks.py:37-49`
|
||||
|
||||
```python
|
||||
async def wait_for_nostr_events(nostr_client: NostrClient):
|
||||
while True:
|
||||
try:
|
||||
await subscribe_to_all_merchants()
|
||||
while True:
|
||||
message = await nostr_client.get_event()
|
||||
await process_nostr_message(message)
|
||||
```
|
||||
|
||||
**Subscription Filters**:
|
||||
|
||||
- Direct messages (kind 4) - for orders
|
||||
- Stall events (kind 30017)
|
||||
- Product events (kind 30018)
|
||||
- Profile updates (kind 0)
|
||||
|
||||
#### 2. **Temporary Subscriptions (Manual Refresh)**
|
||||
|
||||
**Purpose**: Catch up on missed events when merchant clicks "Refresh from Nostr"
|
||||
|
||||
**Implementation**:
|
||||
|
||||
- Duration: 10 seconds only
|
||||
- Triggered by user action
|
||||
- Creates subscription ID: `merchant-{hash}`
|
||||
- Fetches ALL events from time=0
|
||||
|
||||
**Code Location**: `/nostrmarket/nostr/nostr_client.py:100-120`
|
||||
|
||||
```python
|
||||
async def merchant_temp_subscription(self, pk, duration=10):
|
||||
dm_filters = self._filters_for_direct_messages([pk], 0)
|
||||
# ... creates filters with time=0 (all history)
|
||||
await self.send_req_queue.put(["REQ", subscription_id] + merchant_filters)
|
||||
asyncio.create_task(unsubscribe_with_delay(subscription_id, duration))
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Problem Identification
|
||||
|
||||
### Why Manual Refresh is Required
|
||||
|
||||
#### **Issue 1: Timing Window Problem**
|
||||
|
||||
The persistent subscription uses timestamps from the last database update:
|
||||
|
||||
```python
|
||||
async def subscribe_to_all_merchants():
|
||||
last_dm_time = await get_last_direct_messages_created_at()
|
||||
last_stall_time = await get_last_stall_update_time()
|
||||
last_prod_time = await get_last_product_update_time()
|
||||
|
||||
await nostr_client.subscribe_merchants(
|
||||
public_keys, last_dm_time, last_stall_time, last_prod_time, 0
|
||||
)
|
||||
```
|
||||
|
||||
**Problem**: Events that occur between:
|
||||
|
||||
- The last database update time
|
||||
- When the subscription becomes active
|
||||
...are potentially missed
|
||||
|
||||
#### **Issue 2: Connection Stability**
|
||||
|
||||
The WebSocket connection between components may be unstable:
|
||||
|
||||
```
|
||||
[Nostrmarket] <--WebSocket--> [Nostrclient] <--WebSocket--> [Nostr Relays]
|
||||
Extension Extension (Global)
|
||||
```
|
||||
|
||||
**Potential failure points**:
|
||||
|
||||
1. Connection drops between nostrmarket → nostrclient
|
||||
2. Connection drops between nostrclient → relays
|
||||
3. Reconnection doesn't re-establish subscriptions
|
||||
|
||||
#### **Issue 3: Subscription State Management**
|
||||
|
||||
**Current behavior**:
|
||||
|
||||
- Single persistent subscription per merchant
|
||||
- No automatic resubscription on failure
|
||||
- No heartbeat/keepalive mechanism
|
||||
- No verification that subscription is active
|
||||
|
||||
#### **Issue 4: Event Processing Delays**
|
||||
|
||||
The startup sequence has intentional delays:
|
||||
|
||||
```python
|
||||
async def _subscribe_to_nostr_client():
|
||||
await asyncio.sleep(10) # Wait for nostrclient
|
||||
await nostr_client.run_forever()
|
||||
|
||||
async def _wait_for_nostr_events():
|
||||
await asyncio.sleep(15) # Wait for extension init
|
||||
await wait_for_nostr_events(nostr_client)
|
||||
```
|
||||
|
||||
**Problem**: Orders arriving during initialization are missed
|
||||
|
||||
---
|
||||
|
||||
## Why Manual Refresh Works
|
||||
|
||||
The temporary subscription succeeds because:
|
||||
|
||||
1. **Fetches from time=0**: Gets ALL historical events
|
||||
2. **Fresh connection**: Creates new subscription request
|
||||
3. **Immediate processing**: No startup delays
|
||||
4. **Direct feedback**: User sees results immediately
|
||||
|
||||
```python
|
||||
# Temporary subscription uses time=0 (all events)
|
||||
dm_filters = self._filters_for_direct_messages([pk], 0) # ← 0 means all time
|
||||
|
||||
# Persistent subscription uses last update time
|
||||
dm_filters = self._filters_for_direct_messages(public_keys, dm_time) # ← can miss events
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Impact Analysis
|
||||
|
||||
### User Experience Issues
|
||||
|
||||
1. **Merchants miss orders** without manual refresh
|
||||
2. **No real-time notifications** for new orders
|
||||
3. **Uncertainty** about order status
|
||||
4. **Extra manual steps** required
|
||||
5. **Delayed order fulfillment**
|
||||
|
||||
### Technical Implications
|
||||
|
||||
1. **Not truly decentralized** - requires active monitoring
|
||||
2. **Scalability concerns** - manual refresh doesn't scale
|
||||
3. **Reliability issues** - depends on user action
|
||||
4. **Performance overhead** - fetching all events repeatedly
|
||||
|
||||
---
|
||||
|
||||
## Recommended Solutions
|
||||
|
||||
### Solution A: Enhanced Persistent Subscriptions
|
||||
|
||||
**Implement redundant subscription mechanisms:**
|
||||
|
||||
```python
|
||||
class EnhancedSubscriptionManager:
|
||||
def __init__(self):
|
||||
self.last_heartbeat = time.time()
|
||||
self.subscription_active = False
|
||||
|
||||
async def maintain_subscription(self):
|
||||
while True:
|
||||
if not self.subscription_active or \
|
||||
time.time() - self.last_heartbeat > 30:
|
||||
await self.resubscribe_with_overlap()
|
||||
await asyncio.sleep(10)
|
||||
|
||||
async def resubscribe_with_overlap(self):
|
||||
# Use timestamp with 5-minute overlap
|
||||
overlap_time = int(time.time()) - 300
|
||||
await subscribe_to_all_merchants(since=overlap_time)
|
||||
```
|
||||
|
||||
### Solution B: Periodic Auto-Refresh
|
||||
|
||||
**Add automatic temporary subscriptions:**
|
||||
|
||||
```python
|
||||
async def auto_refresh_loop():
|
||||
while True:
|
||||
await asyncio.sleep(60) # Every minute
|
||||
merchants = await get_all_active_merchants()
|
||||
for merchant in merchants:
|
||||
await merchant_temp_subscription(merchant.pubkey, duration=5)
|
||||
```
|
||||
|
||||
### Solution C: WebSocket Health Monitoring
|
||||
|
||||
**Implement connection health checks:**
|
||||
|
||||
```python
|
||||
class WebSocketHealthMonitor:
|
||||
async def check_connection_health(self):
|
||||
try:
|
||||
# Send ping to nostrclient
|
||||
response = await nostr_client.ping(timeout=5)
|
||||
if not response:
|
||||
await self.reconnect_and_resubscribe()
|
||||
except Exception:
|
||||
await self.reconnect_and_resubscribe()
|
||||
```
|
||||
|
||||
### Solution D: Event Gap Detection
|
||||
|
||||
**Detect and fill gaps in event sequence:**
|
||||
|
||||
```python
|
||||
async def detect_event_gaps():
|
||||
# Check for gaps in event timestamps
|
||||
last_known = await get_last_event_time()
|
||||
current_time = int(time.time())
|
||||
|
||||
if current_time - last_known > 60: # 1 minute gap
|
||||
# Perform temporary subscription to fill gap
|
||||
await fetch_missing_events(since=last_known)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Implementation Priority
|
||||
|
||||
### Phase 1: Quick Fixes (1-2 days)
|
||||
|
||||
1. [DONE] Increase temp subscription duration (10s → 30s)
|
||||
2. [DONE] Add connection health logging
|
||||
3. [DONE] Reduce startup delays
|
||||
|
||||
### Phase 2: Reliability (3-5 days)
|
||||
|
||||
1. [TODO] Implement subscription heartbeat
|
||||
2. [TODO] Add automatic resubscription on failure
|
||||
3. [TODO] Create event gap detection
|
||||
|
||||
### Phase 3: Full Solution (1-2 weeks)
|
||||
|
||||
1. [TODO] WebSocket connection monitoring
|
||||
2. [TODO] Redundant subscription system
|
||||
3. [TODO] Real-time order notifications
|
||||
4. [TODO] Event deduplication logic
|
||||
|
||||
---
|
||||
|
||||
## Testing Recommendations
|
||||
|
||||
### Test Scenarios
|
||||
|
||||
1. **Order during startup**: Send order within 15 seconds of server start
|
||||
2. **Long-running test**: Keep server running for 24 hours, send periodic orders
|
||||
3. **Connection interruption**: Disconnect nostrclient, send order, reconnect
|
||||
4. **High volume**: Send 100 orders rapidly
|
||||
5. **Network latency**: Add artificial delay between components
|
||||
|
||||
### Monitoring Metrics
|
||||
|
||||
- Time between order sent → order discovered
|
||||
- Percentage of orders requiring manual refresh
|
||||
- WebSocket connection uptime
|
||||
- Subscription success rate
|
||||
- Event processing latency
|
||||
|
||||
---
|
||||
|
||||
## Conclusion
|
||||
|
||||
The current order discovery system relies on manual refresh due to:
|
||||
|
||||
1. **Timing gaps** in persistent subscriptions
|
||||
2. **Connection stability** issues
|
||||
3. **Lack of redundancy** in subscription management
|
||||
4. **No automatic recovery** mechanisms
|
||||
|
||||
While the temporary subscription (manual refresh) provides a workaround, a proper solution requires implementing connection monitoring, subscription health checks, and automatic gap-filling mechanisms to ensure reliable real-time order discovery.
|
||||
|
||||
---
|
||||
|
||||
## Appendix: Code References
|
||||
|
||||
### Key Files
|
||||
|
||||
- `/nostrmarket/tasks.py` - Background task management
|
||||
- `/nostrmarket/nostr/nostr_client.py` - Nostr client implementation
|
||||
- `/nostrmarket/services.py` - Order processing logic
|
||||
- `/nostrmarket/views_api.py` - API endpoints for refresh
|
||||
|
||||
### Relevant Functions
|
||||
|
||||
- `wait_for_nostr_events()` - Main event loop
|
||||
- `subscribe_to_all_merchants()` - Persistent subscription
|
||||
- `merchant_temp_subscription()` - Manual refresh
|
||||
- `process_nostr_message()` - Event processing
|
||||
|
||||
---
|
||||
|
||||
_Document prepared: January 2025_
|
||||
_Analysis based on: Nostrmarket v1.0_
|
||||
_Status: Active Investigation_
|
||||
123
models.py
123
models.py
|
|
@ -2,7 +2,7 @@ import json
|
|||
import time
|
||||
from abc import abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
||||
from lnbits.utils.exchange_rates import btc_price, fiat_amount_as_satoshis
|
||||
from pydantic import BaseModel
|
||||
|
|
@ -32,16 +32,21 @@ class Nostrable:
|
|||
|
||||
|
||||
class MerchantProfile(BaseModel):
|
||||
name: str | None = None
|
||||
about: str | None = None
|
||||
picture: str | None = None
|
||||
name: Optional[str] = None
|
||||
about: Optional[str] = None
|
||||
picture: Optional[str] = None
|
||||
|
||||
|
||||
class MerchantConfig(MerchantProfile):
|
||||
event_id: str | None = None
|
||||
sync_from_nostr: bool = False
|
||||
active: bool = False
|
||||
restore_in_progress: bool | None = False
|
||||
event_id: Optional[str] = None
|
||||
sync_from_nostr = False
|
||||
# TODO: switched to True for AIO demo; determine if we leave this as True
|
||||
active: bool = True
|
||||
restore_in_progress: Optional[bool] = False
|
||||
|
||||
|
||||
class CreateMerchantRequest(BaseModel):
|
||||
config: MerchantConfig = MerchantConfig()
|
||||
|
||||
|
||||
class PartialMerchant(BaseModel):
|
||||
|
|
@ -52,7 +57,7 @@ class PartialMerchant(BaseModel):
|
|||
|
||||
class Merchant(PartialMerchant, Nostrable):
|
||||
id: str
|
||||
time: int | None = 0
|
||||
time: Optional[int] = 0
|
||||
|
||||
def sign_hash(self, hash_: bytes) -> str:
|
||||
return sign_message_hash(self.private_key, hash_)
|
||||
|
|
@ -122,11 +127,11 @@ class Merchant(PartialMerchant, Nostrable):
|
|||
|
||||
######################################## ZONES ########################################
|
||||
class Zone(BaseModel):
|
||||
id: str | None = None
|
||||
name: str | None = None
|
||||
id: Optional[str] = None
|
||||
name: Optional[str] = None
|
||||
currency: str
|
||||
cost: float
|
||||
countries: list[str] = []
|
||||
countries: List[str] = []
|
||||
|
||||
@classmethod
|
||||
def from_row(cls, row: dict) -> "Zone":
|
||||
|
|
@ -139,22 +144,22 @@ class Zone(BaseModel):
|
|||
|
||||
|
||||
class StallConfig(BaseModel):
|
||||
image_url: str | None = None
|
||||
description: str | None = None
|
||||
image_url: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class Stall(BaseModel, Nostrable):
|
||||
id: str | None = None
|
||||
id: Optional[str] = None
|
||||
wallet: str
|
||||
name: str
|
||||
currency: str = "sat"
|
||||
shipping_zones: list[Zone] = []
|
||||
shipping_zones: List[Zone] = []
|
||||
config: StallConfig = StallConfig()
|
||||
pending: bool = False
|
||||
|
||||
"""Last published nostr event for this Stall"""
|
||||
event_id: str | None = None
|
||||
event_created_at: int | None = None
|
||||
event_id: Optional[str] = None
|
||||
event_created_at: Optional[int] = None
|
||||
|
||||
def validate_stall(self):
|
||||
for z in self.shipping_zones:
|
||||
|
|
@ -212,19 +217,19 @@ class ProductShippingCost(BaseModel):
|
|||
|
||||
|
||||
class ProductConfig(BaseModel):
|
||||
description: str | None = None
|
||||
currency: str | None = None
|
||||
use_autoreply: bool | None = False
|
||||
autoreply_message: str | None = None
|
||||
shipping: list[ProductShippingCost] = []
|
||||
description: Optional[str] = None
|
||||
currency: Optional[str] = None
|
||||
use_autoreply: Optional[bool] = False
|
||||
autoreply_message: Optional[str] = None
|
||||
shipping: List[ProductShippingCost] = []
|
||||
|
||||
|
||||
class Product(BaseModel, Nostrable):
|
||||
id: str | None = None
|
||||
id: Optional[str] = None
|
||||
stall_id: str
|
||||
name: str
|
||||
categories: list[str] = []
|
||||
images: list[str] = []
|
||||
categories: List[str] = []
|
||||
images: List[str] = []
|
||||
price: float
|
||||
quantity: int
|
||||
active: bool = True
|
||||
|
|
@ -232,8 +237,8 @@ class Product(BaseModel, Nostrable):
|
|||
config: ProductConfig = ProductConfig()
|
||||
|
||||
"""Last published nostr event for this Product"""
|
||||
event_id: str | None = None
|
||||
event_created_at: int | None = None
|
||||
event_id: Optional[str] = None
|
||||
event_created_at: Optional[int] = None
|
||||
|
||||
def to_nostr_event(self, pubkey: str) -> NostrEvent:
|
||||
content = {
|
||||
|
|
@ -290,7 +295,7 @@ class ProductOverview(BaseModel):
|
|||
id: str
|
||||
name: str
|
||||
price: float
|
||||
product_shipping_cost: float | None = None
|
||||
product_shipping_cost: Optional[float] = None
|
||||
|
||||
@classmethod
|
||||
def from_product(cls, p: Product) -> "ProductOverview":
|
||||
|
|
@ -307,21 +312,21 @@ class OrderItem(BaseModel):
|
|||
|
||||
|
||||
class OrderContact(BaseModel):
|
||||
nostr: str | None = None
|
||||
phone: str | None = None
|
||||
email: str | None = None
|
||||
nostr: Optional[str] = None
|
||||
phone: Optional[str] = None
|
||||
email: Optional[str] = None
|
||||
|
||||
|
||||
class OrderExtra(BaseModel):
|
||||
products: list[ProductOverview]
|
||||
products: List[ProductOverview]
|
||||
currency: str
|
||||
btc_price: str
|
||||
shipping_cost: float = 0
|
||||
shipping_cost_sat: float = 0
|
||||
fail_message: str | None = None
|
||||
fail_message: Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
async def from_products(cls, products: list[Product]):
|
||||
async def from_products(cls, products: List[Product]):
|
||||
currency = products[0].config.currency if len(products) else "sat"
|
||||
exchange_rate = (
|
||||
await btc_price(currency) if currency and currency != "sat" else 1
|
||||
|
|
@ -337,19 +342,19 @@ class OrderExtra(BaseModel):
|
|||
|
||||
class PartialOrder(BaseModel):
|
||||
id: str
|
||||
event_id: str | None = None
|
||||
event_created_at: int | None = None
|
||||
event_id: Optional[str] = None
|
||||
event_created_at: Optional[int] = None
|
||||
public_key: str
|
||||
merchant_public_key: str
|
||||
shipping_id: str
|
||||
items: list[OrderItem]
|
||||
contact: OrderContact | None = None
|
||||
address: str | None = None
|
||||
items: List[OrderItem]
|
||||
contact: Optional[OrderContact] = None
|
||||
address: Optional[str] = None
|
||||
|
||||
def validate_order(self):
|
||||
assert len(self.items) != 0, f"Order has no items. Order: '{self.id}'"
|
||||
|
||||
def validate_order_items(self, product_list: list[Product]):
|
||||
def validate_order_items(self, product_list: List[Product]):
|
||||
assert len(self.items) != 0, f"Order has no items. Order: '{self.id}'"
|
||||
assert (
|
||||
len(product_list) != 0
|
||||
|
|
@ -370,8 +375,8 @@ class PartialOrder(BaseModel):
|
|||
)
|
||||
|
||||
async def costs_in_sats(
|
||||
self, products: list[Product], shipping_id: str, stall_shipping_cost: float
|
||||
) -> tuple[float, float]:
|
||||
self, products: List[Product], shipping_id: str, stall_shipping_cost: float
|
||||
) -> Tuple[float, float]:
|
||||
product_prices = {}
|
||||
for p in products:
|
||||
product_shipping_cost = next(
|
||||
|
|
@ -400,7 +405,7 @@ class PartialOrder(BaseModel):
|
|||
return product_cost, stall_shipping_cost
|
||||
|
||||
def receipt(
|
||||
self, products: list[Product], shipping_id: str, stall_shipping_cost: float
|
||||
self, products: List[Product], shipping_id: str, stall_shipping_cost: float
|
||||
) -> str:
|
||||
if len(products) == 0:
|
||||
return "[No Products]"
|
||||
|
|
@ -449,7 +454,7 @@ class Order(PartialOrder):
|
|||
total: float
|
||||
paid: bool = False
|
||||
shipped: bool = False
|
||||
time: int | None = None
|
||||
time: Optional[int] = None
|
||||
extra: OrderExtra
|
||||
|
||||
@classmethod
|
||||
|
|
@ -463,14 +468,14 @@ class Order(PartialOrder):
|
|||
|
||||
class OrderStatusUpdate(BaseModel):
|
||||
id: str
|
||||
message: str | None = None
|
||||
paid: bool | None = False
|
||||
shipped: bool | None = None
|
||||
message: Optional[str] = None
|
||||
paid: Optional[bool] = False
|
||||
shipped: Optional[bool] = None
|
||||
|
||||
|
||||
class OrderReissue(BaseModel):
|
||||
id: str
|
||||
shipping_id: str | None = None
|
||||
shipping_id: Optional[str] = None
|
||||
|
||||
|
||||
class PaymentOption(BaseModel):
|
||||
|
|
@ -480,8 +485,8 @@ class PaymentOption(BaseModel):
|
|||
|
||||
class PaymentRequest(BaseModel):
|
||||
id: str
|
||||
message: str | None = None
|
||||
payment_options: list[PaymentOption]
|
||||
message: Optional[str] = None
|
||||
payment_options: List[PaymentOption]
|
||||
|
||||
|
||||
######################################## MESSAGE #######################################
|
||||
|
|
@ -497,16 +502,16 @@ class DirectMessageType(Enum):
|
|||
|
||||
|
||||
class PartialDirectMessage(BaseModel):
|
||||
event_id: str | None = None
|
||||
event_created_at: int | None = None
|
||||
event_id: Optional[str] = None
|
||||
event_created_at: Optional[int] = None
|
||||
message: str
|
||||
public_key: str
|
||||
type: int = DirectMessageType.PLAIN_TEXT.value
|
||||
incoming: bool = False
|
||||
time: int | None = None
|
||||
time: Optional[int] = None
|
||||
|
||||
@classmethod
|
||||
def parse_message(cls, msg) -> tuple[DirectMessageType, Any | None]:
|
||||
def parse_message(cls, msg) -> Tuple[DirectMessageType, Optional[Any]]:
|
||||
try:
|
||||
msg_json = json.loads(msg)
|
||||
if "type" in msg_json:
|
||||
|
|
@ -529,15 +534,15 @@ class DirectMessage(PartialDirectMessage):
|
|||
|
||||
|
||||
class CustomerProfile(BaseModel):
|
||||
name: str | None = None
|
||||
about: str | None = None
|
||||
name: Optional[str] = None
|
||||
about: Optional[str] = None
|
||||
|
||||
|
||||
class Customer(BaseModel):
|
||||
merchant_id: str
|
||||
public_key: str
|
||||
event_created_at: int | None = None
|
||||
profile: CustomerProfile | None = None
|
||||
event_created_at: Optional[int] = None
|
||||
profile: Optional[CustomerProfile] = None
|
||||
unread_messages: int = 0
|
||||
|
||||
@classmethod
|
||||
|
|
|
|||
|
|
@ -31,9 +31,11 @@ class NostrClient:
|
|||
logger.debug(f"Connecting to websockets for 'nostrclient' extension...")
|
||||
|
||||
relay_endpoint = encrypt_internal_message("relay", urlsafe=True)
|
||||
ws_url = f"ws://localhost:{settings.port}/nostrclient/api/v1/{relay_endpoint}"
|
||||
|
||||
on_open, on_message, on_error, on_close = self._ws_handlers()
|
||||
ws = WebSocketApp(
|
||||
f"ws://localhost:{settings.port}/nostrclient/api/v1/{relay_endpoint}",
|
||||
ws_url,
|
||||
on_message=on_message,
|
||||
on_open=on_open,
|
||||
on_close=on_close,
|
||||
|
|
@ -65,6 +67,7 @@ class NostrClient:
|
|||
async def get_event(self):
|
||||
value = await self.recieve_event_queue.get()
|
||||
if isinstance(value, ValueError):
|
||||
logger.error(f"[NOSTRMARKET] ❌ Queue returned error: {value}")
|
||||
raise value
|
||||
return value
|
||||
|
||||
|
|
@ -91,10 +94,6 @@ class NostrClient:
|
|||
self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32]
|
||||
await self.send_req_queue.put(["REQ", self.subscription_id] + merchant_filters)
|
||||
|
||||
logger.debug(
|
||||
f"Subscribing to events for: {len(public_keys)} keys. New subscription id: {self.subscription_id}"
|
||||
)
|
||||
|
||||
async def merchant_temp_subscription(self, pk, duration=10):
|
||||
dm_filters = self._filters_for_direct_messages([pk], 0)
|
||||
stall_filters = self._filters_for_stall_events([pk], 0)
|
||||
|
|
@ -175,16 +174,21 @@ class NostrClient:
|
|||
|
||||
def _ws_handlers(self):
|
||||
def on_open(_):
|
||||
logger.info("Connected to 'nostrclient' websocket")
|
||||
logger.debug("[NOSTRMARKET DEBUG] ✅ Connected to 'nostrclient' websocket successfully")
|
||||
|
||||
def on_message(_, message):
|
||||
self.recieve_event_queue.put_nowait(message)
|
||||
logger.debug(f"[NOSTRMARKET DEBUG] 📨 Received websocket message: {message[:200]}...")
|
||||
try:
|
||||
self.recieve_event_queue.put_nowait(message)
|
||||
logger.debug(f"[NOSTRMARKET DEBUG] 📤 Message queued successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"[NOSTRMARKET] ❌ Failed to queue message: {e}")
|
||||
|
||||
def on_error(_, error):
|
||||
logger.warning(error)
|
||||
logger.warning(f"[NOSTRMARKET] ❌ Websocket error: {error}")
|
||||
|
||||
def on_close(x, status_code, message):
|
||||
logger.warning(f"Websocket closed: {x}: '{status_code}' '{message}'")
|
||||
logger.warning(f"[NOSTRMARKET] 🔌 Websocket closed: {x}: '{status_code}' '{message}'")
|
||||
# force re-subscribe
|
||||
self.recieve_event_queue.put_nowait(ValueError("Websocket close."))
|
||||
|
||||
|
|
|
|||
130
services.py
130
services.py
|
|
@ -1,7 +1,8 @@
|
|||
import asyncio
|
||||
import json
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from bolt11 import decode
|
||||
from lnbits.bolt11 import decode
|
||||
from lnbits.core.crud import get_wallet
|
||||
from lnbits.core.services import create_invoice, websocket_updater
|
||||
from loguru import logger
|
||||
|
|
@ -59,11 +60,12 @@ from .nostr.event import NostrEvent
|
|||
|
||||
async def create_new_order(
|
||||
merchant_public_key: str, data: PartialOrder
|
||||
) -> PaymentRequest | None:
|
||||
) -> Optional[PaymentRequest]:
|
||||
merchant = await get_merchant_by_pubkey(merchant_public_key)
|
||||
assert merchant, "Cannot find merchant for order!"
|
||||
|
||||
if await get_order(merchant.id, data.id):
|
||||
existing_order = await get_order(merchant.id, data.id)
|
||||
if existing_order:
|
||||
return None
|
||||
if data.event_id and await get_order_by_event_id(merchant.id, data.event_id):
|
||||
return None
|
||||
|
|
@ -73,20 +75,24 @@ async def create_new_order(
|
|||
)
|
||||
await create_order(merchant.id, order)
|
||||
|
||||
return PaymentRequest(
|
||||
payment_request = PaymentRequest(
|
||||
id=data.id,
|
||||
payment_options=[PaymentOption(type="ln", link=invoice)],
|
||||
message=receipt,
|
||||
)
|
||||
return payment_request
|
||||
|
||||
|
||||
async def build_order_with_payment(
|
||||
merchant_id: str, merchant_public_key: str, data: PartialOrder
|
||||
):
|
||||
|
||||
products = await get_products_by_ids(
|
||||
merchant_id, [p.product_id for p in data.items]
|
||||
)
|
||||
|
||||
data.validate_order_items(products)
|
||||
|
||||
shipping_zone = await get_zone(merchant_id, data.shipping_id)
|
||||
assert shipping_zone, f"Shipping zone not found for order '{data.id}'"
|
||||
|
||||
|
|
@ -94,6 +100,7 @@ async def build_order_with_payment(
|
|||
product_cost_sat, shipping_cost_sat = await data.costs_in_sats(
|
||||
products, shipping_zone.id, shipping_zone.cost
|
||||
)
|
||||
|
||||
receipt = data.receipt(products, shipping_zone.id, shipping_zone.cost)
|
||||
|
||||
wallet_id = await get_wallet_for_product(data.items[0].product_id)
|
||||
|
|
@ -104,11 +111,13 @@ async def build_order_with_payment(
|
|||
merchant_id, product_ids, data.items
|
||||
)
|
||||
if not success:
|
||||
logger.error(f"[NOSTRMARKET] ❌ Product quantity check failed: {message}")
|
||||
raise ValueError(message)
|
||||
|
||||
total_amount_sat = round(product_cost_sat + shipping_cost_sat)
|
||||
payment = await create_invoice(
|
||||
wallet_id=wallet_id,
|
||||
amount=round(product_cost_sat + shipping_cost_sat),
|
||||
amount=total_amount_sat,
|
||||
memo=f"Order '{data.id}' for pubkey '{data.public_key}'",
|
||||
extra={
|
||||
"tag": "nostrmarket",
|
||||
|
|
@ -136,7 +145,7 @@ async def update_merchant_to_nostr(
|
|||
merchant: Merchant, delete_merchant=False
|
||||
) -> Merchant:
|
||||
stalls = await get_stalls(merchant.id)
|
||||
event: NostrEvent | None = None
|
||||
event: Optional[NostrEvent] = None
|
||||
for stall in stalls:
|
||||
assert stall.id
|
||||
products = await get_products(merchant.id, stall.id)
|
||||
|
|
@ -221,7 +230,7 @@ async def notify_client_of_order_status(
|
|||
|
||||
async def update_products_for_order(
|
||||
merchant: Merchant, order: Order
|
||||
) -> tuple[bool, str]:
|
||||
) -> Tuple[bool, str]:
|
||||
product_ids = [i.product_id for i in order.items]
|
||||
success, products, message = await compute_products_new_quantity(
|
||||
merchant.id, product_ids, order.items
|
||||
|
|
@ -289,9 +298,9 @@ async def send_dm(
|
|||
|
||||
|
||||
async def compute_products_new_quantity(
|
||||
merchant_id: str, product_ids: list[str], items: list[OrderItem]
|
||||
) -> tuple[bool, list[Product], str]:
|
||||
products: list[Product] = await get_products_by_ids(merchant_id, product_ids)
|
||||
merchant_id: str, product_ids: List[str], items: List[OrderItem]
|
||||
) -> Tuple[bool, List[Product], str]:
|
||||
products: List[Product] = await get_products_by_ids(merchant_id, product_ids)
|
||||
|
||||
for p in products:
|
||||
required_quantity = next(
|
||||
|
|
@ -314,11 +323,17 @@ async def compute_products_new_quantity(
|
|||
|
||||
async def process_nostr_message(msg: str):
|
||||
try:
|
||||
type_, *rest = json.loads(msg)
|
||||
parsed_msg = json.loads(msg)
|
||||
type_, *rest = parsed_msg
|
||||
|
||||
|
||||
if type_.upper() == "EVENT":
|
||||
if len(rest) < 2:
|
||||
logger.warning(f"[NOSTRMARKET] ⚠️ EVENT message missing data: {rest}")
|
||||
return
|
||||
_, event = rest
|
||||
event = NostrEvent(**event)
|
||||
|
||||
if event.kind == 0:
|
||||
await _handle_customer_profile_update(event)
|
||||
elif event.kind == 4:
|
||||
|
|
@ -327,10 +342,15 @@ async def process_nostr_message(msg: str):
|
|||
await _handle_stall(event)
|
||||
elif event.kind == 30018:
|
||||
await _handle_product(event)
|
||||
else:
|
||||
logger.info(f"[NOSTRMARKET] ❓ Unhandled event kind: {event.kind} - event: {event.id}")
|
||||
return
|
||||
else:
|
||||
logger.info(f"[NOSTRMARKET] 🔄 Non-EVENT message type: {type_}")
|
||||
|
||||
except Exception as ex:
|
||||
logger.debug(ex)
|
||||
logger.error(f"[NOSTRMARKET] ❌ Error processing nostr message: {ex}")
|
||||
logger.error(f"[NOSTRMARKET] 📄 Raw message that failed: {msg}")
|
||||
|
||||
|
||||
async def create_or_update_order_from_dm(
|
||||
|
|
@ -412,28 +432,29 @@ async def extract_customer_order_from_dm(
|
|||
|
||||
|
||||
async def _handle_nip04_message(event: NostrEvent):
|
||||
merchant_public_key = event.pubkey
|
||||
merchant = await get_merchant_by_pubkey(merchant_public_key)
|
||||
|
||||
if not merchant:
|
||||
p_tags = event.tag_values("p")
|
||||
if len(p_tags) and p_tags[0]:
|
||||
merchant_public_key = p_tags[0]
|
||||
merchant = await get_merchant_by_pubkey(merchant_public_key)
|
||||
|
||||
assert merchant, f"Merchant not found for public key '{merchant_public_key}'"
|
||||
|
||||
if event.pubkey == merchant_public_key:
|
||||
assert len(event.tag_values("p")) != 0, "Outgong message has no 'p' tag"
|
||||
clear_text_msg = merchant.decrypt_message(
|
||||
|
||||
p_tags = event.tag_values("p")
|
||||
|
||||
# PRIORITY 1: Check if any recipient (p_tag) is a merchant → incoming message TO merchant
|
||||
for p_tag in p_tags:
|
||||
if p_tag:
|
||||
potential_merchant = await get_merchant_by_pubkey(p_tag)
|
||||
if potential_merchant:
|
||||
clear_text_msg = potential_merchant.decrypt_message(event.content, event.pubkey)
|
||||
await _handle_incoming_dms(event, potential_merchant, clear_text_msg)
|
||||
return # IMPORTANT: Return immediately to prevent double processing
|
||||
|
||||
# PRIORITY 2: If no recipient merchant found, check if sender is a merchant → outgoing message FROM merchant
|
||||
sender_merchant = await get_merchant_by_pubkey(event.pubkey)
|
||||
if sender_merchant:
|
||||
assert len(event.tag_values("p")) != 0, "Outgoing message has no 'p' tag"
|
||||
clear_text_msg = sender_merchant.decrypt_message(
|
||||
event.content, event.tag_values("p")[0]
|
||||
)
|
||||
await _handle_outgoing_dms(event, merchant, clear_text_msg)
|
||||
elif event.has_tag_value("p", merchant_public_key):
|
||||
clear_text_msg = merchant.decrypt_message(event.content, event.pubkey)
|
||||
await _handle_incoming_dms(event, merchant, clear_text_msg)
|
||||
else:
|
||||
logger.warning(f"Bad NIP04 event: '{event.id}'")
|
||||
await _handle_outgoing_dms(event, sender_merchant, clear_text_msg)
|
||||
return # IMPORTANT: Return immediately
|
||||
|
||||
# No merchant found in either direction
|
||||
|
||||
|
||||
async def _handle_incoming_dms(
|
||||
|
|
@ -483,17 +504,18 @@ async def _handle_outgoing_dms(
|
|||
|
||||
async def _handle_incoming_structured_dm(
|
||||
merchant: Merchant, dm: DirectMessage, json_data: dict
|
||||
) -> tuple[DirectMessageType, str | None]:
|
||||
) -> Tuple[DirectMessageType, Optional[str]]:
|
||||
try:
|
||||
if dm.type == DirectMessageType.CUSTOMER_ORDER.value and merchant.config.active:
|
||||
json_resp = await _handle_new_order(
|
||||
merchant.id, merchant.public_key, dm, json_data
|
||||
)
|
||||
|
||||
return DirectMessageType.PAYMENT_REQUEST, json_resp
|
||||
else:
|
||||
logger.info(f"[NOSTRMARKET] Skipping order processing - type: {dm.type}, expected: {DirectMessageType.CUSTOMER_ORDER.value}, merchant_active: {merchant.config.active}")
|
||||
|
||||
except Exception as ex:
|
||||
logger.warning(ex)
|
||||
logger.error(f"[NOSTRMARKET] Error in _handle_incoming_structured_dm: {ex}")
|
||||
|
||||
return DirectMessageType.PLAIN_TEXT, None
|
||||
|
||||
|
|
@ -574,9 +596,31 @@ async def _handle_new_order(
|
|||
wallet = await get_wallet(wallet_id)
|
||||
assert wallet, f"Cannot find wallet for product id: {first_product_id}"
|
||||
|
||||
|
||||
payment_req = await create_new_order(merchant_public_key, partial_order)
|
||||
|
||||
if payment_req is None:
|
||||
# Return existing order data instead of creating a failed order
|
||||
existing_order = await get_order(merchant_id, partial_order.id)
|
||||
if existing_order and existing_order.invoice_id != "None":
|
||||
# Order exists with invoice, return existing payment request
|
||||
duplicate_response = json.dumps({
|
||||
"type": DirectMessageType.PAYMENT_REQUEST.value,
|
||||
"id": existing_order.id,
|
||||
"message": "Order already received and processed",
|
||||
"payment_options": []
|
||||
}, separators=(",", ":"), ensure_ascii=False)
|
||||
return duplicate_response
|
||||
else:
|
||||
# Order exists but no invoice, skip processing
|
||||
logger.info(f"[NOSTRMARKET] Order exists but no invoice, returning empty string")
|
||||
return ""
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
logger.error(f"[NOSTRMARKET] Error creating order: {e}")
|
||||
logger.error(f"[NOSTRMARKET] Order data: {json_data}")
|
||||
logger.error(f"[NOSTRMARKET] Exception type: {type(e).__name__}")
|
||||
logger.error(f"[NOSTRMARKET] Exception details: {str(e)}")
|
||||
payment_req = await create_new_failed_order(
|
||||
merchant_id,
|
||||
merchant_public_key,
|
||||
|
|
@ -584,12 +628,17 @@ async def _handle_new_order(
|
|||
json_data,
|
||||
"Order received, but cannot be processed. Please contact merchant.",
|
||||
)
|
||||
assert payment_req
|
||||
|
||||
if not payment_req:
|
||||
logger.error(f"[NOSTRMARKET] No payment request returned for order: {partial_order.id}")
|
||||
return ""
|
||||
|
||||
response = {
|
||||
"type": DirectMessageType.PAYMENT_REQUEST.value,
|
||||
**payment_req.dict(),
|
||||
}
|
||||
return json.dumps(response, separators=(",", ":"), ensure_ascii=False)
|
||||
response_json = json.dumps(response, separators=(",", ":"), ensure_ascii=False)
|
||||
return response_json
|
||||
|
||||
|
||||
async def create_new_failed_order(
|
||||
|
|
@ -622,8 +671,11 @@ async def subscribe_to_all_merchants():
|
|||
last_stall_time = await get_last_stall_update_time()
|
||||
last_prod_time = await get_last_product_update_time()
|
||||
|
||||
# Make dm_time more lenient by subtracting 5 minutes to avoid missing recent events
|
||||
lenient_dm_time = max(0, last_dm_time - 300) if last_dm_time > 0 else 0
|
||||
|
||||
await nostr_client.subscribe_merchants(
|
||||
public_keys, last_dm_time, last_stall_time, last_prod_time, 0
|
||||
public_keys, lenient_dm_time, last_stall_time, last_prod_time, 0
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -21,26 +21,18 @@ window.app = Vue.createApp({
|
|||
},
|
||||
methods: {
|
||||
generateKeys: async function () {
|
||||
const privateKey = nostr.generatePrivateKey()
|
||||
await this.createMerchant(privateKey)
|
||||
// No longer need to generate keys here - the backend will use user's existing keypairs
|
||||
await this.createMerchant()
|
||||
},
|
||||
importKeys: async function () {
|
||||
this.importKeyDialog.show = false
|
||||
let privateKey = this.importKeyDialog.data.privateKey
|
||||
if (!privateKey) {
|
||||
return
|
||||
}
|
||||
try {
|
||||
if (privateKey.toLowerCase().startsWith('nsec')) {
|
||||
privateKey = nostr.nip19.decode(privateKey).data
|
||||
}
|
||||
} catch (error) {
|
||||
this.$q.notify({
|
||||
type: 'negative',
|
||||
message: `${error}`
|
||||
})
|
||||
}
|
||||
await this.createMerchant(privateKey)
|
||||
// Import keys functionality removed since we use user's native keypairs
|
||||
// Show a message that this is no longer needed
|
||||
this.$q.notify({
|
||||
type: 'info',
|
||||
message: 'Merchants now use your account Nostr keys automatically. Key import is no longer needed.',
|
||||
timeout: 3000
|
||||
})
|
||||
},
|
||||
showImportKeysDialog: async function () {
|
||||
this.importKeyDialog.show = true
|
||||
|
|
@ -94,12 +86,9 @@ window.app = Vue.createApp({
|
|||
this.activeChatCustomer = ''
|
||||
this.showKeys = false
|
||||
},
|
||||
createMerchant: async function (privateKey) {
|
||||
createMerchant: async function () {
|
||||
try {
|
||||
const pubkey = nostr.getPublicKey(privateKey)
|
||||
const payload = {
|
||||
private_key: privateKey,
|
||||
public_key: pubkey,
|
||||
config: {}
|
||||
}
|
||||
const {data} = await LNbits.api.request(
|
||||
|
|
|
|||
6
tasks.py
6
tasks.py
|
|
@ -35,13 +35,17 @@ async def on_invoice_paid(payment: Payment) -> None:
|
|||
|
||||
|
||||
async def wait_for_nostr_events(nostr_client: NostrClient):
|
||||
logger.info("[NOSTRMARKET DEBUG] Starting wait_for_nostr_events task")
|
||||
while True:
|
||||
try:
|
||||
logger.info("[NOSTRMARKET DEBUG] Subscribing to all merchants...")
|
||||
await subscribe_to_all_merchants()
|
||||
|
||||
while True:
|
||||
logger.debug("[NOSTRMARKET DEBUG] Waiting for nostr event...")
|
||||
message = await nostr_client.get_event()
|
||||
logger.info(f"[NOSTRMARKET DEBUG] Received event from nostr_client: {message[:100]}...")
|
||||
await process_nostr_message(message)
|
||||
except Exception as e:
|
||||
logger.warning(f"Subcription failed. Will retry in one minute: {e}")
|
||||
logger.warning(f"[NOSTRMARKET DEBUG] Subscription failed. Will retry in 10 seconds: {e}")
|
||||
await asyncio.sleep(10)
|
||||
|
|
|
|||
76
views_api.py
76
views_api.py
|
|
@ -1,15 +1,18 @@
|
|||
import json
|
||||
from http import HTTPStatus
|
||||
from typing import List, Optional
|
||||
|
||||
from fastapi import Depends
|
||||
from fastapi.exceptions import HTTPException
|
||||
from lnbits.core.models import WalletTypeInfo
|
||||
from lnbits.core.crud import get_account, update_account
|
||||
from lnbits.core.services import websocket_updater
|
||||
from lnbits.decorators import (
|
||||
WalletTypeInfo,
|
||||
require_admin_key,
|
||||
require_invoice_key,
|
||||
)
|
||||
from lnbits.utils.exchange_rates import currencies
|
||||
from lnbits.utils.nostr import generate_keypair
|
||||
from loguru import logger
|
||||
|
||||
from . import nostr_client, nostrmarket_ext
|
||||
|
|
@ -58,6 +61,7 @@ from .crud import (
|
|||
)
|
||||
from .helpers import normalize_public_key
|
||||
from .models import (
|
||||
CreateMerchantRequest,
|
||||
Customer,
|
||||
DirectMessage,
|
||||
DirectMessageType,
|
||||
|
|
@ -89,18 +93,48 @@ from .services import (
|
|||
|
||||
@nostrmarket_ext.post("/api/v1/merchant")
|
||||
async def api_create_merchant(
|
||||
data: PartialMerchant,
|
||||
data: CreateMerchantRequest,
|
||||
wallet: WalletTypeInfo = Depends(require_admin_key),
|
||||
) -> Merchant:
|
||||
|
||||
try:
|
||||
merchant = await get_merchant_by_pubkey(data.public_key)
|
||||
assert merchant is None, "A merchant already uses this public key"
|
||||
|
||||
# Check if merchant already exists for this user
|
||||
merchant = await get_merchant_for_user(wallet.wallet.user)
|
||||
assert merchant is None, "A merchant already exists for this user"
|
||||
|
||||
merchant = await create_merchant(wallet.wallet.user, data)
|
||||
# Get user's account to access their Nostr keypairs
|
||||
account = await get_account(wallet.wallet.user)
|
||||
if not account:
|
||||
raise HTTPException(
|
||||
status_code=HTTPStatus.NOT_FOUND,
|
||||
detail="User account not found",
|
||||
)
|
||||
|
||||
# Check if user has Nostr keypairs, generate them if not
|
||||
if not account.pubkey or not account.prvkey:
|
||||
# Generate new keypair for user
|
||||
private_key, public_key = generate_keypair()
|
||||
|
||||
# Update user account with new keypairs
|
||||
account.pubkey = public_key
|
||||
account.prvkey = private_key
|
||||
await update_account(account)
|
||||
else:
|
||||
public_key = account.pubkey
|
||||
private_key = account.prvkey
|
||||
|
||||
# Check if another merchant is already using this public key
|
||||
existing_merchant = await get_merchant_by_pubkey(public_key)
|
||||
assert existing_merchant is None, "A merchant already uses this public key"
|
||||
|
||||
# Create PartialMerchant with user's keypairs
|
||||
partial_merchant = PartialMerchant(
|
||||
private_key=private_key,
|
||||
public_key=public_key,
|
||||
config=data.config
|
||||
)
|
||||
|
||||
merchant = await create_merchant(wallet.wallet.user, partial_merchant)
|
||||
|
||||
await create_zone(
|
||||
merchant.id,
|
||||
|
|
@ -115,7 +149,7 @@ async def api_create_merchant(
|
|||
|
||||
await resubscribe_to_all_merchants()
|
||||
|
||||
await nostr_client.merchant_temp_subscription(data.public_key)
|
||||
await nostr_client.merchant_temp_subscription(public_key)
|
||||
|
||||
return merchant
|
||||
except AssertionError as ex:
|
||||
|
|
@ -134,7 +168,7 @@ async def api_create_merchant(
|
|||
@nostrmarket_ext.get("/api/v1/merchant")
|
||||
async def api_get_merchant(
|
||||
wallet: WalletTypeInfo = Depends(require_invoice_key),
|
||||
) -> Merchant | None:
|
||||
) -> Optional[Merchant]:
|
||||
|
||||
try:
|
||||
merchant = await get_merchant_for_user(wallet.wallet.user)
|
||||
|
|
@ -302,7 +336,7 @@ async def api_delete_merchant_on_nostr(
|
|||
@nostrmarket_ext.get("/api/v1/zone")
|
||||
async def api_get_zones(
|
||||
wallet: WalletTypeInfo = Depends(require_invoice_key),
|
||||
) -> list[Zone]:
|
||||
) -> List[Zone]:
|
||||
try:
|
||||
merchant = await get_merchant_for_user(wallet.wallet.user)
|
||||
assert merchant, "Merchant cannot be found"
|
||||
|
|
@ -502,7 +536,7 @@ async def api_get_stall(
|
|||
|
||||
@nostrmarket_ext.get("/api/v1/stall")
|
||||
async def api_get_stalls(
|
||||
pending: bool | None = False,
|
||||
pending: Optional[bool] = False,
|
||||
wallet: WalletTypeInfo = Depends(require_invoice_key),
|
||||
):
|
||||
try:
|
||||
|
|
@ -526,7 +560,7 @@ async def api_get_stalls(
|
|||
@nostrmarket_ext.get("/api/v1/stall/product/{stall_id}")
|
||||
async def api_get_stall_products(
|
||||
stall_id: str,
|
||||
pending: bool | None = False,
|
||||
pending: Optional[bool] = False,
|
||||
wallet: WalletTypeInfo = Depends(require_invoice_key),
|
||||
):
|
||||
try:
|
||||
|
|
@ -550,9 +584,9 @@ async def api_get_stall_products(
|
|||
@nostrmarket_ext.get("/api/v1/stall/order/{stall_id}")
|
||||
async def api_get_stall_orders(
|
||||
stall_id: str,
|
||||
paid: bool | None = None,
|
||||
shipped: bool | None = None,
|
||||
pubkey: str | None = None,
|
||||
paid: Optional[bool] = None,
|
||||
shipped: Optional[bool] = None,
|
||||
pubkey: Optional[str] = None,
|
||||
wallet: WalletTypeInfo = Depends(require_invoice_key),
|
||||
):
|
||||
try:
|
||||
|
|
@ -686,7 +720,7 @@ async def api_update_product(
|
|||
async def api_get_product(
|
||||
product_id: str,
|
||||
wallet: WalletTypeInfo = Depends(require_invoice_key),
|
||||
) -> Product | None:
|
||||
) -> Optional[Product]:
|
||||
try:
|
||||
merchant = await get_merchant_for_user(wallet.wallet.user)
|
||||
assert merchant, "Merchant cannot be found"
|
||||
|
|
@ -771,9 +805,9 @@ async def api_get_order(
|
|||
|
||||
@nostrmarket_ext.get("/api/v1/order")
|
||||
async def api_get_orders(
|
||||
paid: bool | None = None,
|
||||
shipped: bool | None = None,
|
||||
pubkey: str | None = None,
|
||||
paid: Optional[bool] = None,
|
||||
shipped: Optional[bool] = None,
|
||||
pubkey: Optional[str] = None,
|
||||
wallet: WalletTypeInfo = Depends(require_invoice_key),
|
||||
):
|
||||
try:
|
||||
|
|
@ -859,7 +893,7 @@ async def api_update_order_status(
|
|||
async def api_restore_order(
|
||||
event_id: str,
|
||||
wallet: WalletTypeInfo = Depends(require_admin_key),
|
||||
) -> Order | None:
|
||||
) -> Optional[Order]:
|
||||
try:
|
||||
merchant = await get_merchant_for_user(wallet.wallet.user)
|
||||
assert merchant, "Merchant cannot be found"
|
||||
|
|
@ -986,7 +1020,7 @@ async def api_reissue_order_invoice(
|
|||
@nostrmarket_ext.get("/api/v1/message/{public_key}")
|
||||
async def api_get_messages(
|
||||
public_key: str, wallet: WalletTypeInfo = Depends(require_invoice_key)
|
||||
) -> list[DirectMessage]:
|
||||
) -> List[DirectMessage]:
|
||||
try:
|
||||
merchant = await get_merchant_for_user(wallet.wallet.user)
|
||||
assert merchant, "Merchant cannot be found"
|
||||
|
|
@ -1042,7 +1076,7 @@ async def api_create_message(
|
|||
@nostrmarket_ext.get("/api/v1/customer")
|
||||
async def api_get_customers(
|
||||
wallet: WalletTypeInfo = Depends(require_invoice_key),
|
||||
) -> list[Customer]:
|
||||
) -> List[Customer]:
|
||||
try:
|
||||
merchant = await get_merchant_for_user(wallet.wallet.user)
|
||||
assert merchant, "Merchant cannot be found"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue