From 89f46fff35b1435aad94c5382395958006998930 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Mon, 27 Mar 2023 17:19:51 +0300 Subject: [PATCH] feat: keep customer profiles up to date --- crud.py | 55 +++++++++++++++++++++++++++++++++++++++++++ migrations.py | 14 +++++++++++ models.py | 21 +++++++++++++++++ nostr/nostr_client.py | 11 ++++++++- services.py | 48 +++++++++++++++++++++++++++++++++---- static/js/index.js | 26 ++++++++++++++++++++ tasks.py | 5 ++++ 7 files changed, 175 insertions(+), 5 deletions(-) diff --git a/crud.py b/crud.py index 2029a2e..fb70c79 100644 --- a/crud.py +++ b/crud.py @@ -5,6 +5,8 @@ from lnbits.helpers import urlsafe_short_hash from . import db from .models import ( + Customer, + CustomerProfile, DirectMessage, Merchant, MerchantConfig, @@ -602,3 +604,56 @@ async def get_public_keys_for_direct_messages(merchant_id: str) -> List[str]: (merchant_id), ) return [row[0] for row in rows] + + +######################################## CUSTOMERS ######################################## + + +async def create_customer(merchant_id: str, data: Customer) -> Customer: + await db.execute( + f""" + INSERT INTO nostrmarket.customers (merchant_id, public_key, meta) + VALUES (?, ?, ?) + """, + ( + merchant_id, + data.public_key, + json.dumps(data.profile) if data.profile else "{}", + ), + ) + + customer = await get_customer(merchant_id, data.public_key) + assert customer, "Newly created customer couldn't be retrieved" + return customer + + +async def get_customer(merchant_id: str, public_key: str) -> Optional[Customer]: + row = await db.fetchone( + "SELECT * FROM nostrmarket.customers WHERE merchant_id = ? AND public_key = ?", + ( + merchant_id, + public_key, + ), + ) + return Customer.from_row(row) if row else None + + +async def get_cusomers(merchant_id: str) -> List[Customer]: + rows = await db.fetchall( + "SELECT * FROM nostrmarket.customers WHERE merchant_id = ?", (merchant_id,) + ) + return [Customer.from_row(row) for row in rows] + + +async def get_all_customers() -> List[Customer]: + rows = await db.fetchall("SELECT * FROM nostrmarket.customers") + return [Customer.from_row(row) for row in rows] + + +async def update_customer_profile( + public_key: str, event_created_at: int, profile: CustomerProfile +): + await db.execute( + f"UPDATE nostrmarket.customers SET event_created_at = ?, meta = ? WHERE public_key = ?", + (event_created_at, json.dumps(profile.dict()), public_key), + ) diff --git a/migrations.py b/migrations.py index 530f12e..9edc050 100644 --- a/migrations.py +++ b/migrations.py @@ -125,3 +125,17 @@ async def m001_initial(db): await db.execute( "CREATE INDEX idx_event_id ON nostrmarket.direct_messages (event_id)" ) + + """ + Initial customers table. + """ + await db.execute( + """ + CREATE TABLE nostrmarket.customers ( + merchant_id TEXT NOT NULL, + public_key TEXT NOT NULL, + event_created_at INTEGER, + meta TEXT NOT NULL DEFAULT '{}' + ); + """ + ) diff --git a/models.py b/models.py index 29d91e9..fbb43de 100644 --- a/models.py +++ b/models.py @@ -427,3 +427,24 @@ class DirectMessage(PartialDirectMessage): def from_row(cls, row: Row) -> "DirectMessage": dm = cls(**dict(row)) return dm + + +######################################## CUSTOMERS ######################################## + + +class CustomerProfile(BaseModel): + name: Optional[str] + about: Optional[str] + + +class Customer(BaseModel): + merchant_id: str + public_key: str + event_created_at: Optional[int] + profile: Optional[CustomerProfile] + + @classmethod + def from_row(cls, row: Row) -> "Customer": + customer = cls(**dict(row)) + customer.profile = CustomerProfile(**json.loads(row["meta"])) + return customer diff --git a/nostr/nostr_client.py b/nostr/nostr_client.py index a2a936c..965c827 100644 --- a/nostr/nostr_client.py +++ b/nostr/nostr_client.py @@ -70,7 +70,7 @@ class NostrClient: async def subscribe_to_direct_messages(self, public_key: str, since: int): in_messages_filter = {"kind": 4, "#p": [public_key]} out_messages_filter = {"kind": 4, "authors": [public_key]} - if since != 0: + if since and since != 0: in_messages_filter["since"] = since out_messages_filter["since"] = since @@ -92,6 +92,15 @@ class NostrClient: ["REQ", f"product-events:{public_key}", product_filter] ) + async def subscribe_to_user_profile(self, public_key: str, since: int): + profile_filter = {"kind": 0, "authors": [public_key]} + if since and since != 0: + profile_filter["since"] = since + 1 + + await self.send_req_queue.put( + ["REQ", f"user-profile-events:{public_key}", profile_filter] + ) + async def unsubscribe_from_direct_messages(self, public_key: str): await self.send_req_queue.put(["CLOSE", f"direct-messages-in:{public_key}"]) await self.send_req_queue.put(["CLOSE", f"direct-messages-out:{public_key}"]) diff --git a/services.py b/services.py index 288ed43..d653457 100644 --- a/services.py +++ b/services.py @@ -4,11 +4,15 @@ from typing import List, Optional, Tuple from loguru import logger from lnbits.core import create_invoice, get_wallet +from lnbits.core.services import websocketUpdater from . import nostr_client from .crud import ( + CustomerProfile, + create_customer, create_direct_message, create_order, + get_customer, get_merchant_by_pubkey, get_order, get_order_by_event_id, @@ -16,6 +20,7 @@ from .crud import ( get_products_by_ids, get_stalls, get_wallet_for_product, + update_customer_profile, update_order_paid_status, update_product, update_product_quantity, @@ -23,6 +28,7 @@ from .crud import ( ) from .helpers import order_from_json from .models import ( + Customer, Merchant, Nostrable, Order, @@ -85,6 +91,16 @@ async def create_new_order( extra=await OrderExtra.from_products(products), ) await create_order(merchant.id, order) + await websocketUpdater( + merchant.id, + json.dumps( + { + "type": "new-order", + "stallId": products[0].stall_id, + "customerPubkey": data.public_key, + } + ), + ) return PaymentRequest( id=data.id, payment_options=[PaymentOption(type="ln", link=invoice)] @@ -206,9 +222,11 @@ async def process_nostr_message(msg: str): type, *rest = json.loads(msg) if type.upper() == "EVENT": subscription_id, event = rest - _, merchant_public_key = subscription_id.split(":") event = NostrEvent(**event) + if event.kind == 0: + await _handle_customer_profile_update(event) if event.kind == 4: + _, merchant_public_key = subscription_id.split(":") await _handle_nip04_message(merchant_public_key, event) return except Exception as ex: @@ -235,7 +253,7 @@ async def _handle_nip04_message(merchant_public_key: str, event: NostrEvent): async def _handle_incoming_dms( event: NostrEvent, merchant: Merchant, clear_text_msg: str ): - dm_content = await _handle_dirrect_message( + dm_reply = await _handle_dirrect_message( merchant.id, merchant.public_key, event.pubkey, @@ -243,10 +261,17 @@ async def _handle_incoming_dms( event.created_at, clear_text_msg, ) - if dm_content: - dm_event = merchant.build_dm_event(dm_content, event.pubkey) + if dm_reply: + dm_event = merchant.build_dm_event(dm_reply, event.pubkey) await nostr_client.publish_nostr_event(dm_event) + customer = await get_customer(merchant.id, event.pubkey) + if not customer: + await create_customer( + merchant.id, Customer(merchant_id=merchant.id, public_key=event.pubkey) + ) + await nostr_client.subscribe_to_user_profile(event.pubkey, 0) + async def _handle_outgoing_dms( event: NostrEvent, merchant: Merchant, clear_text_msg: str @@ -308,3 +333,18 @@ async def _handle_new_order(order: PartialOrder) -> Optional[str]: return json.dumps(new_order.dict(), separators=(",", ":"), ensure_ascii=False) return None + + +async def _handle_customer_profile_update(event: NostrEvent): + try: + profile = json.loads(event.content) + await update_customer_profile( + event.pubkey, + event.created_at, + CustomerProfile( + name=profile["name"] if "name" in profile else "", + about=profile["about"] if "about" in profile else "", + ), + ) + except Exception as ex: + logger.warning(ex) diff --git a/static/js/index.js b/static/js/index.js index 58b2c5c..5d572b9 100644 --- a/static/js/index.js +++ b/static/js/index.js @@ -102,10 +102,36 @@ const merchant = async () => { }, customerSelectedForOrder: function (customerPubkey) { this.activeChatCustomer = customerPubkey + }, + waitForNotifications: function () { + try { + const scheme = location.protocol === 'http:' ? 'ws' : 'wss' + const port = location.port ? `:${location.port}` : '' + const wsUrl = `${scheme}://${document.domain}${port}/api/v1/ws/${this.merchant.id}` + const wsConnection = new WebSocket(wsUrl) + console.log('### waiting for events') + wsConnection.onmessage = e => { + console.log('### e', e) + this.$q.notify({ + timeout: 5000, + type: 'positive', + message: 'New Update', + caption: `something here` + }) + } + } catch (error) { + this.$q.notify({ + timeout: 5000, + type: 'warning', + message: 'Failed to watch for updated', + caption: `${error}` + }) + } } }, created: async function () { await this.getMerchant() + await this.waitForNotifications() } }) } diff --git a/tasks.py b/tasks.py index ad791e3..35ff479 100644 --- a/tasks.py +++ b/tasks.py @@ -4,6 +4,7 @@ from lnbits.core.models import Payment from lnbits.tasks import register_invoice_listener from .crud import ( + get_all_customers, get_last_direct_messages_time, get_last_order_time, get_public_keys_for_merchants, @@ -42,6 +43,10 @@ async def wait_for_nostr_events(nostr_client: NostrClient): await nostr_client.subscribe_to_direct_messages(p, since) + customers = await get_all_customers() + for c in customers: + await nostr_client.subscribe_to_user_profile(c.public_key, c.event_created_at) + while True: message = await nostr_client.get_event() await process_nostr_message(message)