import { getRedisClient } from '@/lib/redis'; import { logger } from '@/lib/logger'; import { NotificationCount } from '@/lib/types/notification'; export interface NotificationItem { id: string; title: string; message: string; link?: string; timestamp: string; metadata?: Record; } /** * Simple registry that stores notification counts from widgets * Widgets update their counts, and the badge reads the aggregated count */ export class NotificationRegistry { private static instance: NotificationRegistry; private static COUNT_CACHE_KEY = (userId: string) => `notifications:count:${userId}`; private static ITEMS_CACHE_KEY = (userId: string, source: string) => `notifications:items:${userId}:${source}`; private static COUNT_CACHE_TTL = 300; // 5 minutes (increased to prevent premature expiration) private static ITEMS_CACHE_TTL = 300; // 5 minutes for items public static getInstance(): NotificationRegistry { if (!NotificationRegistry.instance) { NotificationRegistry.instance = new NotificationRegistry(); } return NotificationRegistry.instance; } /** * Record count from a widget (called when widget detects new items) */ async recordCount( userId: string, source: string, count: number, items?: NotificationItem[] ): Promise { const redis = getRedisClient(); const cacheKey = NotificationRegistry.COUNT_CACHE_KEY(userId); // Get current aggregated count let currentCount: NotificationCount = { total: 0, unread: 0, sources: {}, }; try { const cached = await redis.get(cacheKey); if (cached) { currentCount = JSON.parse(cached); } } catch (error) { logger.error('[NOTIFICATION_REGISTRY] Error reading cache', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), error: error instanceof Error ? error.message : String(error), }); } // Update count for this source // If count is 1 and it's a call, increment. Otherwise, set the count. const previousSourceCount = currentCount.sources[source]?.unread || 0; const isCallIncrement = count === 1 && items && items.length > 0 && items[0].metadata?.type === 'call'; if (isCallIncrement) { // For calls, increment the existing count currentCount.sources[source] = { total: previousSourceCount + count, unread: previousSourceCount + count, }; logger.debug('[NOTIFICATION_REGISTRY] Call count incremented', { source, previousCount: previousSourceCount, newCount: currentCount.sources[source].unread, }); } else { // For regular updates, set the count (widgets send their total) // Only update if the new count is different or if we're setting it for the first time if (count !== previousSourceCount || !currentCount.sources[source]) { currentCount.sources[source] = { total: count, unread: count, }; logger.debug('[NOTIFICATION_REGISTRY] Count updated', { previousCount: previousSourceCount, newCount: count, }); } else { // Count hasn't changed, but refresh the TTL to keep it alive logger.debug('[NOTIFICATION_REGISTRY] Count unchanged, refreshing TTL', { count, }); } } // Recalculate total currentCount.unread = Object.values(currentCount.sources).reduce( (sum, s) => sum + s.unread, 0 ); currentCount.total = currentCount.unread; // Store in cache try { await redis.set( cacheKey, JSON.stringify(currentCount), 'EX', NotificationRegistry.COUNT_CACHE_TTL ); logger.debug('[NOTIFICATION_REGISTRY] Count updated', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), count, totalUnread: currentCount.unread, previousCount: previousSourceCount, }); } catch (error) { logger.error('[NOTIFICATION_REGISTRY] Error updating cache', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), error: error instanceof Error ? error.message : String(error), }); } // Store items if provided (for dropdown display) if (items && items.length > 0) { try { const itemsKey = NotificationRegistry.ITEMS_CACHE_KEY(userId, source); // Limit to 50 items per source await redis.set( itemsKey, JSON.stringify(items.slice(0, 50)), 'EX', NotificationRegistry.ITEMS_CACHE_TTL ); logger.debug('[NOTIFICATION_REGISTRY] Items stored', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), itemsCount: items.length, }); } catch (error) { logger.error('[NOTIFICATION_REGISTRY] Error storing items', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), error: error instanceof Error ? error.message : String(error), }); } } } /** * Get aggregated count (called by the badge) */ async getCount(userId: string): Promise { const redis = getRedisClient(); const cacheKey = NotificationRegistry.COUNT_CACHE_KEY(userId); try { const cached = await redis.get(cacheKey); if (cached) { const count = JSON.parse(cached); logger.debug('[NOTIFICATION_REGISTRY] Count retrieved from cache', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), totalUnread: count.unread, sourcesCount: Object.keys(count.sources).length, }); return count; } } catch (error) { logger.error('[NOTIFICATION_REGISTRY] Error reading cache', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), error: error instanceof Error ? error.message : String(error), }); } // If no cache, return empty count logger.debug('[NOTIFICATION_REGISTRY] No cache found, returning empty count', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), }); return { total: 0, unread: 0, sources: {}, }; } /** * Get notifications (items) from all sources (for dropdown) */ async getNotifications(userId: string, limit: number = 20): Promise { const redis = getRedisClient(); const sources = ['email', 'rocketchat', 'leantime', 'calendar']; const allItems: any[] = []; for (const source of sources) { try { const itemsKey = NotificationRegistry.ITEMS_CACHE_KEY(userId, source); const items = await redis.get(itemsKey); if (items) { const parsed = JSON.parse(items); // Transform items to Notification format const transformed = parsed.map((item: NotificationItem) => { // Parse timestamp - handle both string and Date let timestamp: Date; if (typeof item.timestamp === 'string') { timestamp = new Date(item.timestamp); } else if (item.timestamp && typeof item.timestamp === 'object' && 'getTime' in item.timestamp) { // Handle Date object (from JSON.parse) timestamp = new Date(item.timestamp as any); } else { timestamp = new Date(); // Fallback to now } return { id: `${source}-${item.id}`, source: source as 'leantime' | 'rocketchat' | 'email' | 'calendar', sourceId: item.id, type: source, title: item.title, message: item.message, link: item.link, isRead: false, // Widgets only send unread items timestamp, priority: 'normal' as const, user: { id: userId, }, metadata: item.metadata, }; }); allItems.push(...transformed); } } catch (error) { logger.error('[NOTIFICATION_REGISTRY] Error reading items', { error: error instanceof Error ? error.message : String(error), }); } } // Sort by timestamp (newest first) allItems.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime()); logger.debug('[NOTIFICATION_REGISTRY] Notifications retrieved', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), total: allItems.length, limit, returned: Math.min(allItems.length, limit), }); return allItems.slice(0, limit); } /** * Invalidate cache (for force refresh) */ async invalidateCache(userId: string): Promise { try { const redis = getRedisClient(); const countKey = NotificationRegistry.COUNT_CACHE_KEY(userId); const sources = ['email', 'rocketchat', 'leantime', 'calendar']; // Delete count cache await redis.del(countKey); // Delete items caches for (const source of sources) { const itemsKey = NotificationRegistry.ITEMS_CACHE_KEY(userId, source); await redis.del(itemsKey); } logger.debug('[NOTIFICATION_REGISTRY] Cache invalidated', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), }); } catch (error) { logger.error('[NOTIFICATION_REGISTRY] Error invalidating cache', { userIdHash: Buffer.from(userId).toString('base64').slice(0, 12), error: error instanceof Error ? error.message : String(error), }); } } }