feat: keep customer profiles up to date

This commit is contained in:
Vlad Stan 2023-03-27 17:19:51 +03:00
parent e7b16dd17f
commit 89f46fff35
7 changed files with 175 additions and 5 deletions

55
crud.py
View file

@ -5,6 +5,8 @@ from lnbits.helpers import urlsafe_short_hash
from . import db from . import db
from .models import ( from .models import (
Customer,
CustomerProfile,
DirectMessage, DirectMessage,
Merchant, Merchant,
MerchantConfig, MerchantConfig,
@ -602,3 +604,56 @@ async def get_public_keys_for_direct_messages(merchant_id: str) -> List[str]:
(merchant_id), (merchant_id),
) )
return [row[0] for row in rows] return [row[0] for row in rows]
######################################## CUSTOMERS ########################################
async def create_customer(merchant_id: str, data: Customer) -> Customer:
await db.execute(
f"""
INSERT INTO nostrmarket.customers (merchant_id, public_key, meta)
VALUES (?, ?, ?)
""",
(
merchant_id,
data.public_key,
json.dumps(data.profile) if data.profile else "{}",
),
)
customer = await get_customer(merchant_id, data.public_key)
assert customer, "Newly created customer couldn't be retrieved"
return customer
async def get_customer(merchant_id: str, public_key: str) -> Optional[Customer]:
row = await db.fetchone(
"SELECT * FROM nostrmarket.customers WHERE merchant_id = ? AND public_key = ?",
(
merchant_id,
public_key,
),
)
return Customer.from_row(row) if row else None
async def get_cusomers(merchant_id: str) -> List[Customer]:
rows = await db.fetchall(
"SELECT * FROM nostrmarket.customers WHERE merchant_id = ?", (merchant_id,)
)
return [Customer.from_row(row) for row in rows]
async def get_all_customers() -> List[Customer]:
rows = await db.fetchall("SELECT * FROM nostrmarket.customers")
return [Customer.from_row(row) for row in rows]
async def update_customer_profile(
public_key: str, event_created_at: int, profile: CustomerProfile
):
await db.execute(
f"UPDATE nostrmarket.customers SET event_created_at = ?, meta = ? WHERE public_key = ?",
(event_created_at, json.dumps(profile.dict()), public_key),
)

View file

@ -125,3 +125,17 @@ async def m001_initial(db):
await db.execute( await db.execute(
"CREATE INDEX idx_event_id ON nostrmarket.direct_messages (event_id)" "CREATE INDEX idx_event_id ON nostrmarket.direct_messages (event_id)"
) )
"""
Initial customers table.
"""
await db.execute(
"""
CREATE TABLE nostrmarket.customers (
merchant_id TEXT NOT NULL,
public_key TEXT NOT NULL,
event_created_at INTEGER,
meta TEXT NOT NULL DEFAULT '{}'
);
"""
)

View file

@ -427,3 +427,24 @@ class DirectMessage(PartialDirectMessage):
def from_row(cls, row: Row) -> "DirectMessage": def from_row(cls, row: Row) -> "DirectMessage":
dm = cls(**dict(row)) dm = cls(**dict(row))
return dm return dm
######################################## CUSTOMERS ########################################
class CustomerProfile(BaseModel):
name: Optional[str]
about: Optional[str]
class Customer(BaseModel):
merchant_id: str
public_key: str
event_created_at: Optional[int]
profile: Optional[CustomerProfile]
@classmethod
def from_row(cls, row: Row) -> "Customer":
customer = cls(**dict(row))
customer.profile = CustomerProfile(**json.loads(row["meta"]))
return customer

View file

@ -70,7 +70,7 @@ class NostrClient:
async def subscribe_to_direct_messages(self, public_key: str, since: int): async def subscribe_to_direct_messages(self, public_key: str, since: int):
in_messages_filter = {"kind": 4, "#p": [public_key]} in_messages_filter = {"kind": 4, "#p": [public_key]}
out_messages_filter = {"kind": 4, "authors": [public_key]} out_messages_filter = {"kind": 4, "authors": [public_key]}
if since != 0: if since and since != 0:
in_messages_filter["since"] = since in_messages_filter["since"] = since
out_messages_filter["since"] = since out_messages_filter["since"] = since
@ -92,6 +92,15 @@ class NostrClient:
["REQ", f"product-events:{public_key}", product_filter] ["REQ", f"product-events:{public_key}", product_filter]
) )
async def subscribe_to_user_profile(self, public_key: str, since: int):
profile_filter = {"kind": 0, "authors": [public_key]}
if since and since != 0:
profile_filter["since"] = since + 1
await self.send_req_queue.put(
["REQ", f"user-profile-events:{public_key}", profile_filter]
)
async def unsubscribe_from_direct_messages(self, public_key: str): async def unsubscribe_from_direct_messages(self, public_key: str):
await self.send_req_queue.put(["CLOSE", f"direct-messages-in:{public_key}"]) await self.send_req_queue.put(["CLOSE", f"direct-messages-in:{public_key}"])
await self.send_req_queue.put(["CLOSE", f"direct-messages-out:{public_key}"]) await self.send_req_queue.put(["CLOSE", f"direct-messages-out:{public_key}"])

View file

@ -4,11 +4,15 @@ from typing import List, Optional, Tuple
from loguru import logger from loguru import logger
from lnbits.core import create_invoice, get_wallet from lnbits.core import create_invoice, get_wallet
from lnbits.core.services import websocketUpdater
from . import nostr_client from . import nostr_client
from .crud import ( from .crud import (
CustomerProfile,
create_customer,
create_direct_message, create_direct_message,
create_order, create_order,
get_customer,
get_merchant_by_pubkey, get_merchant_by_pubkey,
get_order, get_order,
get_order_by_event_id, get_order_by_event_id,
@ -16,6 +20,7 @@ from .crud import (
get_products_by_ids, get_products_by_ids,
get_stalls, get_stalls,
get_wallet_for_product, get_wallet_for_product,
update_customer_profile,
update_order_paid_status, update_order_paid_status,
update_product, update_product,
update_product_quantity, update_product_quantity,
@ -23,6 +28,7 @@ from .crud import (
) )
from .helpers import order_from_json from .helpers import order_from_json
from .models import ( from .models import (
Customer,
Merchant, Merchant,
Nostrable, Nostrable,
Order, Order,
@ -85,6 +91,16 @@ async def create_new_order(
extra=await OrderExtra.from_products(products), extra=await OrderExtra.from_products(products),
) )
await create_order(merchant.id, order) await create_order(merchant.id, order)
await websocketUpdater(
merchant.id,
json.dumps(
{
"type": "new-order",
"stallId": products[0].stall_id,
"customerPubkey": data.public_key,
}
),
)
return PaymentRequest( return PaymentRequest(
id=data.id, payment_options=[PaymentOption(type="ln", link=invoice)] id=data.id, payment_options=[PaymentOption(type="ln", link=invoice)]
@ -206,9 +222,11 @@ async def process_nostr_message(msg: str):
type, *rest = json.loads(msg) type, *rest = json.loads(msg)
if type.upper() == "EVENT": if type.upper() == "EVENT":
subscription_id, event = rest subscription_id, event = rest
_, merchant_public_key = subscription_id.split(":")
event = NostrEvent(**event) event = NostrEvent(**event)
if event.kind == 0:
await _handle_customer_profile_update(event)
if event.kind == 4: if event.kind == 4:
_, merchant_public_key = subscription_id.split(":")
await _handle_nip04_message(merchant_public_key, event) await _handle_nip04_message(merchant_public_key, event)
return return
except Exception as ex: except Exception as ex:
@ -235,7 +253,7 @@ async def _handle_nip04_message(merchant_public_key: str, event: NostrEvent):
async def _handle_incoming_dms( async def _handle_incoming_dms(
event: NostrEvent, merchant: Merchant, clear_text_msg: str event: NostrEvent, merchant: Merchant, clear_text_msg: str
): ):
dm_content = await _handle_dirrect_message( dm_reply = await _handle_dirrect_message(
merchant.id, merchant.id,
merchant.public_key, merchant.public_key,
event.pubkey, event.pubkey,
@ -243,10 +261,17 @@ async def _handle_incoming_dms(
event.created_at, event.created_at,
clear_text_msg, clear_text_msg,
) )
if dm_content: if dm_reply:
dm_event = merchant.build_dm_event(dm_content, event.pubkey) dm_event = merchant.build_dm_event(dm_reply, event.pubkey)
await nostr_client.publish_nostr_event(dm_event) await nostr_client.publish_nostr_event(dm_event)
customer = await get_customer(merchant.id, event.pubkey)
if not customer:
await create_customer(
merchant.id, Customer(merchant_id=merchant.id, public_key=event.pubkey)
)
await nostr_client.subscribe_to_user_profile(event.pubkey, 0)
async def _handle_outgoing_dms( async def _handle_outgoing_dms(
event: NostrEvent, merchant: Merchant, clear_text_msg: str event: NostrEvent, merchant: Merchant, clear_text_msg: str
@ -308,3 +333,18 @@ async def _handle_new_order(order: PartialOrder) -> Optional[str]:
return json.dumps(new_order.dict(), separators=(",", ":"), ensure_ascii=False) return json.dumps(new_order.dict(), separators=(",", ":"), ensure_ascii=False)
return None return None
async def _handle_customer_profile_update(event: NostrEvent):
try:
profile = json.loads(event.content)
await update_customer_profile(
event.pubkey,
event.created_at,
CustomerProfile(
name=profile["name"] if "name" in profile else "",
about=profile["about"] if "about" in profile else "",
),
)
except Exception as ex:
logger.warning(ex)

View file

@ -102,10 +102,36 @@ const merchant = async () => {
}, },
customerSelectedForOrder: function (customerPubkey) { customerSelectedForOrder: function (customerPubkey) {
this.activeChatCustomer = customerPubkey this.activeChatCustomer = customerPubkey
},
waitForNotifications: function () {
try {
const scheme = location.protocol === 'http:' ? 'ws' : 'wss'
const port = location.port ? `:${location.port}` : ''
const wsUrl = `${scheme}://${document.domain}${port}/api/v1/ws/${this.merchant.id}`
const wsConnection = new WebSocket(wsUrl)
console.log('### waiting for events')
wsConnection.onmessage = e => {
console.log('### e', e)
this.$q.notify({
timeout: 5000,
type: 'positive',
message: 'New Update',
caption: `something here`
})
}
} catch (error) {
this.$q.notify({
timeout: 5000,
type: 'warning',
message: 'Failed to watch for updated',
caption: `${error}`
})
}
} }
}, },
created: async function () { created: async function () {
await this.getMerchant() await this.getMerchant()
await this.waitForNotifications()
} }
}) })
} }

View file

@ -4,6 +4,7 @@ from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener from lnbits.tasks import register_invoice_listener
from .crud import ( from .crud import (
get_all_customers,
get_last_direct_messages_time, get_last_direct_messages_time,
get_last_order_time, get_last_order_time,
get_public_keys_for_merchants, get_public_keys_for_merchants,
@ -42,6 +43,10 @@ async def wait_for_nostr_events(nostr_client: NostrClient):
await nostr_client.subscribe_to_direct_messages(p, since) await nostr_client.subscribe_to_direct_messages(p, since)
customers = await get_all_customers()
for c in customers:
await nostr_client.subscribe_to_user_profile(c.public_key, c.event_created_at)
while True: while True:
message = await nostr_client.get_event() message = await nostr_client.get_event()
await process_nostr_message(message) await process_nostr_message(message)