NeahStable/lib/services/notifications/notification-registry.ts
2026-01-16 17:24:36 +01:00

286 lines
8.7 KiB
TypeScript

import { getRedisClient } from '@/lib/redis';
import { logger } from '@/lib/logger';
import { NotificationCount } from '@/lib/types/notification';
export interface NotificationItem {
id: string;
title: string;
message: string;
link?: string;
timestamp: string;
metadata?: Record<string, any>;
}
/**
* Simple registry that stores notification counts from widgets
* Widgets update their counts, and the badge reads the aggregated count
*/
export class NotificationRegistry {
private static instance: NotificationRegistry;
private static COUNT_CACHE_KEY = (userId: string) => `notifications:count:${userId}`;
private static ITEMS_CACHE_KEY = (userId: string, source: string) =>
`notifications:items:${userId}:${source}`;
private static COUNT_CACHE_TTL = 300; // 5 minutes (increased to prevent premature expiration)
private static ITEMS_CACHE_TTL = 300; // 5 minutes for items
public static getInstance(): NotificationRegistry {
if (!NotificationRegistry.instance) {
NotificationRegistry.instance = new NotificationRegistry();
}
return NotificationRegistry.instance;
}
/**
* Record count from a widget (called when widget detects new items)
*/
async recordCount(
userId: string,
source: string,
count: number,
items?: NotificationItem[]
): Promise<void> {
const redis = getRedisClient();
const cacheKey = NotificationRegistry.COUNT_CACHE_KEY(userId);
// Get current aggregated count
let currentCount: NotificationCount = {
total: 0,
unread: 0,
sources: {},
};
try {
const cached = await redis.get(cacheKey);
if (cached) {
currentCount = JSON.parse(cached);
}
} catch (error) {
logger.error('[NOTIFICATION_REGISTRY] Error reading cache', {
userId,
error: error instanceof Error ? error.message : String(error),
});
}
// Update count for this source
// If count is 1 and it's a call, increment. Otherwise, set the count.
const previousSourceCount = currentCount.sources[source]?.unread || 0;
const isCallIncrement = count === 1 && items && items.length > 0 && items[0].metadata?.type === 'call';
if (isCallIncrement) {
// For calls, increment the existing count
currentCount.sources[source] = {
total: previousSourceCount + count,
unread: previousSourceCount + count,
};
logger.debug('[NOTIFICATION_REGISTRY] Call count incremented', {
source,
previousCount: previousSourceCount,
newCount: currentCount.sources[source].unread,
});
} else {
// For regular updates, set the count (widgets send their total)
// Only update if the new count is different or if we're setting it for the first time
if (count !== previousSourceCount || !currentCount.sources[source]) {
currentCount.sources[source] = {
total: count,
unread: count,
};
logger.debug('[NOTIFICATION_REGISTRY] Count updated', {
source,
previousCount: previousSourceCount,
newCount: count,
});
} else {
// Count hasn't changed, but refresh the TTL to keep it alive
logger.debug('[NOTIFICATION_REGISTRY] Count unchanged, refreshing TTL', {
source,
count,
});
}
}
// Recalculate total
currentCount.unread = Object.values(currentCount.sources).reduce(
(sum, s) => sum + s.unread,
0
);
currentCount.total = currentCount.unread;
// Store in cache
try {
await redis.set(
cacheKey,
JSON.stringify(currentCount),
'EX',
NotificationRegistry.COUNT_CACHE_TTL
);
logger.debug('[NOTIFICATION_REGISTRY] Count updated', {
userId,
source,
count,
totalUnread: currentCount.unread,
previousCount: previousSourceCount,
});
} catch (error) {
logger.error('[NOTIFICATION_REGISTRY] Error updating cache', {
userId,
error: error instanceof Error ? error.message : String(error),
});
}
// Store items if provided (for dropdown display)
if (items && items.length > 0) {
try {
const itemsKey = NotificationRegistry.ITEMS_CACHE_KEY(userId, source);
// Limit to 50 items per source
await redis.set(
itemsKey,
JSON.stringify(items.slice(0, 50)),
'EX',
NotificationRegistry.ITEMS_CACHE_TTL
);
logger.debug('[NOTIFICATION_REGISTRY] Items stored', {
userId,
source,
itemsCount: items.length,
});
} catch (error) {
logger.error('[NOTIFICATION_REGISTRY] Error storing items', {
userId,
source,
error: error instanceof Error ? error.message : String(error),
});
}
}
}
/**
* Get aggregated count (called by the badge)
*/
async getCount(userId: string): Promise<NotificationCount> {
const redis = getRedisClient();
const cacheKey = NotificationRegistry.COUNT_CACHE_KEY(userId);
try {
const cached = await redis.get(cacheKey);
if (cached) {
const count = JSON.parse(cached);
logger.debug('[NOTIFICATION_REGISTRY] Count retrieved from cache', {
userId,
totalUnread: count.unread,
sources: Object.keys(count.sources),
});
return count;
}
} catch (error) {
logger.error('[NOTIFICATION_REGISTRY] Error reading cache', {
userId,
error: error instanceof Error ? error.message : String(error),
});
}
// If no cache, return empty count
logger.debug('[NOTIFICATION_REGISTRY] No cache found, returning empty count', { userId });
return {
total: 0,
unread: 0,
sources: {},
};
}
/**
* Get notifications (items) from all sources (for dropdown)
*/
async getNotifications(userId: string, limit: number = 20): Promise<any[]> {
const redis = getRedisClient();
const sources = ['email', 'rocketchat', 'leantime', 'calendar'];
const allItems: any[] = [];
for (const source of sources) {
try {
const itemsKey = NotificationRegistry.ITEMS_CACHE_KEY(userId, source);
const items = await redis.get(itemsKey);
if (items) {
const parsed = JSON.parse(items);
// Transform items to Notification format
const transformed = parsed.map((item: NotificationItem) => {
// Parse timestamp - handle both string and Date
let timestamp: Date;
if (typeof item.timestamp === 'string') {
timestamp = new Date(item.timestamp);
} else if (item.timestamp instanceof Date) {
timestamp = item.timestamp;
} else {
timestamp = new Date(); // Fallback to now
}
return {
id: `${source}-${item.id}`,
source: source as 'leantime' | 'rocketchat' | 'email' | 'calendar',
sourceId: item.id,
type: source,
title: item.title,
message: item.message,
link: item.link,
isRead: false, // Widgets only send unread items
timestamp,
priority: 'normal' as const,
user: {
id: userId,
},
metadata: item.metadata,
};
});
allItems.push(...transformed);
}
} catch (error) {
logger.error('[NOTIFICATION_REGISTRY] Error reading items', {
source,
error: error instanceof Error ? error.message : String(error),
});
}
}
// Sort by timestamp (newest first)
allItems.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime());
logger.debug('[NOTIFICATION_REGISTRY] Notifications retrieved', {
userId,
total: allItems.length,
limit,
returned: Math.min(allItems.length, limit),
});
return allItems.slice(0, limit);
}
/**
* Invalidate cache (for force refresh)
*/
async invalidateCache(userId: string): Promise<void> {
try {
const redis = getRedisClient();
const countKey = NotificationRegistry.COUNT_CACHE_KEY(userId);
const sources = ['email', 'rocketchat', 'leantime', 'calendar'];
// Delete count cache
await redis.del(countKey);
// Delete items caches
for (const source of sources) {
const itemsKey = NotificationRegistry.ITEMS_CACHE_KEY(userId, source);
await redis.del(itemsKey);
}
logger.debug('[NOTIFICATION_REGISTRY] Cache invalidated', { userId });
} catch (error) {
logger.error('[NOTIFICATION_REGISTRY] Error invalidating cache', {
userId,
error: error instanceof Error ? error.message : String(error),
});
}
}
}