import { Notification, NotificationCount } from '@/lib/types/notification'; import { NotificationAdapter } from './notification-adapter.interface'; import { getServerSession } from 'next-auth'; import { authOptions } from "@/app/api/auth/options"; import { getRedisClient } from '@/lib/redis'; import { logger } from '@/lib/logger'; // Leantime notification type from their API interface LeantimeNotification { id: number; userId: number; username: string; message: string; type: string; moduleId: number; url: string; read: number; // 0 for unread, 1 for read date: string; // ISO format date } export class LeantimeAdapter implements NotificationAdapter { readonly sourceName = 'leantime'; private apiUrl: string; private apiToken: string; private static readonly USER_ID_CACHE_TTL = 3600; // 1 hour private static readonly USER_ID_CACHE_KEY_PREFIX = 'leantime:userid:'; private static readonly USER_EMAIL_CACHE_TTL = 1800; // 30 minutes private static readonly USER_EMAIL_CACHE_KEY_PREFIX = 'leantime:useremail:'; constructor() { this.apiUrl = process.env.LEANTIME_API_URL || ''; this.apiToken = process.env.LEANTIME_TOKEN || ''; logger.debug('[LEANTIME_ADAPTER] Initialized', { hasApiUrl: !!this.apiUrl, hasApiToken: !!this.apiToken, }); } /** * Invalidate cached Leantime user ID for an email * Useful when user data changes or for debugging */ static async invalidateUserIdCache(email: string): Promise { try { const redis = getRedisClient(); const cacheKey = `${LeantimeAdapter.USER_ID_CACHE_KEY_PREFIX}${email.toLowerCase()}`; await redis.del(cacheKey); logger.info('[LEANTIME_ADAPTER] Invalidated user ID cache', { emailHash: Buffer.from(email.toLowerCase()).toString('base64').slice(0, 12), }); } catch (error) { console.error(`[LEANTIME_ADAPTER] Error invalidating user ID cache:`, error); } } async getNotifications(userId: string, page = 1, limit = 20): Promise { logger.debug('[LEANTIME_ADAPTER] getNotifications called', { userId, page, limit, }); try { // Get the user's email directly from the session const email = await this.getUserEmail(); if (!email) { logger.error('[LEANTIME_ADAPTER] Could not get user email from session'); return []; } const leantimeUserId = await this.getLeantimeUserId(email); if (!leantimeUserId) { logger.error('[LEANTIME_ADAPTER] User not found in Leantime', { emailHash: Buffer.from(email.toLowerCase()).toString('base64').slice(0, 12), }); return []; } // Calculate pagination limits const limitStart = (page - 1) * limit; const limitEnd = limit; // Make request to Leantime API using the correct jsonrpc method const jsonRpcBody = { jsonrpc: '2.0', method: 'leantime.rpc.Notifications.Notifications.getAllNotifications', params: { userId: leantimeUserId, showNewOnly: 0, // Get all notifications, not just unread limitStart: limitStart, limitEnd: limitEnd, filterOptions: [] // No additional filters }, id: 1 }; const response = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify(jsonRpcBody) }); logger.debug('[LEANTIME_ADAPTER] getNotifications response status', { status: response.status, }); if (!response.ok) { const errorText = await response.text(); logger.error('[LEANTIME_ADAPTER] Failed to fetch Leantime notifications', { status: response.status, bodyPreview: errorText.substring(0, 200), }); return []; } const responseText = await response.text(); const data = JSON.parse(responseText); if (!data.result || !Array.isArray(data.result)) { if (data.error) { logger.error('[LEANTIME_ADAPTER] API error in getNotifications', { message: data.error.message, code: data.error.code, }); } else { logger.error('[LEANTIME_ADAPTER] Invalid response format from Leantime notifications API'); } return []; } const notifications = this.transformNotifications(data.result, userId); logger.debug('[LEANTIME_ADAPTER] Transformed notifications count', { count: notifications.length, }); return notifications; } catch (error) { logger.error('[LEANTIME_ADAPTER] Error fetching Leantime notifications', { error: error instanceof Error ? error.message : String(error), }); return []; } } async getNotificationCount(userId: string): Promise { logger.debug('[LEANTIME_ADAPTER] getNotificationCount called', { userId }); try { // Fetch notifications directly from API for accurate counting (bypassing cache) // Fetch up to 1000 notifications to get accurate count const email = await this.getUserEmail(); if (!email) { logger.error('[LEANTIME_ADAPTER] Could not get user email for count'); return { total: 0, unread: 0, sources: { leantime: { total: 0, unread: 0 } } }; } const leantimeUserId = await this.getLeantimeUserId(email); if (!leantimeUserId) { logger.error('[LEANTIME_ADAPTER] Could not get Leantime user ID for count'); return { total: 0, unread: 0, sources: { leantime: { total: 0, unread: 0 } } }; } // Fetch directly from API (bypassing cache) to get accurate count const jsonRpcBody = { jsonrpc: '2.0', method: 'leantime.rpc.Notifications.Notifications.getAllNotifications', params: { userId: leantimeUserId, showNewOnly: 0, // Get all notifications limitStart: 0, limitEnd: 1000, // Fetch up to 1000 for accurate counting filterOptions: [] }, id: 1 }; const response = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify(jsonRpcBody) }); if (!response.ok) { logger.error('[LEANTIME_ADAPTER] Failed to fetch notifications for count', { status: response.status, }); return { total: 0, unread: 0, sources: { leantime: { total: 0, unread: 0 } } }; } const data = await response.json(); if (data.error || !Array.isArray(data.result)) { logger.error('[LEANTIME_ADAPTER] Error or invalid response for count', { error: data.error, }); return { total: 0, unread: 0, sources: { leantime: { total: 0, unread: 0 } } }; } const rawNotifications = data.result; // Count total and unread from raw data const totalCount = rawNotifications.length; const unreadCount = rawNotifications.filter((n: any) => n.read === 0 || n.read === false || n.read === '0' ).length; const hasMore = totalCount === 1000; // If we got exactly 1000, there might be more logger.debug('[LEANTIME_ADAPTER] Notification counts', { total: totalCount, unread: unreadCount, hasMore: hasMore, }); return { total: totalCount, unread: unreadCount, sources: { leantime: { total: totalCount, unread: unreadCount } } }; } catch (error) { logger.error('[LEANTIME_ADAPTER] Error fetching notification count', { error: error instanceof Error ? error.message : String(error), }); return { total: 0, unread: 0, sources: { leantime: { total: 0, unread: 0 } } }; } } async markAsRead(userId: string, notificationId: string): Promise { logger.debug('[LEANTIME_ADAPTER] markAsRead called', { userId, notificationId, }); try { // Extract the source ID from our compound ID const sourceId = notificationId.replace(`${this.sourceName}-`, ''); // Get user email and ID const email = await this.getUserEmail(); if (!email) { logger.error('[LEANTIME_ADAPTER] Could not get user email from session'); return false; } const leantimeUserId = await this.getLeantimeUserId(email); if (!leantimeUserId) { logger.error('[LEANTIME_ADAPTER] User not found in Leantime', { emailHash: Buffer.from(email.toLowerCase()).toString('base64').slice(0, 12), }); return false; } // Make request to Leantime API to mark notification as read // According to Leantime docs: method is markNotificationRead, params are id and userId const jsonRpcBody = { jsonrpc: '2.0', method: 'leantime.rpc.Notifications.Notifications.markNotificationRead', params: { id: parseInt(sourceId), userId: leantimeUserId }, id: 1 }; const response = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify(jsonRpcBody) }); logger.debug('[LEANTIME_ADAPTER] markAsRead response status', { status: response.status, }); if (!response.ok) { const errorText = await response.text(); logger.error('[LEANTIME_ADAPTER] markAsRead HTTP error', { status: response.status, bodyPreview: errorText.substring(0, 200), }); return false; } const responseText = await response.text(); let data; try { data = JSON.parse(responseText); } catch (parseError) { logger.error('[LEANTIME_ADAPTER] markAsRead failed to parse response', { error: parseError instanceof Error ? parseError.message : String(parseError), }); return false; } if (data.error) { logger.error('[LEANTIME_ADAPTER] markAsRead API error', { error: data.error, }); return false; } const success = data.result === true || data.result === "true" || !!data.result; logger.debug('[LEANTIME_ADAPTER] markAsRead success', { success }); return success; } catch (error) { logger.error('[LEANTIME_ADAPTER] Error marking notification as read', { error: error instanceof Error ? error.message : String(error), }); return false; } } async markAllAsRead(userId: string): Promise { // CRITICAL: This should ALWAYS appear if method is called // Using multiple logging methods to ensure visibility logger.info('[LEANTIME_ADAPTER] markAllAsRead START', { userId, hasApiUrl: !!this.apiUrl, hasApiToken: !!this.apiToken, }); try { // Get user email and ID const email = await this.getUserEmail(); if (!email) { logger.error('[LEANTIME_ADAPTER] markAllAsRead could not get user email from session'); return false; } const leantimeUserId = await this.getLeantimeUserId(email); if (!leantimeUserId) { logger.error('[LEANTIME_ADAPTER] markAllAsRead user not found in Leantime', { emailHash: Buffer.from(email.toLowerCase()).toString('base64').slice(0, 12), }); return false; } // Leantime doesn't have a "mark all as read" method, so we need to: // 1. Fetch all unread notifications directly from API (bypassing any cache) // 2. Mark each one individually using markNotificationRead // Fetch all notifications directly from API (up to 1000) to get fresh data (not cached) const jsonRpcBody = { jsonrpc: '2.0', method: 'leantime.rpc.Notifications.Notifications.getAllNotifications', params: { userId: leantimeUserId, showNewOnly: 0, // Get all, not just unread limitStart: 0, limitEnd: 1000, filterOptions: [] }, id: 1 }; const fetchResponse = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify(jsonRpcBody) }); if (!fetchResponse.ok) { logger.error('[LEANTIME_ADAPTER] markAllAsRead failed to fetch notifications', { status: fetchResponse.status, }); return false; } const fetchData = await fetchResponse.json(); if (fetchData.error) { logger.error('[LEANTIME_ADAPTER] markAllAsRead error fetching notifications', { error: fetchData.error, }); return false; } // Transform the raw Leantime notifications to our format const rawNotifications = Array.isArray(fetchData.result) ? fetchData.result : []; const unreadNotifications = rawNotifications .filter((n: any) => n.read === 0 || n.read === false || n.read === '0') .map((n: any) => ({ id: n.id, sourceId: String(n.id) })); logger.info('[LEANTIME_ADAPTER] markAllAsRead unread notifications', { unreadCount: unreadNotifications.length, total: rawNotifications.length, }); if (unreadNotifications.length === 0) { logger.info('[LEANTIME_ADAPTER] markAllAsRead no unread notifications'); return true; } // Mark notifications in batches to prevent API overload and connection resets const BATCH_SIZE = 15; // Process 15 notifications at a time const BATCH_DELAY = 200; // 200ms delay between batches const MAX_RETRIES = 2; // Retry failed notifications up to 2 times let successCount = 0; let failureCount = 0; const failedNotifications: number[] = []; // Helper function to mark a single notification const markSingleNotification = async (notificationId: number, retryCount = 0): Promise => { try { const jsonRpcBody = { jsonrpc: '2.0', method: 'leantime.rpc.Notifications.Notifications.markNotificationRead', params: { id: notificationId, userId: leantimeUserId }, id: 1 }; const response = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify(jsonRpcBody) }); if (!response.ok) { logger.error('[LEANTIME_ADAPTER] markAllAsRead failed to mark notification', { notificationId, status: response.status, }); // Retry on server errors (5xx) or rate limiting (429) if ((response.status >= 500 || response.status === 429) && retryCount < MAX_RETRIES) { const delay = Math.min(1000 * Math.pow(2, retryCount), 2000); // Exponential backoff, max 2s logger.debug('[LEANTIME_ADAPTER] Retrying notification after HTTP error', { notificationId, delay, attempt: retryCount + 1, maxRetries: MAX_RETRIES, }); await new Promise(resolve => setTimeout(resolve, delay)); return markSingleNotification(notificationId, retryCount + 1); } return false; } const data = await response.json(); if (data.error) { logger.error('[LEANTIME_ADAPTER] markAllAsRead JSON-RPC error marking notification', { notificationId, error: data.error, }); // Retry on certain JSON-RPC errors if (retryCount < MAX_RETRIES && (data.error.code === -32603 || data.error.code === -32000)) { const delay = Math.min(1000 * Math.pow(2, retryCount), 2000); logger.debug('[LEANTIME_ADAPTER] Retrying notification after JSON-RPC error', { notificationId, delay, attempt: retryCount + 1, maxRetries: MAX_RETRIES, }); await new Promise(resolve => setTimeout(resolve, delay)); return markSingleNotification(notificationId, retryCount + 1); } return false; } return data.result === true || data.result === "true" || !!data.result; } catch (error) { logger.error('[LEANTIME_ADAPTER] markAllAsRead exception marking notification', { notificationId, error: error instanceof Error ? error.message : String(error), }); // Retry on network errors if (retryCount < MAX_RETRIES && error instanceof Error) { const delay = Math.min(1000 * Math.pow(2, retryCount), 2000); logger.debug('[LEANTIME_ADAPTER] Retrying notification after network error', { notificationId, delay, attempt: retryCount + 1, maxRetries: MAX_RETRIES, }); await new Promise(resolve => setTimeout(resolve, delay)); return markSingleNotification(notificationId, retryCount + 1); } return false; } }; // Process notifications in batches const notificationIds: number[] = unreadNotifications .map((n: { id: number | string; sourceId: string }): number | null => { const id = typeof n.id === 'number' ? n.id : parseInt(String(n.id || n.sourceId)); return isNaN(id) ? null : id; }) .filter((id: number | null): id is number => id !== null); logger.info('[LEANTIME_ADAPTER] markAllAsRead processing notifications', { count: notificationIds.length, batchSize: BATCH_SIZE, }); // Split into batches for (let i = 0; i < notificationIds.length; i += BATCH_SIZE) { const batch = notificationIds.slice(i, i + BATCH_SIZE); const batchNumber = Math.floor(i / BATCH_SIZE) + 1; const totalBatches = Math.ceil(notificationIds.length / BATCH_SIZE); logger.debug('[LEANTIME_ADAPTER] markAllAsRead processing batch', { batchNumber, totalBatches, batchSize: batch.length, }); // Process batch in parallel const batchResults = await Promise.all( batch.map(async (notificationId) => { const result = await markSingleNotification(notificationId); if (result) { successCount++; } else { failureCount++; failedNotifications.push(notificationId); } return result; }) ); // Add delay between batches (except for the last batch) if (i + BATCH_SIZE < notificationIds.length) { await new Promise(resolve => setTimeout(resolve, BATCH_DELAY)); } } // Retry failed notifications once more if (failedNotifications.length > 0 && failedNotifications.length < notificationIds.length) { logger.info('[LEANTIME_ADAPTER] markAllAsRead retrying failed notifications', { failedCount: failedNotifications.length, }); const retryResults = await Promise.all( failedNotifications.map(async (notificationId) => { const result = await markSingleNotification(notificationId, 0); if (result) { successCount++; failureCount--; } return result; }) ); } logger.info('[LEANTIME_ADAPTER] markAllAsRead final results', { successCount, failureCount, total: notificationIds.length, }); // Consider it successful if majority were marked (at least 80% success rate) const successRate = notificationIds.length > 0 ? successCount / notificationIds.length : 0; const success = successRate >= 0.8; logger.info('[LEANTIME_ADAPTER] markAllAsRead END', { successRate, success, }); return success; } catch (error) { logger.error('[LEANTIME_ADAPTER] markAllAsRead exception', { error: error instanceof Error ? error.message : String(error), }); return false; } } async isConfigured(): Promise { return !!(this.apiUrl && this.apiToken); } private transformNotifications(data: any[], userId: string): Notification[] { if (!Array.isArray(data)) { return []; } return data.map(notification => { // Determine properties from notification object // Adjust these based on actual structure of Leantime notifications const id = notification.id || notification._id || notification.notificationId; const message = notification.message || notification.text || notification.content || ''; const type = notification.type || 'notification'; const read = notification.read || notification.isRead || 0; const date = notification.date || notification.datetime || notification.createdDate || new Date().toISOString(); const url = notification.url || notification.link || ''; return { id: `${this.sourceName}-${id}`, source: this.sourceName as 'leantime', sourceId: id.toString(), type: type, title: type, // Use type as title if no specific title field message: message, link: url.startsWith('http') ? url : `${this.apiUrl}${url.startsWith('/') ? '' : '/'}${url}`, isRead: read === 1 || read === true, timestamp: new Date(date), priority: 'normal', user: { id: userId, name: notification.username || notification.userName || '' }, metadata: { // Include any other useful fields from notification moduleId: notification.moduleId || notification.module || '', projectId: notification.projectId || '', } }; }); } // Helper function to get user's email with caching private async getUserEmail(): Promise { try { // Get user ID from session first (for cache key) const session = await getServerSession(authOptions); if (!session || !session.user?.id) { return null; } const userId = session.user.id; const emailCacheKey = `${LeantimeAdapter.USER_EMAIL_CACHE_KEY_PREFIX}${userId}`; // Check cache first try { const redis = getRedisClient(); const cachedEmail = await redis.get(emailCacheKey); if (cachedEmail) { logger.debug('[LEANTIME_ADAPTER] Found cached email for user', { userId }); return cachedEmail; } } catch (cacheError) { logger.warn('[LEANTIME_ADAPTER] Error checking email cache, will fetch from session', { error: cacheError instanceof Error ? cacheError.message : String(cacheError), }); } // Get from session if (!session.user?.email) { return null; } const email = session.user.email; // Cache the email try { const redis = getRedisClient(); await redis.set(emailCacheKey, email, 'EX', LeantimeAdapter.USER_EMAIL_CACHE_TTL); logger.debug('[LEANTIME_ADAPTER] Cached user email', { userId }); } catch (cacheError) { logger.warn('[LEANTIME_ADAPTER] Failed to cache email (non-fatal)', { error: cacheError instanceof Error ? cacheError.message : String(cacheError), }); } return email; } catch (error) { logger.error('[LEANTIME_ADAPTER] Error getting user email from session', { error: error instanceof Error ? error.message : String(error), }); return null; } } // Helper function to get Leantime user ID by email with caching and retry logic private async getLeantimeUserId(email: string, retryCount = 0): Promise { const MAX_RETRIES = 3; const CACHE_KEY = `${LeantimeAdapter.USER_ID_CACHE_KEY_PREFIX}${email.toLowerCase()}`; try { if (!this.apiToken) { logger.error('[LEANTIME_ADAPTER] No API token available for getLeantimeUserId'); return null; } // Check Redis cache first try { const redis = getRedisClient(); const cachedUserId = await redis.get(CACHE_KEY); if (cachedUserId) { const userId = parseInt(cachedUserId, 10); if (!isNaN(userId)) { logger.debug('[LEANTIME_ADAPTER] Found cached Leantime user ID', { emailHash: Buffer.from(email.toLowerCase()).toString('base64').slice(0, 12), userId, }); return userId; } } } catch (cacheError) { logger.warn('[LEANTIME_ADAPTER] Error checking cache for user ID, will fetch from API', { error: cacheError instanceof Error ? cacheError.message : String(cacheError), }); } // Fetch from API with retry logic const fetchWithRetry = async (attempt: number): Promise => { try { logger.debug('[LEANTIME_ADAPTER] Fetching Leantime user ID', { emailHash: Buffer.from(email.toLowerCase()).toString('base64').slice(0, 12), attempt: attempt + 1, maxRetries: MAX_RETRIES, }); const response = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify({ jsonrpc: '2.0', method: 'leantime.rpc.users.getAll', id: 1 }), }); if (!response.ok) { const errorText = await response.text(); logger.error('[LEANTIME_ADAPTER] User lookup API HTTP error', { status: response.status, bodyPreview: errorText.substring(0, 200), }); // Retry on server errors (5xx) or rate limiting (429) if ((response.status >= 500 || response.status === 429) && attempt < MAX_RETRIES) { const delay = Math.min(1000 * Math.pow(2, attempt), 5000); // Exponential backoff, max 5s logger.debug('[LEANTIME_ADAPTER] Retrying user lookup after HTTP error', { attempt: attempt + 1, delay, }); await new Promise(resolve => setTimeout(resolve, delay)); return fetchWithRetry(attempt + 1); } return null; } const data = await response.json(); if (data.error) { logger.error('[LEANTIME_ADAPTER] User lookup JSON-RPC error', { error: data.error, }); // Retry on certain errors if (attempt < MAX_RETRIES && (data.error.code === -32603 || data.error.code === -32000)) { const delay = Math.min(1000 * Math.pow(2, attempt), 5000); logger.debug('[LEANTIME_ADAPTER] Retrying user lookup after JSON-RPC error', { attempt: attempt + 1, delay, }); await new Promise(resolve => setTimeout(resolve, delay)); return fetchWithRetry(attempt + 1); } return null; } if (!data.result || !Array.isArray(data.result)) { logger.error('[LEANTIME_ADAPTER] Invalid user lookup response format'); return null; } const users = data.result; // Find user with matching email (check in both username and email fields) const user = users.find((u: any) => u.username === email || u.email === email || (typeof u.username === 'string' && u.username.toLowerCase() === email.toLowerCase()) ); if (user && user.id) { const userId = typeof user.id === 'number' ? user.id : parseInt(String(user.id), 10); if (!isNaN(userId)) { // Cache the result try { const redis = getRedisClient(); await redis.set(CACHE_KEY, userId.toString(), 'EX', LeantimeAdapter.USER_ID_CACHE_TTL); logger.debug('[LEANTIME_ADAPTER] Cached Leantime user ID', { emailHash: Buffer.from(email.toLowerCase()).toString('base64').slice(0, 12), userId, }); } catch (cacheError) { logger.warn('[LEANTIME_ADAPTER] Failed to cache user ID (non-fatal)', { error: cacheError instanceof Error ? cacheError.message : String(cacheError), }); // Continue even if caching fails } return userId; } } logger.warn('[LEANTIME_ADAPTER] User not found in Leantime', { emailHash: Buffer.from(email.toLowerCase()).toString('base64').slice(0, 12), }); return null; } catch (error) { logger.error('[LEANTIME_ADAPTER] Error fetching user ID', { attempt: attempt + 1, error: error instanceof Error ? error.message : String(error), }); // Retry on network errors if (attempt < MAX_RETRIES && error instanceof Error) { const delay = Math.min(1000 * Math.pow(2, attempt), 5000); logger.debug('[LEANTIME_ADAPTER] Retrying user lookup after network error', { attempt: attempt + 1, delay, }); await new Promise(resolve => setTimeout(resolve, delay)); return fetchWithRetry(attempt + 1); } return null; } }; return await fetchWithRetry(retryCount); } catch (error) { logger.error('[LEANTIME_ADAPTER] Fatal error getting Leantime user ID', { error: error instanceof Error ? error.message : String(error), }); return null; } } }