Merge pull request #1 from lnbits/fix_filters

filter subscribtion over `websockets`
This commit is contained in:
calle 2023-02-23 12:57:07 +01:00 committed by GitHub
commit ff4d15252a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 19 deletions

24
.gitignore vendored Normal file
View file

@ -0,0 +1,24 @@
.DS_Store
._*
__pycache__
*.py[cod]
*$py.class
.mypy_cache
.vscode
*-lock.json
*.egg
*.egg-info
.coverage
.pytest_cache
.webassets-cache
htmlcov
test-reports
tests/data/*.sqlite3
*.swo
*.swp
*.pyo
*.pyc
*.env

View file

@ -73,6 +73,7 @@
icon="cancel" icon="cancel"
color="pink" color="pink"
></q-btn> ></q-btn>
</q-td>
</q-tr> </q-tr>
</template> </template>
{% endraw %} {% endraw %}
@ -203,14 +204,17 @@
}) })
}, },
addRelay() { addRelay() {
if (!this.relayToAdd.startsWith("wss://")) { if (
this.relayToAdd = "" !this.relayToAdd.startsWith('wss://') &&
!this.relayToAdd.startsWith('ws://')
) {
this.relayToAdd = ''
this.$q.notify({ this.$q.notify({
timeout: 5000, timeout: 5000,
type: 'warning', type: 'warning',
message: `Invalid relay URL.`, message: `Invalid relay URL.`,
caption: "Should start with wss://" caption: "Should start with 'wss://'' or 'ws://'"
}) })
return return
} }
console.log("ADD RELAY " + this.relayToAdd) console.log("ADD RELAY " + this.relayToAdd)

View file

@ -2,13 +2,15 @@ from http import HTTPStatus
import asyncio import asyncio
import ssl import ssl
import json import json
from fastapi import Request from typing import List
from fastapi import Request, WebSocket
from fastapi.param_functions import Query from fastapi.param_functions import Query
from fastapi.params import Depends from fastapi.params import Depends
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from starlette.exceptions import HTTPException from starlette.exceptions import HTTPException
from sse_starlette.sse import EventSourceResponse from sse_starlette.sse import EventSourceResponse
from loguru import logger
from . import nostrclient_ext from . import nostrclient_ext
@ -93,8 +95,35 @@ async def api_post_event(event: Event):
@nostrclient_ext.post("/api/v1/filters") @nostrclient_ext.post("/api/v1/filters")
async def api_subscribe(filters: Filters): async def api_subscribe(filters: Filters):
nostr_filters = init_filters(filters.__root__)
return EventSourceResponse(
event_getter(nostr_filters),
ping=20,
media_type="text/event-stream",
)
@nostrclient_ext.websocket("/api/v1/filters")
async def ws_filter_subscribe(websocket: WebSocket):
await websocket.accept()
while True:
json_data = await websocket.receive_text()
try:
data = json.loads(json_data)
filters = data if isinstance(data, list) else [data]
filters = [Filter.parse_obj(f) for f in filters]
nostr_filters = init_filters(filters)
async for message in event_getter(nostr_filters):
await websocket.send_text(message)
except Exception as e:
logger.warning(e)
def init_filters(filters: List[Filter]):
filter_list = [] filter_list = []
for filter in filters.__root__: for filter in filters:
filter_list.append( filter_list.append(
NostrFilter( NostrFilter(
event_ids=filter.ids, event_ids=filter.ids,
@ -116,15 +145,11 @@ async def api_subscribe(filters: Filters):
request.extend(nostr_filters.to_json_array()) request.extend(nostr_filters.to_json_array())
message = json.dumps(request) message = json.dumps(request)
client.relay_manager.publish_message(message) client.relay_manager.publish_message(message)
return nostr_filters
async def event_getter():
while True:
event = await received_event_queue.get()
if nostr_filters.match(event):
yield event.to_message()
return EventSourceResponse( async def event_getter(nostr_filters):
event_getter(), while True:
ping=20, event = await received_event_queue.get()
media_type="text/event-stream", if nostr_filters.match(event):
) yield event.to_message()