NeahNew/lib/services/notifications/notification-service.ts
2026-01-07 10:32:42 +01:00

530 lines
18 KiB
TypeScript

import { Notification, NotificationCount } from '@/lib/types/notification';
import { NotificationAdapter } from './notification-adapter.interface';
import { LeantimeAdapter } from './leantime-adapter';
import { getRedisClient } from '@/lib/redis';
import { logger } from '@/lib/logger';
export class NotificationService {
private adapters: Map<string, NotificationAdapter> = new Map();
private static instance: NotificationService;
// Cache keys and TTLs - All aligned to 30 seconds for consistency
private static NOTIFICATION_COUNT_CACHE_KEY = (userId: string) => `notifications:count:${userId}`;
private static NOTIFICATIONS_LIST_CACHE_KEY = (userId: string, page: number, limit: number) =>
`notifications:list:${userId}:${page}:${limit}`;
private static COUNT_CACHE_TTL = 30; // 30 seconds (aligned with refresh interval)
private static LIST_CACHE_TTL = 30; // 30 seconds (aligned with count cache for consistency)
private static REFRESH_LOCK_KEY = (userId: string) => `notifications:refresh:lock:${userId}`;
private static REFRESH_LOCK_TTL = 30; // 30 seconds
constructor() {
logger.debug('[NOTIFICATION_SERVICE] Initializing notification service');
// Register adapters
this.registerAdapter(new LeantimeAdapter());
// More adapters will be added as they are implemented
// this.registerAdapter(new NextcloudAdapter());
// this.registerAdapter(new GiteaAdapter());
// this.registerAdapter(new DolibarrAdapter());
// this.registerAdapter(new MoodleAdapter());
logger.debug('[NOTIFICATION_SERVICE] Registered adapters', {
adapters: Array.from(this.adapters.keys()),
});
}
/**
* Get the singleton instance of the notification service
*/
public static getInstance(): NotificationService {
if (!NotificationService.instance) {
logger.debug('[NOTIFICATION_SERVICE] Creating new notification service instance');
NotificationService.instance = new NotificationService();
}
return NotificationService.instance;
}
/**
* Register a notification adapter
*/
private registerAdapter(adapter: NotificationAdapter): void {
this.adapters.set(adapter.sourceName, adapter);
logger.debug('[NOTIFICATION_SERVICE] Registered notification adapter', {
adapter: adapter.sourceName,
});
}
/**
* Get all notifications for a user from all configured sources
*/
async getNotifications(userId: string, page = 1, limit = 20): Promise<Notification[]> {
logger.debug('[NOTIFICATION_SERVICE] getNotifications called', {
userId,
page,
limit,
});
const redis = getRedisClient();
const cacheKey = NotificationService.NOTIFICATIONS_LIST_CACHE_KEY(userId, page, limit);
// Try to get from cache first
try {
const cachedData = await redis.get(cacheKey);
if (cachedData) {
logger.debug('[NOTIFICATION_SERVICE] Using cached notifications', {
userId,
});
// Schedule background refresh if TTL is less than half the original value
const ttl = await redis.ttl(cacheKey);
if (ttl < NotificationService.LIST_CACHE_TTL / 2) {
// Background refresh is now handled by unified refresh system
// Only schedule if unified refresh is not active
}
return JSON.parse(cachedData);
}
} catch (error) {
console.error('[NOTIFICATION_SERVICE] Error retrieving notifications from cache:', error);
}
// No cached data, fetch from all adapters
logger.debug('[NOTIFICATION_SERVICE] Fetching notifications from adapters', {
userId,
adapterCount: this.adapters.size,
});
const allNotifications: Notification[] = [];
const adapterEntries = Array.from(this.adapters.entries());
logger.debug('[NOTIFICATION_SERVICE] Available adapters', {
adapters: adapterEntries.map(([name]) => name),
});
const promises = adapterEntries.map(async ([name, adapter]) => {
logger.debug('[NOTIFICATION_SERVICE] Checking adapter configuration', {
adapter: name,
});
try {
const configured = await adapter.isConfigured();
logger.debug('[NOTIFICATION_SERVICE] Adapter configuration result', {
adapter: name,
configured,
});
if (configured) {
logger.debug('[NOTIFICATION_SERVICE] Fetching notifications from adapter', {
adapter: name,
userId,
});
const notifications = await adapter.getNotifications(userId, page, limit);
logger.debug('[NOTIFICATION_SERVICE] Adapter notifications fetched', {
adapter: name,
count: notifications.length,
});
return notifications;
} else {
logger.debug('[NOTIFICATION_SERVICE] Skipping adapter (not configured)', {
adapter: name,
});
return [];
}
} catch (error) {
logger.error('[NOTIFICATION_SERVICE] Error fetching notifications from adapter', {
adapter: name,
error: error instanceof Error ? error.message : String(error),
});
return [];
}
});
const results = await Promise.all(promises);
// Combine all notifications
results.forEach((notifications, index) => {
const adapterName = adapterEntries[index][0];
logger.debug('[NOTIFICATION_SERVICE] Adding notifications from adapter', {
adapter: adapterName,
count: notifications.length,
});
allNotifications.push(...notifications);
});
// Sort by timestamp (newest first)
allNotifications.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime());
logger.debug('[NOTIFICATION_SERVICE] Notifications sorted', {
total: allNotifications.length,
});
// Store in cache
try {
await redis.set(
cacheKey,
JSON.stringify(allNotifications),
'EX',
NotificationService.LIST_CACHE_TTL
);
logger.debug('[NOTIFICATION_SERVICE] Cached notifications', {
userId,
count: allNotifications.length,
});
} catch (error) {
logger.error('[NOTIFICATION_SERVICE] Error caching notifications', {
error: error instanceof Error ? error.message : String(error),
});
}
return allNotifications;
}
/**
* Get notification counts for a user
*/
async getNotificationCount(userId: string): Promise<NotificationCount> {
logger.debug('[NOTIFICATION_SERVICE] getNotificationCount called', { userId });
const redis = getRedisClient();
const cacheKey = NotificationService.NOTIFICATION_COUNT_CACHE_KEY(userId);
// Try to get from cache first
try {
const cachedData = await redis.get(cacheKey);
if (cachedData) {
logger.debug('[NOTIFICATION_SERVICE] Using cached notification counts', { userId });
// Background refresh is now handled by unified refresh system
// Cache TTL is aligned with refresh interval (30s)
return JSON.parse(cachedData);
}
} catch (error) {
console.error('[NOTIFICATION_SERVICE] Error retrieving notification counts from cache:', error);
}
// No cached data, fetch counts from all adapters
logger.debug('[NOTIFICATION_SERVICE] Fetching notification counts from adapters', {
userId,
adapterCount: this.adapters.size,
});
const aggregatedCount: NotificationCount = {
total: 0,
unread: 0,
sources: {}
};
const adapterEntries = Array.from(this.adapters.entries());
logger.debug('[NOTIFICATION_SERVICE] Available adapters for count', {
adapters: adapterEntries.map(([name]) => name),
});
const promises = adapterEntries.map(async ([name, adapter]) => {
logger.debug('[NOTIFICATION_SERVICE] Checking adapter configuration for count', {
adapter: name,
});
try {
const configured = await adapter.isConfigured();
logger.debug('[NOTIFICATION_SERVICE] Adapter configuration result for count', {
adapter: name,
configured,
});
if (configured) {
logger.debug('[NOTIFICATION_SERVICE] Fetching notification count from adapter', {
adapter: name,
userId,
});
const count = await adapter.getNotificationCount(userId);
logger.debug('[NOTIFICATION_SERVICE] Adapter count fetched', {
adapter: name,
total: count.total,
unread: count.unread,
});
return count;
} else {
logger.debug('[NOTIFICATION_SERVICE] Skipping adapter for count (not configured)', {
adapter: name,
});
return null;
}
} catch (error) {
logger.error('[NOTIFICATION_SERVICE] Error fetching notification count from adapter', {
adapter: name,
error: error instanceof Error ? error.message : String(error),
});
return null;
}
});
const results = await Promise.all(promises);
// Combine all counts
results.forEach((count, index) => {
if (!count) return;
const adapterName = adapterEntries[index][0];
logger.debug('[NOTIFICATION_SERVICE] Adding counts from adapter', {
adapter: adapterName,
total: count.total,
unread: count.unread,
});
aggregatedCount.total += count.total;
aggregatedCount.unread += count.unread;
// Merge source-specific counts
Object.entries(count.sources).forEach(([source, sourceCount]) => {
aggregatedCount.sources[source] = sourceCount;
});
});
logger.debug('[NOTIFICATION_SERVICE] Aggregated counts', {
userId,
total: aggregatedCount.total,
unread: aggregatedCount.unread,
});
// Store in cache
try {
await redis.set(
cacheKey,
JSON.stringify(aggregatedCount),
'EX',
NotificationService.COUNT_CACHE_TTL
);
logger.debug('[NOTIFICATION_SERVICE] Cached notification counts', {
userId,
total: aggregatedCount.total,
unread: aggregatedCount.unread,
});
} catch (error) {
logger.error('[NOTIFICATION_SERVICE] Error caching notification counts', {
error: error instanceof Error ? error.message : String(error),
});
}
return aggregatedCount;
}
/**
* Mark a notification as read
*/
async markAsRead(userId: string, notificationId: string): Promise<boolean> {
// Extract the source from the notification ID (format: "source-id")
const [source, ...idParts] = notificationId.split('-');
const sourceId = idParts.join('-'); // Reconstruct the ID in case it has hyphens
if (!source || !this.adapters.has(source)) {
logger.warn('[NOTIFICATION_SERVICE] markAsRead invalid source or adapter not found', {
source,
});
// Still invalidate cache to ensure fresh data
await this.invalidateCache(userId);
return false;
}
const adapter = this.adapters.get(source)!;
let success = false;
try {
success = await adapter.markAsRead(userId, notificationId);
logger.debug('[NOTIFICATION_SERVICE] markAsRead result', {
userId,
notificationId,
success,
});
} catch (error) {
logger.error('[NOTIFICATION_SERVICE] markAsRead error', {
error: error instanceof Error ? error.message : String(error),
});
success = false;
}
// Always invalidate cache after marking attempt (even on failure)
// This ensures fresh data on next fetch, even if the operation partially failed
logger.debug('[NOTIFICATION_SERVICE] markAsRead invalidating cache', {
userId,
success,
});
await this.invalidateCache(userId);
return success;
}
/**
* Mark all notifications from all sources as read
*/
async markAllAsRead(userId: string): Promise<boolean> {
logger.debug('[NOTIFICATION_SERVICE] markAllAsRead called', {
userId,
adapters: Array.from(this.adapters.keys()),
});
const promises = Array.from(this.adapters.values())
.map(async (adapter) => {
const adapterName = adapter.sourceName;
logger.debug('[NOTIFICATION_SERVICE] markAllAsRead processing adapter', {
adapter: adapterName,
});
try {
const configured = await adapter.isConfigured();
logger.debug('[NOTIFICATION_SERVICE] markAllAsRead adapter configuration', {
adapter: adapterName,
configured,
});
if (!configured) {
logger.debug('[NOTIFICATION_SERVICE] markAllAsRead skipping adapter (not configured)', {
adapter: adapterName,
});
return true; // Not configured, so nothing to mark (treat as success)
}
logger.debug('[NOTIFICATION_SERVICE] Calling markAllAsRead on adapter', {
adapter: adapterName,
});
const result = await adapter.markAllAsRead(userId);
logger.debug('[NOTIFICATION_SERVICE] Adapter markAllAsRead result', {
adapter: adapterName,
result,
});
return result;
} catch (error) {
logger.error('[NOTIFICATION_SERVICE] Error marking all notifications as read for adapter', {
adapter: adapterName,
error: error instanceof Error ? error.message : String(error),
});
return false;
}
});
const results = await Promise.all(promises);
const success = results.every(result => result);
const anySuccess = results.some(result => result);
logger.debug('[NOTIFICATION_SERVICE] markAllAsRead results', {
results,
success,
anySuccess,
});
// Always invalidate cache after marking attempt (even on failure)
// This ensures fresh data on next fetch, even if the operation failed
// The user might have marked some notifications manually, or the operation might have partially succeeded
logger.debug('[NOTIFICATION_SERVICE] markAllAsRead invalidating caches', {
userId,
anySuccess,
success,
});
await this.invalidateCache(userId);
return success;
}
/**
* Invalidate notification caches for a user
*/
private async invalidateCache(userId: string): Promise<void> {
try {
const redis = getRedisClient();
// Get all cache keys for this user
const countKey = NotificationService.NOTIFICATION_COUNT_CACHE_KEY(userId);
const listKeysPattern = `notifications:list:${userId}:*`;
// Delete count cache
await redis.del(countKey);
// Find and delete list caches using SCAN to avoid blocking Redis
let cursor = "0";
do {
const [nextCursor, keys] = await redis.scan(
cursor,
"MATCH",
listKeysPattern,
"COUNT",
100
);
cursor = nextCursor;
if (keys.length > 0) {
await redis.del(...keys);
}
} while (cursor !== "0");
logger.debug('[NOTIFICATION_SERVICE] Invalidated notification caches', {
userId,
});
} catch (error) {
logger.error('[NOTIFICATION_SERVICE] Error invalidating notification caches', {
error: error instanceof Error ? error.message : String(error),
});
}
}
/**
* Schedule a background refresh of notification data
*/
private async scheduleBackgroundRefresh(userId: string): Promise<void> {
const redis = getRedisClient();
const lockKey = NotificationService.REFRESH_LOCK_KEY(userId);
// Try to acquire a lock to prevent multiple refreshes
const lockAcquired = await redis.set(
lockKey,
Date.now().toString(),
'EX',
NotificationService.REFRESH_LOCK_TTL,
'NX' // Set only if the key doesn't exist
);
if (!lockAcquired) {
// Another process is already refreshing
return;
}
// Use setTimeout to make this non-blocking
setTimeout(async () => {
try {
// Check if we've refreshed recently (within the last minute)
// to avoid excessive refreshes from multiple tabs/components
const refreshKey = `notifications:last_refresh:${userId}`;
const lastRefresh = await redis.get(refreshKey);
if (lastRefresh) {
const lastRefreshTime = parseInt(lastRefresh, 10);
const now = Date.now();
// If refreshed in the last minute, skip
if (now - lastRefreshTime < 60000) {
logger.debug('[NOTIFICATION_SERVICE] Skipping background refresh (recently refreshed)', {
userId,
});
return;
}
}
logger.debug('[NOTIFICATION_SERVICE] Background refresh started', {
userId,
});
// Set last refresh time
await redis.set(refreshKey, Date.now().toString(), 'EX', 120); // 2 minute TTL
// Refresh counts and notifications (for first page)
await this.getNotificationCount(userId);
await this.getNotifications(userId, 1, 20);
logger.debug('[NOTIFICATION_SERVICE] Background refresh completed', {
userId,
});
} catch (error) {
logger.error('[NOTIFICATION_SERVICE] Background refresh failed', {
userId,
error: error instanceof Error ? error.message : String(error),
});
} finally {
// Release the lock
await redis.del(lockKey).catch(() => {});
}
}, 0);
}
}