362 lines
13 KiB
TypeScript
362 lines
13 KiB
TypeScript
import { Notification, NotificationCount } from '@/lib/types/notification';
|
|
import { NotificationAdapter } from './notification-adapter.interface';
|
|
import { LeantimeAdapter } from './leantime-adapter';
|
|
import { getRedisClient } from '@/lib/redis';
|
|
|
|
export class NotificationService {
|
|
private adapters: Map<string, NotificationAdapter> = new Map();
|
|
private static instance: NotificationService;
|
|
|
|
// Cache keys and TTLs
|
|
private static NOTIFICATION_COUNT_CACHE_KEY = (userId: string) => `notifications:count:${userId}`;
|
|
private static NOTIFICATIONS_LIST_CACHE_KEY = (userId: string, page: number, limit: number) =>
|
|
`notifications:list:${userId}:${page}:${limit}`;
|
|
private static COUNT_CACHE_TTL = 30; // 30 seconds
|
|
private static LIST_CACHE_TTL = 300; // 5 minutes
|
|
private static REFRESH_LOCK_KEY = (userId: string) => `notifications:refresh:lock:${userId}`;
|
|
private static REFRESH_LOCK_TTL = 30; // 30 seconds
|
|
|
|
constructor() {
|
|
console.log('[NOTIFICATION_SERVICE] Initializing notification service');
|
|
|
|
// Register adapters
|
|
this.registerAdapter(new LeantimeAdapter());
|
|
|
|
// More adapters will be added as they are implemented
|
|
// this.registerAdapter(new NextcloudAdapter());
|
|
// this.registerAdapter(new GiteaAdapter());
|
|
// this.registerAdapter(new DolibarrAdapter());
|
|
// this.registerAdapter(new MoodleAdapter());
|
|
|
|
console.log('[NOTIFICATION_SERVICE] Registered adapters:', Array.from(this.adapters.keys()));
|
|
}
|
|
|
|
/**
|
|
* Get the singleton instance of the notification service
|
|
*/
|
|
public static getInstance(): NotificationService {
|
|
if (!NotificationService.instance) {
|
|
console.log('[NOTIFICATION_SERVICE] Creating new notification service instance');
|
|
NotificationService.instance = new NotificationService();
|
|
}
|
|
return NotificationService.instance;
|
|
}
|
|
|
|
/**
|
|
* Register a notification adapter
|
|
*/
|
|
private registerAdapter(adapter: NotificationAdapter): void {
|
|
this.adapters.set(adapter.sourceName, adapter);
|
|
console.log(`[NOTIFICATION_SERVICE] Registered notification adapter: ${adapter.sourceName}`);
|
|
}
|
|
|
|
/**
|
|
* Get all notifications for a user from all configured sources
|
|
*/
|
|
async getNotifications(userId: string, page = 1, limit = 20): Promise<Notification[]> {
|
|
console.log(`[NOTIFICATION_SERVICE] getNotifications called for user ${userId}, page ${page}, limit ${limit}`);
|
|
const redis = getRedisClient();
|
|
const cacheKey = NotificationService.NOTIFICATIONS_LIST_CACHE_KEY(userId, page, limit);
|
|
|
|
// Try to get from cache first
|
|
try {
|
|
const cachedData = await redis.get(cacheKey);
|
|
if (cachedData) {
|
|
console.log(`[NOTIFICATION_SERVICE] Using cached notifications for user ${userId}`);
|
|
|
|
// Schedule background refresh if TTL is less than half the original value
|
|
const ttl = await redis.ttl(cacheKey);
|
|
if (ttl < NotificationService.LIST_CACHE_TTL / 2) {
|
|
this.scheduleBackgroundRefresh(userId);
|
|
}
|
|
|
|
return JSON.parse(cachedData);
|
|
}
|
|
} catch (error) {
|
|
console.error('[NOTIFICATION_SERVICE] Error retrieving notifications from cache:', error);
|
|
}
|
|
|
|
// No cached data, fetch from all adapters
|
|
console.log(`[NOTIFICATION_SERVICE] Fetching notifications for user ${userId} from ${this.adapters.size} adapters`);
|
|
|
|
const allNotifications: Notification[] = [];
|
|
const adapterEntries = Array.from(this.adapters.entries());
|
|
console.log(`[NOTIFICATION_SERVICE] Available adapters: ${adapterEntries.map(([name]) => name).join(', ')}`);
|
|
|
|
const promises = adapterEntries.map(async ([name, adapter]) => {
|
|
console.log(`[NOTIFICATION_SERVICE] Checking if adapter ${name} is configured`);
|
|
try {
|
|
const configured = await adapter.isConfigured();
|
|
console.log(`[NOTIFICATION_SERVICE] Adapter ${name} is configured: ${configured}`);
|
|
|
|
if (configured) {
|
|
console.log(`[NOTIFICATION_SERVICE] Fetching notifications from ${name} for user ${userId}`);
|
|
const notifications = await adapter.getNotifications(userId, page, limit);
|
|
console.log(`[NOTIFICATION_SERVICE] Got ${notifications.length} notifications from ${name}`);
|
|
return notifications;
|
|
} else {
|
|
console.log(`[NOTIFICATION_SERVICE] Skipping adapter ${name} as it is not configured`);
|
|
return [];
|
|
}
|
|
} catch (error) {
|
|
console.error(`[NOTIFICATION_SERVICE] Error fetching notifications from ${name}:`, error);
|
|
return [];
|
|
}
|
|
});
|
|
|
|
const results = await Promise.all(promises);
|
|
|
|
// Combine all notifications
|
|
results.forEach((notifications, index) => {
|
|
const adapterName = adapterEntries[index][0];
|
|
console.log(`[NOTIFICATION_SERVICE] Adding ${notifications.length} notifications from ${adapterName}`);
|
|
allNotifications.push(...notifications);
|
|
});
|
|
|
|
// Sort by timestamp (newest first)
|
|
allNotifications.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime());
|
|
console.log(`[NOTIFICATION_SERVICE] Total notifications after sorting: ${allNotifications.length}`);
|
|
|
|
// Store in cache
|
|
try {
|
|
await redis.set(
|
|
cacheKey,
|
|
JSON.stringify(allNotifications),
|
|
'EX',
|
|
NotificationService.LIST_CACHE_TTL
|
|
);
|
|
console.log(`[NOTIFICATION_SERVICE] Cached ${allNotifications.length} notifications for user ${userId}`);
|
|
} catch (error) {
|
|
console.error('[NOTIFICATION_SERVICE] Error caching notifications:', error);
|
|
}
|
|
|
|
return allNotifications;
|
|
}
|
|
|
|
/**
|
|
* Get notification counts for a user
|
|
*/
|
|
async getNotificationCount(userId: string): Promise<NotificationCount> {
|
|
console.log(`[NOTIFICATION_SERVICE] getNotificationCount called for user ${userId}`);
|
|
const redis = getRedisClient();
|
|
const cacheKey = NotificationService.NOTIFICATION_COUNT_CACHE_KEY(userId);
|
|
|
|
// Try to get from cache first
|
|
try {
|
|
const cachedData = await redis.get(cacheKey);
|
|
if (cachedData) {
|
|
console.log(`[NOTIFICATION_SERVICE] Using cached notification counts for user ${userId}`);
|
|
|
|
// Schedule background refresh if TTL is less than half the original value
|
|
const ttl = await redis.ttl(cacheKey);
|
|
if (ttl < NotificationService.COUNT_CACHE_TTL / 2) {
|
|
this.scheduleBackgroundRefresh(userId);
|
|
}
|
|
|
|
return JSON.parse(cachedData);
|
|
}
|
|
} catch (error) {
|
|
console.error('[NOTIFICATION_SERVICE] Error retrieving notification counts from cache:', error);
|
|
}
|
|
|
|
// No cached data, fetch counts from all adapters
|
|
console.log(`[NOTIFICATION_SERVICE] Fetching notification counts for user ${userId} from ${this.adapters.size} adapters`);
|
|
|
|
const aggregatedCount: NotificationCount = {
|
|
total: 0,
|
|
unread: 0,
|
|
sources: {}
|
|
};
|
|
|
|
const adapterEntries = Array.from(this.adapters.entries());
|
|
console.log(`[NOTIFICATION_SERVICE] Available adapters for count: ${adapterEntries.map(([name]) => name).join(', ')}`);
|
|
|
|
const promises = adapterEntries.map(async ([name, adapter]) => {
|
|
console.log(`[NOTIFICATION_SERVICE] Checking if adapter ${name} is configured for count`);
|
|
try {
|
|
const configured = await adapter.isConfigured();
|
|
console.log(`[NOTIFICATION_SERVICE] Adapter ${name} is configured for count: ${configured}`);
|
|
|
|
if (configured) {
|
|
console.log(`[NOTIFICATION_SERVICE] Fetching notification count from ${name} for user ${userId}`);
|
|
const count = await adapter.getNotificationCount(userId);
|
|
console.log(`[NOTIFICATION_SERVICE] Got count from ${name}:`, count);
|
|
return count;
|
|
} else {
|
|
console.log(`[NOTIFICATION_SERVICE] Skipping adapter ${name} for count as it is not configured`);
|
|
return null;
|
|
}
|
|
} catch (error) {
|
|
console.error(`[NOTIFICATION_SERVICE] Error fetching notification count from ${name}:`, error);
|
|
return null;
|
|
}
|
|
});
|
|
|
|
const results = await Promise.all(promises);
|
|
|
|
// Combine all counts
|
|
results.forEach((count, index) => {
|
|
if (!count) return;
|
|
|
|
const adapterName = adapterEntries[index][0];
|
|
console.log(`[NOTIFICATION_SERVICE] Adding counts from ${adapterName}: total=${count.total}, unread=${count.unread}`);
|
|
|
|
aggregatedCount.total += count.total;
|
|
aggregatedCount.unread += count.unread;
|
|
|
|
// Merge source-specific counts
|
|
Object.entries(count.sources).forEach(([source, sourceCount]) => {
|
|
aggregatedCount.sources[source] = sourceCount;
|
|
});
|
|
});
|
|
|
|
console.log(`[NOTIFICATION_SERVICE] Aggregated counts for user ${userId}:`, aggregatedCount);
|
|
|
|
// Store in cache
|
|
try {
|
|
await redis.set(
|
|
cacheKey,
|
|
JSON.stringify(aggregatedCount),
|
|
'EX',
|
|
NotificationService.COUNT_CACHE_TTL
|
|
);
|
|
console.log(`[NOTIFICATION_SERVICE] Cached notification counts for user ${userId}`);
|
|
} catch (error) {
|
|
console.error('[NOTIFICATION_SERVICE] Error caching notification counts:', error);
|
|
}
|
|
|
|
return aggregatedCount;
|
|
}
|
|
|
|
/**
|
|
* Mark a notification as read
|
|
*/
|
|
async markAsRead(userId: string, notificationId: string): Promise<boolean> {
|
|
// Extract the source from the notification ID (format: "source-id")
|
|
const [source, ...idParts] = notificationId.split('-');
|
|
const sourceId = idParts.join('-'); // Reconstruct the ID in case it has hyphens
|
|
|
|
if (!source || !this.adapters.has(source)) {
|
|
return false;
|
|
}
|
|
|
|
const adapter = this.adapters.get(source)!;
|
|
const success = await adapter.markAsRead(userId, notificationId);
|
|
|
|
if (success) {
|
|
// Invalidate caches
|
|
await this.invalidateCache(userId);
|
|
}
|
|
|
|
return success;
|
|
}
|
|
|
|
/**
|
|
* Mark all notifications from all sources as read
|
|
*/
|
|
async markAllAsRead(userId: string): Promise<boolean> {
|
|
const promises = Array.from(this.adapters.values())
|
|
.map(adapter => adapter.isConfigured()
|
|
.then(configured => configured ? adapter.markAllAsRead(userId) : true)
|
|
.catch(error => {
|
|
console.error(`Error marking all notifications as read for ${adapter.sourceName}:`, error);
|
|
return false;
|
|
})
|
|
);
|
|
|
|
const results = await Promise.all(promises);
|
|
const success = results.every(result => result);
|
|
|
|
if (success) {
|
|
// Invalidate caches
|
|
await this.invalidateCache(userId);
|
|
}
|
|
|
|
return success;
|
|
}
|
|
|
|
/**
|
|
* Invalidate notification caches for a user
|
|
*/
|
|
private async invalidateCache(userId: string): Promise<void> {
|
|
try {
|
|
const redis = getRedisClient();
|
|
|
|
// Get all cache keys for this user
|
|
const countKey = NotificationService.NOTIFICATION_COUNT_CACHE_KEY(userId);
|
|
const listKeysPattern = `notifications:list:${userId}:*`;
|
|
|
|
// Delete count cache
|
|
await redis.del(countKey);
|
|
|
|
// Find and delete list caches
|
|
const listKeys = await redis.keys(listKeysPattern);
|
|
if (listKeys.length > 0) {
|
|
await redis.del(...listKeys);
|
|
}
|
|
|
|
console.log(`[NOTIFICATION_SERVICE] Invalidated notification caches for user ${userId}`);
|
|
} catch (error) {
|
|
console.error('Error invalidating notification caches:', error);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Schedule a background refresh of notification data
|
|
*/
|
|
private async scheduleBackgroundRefresh(userId: string): Promise<void> {
|
|
const redis = getRedisClient();
|
|
const lockKey = NotificationService.REFRESH_LOCK_KEY(userId);
|
|
|
|
// Try to acquire a lock to prevent multiple refreshes
|
|
const lockAcquired = await redis.set(
|
|
lockKey,
|
|
Date.now().toString(),
|
|
'EX',
|
|
NotificationService.REFRESH_LOCK_TTL,
|
|
'NX' // Set only if the key doesn't exist
|
|
);
|
|
|
|
if (!lockAcquired) {
|
|
// Another process is already refreshing
|
|
return;
|
|
}
|
|
|
|
// Use setTimeout to make this non-blocking
|
|
setTimeout(async () => {
|
|
try {
|
|
// Check if we've refreshed recently (within the last minute)
|
|
// to avoid excessive refreshes from multiple tabs/components
|
|
const refreshKey = `notifications:last_refresh:${userId}`;
|
|
const lastRefresh = await redis.get(refreshKey);
|
|
|
|
if (lastRefresh) {
|
|
const lastRefreshTime = parseInt(lastRefresh, 10);
|
|
const now = Date.now();
|
|
|
|
// If refreshed in the last minute, skip
|
|
if (now - lastRefreshTime < 60000) {
|
|
console.log(`[NOTIFICATION_SERVICE] Skipping background refresh for user ${userId} - refreshed recently`);
|
|
return;
|
|
}
|
|
}
|
|
|
|
console.log(`[NOTIFICATION_SERVICE] Background refresh started for user ${userId}`);
|
|
|
|
// Set last refresh time
|
|
await redis.set(refreshKey, Date.now().toString(), 'EX', 120); // 2 minute TTL
|
|
|
|
// Refresh counts and notifications (for first page)
|
|
await this.getNotificationCount(userId);
|
|
await this.getNotifications(userId, 1, 20);
|
|
|
|
console.log(`[NOTIFICATION_SERVICE] Background refresh completed for user ${userId}`);
|
|
} catch (error) {
|
|
console.error(`[NOTIFICATION_SERVICE] Background refresh failed for user ${userId}:`, error);
|
|
} finally {
|
|
// Release the lock
|
|
await redis.del(lockKey).catch(() => {});
|
|
}
|
|
}, 0);
|
|
}
|
|
}
|