NeahNew/lib/services/notifications/notification-service.ts
2025-05-04 11:06:56 +02:00

315 lines
10 KiB
TypeScript

import { Notification, NotificationCount } from '@/lib/types/notification';
import { NotificationAdapter } from './notification-adapter.interface';
import { LeantimeAdapter } from './leantime-adapter';
import { getRedisClient } from '@/lib/redis';
export class NotificationService {
private adapters: Map<string, NotificationAdapter> = new Map();
private static instance: NotificationService;
// Cache keys and TTLs
private static NOTIFICATION_COUNT_CACHE_KEY = (userId: string) => `notifications:count:${userId}`;
private static NOTIFICATIONS_LIST_CACHE_KEY = (userId: string, page: number, limit: number) =>
`notifications:list:${userId}:${page}:${limit}`;
private static COUNT_CACHE_TTL = 30; // 30 seconds
private static LIST_CACHE_TTL = 300; // 5 minutes
private static REFRESH_LOCK_KEY = (userId: string) => `notifications:refresh:lock:${userId}`;
private static REFRESH_LOCK_TTL = 30; // 30 seconds
constructor() {
// Register adapters
this.registerAdapter(new LeantimeAdapter());
// More adapters will be added as they are implemented
// this.registerAdapter(new NextcloudAdapter());
// this.registerAdapter(new GiteaAdapter());
// this.registerAdapter(new DolibarrAdapter());
// this.registerAdapter(new MoodleAdapter());
}
/**
* Get the singleton instance of the notification service
*/
public static getInstance(): NotificationService {
if (!NotificationService.instance) {
NotificationService.instance = new NotificationService();
}
return NotificationService.instance;
}
/**
* Register a notification adapter
*/
private registerAdapter(adapter: NotificationAdapter): void {
this.adapters.set(adapter.sourceName, adapter);
console.log(`Registered notification adapter: ${adapter.sourceName}`);
}
/**
* Get all notifications for a user from all configured sources
*/
async getNotifications(userId: string, page = 1, limit = 20): Promise<Notification[]> {
const redis = getRedisClient();
const cacheKey = NotificationService.NOTIFICATIONS_LIST_CACHE_KEY(userId, page, limit);
// Try to get from cache first
try {
const cachedData = await redis.get(cacheKey);
if (cachedData) {
console.log(`[NOTIFICATION_SERVICE] Using cached notifications for user ${userId}`);
// Schedule background refresh if TTL is less than half the original value
const ttl = await redis.ttl(cacheKey);
if (ttl < NotificationService.LIST_CACHE_TTL / 2) {
this.scheduleBackgroundRefresh(userId);
}
return JSON.parse(cachedData);
}
} catch (error) {
console.error('Error retrieving notifications from cache:', error);
}
// No cached data, fetch from all adapters
console.log(`[NOTIFICATION_SERVICE] Fetching notifications for user ${userId}`);
const allNotifications: Notification[] = [];
const promises = Array.from(this.adapters.values())
.map(adapter => adapter.isConfigured()
.then(configured => configured ? adapter.getNotifications(userId, page, limit) : [])
.catch(error => {
console.error(`Error fetching notifications from ${adapter.sourceName}:`, error);
return [];
})
);
const results = await Promise.all(promises);
// Combine all notifications
results.forEach(notifications => {
allNotifications.push(...notifications);
});
// Sort by timestamp (newest first)
allNotifications.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime());
// Store in cache
try {
await redis.set(
cacheKey,
JSON.stringify(allNotifications),
'EX',
NotificationService.LIST_CACHE_TTL
);
} catch (error) {
console.error('Error caching notifications:', error);
}
return allNotifications;
}
/**
* Get notification counts for a user
*/
async getNotificationCount(userId: string): Promise<NotificationCount> {
const redis = getRedisClient();
const cacheKey = NotificationService.NOTIFICATION_COUNT_CACHE_KEY(userId);
// Try to get from cache first
try {
const cachedData = await redis.get(cacheKey);
if (cachedData) {
console.log(`[NOTIFICATION_SERVICE] Using cached notification counts for user ${userId}`);
// Schedule background refresh if TTL is less than half the original value
const ttl = await redis.ttl(cacheKey);
if (ttl < NotificationService.COUNT_CACHE_TTL / 2) {
this.scheduleBackgroundRefresh(userId);
}
return JSON.parse(cachedData);
}
} catch (error) {
console.error('Error retrieving notification counts from cache:', error);
}
// No cached data, fetch counts from all adapters
console.log(`[NOTIFICATION_SERVICE] Fetching notification counts for user ${userId}`);
const aggregatedCount: NotificationCount = {
total: 0,
unread: 0,
sources: {}
};
const promises = Array.from(this.adapters.values())
.map(adapter => adapter.isConfigured()
.then(configured => configured ? adapter.getNotificationCount(userId) : null)
.catch(error => {
console.error(`Error fetching notification count from ${adapter.sourceName}:`, error);
return null;
})
);
const results = await Promise.all(promises);
// Combine all counts
results.forEach(count => {
if (!count) return;
aggregatedCount.total += count.total;
aggregatedCount.unread += count.unread;
// Merge source-specific counts
Object.entries(count.sources).forEach(([source, sourceCount]) => {
aggregatedCount.sources[source] = sourceCount;
});
});
// Store in cache
try {
await redis.set(
cacheKey,
JSON.stringify(aggregatedCount),
'EX',
NotificationService.COUNT_CACHE_TTL
);
} catch (error) {
console.error('Error caching notification counts:', error);
}
return aggregatedCount;
}
/**
* Mark a notification as read
*/
async markAsRead(userId: string, notificationId: string): Promise<boolean> {
// Extract the source from the notification ID (format: "source-id")
const [source, ...idParts] = notificationId.split('-');
const sourceId = idParts.join('-'); // Reconstruct the ID in case it has hyphens
if (!source || !this.adapters.has(source)) {
return false;
}
const adapter = this.adapters.get(source)!;
const success = await adapter.markAsRead(userId, notificationId);
if (success) {
// Invalidate caches
await this.invalidateCache(userId);
}
return success;
}
/**
* Mark all notifications from all sources as read
*/
async markAllAsRead(userId: string): Promise<boolean> {
const promises = Array.from(this.adapters.values())
.map(adapter => adapter.isConfigured()
.then(configured => configured ? adapter.markAllAsRead(userId) : true)
.catch(error => {
console.error(`Error marking all notifications as read for ${adapter.sourceName}:`, error);
return false;
})
);
const results = await Promise.all(promises);
const success = results.every(result => result);
if (success) {
// Invalidate caches
await this.invalidateCache(userId);
}
return success;
}
/**
* Invalidate notification caches for a user
*/
private async invalidateCache(userId: string): Promise<void> {
try {
const redis = getRedisClient();
// Get all cache keys for this user
const countKey = NotificationService.NOTIFICATION_COUNT_CACHE_KEY(userId);
const listKeysPattern = `notifications:list:${userId}:*`;
// Delete count cache
await redis.del(countKey);
// Find and delete list caches
const listKeys = await redis.keys(listKeysPattern);
if (listKeys.length > 0) {
await redis.del(...listKeys);
}
console.log(`[NOTIFICATION_SERVICE] Invalidated notification caches for user ${userId}`);
} catch (error) {
console.error('Error invalidating notification caches:', error);
}
}
/**
* Schedule a background refresh of notification data
*/
private async scheduleBackgroundRefresh(userId: string): Promise<void> {
const redis = getRedisClient();
const lockKey = NotificationService.REFRESH_LOCK_KEY(userId);
// Try to acquire a lock to prevent multiple refreshes
const lockAcquired = await redis.set(
lockKey,
Date.now().toString(),
'EX',
NotificationService.REFRESH_LOCK_TTL,
'NX' // Set only if the key doesn't exist
);
if (!lockAcquired) {
// Another process is already refreshing
return;
}
// Use setTimeout to make this non-blocking
setTimeout(async () => {
try {
// Check if we've refreshed recently (within the last minute)
// to avoid excessive refreshes from multiple tabs/components
const refreshKey = `notifications:last_refresh:${userId}`;
const lastRefresh = await redis.get(refreshKey);
if (lastRefresh) {
const lastRefreshTime = parseInt(lastRefresh, 10);
const now = Date.now();
// If refreshed in the last minute, skip
if (now - lastRefreshTime < 60000) {
console.log(`[NOTIFICATION_SERVICE] Skipping background refresh for user ${userId} - refreshed recently`);
return;
}
}
console.log(`[NOTIFICATION_SERVICE] Background refresh started for user ${userId}`);
// Set last refresh time
await redis.set(refreshKey, Date.now().toString(), 'EX', 120); // 2 minute TTL
// Refresh counts and notifications (for first page)
await this.getNotificationCount(userId);
await this.getNotifications(userId, 1, 20);
console.log(`[NOTIFICATION_SERVICE] Background refresh completed for user ${userId}`);
} catch (error) {
console.error(`[NOTIFICATION_SERVICE] Background refresh failed for user ${userId}:`, error);
} finally {
// Release the lock
await redis.del(lockKey).catch(() => {});
}
}, 0);
}
}