import { Notification, NotificationCount } from '@/lib/types/notification'; import { NotificationAdapter } from './notification-adapter.interface'; import { LeantimeAdapter } from './leantime-adapter'; import { getRedisClient } from '@/lib/redis'; export class NotificationService { private adapters: Map = new Map(); private static instance: NotificationService; // Cache keys and TTLs private static NOTIFICATION_COUNT_CACHE_KEY = (userId: string) => `notifications:count:${userId}`; private static NOTIFICATIONS_LIST_CACHE_KEY = (userId: string, page: number, limit: number) => `notifications:list:${userId}:${page}:${limit}`; private static COUNT_CACHE_TTL = 30; // 30 seconds private static LIST_CACHE_TTL = 300; // 5 minutes private static REFRESH_LOCK_KEY = (userId: string) => `notifications:refresh:lock:${userId}`; private static REFRESH_LOCK_TTL = 30; // 30 seconds constructor() { // Register adapters this.registerAdapter(new LeantimeAdapter()); // More adapters will be added as they are implemented // this.registerAdapter(new NextcloudAdapter()); // this.registerAdapter(new GiteaAdapter()); // this.registerAdapter(new DolibarrAdapter()); // this.registerAdapter(new MoodleAdapter()); } /** * Get the singleton instance of the notification service */ public static getInstance(): NotificationService { if (!NotificationService.instance) { NotificationService.instance = new NotificationService(); } return NotificationService.instance; } /** * Register a notification adapter */ private registerAdapter(adapter: NotificationAdapter): void { this.adapters.set(adapter.sourceName, adapter); console.log(`Registered notification adapter: ${adapter.sourceName}`); } /** * Get all notifications for a user from all configured sources */ async getNotifications(userId: string, page = 1, limit = 20): Promise { const redis = getRedisClient(); const cacheKey = NotificationService.NOTIFICATIONS_LIST_CACHE_KEY(userId, page, limit); // Try to get from cache first try { const cachedData = await redis.get(cacheKey); if (cachedData) { console.log(`[NOTIFICATION_SERVICE] Using cached notifications for user ${userId}`); // Schedule background refresh if TTL is less than half the original value const ttl = await redis.ttl(cacheKey); if (ttl < NotificationService.LIST_CACHE_TTL / 2) { this.scheduleBackgroundRefresh(userId); } return JSON.parse(cachedData); } } catch (error) { console.error('Error retrieving notifications from cache:', error); } // No cached data, fetch from all adapters console.log(`[NOTIFICATION_SERVICE] Fetching notifications for user ${userId}`); const allNotifications: Notification[] = []; const promises = Array.from(this.adapters.values()) .map(adapter => adapter.isConfigured() .then(configured => configured ? adapter.getNotifications(userId, page, limit) : []) .catch(error => { console.error(`Error fetching notifications from ${adapter.sourceName}:`, error); return []; }) ); const results = await Promise.all(promises); // Combine all notifications results.forEach(notifications => { allNotifications.push(...notifications); }); // Sort by timestamp (newest first) allNotifications.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime()); // Store in cache try { await redis.set( cacheKey, JSON.stringify(allNotifications), 'EX', NotificationService.LIST_CACHE_TTL ); } catch (error) { console.error('Error caching notifications:', error); } return allNotifications; } /** * Get notification counts for a user */ async getNotificationCount(userId: string): Promise { const redis = getRedisClient(); const cacheKey = NotificationService.NOTIFICATION_COUNT_CACHE_KEY(userId); // Try to get from cache first try { const cachedData = await redis.get(cacheKey); if (cachedData) { console.log(`[NOTIFICATION_SERVICE] Using cached notification counts for user ${userId}`); // Schedule background refresh if TTL is less than half the original value const ttl = await redis.ttl(cacheKey); if (ttl < NotificationService.COUNT_CACHE_TTL / 2) { this.scheduleBackgroundRefresh(userId); } return JSON.parse(cachedData); } } catch (error) { console.error('Error retrieving notification counts from cache:', error); } // No cached data, fetch counts from all adapters console.log(`[NOTIFICATION_SERVICE] Fetching notification counts for user ${userId}`); const aggregatedCount: NotificationCount = { total: 0, unread: 0, sources: {} }; const promises = Array.from(this.adapters.values()) .map(adapter => adapter.isConfigured() .then(configured => configured ? adapter.getNotificationCount(userId) : null) .catch(error => { console.error(`Error fetching notification count from ${adapter.sourceName}:`, error); return null; }) ); const results = await Promise.all(promises); // Combine all counts results.forEach(count => { if (!count) return; aggregatedCount.total += count.total; aggregatedCount.unread += count.unread; // Merge source-specific counts Object.entries(count.sources).forEach(([source, sourceCount]) => { aggregatedCount.sources[source] = sourceCount; }); }); // Store in cache try { await redis.set( cacheKey, JSON.stringify(aggregatedCount), 'EX', NotificationService.COUNT_CACHE_TTL ); } catch (error) { console.error('Error caching notification counts:', error); } return aggregatedCount; } /** * Mark a notification as read */ async markAsRead(userId: string, notificationId: string): Promise { // Extract the source from the notification ID (format: "source-id") const [source, ...idParts] = notificationId.split('-'); const sourceId = idParts.join('-'); // Reconstruct the ID in case it has hyphens if (!source || !this.adapters.has(source)) { return false; } const adapter = this.adapters.get(source)!; const success = await adapter.markAsRead(userId, notificationId); if (success) { // Invalidate caches await this.invalidateCache(userId); } return success; } /** * Mark all notifications from all sources as read */ async markAllAsRead(userId: string): Promise { const promises = Array.from(this.adapters.values()) .map(adapter => adapter.isConfigured() .then(configured => configured ? adapter.markAllAsRead(userId) : true) .catch(error => { console.error(`Error marking all notifications as read for ${adapter.sourceName}:`, error); return false; }) ); const results = await Promise.all(promises); const success = results.every(result => result); if (success) { // Invalidate caches await this.invalidateCache(userId); } return success; } /** * Invalidate notification caches for a user */ private async invalidateCache(userId: string): Promise { try { const redis = getRedisClient(); // Get all cache keys for this user const countKey = NotificationService.NOTIFICATION_COUNT_CACHE_KEY(userId); const listKeysPattern = `notifications:list:${userId}:*`; // Delete count cache await redis.del(countKey); // Find and delete list caches const listKeys = await redis.keys(listKeysPattern); if (listKeys.length > 0) { await redis.del(...listKeys); } console.log(`[NOTIFICATION_SERVICE] Invalidated notification caches for user ${userId}`); } catch (error) { console.error('Error invalidating notification caches:', error); } } /** * Schedule a background refresh of notification data */ private async scheduleBackgroundRefresh(userId: string): Promise { const redis = getRedisClient(); const lockKey = NotificationService.REFRESH_LOCK_KEY(userId); // Try to acquire a lock to prevent multiple refreshes const lockAcquired = await redis.set( lockKey, Date.now().toString(), 'EX', NotificationService.REFRESH_LOCK_TTL, 'NX' // Set only if the key doesn't exist ); if (!lockAcquired) { // Another process is already refreshing return; } // Use setTimeout to make this non-blocking setTimeout(async () => { try { // Check if we've refreshed recently (within the last minute) // to avoid excessive refreshes from multiple tabs/components const refreshKey = `notifications:last_refresh:${userId}`; const lastRefresh = await redis.get(refreshKey); if (lastRefresh) { const lastRefreshTime = parseInt(lastRefresh, 10); const now = Date.now(); // If refreshed in the last minute, skip if (now - lastRefreshTime < 60000) { console.log(`[NOTIFICATION_SERVICE] Skipping background refresh for user ${userId} - refreshed recently`); return; } } console.log(`[NOTIFICATION_SERVICE] Background refresh started for user ${userId}`); // Set last refresh time await redis.set(refreshKey, Date.now().toString(), 'EX', 120); // 2 minute TTL // Refresh counts and notifications (for first page) await this.getNotificationCount(userId); await this.getNotifications(userId, 1, 20); console.log(`[NOTIFICATION_SERVICE] Background refresh completed for user ${userId}`); } catch (error) { console.error(`[NOTIFICATION_SERVICE] Background refresh failed for user ${userId}:`, error); } finally { // Release the lock await redis.del(lockKey).catch(() => {}); } }, 0); } }