diff --git a/crud.py b/crud.py index 7b274bb..b1dd13e 100644 --- a/crud.py +++ b/crud.py @@ -72,12 +72,12 @@ async def get_merchant_by_pubkey(public_key: str) -> Optional[Merchant]: return Merchant.from_row(row) if row else None -async def get_public_keys_for_merchants() -> List[str]: +async def get_merchants_ids_with_pubkeys() -> List[str]: rows = await db.fetchall( - """SELECT public_key FROM nostrmarket.merchants""", + """SELECT id, public_key FROM nostrmarket.merchants""", ) - return [row[0] for row in rows] + return [(row[0], row[1]) for row in rows] async def get_merchant_for_user(user_id: str) -> Optional[Merchant]: @@ -100,7 +100,7 @@ async def delete_merchant(merchant_id: str) -> None: async def create_zone(merchant_id: str, data: PartialZone) -> Zone: - zone_id = urlsafe_short_hash() + zone_id = data.id or urlsafe_short_hash() await db.execute( f""" INSERT INTO nostrmarket.zones (id, merchant_id, name, currency, cost, regions) @@ -168,12 +168,14 @@ async def delete_merchant_zones(merchant_id: str) -> None: async def create_stall(merchant_id: str, data: PartialStall) -> Stall: - stall_id = urlsafe_short_hash() + stall_id = data.id or urlsafe_short_hash() await db.execute( f""" - INSERT INTO nostrmarket.stalls (merchant_id, id, wallet, name, currency, zones, meta) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO nostrmarket.stalls + (merchant_id, id, wallet, name, currency, pending, event_id, event_created_at, zones, meta) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO NOTHING """, ( merchant_id, @@ -181,6 +183,9 @@ async def create_stall(merchant_id: str, data: PartialStall) -> Stall: data.wallet, data.name, data.currency, + data.pending, + data.event_id, + data.event_created_at, json.dumps( [z.dict() for z in data.shipping_zones] ), # todo: cost is float. should be int for sats @@ -204,30 +209,42 @@ async def get_stall(merchant_id: str, stall_id: str) -> Optional[Stall]: return Stall.from_row(row) if row else None -async def get_stalls(merchant_id: str) -> List[Stall]: +async def get_stalls(merchant_id: str, pending: Optional[bool] = False) -> List[Stall]: rows = await db.fetchall( - "SELECT * FROM nostrmarket.stalls WHERE merchant_id = ?", - (merchant_id,), + "SELECT * FROM nostrmarket.stalls WHERE merchant_id = ? AND pending = ?", + (merchant_id, pending,), ) return [Stall.from_row(row) for row in rows] +async def get_last_stall_update_time(merchant_id: str) -> int: + row = await db.fetchone( + """ + SELECT event_created_at FROM nostrmarket.stalls + WHERE merchant_id = ? ORDER BY event_created_at DESC LIMIT 1 + """, + (merchant_id,), + ) + return row[0] or 0 if row else 0 async def update_stall(merchant_id: str, stall: Stall) -> Optional[Stall]: await db.execute( f""" - UPDATE nostrmarket.stalls SET wallet = ?, name = ?, currency = ?, zones = ?, meta = ? + UPDATE nostrmarket.stalls SET wallet = ?, name = ?, currency = ?, pending = ?, event_id = ?, event_created_at = ?, zones = ?, meta = ? WHERE merchant_id = ? AND id = ? """, ( stall.wallet, stall.name, stall.currency, + stall.pending, + stall.event_id, + stall.event_created_at, json.dumps( [z.dict() for z in stall.shipping_zones] ), # todo: cost is float. should be int for sats json.dumps(stall.config.dict()), merchant_id, - stall.id, + stall.id ), ) return await get_stall(merchant_id, stall.id) @@ -254,12 +271,14 @@ async def delete_merchant_stalls(merchant_id: str) -> None: async def create_product(merchant_id: str, data: PartialProduct) -> Product: - product_id = urlsafe_short_hash() + product_id = data.id or urlsafe_short_hash() await db.execute( f""" - INSERT INTO nostrmarket.products (merchant_id, id, stall_id, name, price, quantity, image_urls, category_list, meta) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO nostrmarket.products + (merchant_id, id, stall_id, name, price, quantity, pending, event_id, event_created_at, image_urls, category_list, meta) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO NOTHING """, ( merchant_id, @@ -268,6 +287,9 @@ async def create_product(merchant_id: str, data: PartialProduct) -> Product: data.name, data.price, data.quantity, + data.pending, + data.event_id, + data.event_created_at, json.dumps(data.images), json.dumps(data.categories), json.dumps(data.config.dict()), @@ -283,13 +305,16 @@ async def update_product(merchant_id: str, product: Product) -> Product: await db.execute( f""" - UPDATE nostrmarket.products set name = ?, price = ?, quantity = ?, image_urls = ?, category_list = ?, meta = ? + UPDATE nostrmarket.products set name = ?, price = ?, quantity = ?, pending = ?, event_id =?, event_created_at = ?, image_urls = ?, category_list = ?, meta = ? WHERE merchant_id = ? AND id = ? """, ( product.name, product.price, product.quantity, + product.pending, + product.event_id, + product.event_created_at, json.dumps(product.images), json.dumps(product.categories), json.dumps(product.config.dict()), @@ -328,10 +353,10 @@ async def get_product(merchant_id: str, product_id: str) -> Optional[Product]: return Product.from_row(row) if row else None -async def get_products(merchant_id: str, stall_id: str) -> List[Product]: +async def get_products(merchant_id: str, stall_id: str, pending: Optional[bool] = False) -> List[Product]: rows = await db.fetchall( - "SELECT * FROM nostrmarket.products WHERE merchant_id = ? AND stall_id = ?", - (merchant_id, stall_id), + "SELECT * FROM nostrmarket.products WHERE merchant_id = ? AND stall_id = ? AND pending = ?", + (merchant_id, stall_id, pending), ) return [Product.from_row(row) for row in rows] @@ -341,7 +366,11 @@ async def get_products_by_ids( ) -> List[Product]: q = ",".join(["?"] * len(product_ids)) rows = await db.fetchall( - f"SELECT id, stall_id, name, price, quantity, category_list, meta FROM nostrmarket.products WHERE merchant_id = ? AND id IN ({q})", + f""" + SELECT id, stall_id, name, price, quantity, category_list, meta + FROM nostrmarket.products + WHERE merchant_id = ? AND pending = false AND id IN ({q}) + """, (merchant_id, *product_ids), ) return [Product.from_row(row) for row in rows] @@ -353,12 +382,21 @@ async def get_wallet_for_product(product_id: str) -> Optional[str]: SELECT s.wallet FROM nostrmarket.products p INNER JOIN nostrmarket.stalls s ON p.stall_id = s.id - WHERE p.id=? + WHERE p.id = ? AND p.pending = false AND s.pending = false """, (product_id,), ) return row[0] if row else None +async def get_last_product_update_time(merchant_id: str) -> int: + row = await db.fetchone( + """ + SELECT event_created_at FROM nostrmarket.products + WHERE merchant_id = ? ORDER BY event_created_at DESC LIMIT 1 + """, + (merchant_id,), + ) + return row[0] or 0 if row else 0 async def delete_product(merchant_id: str, product_id: str) -> None: await db.execute( @@ -456,7 +494,7 @@ async def get_orders(merchant_id: str, **kwargs) -> List[Order]: q = f"AND {q}" values = (v for v in kwargs.values() if v != None) rows = await db.fetchall( - f"SELECT * FROM nostrmarket.orders WHERE merchant_id = ? {q} ORDER BY time DESC", + f"SELECT * FROM nostrmarket.orders WHERE merchant_id = ? {q} ORDER BY event_created_at DESC", (merchant_id, *values), ) return [Order.from_row(row) for row in rows] @@ -479,17 +517,29 @@ async def get_orders_for_stall( return [Order.from_row(row) for row in rows] -async def get_last_order_time(public_key: str) -> int: +async def get_last_order_time(merchant_id: str) -> int: row = await db.fetchone( """ SELECT event_created_at FROM nostrmarket.orders - WHERE merchant_public_key = ? ORDER BY event_created_at DESC LIMIT 1 + WHERE merchant_id = ? ORDER BY event_created_at DESC LIMIT 1 """, - (public_key,), + (merchant_id,), ) return row[0] if row else 0 +async def update_order(merchant_id: str, order_id: str, **kwargs) -> Optional[Order]: + q = ", ".join([f"{field[0]} = ?" for field in kwargs.items()]) + await db.execute( + f""" + UPDATE nostrmarket.orders SET {q} WHERE merchant_id = ? and id = ? + """, + (*kwargs.values(), merchant_id, order_id) + ) + + return await get_order(merchant_id, order_id) + + async def update_order_paid_status(order_id: str, paid: bool) -> Optional[Order]: await db.execute( f"UPDATE nostrmarket.orders SET paid = ? WHERE id = ?", @@ -533,8 +583,8 @@ async def create_direct_message( dm_id = urlsafe_short_hash() await db.execute( f""" - INSERT INTO nostrmarket.direct_messages (merchant_id, id, event_id, event_created_at, message, public_key, incoming) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO nostrmarket.direct_messages (merchant_id, id, event_id, event_created_at, message, public_key, type, incoming) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(event_id) DO NOTHING """, ( @@ -544,6 +594,7 @@ async def create_direct_message( dm.event_created_at, dm.message, dm.public_key, + dm.type, dm.incoming, ), ) @@ -586,14 +637,24 @@ async def get_direct_messages(merchant_id: str, public_key: str) -> List[DirectM ) return [DirectMessage.from_row(row) for row in rows] +async def get_orders_from_direct_messages(merchant_id: str) -> List[DirectMessage]: + rows = await db.fetchall( + "SELECT * FROM nostrmarket.direct_messages WHERE merchant_id = ? AND type >= 0 ORDER BY event_created_at, type", + (merchant_id), + ) + return [DirectMessage.from_row(row) for row in rows] -async def get_last_direct_messages_time(public_key: str) -> int: + + + + +async def get_last_direct_messages_time(merchant_id: str) -> int: row = await db.fetchone( """ SELECT event_created_at FROM nostrmarket.direct_messages - WHERE public_key = ? ORDER BY event_created_at DESC LIMIT 1 + WHERE merchant_id = ? ORDER BY event_created_at DESC LIMIT 1 """, - (public_key,), + (merchant_id,), ) return row[0] if row else 0 @@ -644,8 +705,13 @@ async def get_customers(merchant_id: str) -> List[Customer]: return [Customer.from_row(row) for row in rows] -async def get_all_customers() -> List[Customer]: - rows = await db.fetchall("SELECT * FROM nostrmarket.customers") +async def get_all_unique_customers() -> List[Customer]: + q = """ + SELECT public_key, MAX(merchant_id) as merchant_id, MAX(event_created_at) + FROM nostrmarket.customers + GROUP BY public_key + """ + rows = await db.fetchall(q) return [Customer.from_row(row) for row in rows] @@ -658,15 +724,15 @@ async def update_customer_profile( ) -async def increment_customer_unread_messages(public_key: str): +async def increment_customer_unread_messages(merchant_id: str, public_key: str): await db.execute( - f"UPDATE nostrmarket.customers SET unread_messages = unread_messages + 1 WHERE public_key = ?", - (public_key,), + f"UPDATE nostrmarket.customers SET unread_messages = unread_messages + 1 WHERE merchant_id = ? AND public_key = ?", + (merchant_id, public_key,), ) - -async def update_customer_no_unread_messages(public_key: str): +#??? two merchants +async def update_customer_no_unread_messages(merchant_id: str, public_key: str): await db.execute( - f"UPDATE nostrmarket.customers SET unread_messages = 0 WHERE public_key = ?", - (public_key,), + f"UPDATE nostrmarket.customers SET unread_messages = 0 WHERE merchant_id =? AND public_key = ?", + (merchant_id, public_key,), ) diff --git a/helpers.py b/helpers.py index a880e24..f7e0d88 100644 --- a/helpers.py +++ b/helpers.py @@ -75,14 +75,6 @@ def copy_x(output, x32, y32, data): return 1 -def order_from_json(s: str) -> Tuple[Optional[Any], Optional[str]]: - try: - order = json.loads(s) - return (order, s) if (type(order) is dict) and "items" in order else (None, s) - except ValueError: - return None, s - - def normalize_public_key(pubkey: str) -> str: if pubkey.startswith("npub1"): _, decoded_data = bech32_decode(pubkey) diff --git a/migrations.py b/migrations.py index f3251f5..eb336d3 100644 --- a/migrations.py +++ b/migrations.py @@ -141,3 +141,30 @@ async def m001_initial(db): ); """ ) + +async def m002_update_stall_and_product(db): + await db.execute( + "ALTER TABLE nostrmarket.stalls ADD COLUMN pending BOOLEAN NOT NULL DEFAULT false;" + ) + await db.execute( + "ALTER TABLE nostrmarket.stalls ADD COLUMN event_id TEXT;" + ) + await db.execute( + "ALTER TABLE nostrmarket.stalls ADD COLUMN event_created_at INTEGER;" + ) + + await db.execute( + "ALTER TABLE nostrmarket.products ADD COLUMN pending BOOLEAN NOT NULL DEFAULT false;" + ) + await db.execute( + "ALTER TABLE nostrmarket.products ADD COLUMN event_id TEXT;" + ) + await db.execute( + "ALTER TABLE nostrmarket.products ADD COLUMN event_created_at INTEGER;" + ) + + +async def m003_update_direct_message_type(db): + await db.execute( + "ALTER TABLE nostrmarket.direct_messages ADD COLUMN type INTEGER NOT NULL DEFAULT -1;" + ) \ No newline at end of file diff --git a/models.py b/models.py index 1f07f71..f2d6a2a 100644 --- a/models.py +++ b/models.py @@ -1,8 +1,9 @@ import json import time from abc import abstractmethod +from enum import Enum from sqlite3 import Row -from typing import List, Optional, Tuple +from typing import Any, List, Optional, Tuple from pydantic import BaseModel @@ -120,6 +121,7 @@ class Merchant(PartialMerchant, Nostrable): ######################################## ZONES ######################################## class PartialZone(BaseModel): + id: Optional[str] name: Optional[str] currency: str cost: float @@ -140,19 +142,22 @@ class Zone(PartialZone): class StallConfig(BaseModel): - """Last published nostr event id for this Stall""" - - event_id: Optional[str] image_url: Optional[str] description: Optional[str] class PartialStall(BaseModel): + id: Optional[str] wallet: str name: str currency: str = "sat" shipping_zones: List[Zone] = [] config: StallConfig = StallConfig() + pending: bool = False + + """Last published nostr event for this Stall""" + event_id: Optional[str] + event_created_at: Optional[int] def validate_stall(self): for z in self.shipping_zones: @@ -189,7 +194,7 @@ class Stall(PartialStall, Nostrable): pubkey=pubkey, created_at=round(time.time()), kind=5, - tags=[["e", self.config.event_id]], + tags=[["e", self.event_id]], content=f"Stall '{self.name}' deleted", ) delete_event.id = delete_event.event_id @@ -204,24 +209,29 @@ class Stall(PartialStall, Nostrable): return stall -######################################## STALLS ######################################## +######################################## PRODUCTS ######################################## class ProductConfig(BaseModel): - event_id: Optional[str] description: Optional[str] currency: Optional[str] class PartialProduct(BaseModel): + id: Optional[str] stall_id: str name: str categories: List[str] = [] images: List[str] = [] price: float quantity: int + pending: bool = False config: ProductConfig = ProductConfig() + """Last published nostr event for this Product""" + event_id: Optional[str] + event_created_at: Optional[int] + class Product(PartialProduct, Nostrable): id: str @@ -255,7 +265,7 @@ class Product(PartialProduct, Nostrable): pubkey=pubkey, created_at=round(time.time()), kind=5, - tags=[["e", self.config.event_id]], + tags=[["e", self.event_id]], content=f"Product '{self.name}' deleted", ) delete_event.id = delete_event.event_id @@ -300,7 +310,7 @@ class OrderExtra(BaseModel): @classmethod async def from_products(cls, products: List[Product]): - currency = products[0].config.currency + currency = products[0].config.currency if len(products) else "sat" exchange_rate = ( (await btc_price(currency)) if currency and currency != "sat" else 1 ) @@ -401,14 +411,34 @@ class PaymentRequest(BaseModel): ######################################## MESSAGE ######################################## + +class DirectMessageType(Enum): + """Various types os direct messages.""" + PLAIN_TEXT = -1 + CUSTOMER_ORDER = 0 + PAYMENT_REQUEST = 1 + ORDER_PAID_OR_SHIPPED = 2 + class PartialDirectMessage(BaseModel): event_id: Optional[str] event_created_at: Optional[int] message: str public_key: str + type: int = DirectMessageType.PLAIN_TEXT.value incoming: bool = False time: Optional[int] + @classmethod + def parse_message(cls, msg) -> Tuple[DirectMessageType, Optional[Any]]: + try: + msg_json = json.loads(msg) + if "type" in msg_json: + return DirectMessageType(msg_json["type"]), msg_json + + return DirectMessageType.PLAIN_TEXT, None + except Exception: + return DirectMessageType.PLAIN_TEXT, None + class DirectMessage(PartialDirectMessage): id: str @@ -419,6 +449,7 @@ class DirectMessage(PartialDirectMessage): return dm + ######################################## CUSTOMERS ######################################## @@ -437,5 +468,5 @@ class Customer(BaseModel): @classmethod def from_row(cls, row: Row) -> "Customer": customer = cls(**dict(row)) - customer.profile = CustomerProfile(**json.loads(row["meta"])) + customer.profile = CustomerProfile(**json.loads(row["meta"])) if "meta" in row else None return customer diff --git a/nostr/nostr_client.py b/nostr/nostr_client.py index ea78bac..175a1c6 100644 --- a/nostr/nostr_client.py +++ b/nostr/nostr_client.py @@ -2,7 +2,7 @@ import asyncio import json from asyncio import Queue from threading import Thread -from typing import Callable +from typing import Callable, List from loguru import logger from websocket import WebSocketApp @@ -18,11 +18,6 @@ class NostrClient: self.send_req_queue: Queue = Queue() self.ws: WebSocketApp = None - async def restart(self): - await self.send_req_queue.put(ValueError("Restarting NostrClient...")) - await self.recieve_event_queue.put(ValueError("Restarting NostrClient...")) - self.ws.close() - self.ws = None async def connect_to_nostrclient_ws( self, on_open: Callable, on_message: Callable @@ -96,36 +91,80 @@ class NostrClient: ["REQ", f"direct-messages-out:{public_key}", out_messages_filter] ) - async def subscribe_to_merchant_events(self, public_key: str, since: int): + logger.debug(f"Subscribed to direct-messages '{public_key}'.") + + async def subscribe_to_stall_events(self, public_key: str, since: int): stall_filter = {"kinds": [30017], "authors": [public_key]} - product_filter = {"kinds": [30018], "authors": [public_key]} + if since and since != 0: + stall_filter["since"] = since await self.send_req_queue.put( ["REQ", f"stall-events:{public_key}", stall_filter] ) + + logger.debug(f"Subscribed to stall-events: '{public_key}'.") + + async def subscribe_to_product_events(self, public_key: str, since: int): + product_filter = {"kinds": [30018], "authors": [public_key]} + if since and since != 0: + product_filter["since"] = since + await self.send_req_queue.put( ["REQ", f"product-events:{public_key}", product_filter] ) + logger.debug(f"Subscribed to product-events: '{public_key}'.") + async def subscribe_to_user_profile(self, public_key: str, since: int): profile_filter = {"kinds": [0], "authors": [public_key]} if since and since != 0: profile_filter["since"] = since + 1 - await self.send_req_queue.put( - ["REQ", f"user-profile-events:{public_key}", profile_filter] - ) + # Disabled for now. The number of clients can grow large. + # Some relays only allow a small number of subscriptions. + # There is the risk that more important subscriptions will be blocked. + # await self.send_req_queue.put( + # ["REQ", f"user-profile-events:{public_key}", profile_filter] + # ) async def unsubscribe_from_direct_messages(self, public_key: str): await self.send_req_queue.put(["CLOSE", f"direct-messages-in:{public_key}"]) await self.send_req_queue.put(["CLOSE", f"direct-messages-out:{public_key}"]) + + logger.debug(f"Unsubscribed from direct-messages '{public_key}'.") async def unsubscribe_from_merchant_events(self, public_key: str): await self.send_req_queue.put(["CLOSE", f"stall-events:{public_key}"]) await self.send_req_queue.put(["CLOSE", f"product-events:{public_key}"]) - def stop(self): - try: - self.ws.close() - except Exception as ex: - logger.warning(ex) + logger.debug(f"Unsubscribed from stall-events and product-events '{public_key}'.") + + async def restart(self, public_keys: List[str]): + await self.unsubscribe_merchants(public_keys) + # Give some time for the CLOSE events to propagate before restarting + await asyncio.sleep(10) + + logger.info("Restating NostrClient...") + await self.send_req_queue.put(ValueError("Restarting NostrClient...")) + await self.recieve_event_queue.put(ValueError("Restarting NostrClient...")) + + self.ws.close() + self.ws = None + + + async def stop(self, public_keys: List[str]): + await self.unsubscribe_merchants(public_keys) + + # Give some time for the CLOSE events to propagate before closing the connection + await asyncio.sleep(10) + self.ws.close() + self.ws = None + + async def unsubscribe_merchants(self, public_keys: List[str]): + for pk in public_keys: + try: + await self.unsubscribe_from_direct_messages(pk) + await self.unsubscribe_from_merchant_events(pk) + except Exception as ex: + logger.warning(ex) + diff --git a/services.py b/services.py index 57bb86a..054cb44 100644 --- a/services.py +++ b/services.py @@ -3,6 +3,7 @@ from typing import List, Optional, Tuple from loguru import logger +from lnbits.bolt11 import decode from lnbits.core import create_invoice, get_wallet from lnbits.core.services import websocketUpdater @@ -12,6 +13,8 @@ from .crud import ( create_customer, create_direct_message, create_order, + create_product, + create_stall, get_customer, get_merchant_by_pubkey, get_order, @@ -23,17 +26,21 @@ from .crud import ( get_zone, increment_customer_unread_messages, update_customer_profile, + update_order, update_order_paid_status, + update_order_shipped_status, update_product, update_product_quantity, update_stall, ) -from .helpers import order_from_json from .models import ( Customer, + DirectMessage, + DirectMessageType, Merchant, Nostrable, Order, + OrderContact, OrderExtra, OrderItem, OrderStatusUpdate, @@ -42,6 +49,7 @@ from .models import ( PaymentOption, PaymentRequest, Product, + Stall, ) from .nostr.event import NostrEvent @@ -126,10 +134,12 @@ async def update_merchant_to_nostr( products = await get_products(merchant.id, stall.id) for product in products: event = await sign_and_send_to_nostr(merchant, product, delete_merchant) - product.config.event_id = event.id + product.event_id = event.id + product.event_created_at = event.created_at await update_product(merchant.id, product) event = await sign_and_send_to_nostr(merchant, stall, delete_merchant) - stall.config.event_id = event.id + stall.event_id = event.id + stall.event_created_at = event.created_at await update_stall(merchant.id, stall) if delete_merchant: # merchant profile updates not supported yet @@ -163,6 +173,16 @@ async def handle_order_paid(order_id: str, merchant_pubkey: str): # todo: lock success, message = await update_products_for_order(merchant, order) await notify_client_of_order_status(order, merchant, success, message) + + await websocketUpdater( + merchant.id, + json.dumps( + { + "type": "order-paid", + "orderId": order_id, + } + ), + ) except Exception as ex: logger.warning(ex) @@ -179,12 +199,26 @@ async def notify_client_of_order_status( shipped=order.shipped, ) dm_content = json.dumps( - order_status.dict(), separators=(",", ":"), ensure_ascii=False + {"type": DirectMessageType.ORDER_PAID_OR_SHIPPED.value, **order_status.dict()}, + separators=(",", ":"), + ensure_ascii=False, ) else: dm_content = f"Order cannot be fulfilled. Reason: {message}" dm_event = merchant.build_dm_event(dm_content, order.public_key) + + dm = PartialDirectMessage( + event_id=dm_event.id, + event_created_at=dm_event.created_at, + message=dm_content, + public_key=order.public_key, + type=DirectMessageType.ORDER_PAID_OR_SHIPPED.value + if success + else DirectMessageType.PLAIN_TEXT.value, + ) + await create_direct_message(merchant.id, dm) + await nostr_client.publish_nostr_event(dm_event) @@ -201,7 +235,7 @@ async def update_products_for_order( for p in products: product = await update_product_quantity(p.id, p.quantity) event = await sign_and_send_to_nostr(merchant, product) - product.config.event_id = event.id + product.event_id = event.id await update_product(merchant.id, product) return True, "ok" @@ -233,19 +267,84 @@ async def compute_products_new_quantity( async def process_nostr_message(msg: str): try: type, *rest = json.loads(msg) + if type.upper() == "EVENT": subscription_id, event = rest event = NostrEvent(**event) + print("kind: ", event.kind, ": ", msg) if event.kind == 0: await _handle_customer_profile_update(event) - if event.kind == 4: + elif event.kind == 4: _, merchant_public_key = subscription_id.split(":") await _handle_nip04_message(merchant_public_key, event) + elif event.kind == 30017: + await _handle_stall(event) + elif event.kind == 30018: + await _handle_product(event) return + except Exception as ex: logger.warning(ex) +async def create_or_update_order_from_dm(merchant_id: str, merchant_pubkey: str, dm: DirectMessage): + type, value = PartialDirectMessage.parse_message(dm.message) + if "id" not in value: + return + + if type == DirectMessageType.CUSTOMER_ORDER: + order = await extract_order_from_dm(merchant_id, merchant_pubkey, dm, value) + new_order = await create_order(merchant_id, order) + if new_order.stall_id == "None" and order.stall_id != "None": + await update_order(merchant_id, order.id, **{ + "stall_id": order.stall_id, + "extra_data": json.dumps(order.extra.dict()) + }) + return + + if type == DirectMessageType.PAYMENT_REQUEST: + payment_request = PaymentRequest(**value) + pr = next((o.link for o in payment_request.payment_options if o.type == "ln"), None) + if not pr: + return + invoice = decode(pr) + await update_order(merchant_id, payment_request.id, **{ + "total": invoice.amount_msat / 1000, + "invoice_id": invoice.payment_hash + }) + return + + if type == DirectMessageType.ORDER_PAID_OR_SHIPPED: + order_update = OrderStatusUpdate(**value) + if order_update.paid: + await update_order_paid_status(order_update.id, True) + if order_update.shipped: + await update_order_shipped_status(merchant_id, order_update.id, True) + + +async def extract_order_from_dm(merchant_id: str, merchant_pubkey: str, dm: DirectMessage, value): + order_items = [OrderItem(**i) for i in value.get("items", [])] + products = await get_products_by_ids(merchant_id, [p.product_id for p in order_items]) + extra = await OrderExtra.from_products(products) + order = Order( + id=value.get("id"), + event_id=dm.event_id, + event_created_at=dm.event_created_at, + public_key=dm.public_key, + merchant_public_key=merchant_pubkey, + shipping_id=value.get("shipping_id", "None"), + items=order_items, + contact=OrderContact(**value.get("contact")) if value.get("contact") else None, + address=value.get("address"), + stall_id=products[0].stall_id if len(products) else "None", + invoice_id="None", + total=0, + extra=extra + ) + + return order + + async def _handle_nip04_message(merchant_public_key: str, event: NostrEvent): merchant = await get_merchant_by_pubkey(merchant_public_key) assert merchant, f"Merchant not found for public key '{merchant_public_key}'" @@ -270,7 +369,7 @@ async def _handle_incoming_dms( if not customer: await _handle_new_customer(event, merchant) else: - await increment_customer_unread_messages(event.pubkey) + await increment_customer_unread_messages(merchant.id, event.pubkey) dm_reply = await _handle_dirrect_message( merchant.id, @@ -282,6 +381,14 @@ async def _handle_incoming_dms( ) if dm_reply: dm_event = merchant.build_dm_event(dm_reply, event.pubkey) + dm = PartialDirectMessage( + event_id=dm_event.id, + event_created_at=dm_event.created_at, + message=dm_reply, + public_key=event.pubkey, + type=DirectMessageType.PAYMENT_REQUEST.value, + ) + await create_direct_message(merchant.id, dm) await nostr_client.publish_nostr_event(dm_event) @@ -289,12 +396,14 @@ async def _handle_outgoing_dms( event: NostrEvent, merchant: Merchant, clear_text_msg: str ): sent_to = event.tag_values("p") + type, _ = PartialDirectMessage.parse_message(clear_text_msg) if len(sent_to) != 0: dm = PartialDirectMessage( event_id=event.id, event_created_at=event.created_at, - message=clear_text_msg, # exclude if json + message=clear_text_msg, public_key=sent_to[0], + type=type.value ) await create_direct_message(merchant.id, dm) @@ -307,22 +416,24 @@ async def _handle_dirrect_message( event_created_at: int, msg: str, ) -> Optional[str]: - order, text_msg = order_from_json(msg) + type, order = PartialDirectMessage.parse_message(msg) try: dm = PartialDirectMessage( event_id=event_id, event_created_at=event_created_at, - message=text_msg, + message=msg, public_key=from_pubkey, incoming=True, + type=type.value, ) - await create_direct_message(merchant_id, dm) + new_dm = await create_direct_message(merchant_id, dm) + # todo: do the same for new order await websocketUpdater( merchant_id, - json.dumps({"type": "new-direct-message", "customerPubkey": from_pubkey}), + json.dumps({"type": "new-direct-message", "customerPubkey": from_pubkey, "data": new_dm.dict()}), ) - if order: + if type == DirectMessageType.CUSTOMER_ORDER: order["public_key"] = from_pubkey order["merchant_public_key"] = merchant_public_key order["event_id"] = event_id @@ -338,17 +449,23 @@ async def _handle_dirrect_message( async def _handle_new_order(order: PartialOrder) -> Optional[str]: order.validate_order() - first_product_id = order.items[0].product_id - wallet_id = await get_wallet_for_product(first_product_id) - assert wallet_id, f"Cannot find wallet id for product id: {first_product_id}" + try: + first_product_id = order.items[0].product_id + wallet_id = await get_wallet_for_product(first_product_id) + assert wallet_id, f"Cannot find wallet id for product id: {first_product_id}" - wallet = await get_wallet(wallet_id) - assert wallet, f"Cannot find wallet for product id: {first_product_id}" + wallet = await get_wallet(wallet_id) + assert wallet, f"Cannot find wallet for product id: {first_product_id}" - new_order = await create_new_order(order.merchant_public_key, order) - if new_order: - return json.dumps(new_order.dict(), separators=(",", ":"), ensure_ascii=False) + + payment_req = await create_new_order(order.merchant_public_key, order) + except Exception as e: + payment_req = PaymentRequest(id=order.id, message=str(e), payment_options=[]) + if payment_req: + response = {"type": DirectMessageType.PAYMENT_REQUEST.value, **payment_req.dict()} + return json.dumps(response, separators=(",", ":"), ensure_ascii=False) + return None @@ -372,3 +489,58 @@ async def _handle_customer_profile_update(event: NostrEvent): ) except Exception as ex: logger.warning(ex) + + +async def _handle_stall(event: NostrEvent): + try: + merchant = await get_merchant_by_pubkey(event.pubkey) + assert merchant, f"Merchant not found for public key '{event.pubkey}'" + stall_json = json.loads(event.content) + + if "id" not in stall_json: + return + + stall = Stall( + id=stall_json["id"], + name=stall_json.get("name", "Recoverd Stall (no name)"), + wallet="", + currency=stall_json.get("currency", "sat"), + shipping_zones=stall_json.get("shipping", []), + pending=True, + event_id=event.id, + event_created_at=event.created_at, + ) + stall.config.description = stall_json.get("description", "") + await create_stall(merchant.id, stall) + + except Exception as ex: + logger.error(ex) + + +async def _handle_product(event: NostrEvent): + try: + merchant = await get_merchant_by_pubkey(event.pubkey) + assert merchant, f"Merchant not found for public key '{event.pubkey}'" + product_json = json.loads(event.content) + + assert "id" in product_json, "Product is missing ID" + assert "stall_id" in product_json, "Product is missing Stall ID" + + product = Product( + id=product_json["id"], + stall_id=product_json["stall_id"], + name=product_json.get("name", "Recoverd Product (no name)"), + images=product_json.get("images", []), + categories=event.tag_values("t"), + price=product_json.get("price", 0), + quantity=product_json.get("quantity", 0), + pending=True, + event_id=event.id, + event_created_at=event.created_at, + ) + product.config.description = product_json.get("description", "") + product.config.currency = product_json.get("currency", "sat") + await create_product(merchant.id, product) + + except Exception as ex: + logger.error(ex) diff --git a/static/components/customer-stall/customer-stall.html b/static/components/customer-stall/customer-stall.html index 5787cd3..0fc6364 100644 --- a/static/components/customer-stall/customer-stall.html +++ b/static/components/customer-stall/customer-stall.html @@ -110,7 +110,8 @@