feat: Integrate Relay Hub for centralized Nostr connection management

- Introduce a new composable, useRelayHub, to manage all Nostr WebSocket connections, enhancing connection stability and performance.
- Update existing components and composables to utilize the Relay Hub for connecting, publishing events, and subscribing to updates, streamlining the overall architecture.
- Add a RelayHubStatus component to display connection status and health metrics, improving user feedback on the connection state.
- Implement a RelayHubDemo page to showcase the functionality of the Relay Hub, including connection tests and subscription management.
- Ensure proper error handling and logging throughout the integration process to facilitate debugging and user experience.
This commit is contained in:
padreug 2025-08-10 11:48:33 +02:00
parent df7e461c91
commit 7d7bee8e77
14 changed files with 1982 additions and 955 deletions

View file

@ -1,5 +1,6 @@
import { SimplePool, type Filter, type Event } from 'nostr-tools'
import { type Filter, type Event } from 'nostr-tools'
import { getReplyInfo, EventKinds } from './events'
import { relayHub } from './relayHub'
export interface NostrClientConfig {
relays: string[]
@ -16,45 +17,40 @@ export interface NostrNote extends Event {
}
export class NostrClient {
private pool: SimplePool
private relays: string[]
private _isConnected: boolean = false
// private _isConnected: boolean = false
constructor(config: NostrClientConfig) {
this.pool = new SimplePool()
this.relays = config.relays
}
get isConnected(): boolean {
return this._isConnected
}
get poolInstance(): SimplePool {
return this.pool
return relayHub.isConnected
}
async connect(): Promise<void> {
try {
// Try to connect to at least one relay
const connections = await Promise.allSettled(
this.relays.map(relay => this.pool.ensureRelay(relay))
)
// Check if at least one connection was successful
this._isConnected = connections.some(result => result.status === 'fulfilled')
if (!this._isConnected) {
throw new Error('Failed to connect to any relay')
// The relay hub should already be initialized by the time this is called
if (!relayHub.isInitialized) {
throw new Error('RelayHub not initialized. Please ensure the app has initialized the relay hub first.')
}
// Check if we're already connected
if (relayHub.isConnected) {
return
}
// Try to connect using the relay hub
await relayHub.connect()
} catch (error) {
this._isConnected = false
throw error
}
}
disconnect(): void {
this.pool.close(this.relays)
this._isConnected = false
// Note: We don't disconnect the relay hub here as other components might be using it
// The relay hub will be managed at the app level
console.log('Client disconnected (relay hub remains active)')
}
async fetchNotes(options: {
@ -78,23 +74,8 @@ export class NostrClient {
]
try {
// Use proper subscription method to get multiple events
const allEvents: Event[] = []
const subscription = this.pool.subscribeMany(this.relays, filters, {
onevent(event: Event) {
allEvents.push(event)
},
oneose() {
// End of stored events - subscription is complete for initial fetch
}
})
// Wait for events to be collected (give it time to get all stored events)
await new Promise(resolve => setTimeout(resolve, 2000))
// Close the subscription since we just want the initial fetch
subscription.close()
// Use the relay hub to query events
const allEvents = await relayHub.queryEvents(filters, this.relays)
const noteEvents = [allEvents] // Wrap in array to match expected format
@ -139,21 +120,13 @@ export class NostrClient {
* Publish an event to all connected relays
*/
async publishEvent(event: Event): Promise<void> {
if (!this._isConnected) {
if (!relayHub.isConnected) {
throw new Error('Not connected to any relays')
}
try {
const results = await Promise.allSettled(
this.relays.map(relay => this.pool.publish([relay], event))
)
const failures = results.filter(result => result.status === 'rejected')
if (failures.length === results.length) {
throw new Error('Failed to publish to any relay')
}
console.log(`Published event ${event.id} to ${results.length - failures.length}/${results.length} relays`)
const result = await relayHub.publishEvent(event)
console.log(`Published event ${event.id} to ${result.success}/${result.total} relays`)
} catch (error) {
console.error('Failed to publish event:', error)
throw error
@ -171,21 +144,12 @@ export class NostrClient {
}
try {
const events = await Promise.all(
this.relays.map(async (relay) => {
try {
return await this.pool.querySync([relay], filter)
} catch (error) {
console.warn(`Failed to fetch replies from relay ${relay}:`, error)
return []
}
})
)
const events = await relayHub.queryEvents([filter], this.relays)
// Flatten and deduplicate events by ID
const uniqueEvents = Array.from(
new Map(
events.flat().map(event => [event.id, event])
events.map(event => [event.id, event])
).values()
)
@ -255,49 +219,12 @@ export class NostrClient {
try {
console.log('Fetching events with filters:', JSON.stringify(filters, null, 2))
const allEvents: Event[] = []
// Try each relay individually to identify problematic relays
const relayResults = await Promise.allSettled(
this.relays.map(async (relay) => {
try {
console.log(`Fetching from relay: ${relay}`)
const relayEvents: Event[] = []
const subscription = this.pool.subscribeMany([relay], filters, {
onevent(event: Event) {
console.log(`Received event from ${relay}:`, event)
relayEvents.push(event)
},
oneose() {
console.log(`End of stored events from ${relay}`)
}
})
// Wait for events to be collected
await new Promise(resolve => setTimeout(resolve, 2000))
subscription.close()
console.log(`Found ${relayEvents.length} events from ${relay}`)
return relayEvents
} catch (error) {
console.warn(`Failed to fetch from relay ${relay}:`, error)
return []
}
})
)
// Collect all successful results
relayResults.forEach((result, index) => {
if (result.status === 'fulfilled') {
allEvents.push(...result.value)
} else {
console.warn(`Relay ${this.relays[index]} failed:`, result.reason)
}
})
const events = await relayHub.queryEvents(filters, this.relays)
// Deduplicate events by ID
const uniqueEvents = Array.from(
new Map(allEvents.map(event => [event.id, event])).values()
new Map(events.map(event => [event.id, event])).values()
)
return uniqueEvents
@ -321,23 +248,13 @@ export class NostrClient {
}
try {
const events = await Promise.all(
this.relays.map(async (relay) => {
try {
return await this.pool.querySync([relay], filter)
} catch (error) {
console.warn(`Failed to fetch profiles from relay ${relay}:`, error)
return []
}
})
)
const events = await relayHub.queryEvents([filter], this.relays)
const profiles = new Map<string, any>()
const allEvents = events.flat()
// Get the latest profile for each pubkey
pubkeys.forEach(pubkey => {
const userEvents = allEvents
const userEvents = events
.filter(event => event.pubkey === pubkey)
.sort((a, b) => b.created_at - a.created_at)
@ -365,32 +282,25 @@ export class NostrClient {
since: Math.floor(Date.now() / 1000)
}]
// Subscribe to each relay individually
const unsubscribes = this.relays.map(relay => {
const sub = this.pool.subscribeMany(
[relay],
filters,
{
onevent: (event: Event) => {
const replyInfo = getReplyInfo(event)
onNote({
...event,
replyCount: 0,
reactionCount: 0,
reactions: {},
isReply: replyInfo.isReply,
replyTo: replyInfo.replyTo,
mentions: replyInfo.mentions
})
}
}
)
return () => sub.close()
// Use the relay hub to subscribe
const unsubscribe = relayHub.subscribe({
id: `notes-subscription-${Date.now()}`,
filters,
relays: this.relays,
onEvent: (event: Event) => {
const replyInfo = getReplyInfo(event)
onNote({
...event,
replyCount: 0,
reactionCount: 0,
reactions: {},
isReply: replyInfo.isReply,
replyTo: replyInfo.replyTo,
mentions: replyInfo.mentions
})
}
})
// Return a function that unsubscribes from all relays
return () => {
unsubscribes.forEach(unsub => unsub())
}
return unsubscribe
}
}

480
src/lib/nostr/relayHub.ts Normal file
View file

@ -0,0 +1,480 @@
import { SimplePool, type Filter, type Event, type Relay } from 'nostr-tools'
// Simple EventEmitter implementation for browser compatibility
class EventEmitter {
private events: { [key: string]: Function[] } = {}
on(event: string, listener: Function): void {
if (!this.events[event]) {
this.events[event] = []
}
this.events[event].push(listener)
}
off(event: string, listener: Function): void {
if (!this.events[event]) return
const index = this.events[event].indexOf(listener)
if (index > -1) {
this.events[event].splice(index, 1)
}
}
emit(event: string, ...args: any[]): void {
if (!this.events[event]) return
this.events[event].forEach(listener => listener(...args))
}
removeAllListeners(event?: string): void {
if (event) {
delete this.events[event]
} else {
this.events = {}
}
}
}
export interface RelayConfig {
url: string
read: boolean
write: boolean
priority?: number // Lower number = higher priority
}
export interface SubscriptionConfig {
id: string
filters: Filter[]
relays?: string[] // If not specified, uses all connected relays
onEvent?: (event: Event) => void
onEose?: () => void
onClose?: () => void
}
export interface RelayStatus {
url: string
connected: boolean
lastSeen: number
error?: string
latency?: number
}
export class RelayHub extends EventEmitter {
private pool: SimplePool
private relayConfigs: Map<string, RelayConfig> = new Map()
private connectedRelays: Map<string, Relay> = new Map()
private subscriptions: Map<string, any> = new Map()
public isInitialized = false
private reconnectInterval?: NodeJS.Timeout
private healthCheckInterval?: NodeJS.Timeout
private mobileVisibilityHandler?: () => void
// Connection state
private _isConnected = false
private _connectionAttempts = 0
private readonly maxReconnectAttempts = 5
private readonly reconnectDelay = 5000 // 5 seconds
private readonly healthCheckIntervalMs = 30000 // 30 seconds
constructor() {
super()
this.pool = new SimplePool()
this.setupMobileVisibilityHandling()
}
get isConnected(): boolean {
return this._isConnected
}
get connectedRelayCount(): number {
return this.connectedRelays.size
}
get totalRelayCount(): number {
return this.relayConfigs.size
}
get relayStatuses(): RelayStatus[] {
return Array.from(this.relayConfigs.values()).map(config => {
const relay = this.connectedRelays.get(config.url)
return {
url: config.url,
connected: !!relay,
lastSeen: relay ? Date.now() : 0,
error: relay ? undefined : 'Not connected',
latency: relay ? 0 : undefined // TODO: Implement actual latency measurement
}
})
}
/**
* Initialize the relay hub with relay configurations
*/
async initialize(relayUrls: string[]): Promise<void> {
if (this.isInitialized) {
console.warn('RelayHub already initialized')
return
}
// Convert URLs to relay configs
this.relayConfigs.clear()
relayUrls.forEach((url, index) => {
this.relayConfigs.set(url, {
url,
read: true,
write: true,
priority: index
})
})
// Start connection management
await this.connect()
this.startHealthCheck()
this.isInitialized = true
console.log(`RelayHub initialized with ${relayUrls.length} relays`)
}
/**
* Connect to all configured relays
*/
async connect(): Promise<void> {
if (this.relayConfigs.size === 0) {
throw new Error('No relay configurations found. Call initialize() first.')
}
try {
this._connectionAttempts++
console.log(`Connecting to ${this.relayConfigs.size} relays (attempt ${this._connectionAttempts})`)
// Connect to relays in priority order
const sortedRelays = Array.from(this.relayConfigs.values())
.sort((a, b) => (a.priority || 0) - (b.priority || 0))
const connectionPromises = sortedRelays.map(async (config) => {
try {
const relay = await this.pool.ensureRelay(config.url)
this.connectedRelays.set(config.url, relay)
console.log(`Connected to relay: ${config.url}`)
return { url: config.url, success: true }
} catch (error) {
console.error(`Failed to connect to relay ${config.url}:`, error)
return { url: config.url, success: false, error }
}
})
const results = await Promise.allSettled(connectionPromises)
const successfulConnections = results.filter(
result => result.status === 'fulfilled' && result.value.success
)
if (successfulConnections.length > 0) {
this._isConnected = true
this._connectionAttempts = 0
this.emit('connected', successfulConnections.length)
console.log(`Successfully connected to ${successfulConnections.length}/${this.relayConfigs.size} relays`)
} else {
throw new Error('Failed to connect to any relay')
}
} catch (error) {
this._isConnected = false
this.emit('connectionError', error)
console.error('Connection failed:', error)
// Schedule reconnection if we haven't exceeded max attempts
if (this._connectionAttempts < this.maxReconnectAttempts) {
this.scheduleReconnect()
} else {
this.emit('maxReconnectAttemptsReached')
console.error('Max reconnection attempts reached')
}
}
}
/**
* Disconnect from all relays
*/
disconnect(): void {
console.log('Disconnecting from all relays')
// Clear intervals
if (this.reconnectInterval) {
clearTimeout(this.reconnectInterval)
this.reconnectInterval = undefined
}
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval)
this.healthCheckInterval = undefined
}
// Close all subscriptions
this.subscriptions.forEach(sub => sub.close())
this.subscriptions.clear()
// Close all relay connections
this.pool.close(Array.from(this.relayConfigs.keys()))
this.connectedRelays.clear()
this._isConnected = false
this.emit('disconnected')
}
/**
* Subscribe to events from relays
*/
subscribe(config: SubscriptionConfig): () => void {
if (!this.isInitialized) {
throw new Error('RelayHub not initialized. Call initialize() first.')
}
if (!this._isConnected) {
throw new Error('Not connected to any relays')
}
// Determine which relays to use
const targetRelays = config.relays || Array.from(this.connectedRelays.keys())
const availableRelays = targetRelays.filter(url => this.connectedRelays.has(url))
if (availableRelays.length === 0) {
throw new Error('No available relays for subscription')
}
console.log(`Creating subscription ${config.id} on ${availableRelays.length} relays`)
// Create subscription using the pool
const subscription = this.pool.subscribeMany(availableRelays, config.filters, {
onevent: (event: Event) => {
config.onEvent?.(event)
this.emit('event', { subscriptionId: config.id, event, relay: 'unknown' })
},
oneose: () => {
config.onEose?.()
this.emit('eose', { subscriptionId: config.id })
}
})
// Store subscription for cleanup
this.subscriptions.set(config.id, subscription)
// Return unsubscribe function
return () => {
this.unsubscribe(config.id)
}
}
/**
* Unsubscribe from a specific subscription
*/
unsubscribe(subscriptionId: string): void {
const subscription = this.subscriptions.get(subscriptionId)
if (subscription) {
subscription.close()
this.subscriptions.delete(subscriptionId)
console.log(`Unsubscribed from ${subscriptionId}`)
}
}
/**
* Publish an event to all connected relays
*/
async publishEvent(event: Event): Promise<{ success: number; total: number }> {
if (!this._isConnected) {
throw new Error('Not connected to any relays')
}
const relayUrls = Array.from(this.connectedRelays.keys())
const results = await Promise.allSettled(
relayUrls.map(relay => this.pool.publish([relay], event))
)
const successful = results.filter(result => result.status === 'fulfilled').length
const total = results.length
console.log(`Published event ${event.id} to ${successful}/${total} relays`)
this.emit('eventPublished', { eventId: event.id, success: successful, total })
return { success: successful, total }
}
/**
* Query events from relays (one-time fetch)
*/
async queryEvents(filters: Filter[], relays?: string[]): Promise<Event[]> {
if (!this._isConnected) {
throw new Error('Not connected to any relays')
}
const targetRelays = relays || Array.from(this.connectedRelays.keys())
const availableRelays = targetRelays.filter(url => this.connectedRelays.has(url))
if (availableRelays.length === 0) {
throw new Error('No available relays for query')
}
try {
// Query each filter separately and combine results
const allEvents: Event[] = []
for (const filter of filters) {
const events = await this.pool.querySync(availableRelays, filter)
allEvents.push(...events)
}
console.log(`Queried ${allEvents.length} events from ${availableRelays.length} relays`)
return allEvents
} catch (error) {
console.error('Query failed:', error)
throw error
}
}
/**
* Get a specific relay instance
*/
getRelay(url: string): Relay | undefined {
return this.connectedRelays.get(url)
}
/**
* Check if a specific relay is connected
*/
isRelayConnected(url: string): boolean {
return this.connectedRelays.has(url)
}
/**
* Force reconnection to all relays
*/
async reconnect(): Promise<void> {
console.log('Forcing reconnection to all relays')
this.disconnect()
await this.connect()
}
/**
* Schedule automatic reconnection
*/
private scheduleReconnect(): void {
if (this.reconnectInterval) {
clearTimeout(this.reconnectInterval)
}
this.reconnectInterval = setTimeout(async () => {
console.log('Attempting automatic reconnection...')
await this.connect()
}, this.reconnectDelay)
}
/**
* Start health check monitoring
*/
private startHealthCheck(): void {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval)
}
this.healthCheckInterval = setInterval(() => {
this.performHealthCheck()
}, this.healthCheckIntervalMs)
}
/**
* Perform health check on all relays
*/
private async performHealthCheck(): Promise<void> {
if (!this._isConnected) return
console.log('Performing relay health check...')
const disconnectedRelays: string[] = []
// Check each relay connection
for (const [url] of this.connectedRelays) {
try {
// Try to send a ping or check connection status
// For now, we'll just check if the relay is still in our connected relays map
if (!this.connectedRelays.has(url)) {
disconnectedRelays.push(url)
}
} catch (error) {
console.warn(`Health check failed for relay ${url}:`, error)
disconnectedRelays.push(url)
}
}
// Remove disconnected relays
disconnectedRelays.forEach(url => {
this.connectedRelays.delete(url)
console.log(`Removed disconnected relay: ${url}`)
})
// Update connection status
if (this.connectedRelays.size === 0) {
this._isConnected = false
this.emit('allRelaysDisconnected')
console.warn('All relays disconnected, attempting reconnection...')
await this.connect()
} else if (this.connectedRelays.size < this.relayConfigs.size) {
this.emit('partialDisconnection', {
connected: this.connectedRelays.size,
total: this.relayConfigs.size
})
}
}
/**
* Setup mobile visibility handling for better WebSocket management
*/
private setupMobileVisibilityHandling(): void {
// Handle page visibility changes (mobile app backgrounding)
if (typeof document !== 'undefined') {
this.mobileVisibilityHandler = () => {
if (document.hidden) {
console.log('Page hidden, maintaining WebSocket connections')
// Keep connections alive but reduce activity
} else {
console.log('Page visible, resuming normal WebSocket activity')
// Resume normal activity and check connections
this.performHealthCheck()
}
}
document.addEventListener('visibilitychange', this.mobileVisibilityHandler)
}
// Handle online/offline events
if (typeof window !== 'undefined') {
window.addEventListener('online', () => {
console.log('Network online, checking relay connections...')
this.performHealthCheck()
})
window.addEventListener('offline', () => {
console.log('Network offline, marking as disconnected...')
this._isConnected = false
this.emit('networkOffline')
})
}
}
/**
* Cleanup resources
*/
destroy(): void {
console.log('Destroying RelayHub...')
// Remove event listeners
if (this.mobileVisibilityHandler && typeof document !== 'undefined') {
document.removeEventListener('visibilitychange', this.mobileVisibilityHandler)
}
if (typeof window !== 'undefined') {
window.removeEventListener('online', () => {})
window.removeEventListener('offline', () => {})
}
// Disconnect and cleanup
this.disconnect()
this.removeAllListeners()
this.isInitialized = false
}
}
// Export singleton instance
export const relayHub = new RelayHub()