import { Notification, NotificationCount } from '@/lib/types/notification'; import { NotificationAdapter } from './notification-adapter.interface'; import { getServerSession } from 'next-auth'; import { authOptions } from "@/app/api/auth/options"; import { getRedisClient } from '@/lib/redis'; // Leantime notification type from their API interface LeantimeNotification { id: number; userId: number; username: string; message: string; type: string; moduleId: number; url: string; read: number; // 0 for unread, 1 for read date: string; // ISO format date } export class LeantimeAdapter implements NotificationAdapter { readonly sourceName = 'leantime'; private apiUrl: string; private apiToken: string; private static readonly USER_ID_CACHE_TTL = 3600; // 1 hour private static readonly USER_ID_CACHE_KEY_PREFIX = 'leantime:userid:'; private static readonly USER_EMAIL_CACHE_TTL = 1800; // 30 minutes private static readonly USER_EMAIL_CACHE_KEY_PREFIX = 'leantime:useremail:'; constructor() { this.apiUrl = process.env.LEANTIME_API_URL || ''; this.apiToken = process.env.LEANTIME_TOKEN || ''; console.log('[LEANTIME_ADAPTER] Initialized with API URL and token'); } /** * Invalidate cached Leantime user ID for an email * Useful when user data changes or for debugging */ static async invalidateUserIdCache(email: string): Promise { try { const redis = getRedisClient(); const cacheKey = `${LeantimeAdapter.USER_ID_CACHE_KEY_PREFIX}${email.toLowerCase()}`; await redis.del(cacheKey); console.log(`[LEANTIME_ADAPTER] Invalidated user ID cache for ${email}`); } catch (error) { console.error(`[LEANTIME_ADAPTER] Error invalidating user ID cache:`, error); } } async getNotifications(userId: string, page = 1, limit = 20): Promise { console.log(`[LEANTIME_ADAPTER] getNotifications called for userId: ${userId}, page: ${page}, limit: ${limit}`); try { // Get the user's email directly from the session const email = await this.getUserEmail(); console.log(`[LEANTIME_ADAPTER] Retrieved email from session:`, email || 'null'); if (!email) { console.error('[LEANTIME_ADAPTER] Could not get user email from session'); return []; } const leantimeUserId = await this.getLeantimeUserId(email); console.log(`[LEANTIME_ADAPTER] Retrieved Leantime userId for email ${email}:`, leantimeUserId || 'null'); if (!leantimeUserId) { console.error('[LEANTIME_ADAPTER] User not found in Leantime:', email); return []; } // Calculate pagination limits const limitStart = (page - 1) * limit; const limitEnd = limit; // Make request to Leantime API using the correct jsonrpc method console.log('[LEANTIME_ADAPTER] Sending request to get all notifications'); const jsonRpcBody = { jsonrpc: '2.0', method: 'leantime.rpc.Notifications.Notifications.getAllNotifications', params: { userId: leantimeUserId, showNewOnly: 0, // Get all notifications, not just unread limitStart: limitStart, limitEnd: limitEnd, filterOptions: [] // No additional filters }, id: 1 }; console.log('[LEANTIME_ADAPTER] Request body:', JSON.stringify(jsonRpcBody)); const response = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify(jsonRpcBody) }); console.log('[LEANTIME_ADAPTER] Response status:', response.status); if (!response.ok) { const errorText = await response.text(); console.error('[LEANTIME_ADAPTER] Failed to fetch Leantime notifications:', { status: response.status, body: errorText.substring(0, 200) + (errorText.length > 200 ? '...' : '') }); return []; } const responseText = await response.text(); console.log('[LEANTIME_ADAPTER] Raw response (truncated):', responseText.substring(0, 200) + (responseText.length > 200 ? '...' : '')); const data = JSON.parse(responseText); console.log('[LEANTIME_ADAPTER] Parsed response data:', { hasResult: !!data.result, resultIsArray: Array.isArray(data.result), resultLength: Array.isArray(data.result) ? data.result.length : 'n/a', error: data.error }); if (!data.result || !Array.isArray(data.result)) { if (data.error) { console.error(`[LEANTIME_ADAPTER] API error: ${data.error.message || JSON.stringify(data.error)}`); } else { console.error('[LEANTIME_ADAPTER] Invalid response format from Leantime notifications API'); } return []; } const notifications = this.transformNotifications(data.result, userId); console.log('[LEANTIME_ADAPTER] Transformed notifications count:', notifications.length); return notifications; } catch (error) { console.error('[LEANTIME_ADAPTER] Error fetching Leantime notifications:', error); return []; } } async getNotificationCount(userId: string): Promise { console.log(`[LEANTIME_ADAPTER] getNotificationCount called for userId: ${userId}`); try { // Fetch notifications directly from API for accurate counting (bypassing cache) // Fetch up to 1000 notifications to get accurate count const email = await this.getUserEmail(); if (!email) { console.error('[LEANTIME_ADAPTER] Could not get user email for count'); return { total: 0, unread: 0, sources: { leantime: { total: 0, unread: 0 } } }; } const leantimeUserId = await this.getLeantimeUserId(email); if (!leantimeUserId) { console.error('[LEANTIME_ADAPTER] Could not get Leantime user ID for count'); return { total: 0, unread: 0, sources: { leantime: { total: 0, unread: 0 } } }; } // Fetch directly from API (bypassing cache) to get accurate count const jsonRpcBody = { jsonrpc: '2.0', method: 'leantime.rpc.Notifications.Notifications.getAllNotifications', params: { userId: leantimeUserId, showNewOnly: 0, // Get all notifications limitStart: 0, limitEnd: 1000, // Fetch up to 1000 for accurate counting filterOptions: [] }, id: 1 }; const response = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify(jsonRpcBody) }); if (!response.ok) { console.error(`[LEANTIME_ADAPTER] Failed to fetch notifications for count: HTTP ${response.status}`); return { total: 0, unread: 0, sources: { leantime: { total: 0, unread: 0 } } }; } const data = await response.json(); if (data.error || !Array.isArray(data.result)) { console.error('[LEANTIME_ADAPTER] Error or invalid response for count:', data.error); return { total: 0, unread: 0, sources: { leantime: { total: 0, unread: 0 } } }; } const rawNotifications = data.result; // Count total and unread from raw data const totalCount = rawNotifications.length; const unreadCount = rawNotifications.filter((n: any) => n.read === 0 || n.read === false || n.read === '0' ).length; const hasMore = totalCount === 1000; // If we got exactly 1000, there might be more console.log('[LEANTIME_ADAPTER] Notification counts:', { total: totalCount, unread: unreadCount, read: totalCount - unreadCount, hasMore: hasMore, note: hasMore ? 'WARNING: May have more than 1000 notifications total' : 'OK' }); return { total: totalCount, unread: unreadCount, sources: { leantime: { total: totalCount, unread: unreadCount } } }; } catch (error) { console.error('[LEANTIME_ADAPTER] Error fetching notification count:', error); return { total: 0, unread: 0, sources: { leantime: { total: 0, unread: 0 } } }; } } async markAsRead(userId: string, notificationId: string): Promise { console.log(`[LEANTIME_ADAPTER] markAsRead called for ${notificationId}`); try { // Extract the source ID from our compound ID const sourceId = notificationId.replace(`${this.sourceName}-`, ''); // Get user email and ID const email = await this.getUserEmail(); if (!email) { console.error('[LEANTIME_ADAPTER] Could not get user email from session'); return false; } const leantimeUserId = await this.getLeantimeUserId(email); if (!leantimeUserId) { console.error('[LEANTIME_ADAPTER] User not found in Leantime:', email); return false; } // Make request to Leantime API to mark notification as read // According to Leantime docs: method is markNotificationRead, params are id and userId const jsonRpcBody = { jsonrpc: '2.0', method: 'leantime.rpc.Notifications.Notifications.markNotificationRead', params: { id: parseInt(sourceId), userId: leantimeUserId }, id: 1 }; console.log(`[LEANTIME_ADAPTER] markAsRead - Request body:`, JSON.stringify(jsonRpcBody)); const response = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify(jsonRpcBody) }); console.log(`[LEANTIME_ADAPTER] markAsRead - Response status: ${response.status}`); if (!response.ok) { const errorText = await response.text(); console.error(`[LEANTIME_ADAPTER] markAsRead - HTTP Error ${response.status}:`, errorText.substring(0, 500)); return false; } const responseText = await response.text(); console.log(`[LEANTIME_ADAPTER] markAsRead - Response body:`, responseText.substring(0, 200)); let data; try { data = JSON.parse(responseText); } catch (parseError) { console.error(`[LEANTIME_ADAPTER] markAsRead - Failed to parse response:`, parseError); return false; } if (data.error) { console.error(`[LEANTIME_ADAPTER] markAsRead - API Error:`, data.error); return false; } const success = data.result === true || data.result === "true" || !!data.result; console.log(`[LEANTIME_ADAPTER] markAsRead - Success: ${success}`); return success; } catch (error) { console.error('[LEANTIME_ADAPTER] Error marking notification as read:', error); return false; } } async markAllAsRead(userId: string): Promise { // CRITICAL: This should ALWAYS appear if method is called // Using multiple logging methods to ensure visibility process.stdout.write(`\n[LEANTIME_ADAPTER] ===== markAllAsRead START =====\n`); console.log(`[LEANTIME_ADAPTER] ===== markAllAsRead START =====`); console.error(`[LEANTIME_ADAPTER] ===== markAllAsRead START (via console.error) =====`); console.log(`[LEANTIME_ADAPTER] markAllAsRead called for userId: ${userId}`); console.log(`[LEANTIME_ADAPTER] API URL: ${this.apiUrl}`); console.log(`[LEANTIME_ADAPTER] Has API Token: ${!!this.apiToken}`); try { // Get user email and ID const email = await this.getUserEmail(); if (!email) { console.error('[LEANTIME_ADAPTER] markAllAsRead - Could not get user email from session'); return false; } console.log(`[LEANTIME_ADAPTER] markAllAsRead - User email: ${email}`); const leantimeUserId = await this.getLeantimeUserId(email); if (!leantimeUserId) { console.error('[LEANTIME_ADAPTER] markAllAsRead - User not found in Leantime:', email); return false; } console.log(`[LEANTIME_ADAPTER] markAllAsRead - Leantime user ID: ${leantimeUserId}`); // Leantime doesn't have a "mark all as read" method, so we need to: // 1. Fetch all unread notifications directly from API (bypassing any cache) // 2. Mark each one individually using markNotificationRead console.log(`[LEANTIME_ADAPTER] markAllAsRead - Fetching all unread notifications directly from API`); // Fetch all notifications directly from API (up to 1000) to get fresh data (not cached) const jsonRpcBody = { jsonrpc: '2.0', method: 'leantime.rpc.Notifications.Notifications.getAllNotifications', params: { userId: leantimeUserId, showNewOnly: 0, // Get all, not just unread limitStart: 0, limitEnd: 1000, filterOptions: [] }, id: 1 }; console.log(`[LEANTIME_ADAPTER] markAllAsRead - Fetching notifications from API`); const fetchResponse = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify(jsonRpcBody) }); if (!fetchResponse.ok) { console.error(`[LEANTIME_ADAPTER] markAllAsRead - Failed to fetch notifications: HTTP ${fetchResponse.status}`); return false; } const fetchData = await fetchResponse.json(); if (fetchData.error) { console.error(`[LEANTIME_ADAPTER] markAllAsRead - Error fetching notifications:`, fetchData.error); return false; } // Transform the raw Leantime notifications to our format const rawNotifications = Array.isArray(fetchData.result) ? fetchData.result : []; const unreadNotifications = rawNotifications .filter((n: any) => n.read === 0 || n.read === false || n.read === '0') .map((n: any) => ({ id: n.id, sourceId: String(n.id) })); console.log(`[LEANTIME_ADAPTER] markAllAsRead - Found ${unreadNotifications.length} unread notifications to mark (from ${rawNotifications.length} total)`); if (unreadNotifications.length === 0) { console.log(`[LEANTIME_ADAPTER] markAllAsRead - No unread notifications, returning success`); return true; } // Mark notifications in batches to prevent API overload and connection resets const BATCH_SIZE = 15; // Process 15 notifications at a time const BATCH_DELAY = 200; // 200ms delay between batches const MAX_RETRIES = 2; // Retry failed notifications up to 2 times let successCount = 0; let failureCount = 0; const failedNotifications: number[] = []; // Helper function to mark a single notification const markSingleNotification = async (notificationId: number, retryCount = 0): Promise => { try { const jsonRpcBody = { jsonrpc: '2.0', method: 'leantime.rpc.Notifications.Notifications.markNotificationRead', params: { id: notificationId, userId: leantimeUserId }, id: 1 }; const response = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify(jsonRpcBody) }); if (!response.ok) { console.error(`[LEANTIME_ADAPTER] markAllAsRead - Failed to mark notification ${notificationId}: HTTP ${response.status}`); // Retry on server errors (5xx) or rate limiting (429) if ((response.status >= 500 || response.status === 429) && retryCount < MAX_RETRIES) { const delay = Math.min(1000 * Math.pow(2, retryCount), 2000); // Exponential backoff, max 2s console.log(`[LEANTIME_ADAPTER] Retrying notification ${notificationId} in ${delay}ms (attempt ${retryCount + 1}/${MAX_RETRIES})`); await new Promise(resolve => setTimeout(resolve, delay)); return markSingleNotification(notificationId, retryCount + 1); } return false; } const data = await response.json(); if (data.error) { console.error(`[LEANTIME_ADAPTER] markAllAsRead - Error marking notification ${notificationId}:`, data.error); // Retry on certain JSON-RPC errors if (retryCount < MAX_RETRIES && (data.error.code === -32603 || data.error.code === -32000)) { const delay = Math.min(1000 * Math.pow(2, retryCount), 2000); console.log(`[LEANTIME_ADAPTER] Retrying notification ${notificationId} after error in ${delay}ms`); await new Promise(resolve => setTimeout(resolve, delay)); return markSingleNotification(notificationId, retryCount + 1); } return false; } return data.result === true || data.result === "true" || !!data.result; } catch (error) { console.error(`[LEANTIME_ADAPTER] markAllAsRead - Exception marking notification ${notificationId}:`, error); // Retry on network errors if (retryCount < MAX_RETRIES && error instanceof Error) { const delay = Math.min(1000 * Math.pow(2, retryCount), 2000); console.log(`[LEANTIME_ADAPTER] Retrying notification ${notificationId} after network error in ${delay}ms`); await new Promise(resolve => setTimeout(resolve, delay)); return markSingleNotification(notificationId, retryCount + 1); } return false; } }; // Process notifications in batches const notificationIds = unreadNotifications .map((n: { id: number | string; sourceId: string }) => { const id = typeof n.id === 'number' ? n.id : parseInt(String(n.id || n.sourceId)); return isNaN(id) ? null : id; }) .filter((id): id is number => id !== null); console.log(`[LEANTIME_ADAPTER] markAllAsRead - Processing ${notificationIds.length} notifications in batches of ${BATCH_SIZE}`); // Split into batches for (let i = 0; i < notificationIds.length; i += BATCH_SIZE) { const batch = notificationIds.slice(i, i + BATCH_SIZE); const batchNumber = Math.floor(i / BATCH_SIZE) + 1; const totalBatches = Math.ceil(notificationIds.length / BATCH_SIZE); console.log(`[LEANTIME_ADAPTER] markAllAsRead - Processing batch ${batchNumber}/${totalBatches} (${batch.length} notifications)`); // Process batch in parallel const batchResults = await Promise.all( batch.map(async (notificationId) => { const result = await markSingleNotification(notificationId); if (result) { successCount++; } else { failureCount++; failedNotifications.push(notificationId); } return result; }) ); // Add delay between batches (except for the last batch) if (i + BATCH_SIZE < notificationIds.length) { await new Promise(resolve => setTimeout(resolve, BATCH_DELAY)); } } // Retry failed notifications once more if (failedNotifications.length > 0 && failedNotifications.length < notificationIds.length) { console.log(`[LEANTIME_ADAPTER] markAllAsRead - Retrying ${failedNotifications.length} failed notifications`); const retryResults = await Promise.all( failedNotifications.map(async (notificationId) => { const result = await markSingleNotification(notificationId, 0); if (result) { successCount++; failureCount--; } return result; }) ); } console.log(`[LEANTIME_ADAPTER] markAllAsRead - Final results: ${successCount} succeeded, ${failureCount} failed out of ${notificationIds.length} total`); // Consider it successful if majority were marked (at least 80% success rate) const successRate = notificationIds.length > 0 ? successCount / notificationIds.length : 0; const success = successRate >= 0.8; console.log(`[LEANTIME_ADAPTER] markAllAsRead - Success rate: ${(successRate * 100).toFixed(1)}%, Overall success: ${success}`); console.log(`[LEANTIME_ADAPTER] ===== markAllAsRead END (success: ${success}) =====`); return success; } catch (error) { console.error('[LEANTIME_ADAPTER] markAllAsRead - Exception:', error); console.error('[LEANTIME_ADAPTER] markAllAsRead - Error stack:', error instanceof Error ? error.stack : 'No stack'); console.error(`[LEANTIME_ADAPTER] ===== markAllAsRead END (exception) =====`); return false; } } async isConfigured(): Promise { return !!(this.apiUrl && this.apiToken); } private transformNotifications(data: any[], userId: string): Notification[] { if (!Array.isArray(data)) { return []; } return data.map(notification => { // Determine properties from notification object // Adjust these based on actual structure of Leantime notifications const id = notification.id || notification._id || notification.notificationId; const message = notification.message || notification.text || notification.content || ''; const type = notification.type || 'notification'; const read = notification.read || notification.isRead || 0; const date = notification.date || notification.datetime || notification.createdDate || new Date().toISOString(); const url = notification.url || notification.link || ''; return { id: `${this.sourceName}-${id}`, source: this.sourceName as 'leantime', sourceId: id.toString(), type: type, title: type, // Use type as title if no specific title field message: message, link: url.startsWith('http') ? url : `${this.apiUrl}${url.startsWith('/') ? '' : '/'}${url}`, isRead: read === 1 || read === true, timestamp: new Date(date), priority: 'normal', user: { id: userId, name: notification.username || notification.userName || '' }, metadata: { // Include any other useful fields from notification moduleId: notification.moduleId || notification.module || '', projectId: notification.projectId || '', } }; }); } // Helper function to get user's email with caching private async getUserEmail(): Promise { try { // Get user ID from session first (for cache key) const session = await getServerSession(authOptions); if (!session || !session.user?.id) { return null; } const userId = session.user.id; const emailCacheKey = `${LeantimeAdapter.USER_EMAIL_CACHE_KEY_PREFIX}${userId}`; // Check cache first try { const redis = getRedisClient(); const cachedEmail = await redis.get(emailCacheKey); if (cachedEmail) { console.log(`[LEANTIME_ADAPTER] Found cached email for user ${userId}`); return cachedEmail; } } catch (cacheError) { console.warn('[LEANTIME_ADAPTER] Error checking email cache, will fetch from session:', cacheError); } // Get from session if (!session.user?.email) { return null; } const email = session.user.email; // Cache the email try { const redis = getRedisClient(); await redis.set(emailCacheKey, email, 'EX', LeantimeAdapter.USER_EMAIL_CACHE_TTL); console.log(`[LEANTIME_ADAPTER] Cached email for user ${userId} (TTL: ${LeantimeAdapter.USER_EMAIL_CACHE_TTL}s)`); } catch (cacheError) { console.warn('[LEANTIME_ADAPTER] Failed to cache email (non-fatal):', cacheError); } return email; } catch (error) { console.error('[LEANTIME_ADAPTER] Error getting user email from session:', error); return null; } } // Helper function to get Leantime user ID by email with caching and retry logic private async getLeantimeUserId(email: string, retryCount = 0): Promise { const MAX_RETRIES = 3; const CACHE_KEY = `${LeantimeAdapter.USER_ID_CACHE_KEY_PREFIX}${email.toLowerCase()}`; try { if (!this.apiToken) { console.error('[LEANTIME_ADAPTER] No API token available for getLeantimeUserId'); return null; } // Check Redis cache first try { const redis = getRedisClient(); const cachedUserId = await redis.get(CACHE_KEY); if (cachedUserId) { const userId = parseInt(cachedUserId, 10); if (!isNaN(userId)) { console.log(`[LEANTIME_ADAPTER] Found cached Leantime user ID for ${email}: ${userId}`); return userId; } } } catch (cacheError) { console.warn('[LEANTIME_ADAPTER] Error checking cache for user ID, will fetch from API:', cacheError); } // Fetch from API with retry logic const fetchWithRetry = async (attempt: number): Promise => { try { console.log(`[LEANTIME_ADAPTER] Fetching Leantime user ID for ${email} (attempt ${attempt + 1}/${MAX_RETRIES + 1})`); const response = await fetch(`${this.apiUrl}/api/jsonrpc`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': this.apiToken }, body: JSON.stringify({ jsonrpc: '2.0', method: 'leantime.rpc.users.getAll', id: 1 }), }); if (!response.ok) { const errorText = await response.text(); console.error(`[LEANTIME_ADAPTER] API error (HTTP ${response.status}):`, errorText.substring(0, 200)); // Retry on server errors (5xx) or rate limiting (429) if ((response.status >= 500 || response.status === 429) && attempt < MAX_RETRIES) { const delay = Math.min(1000 * Math.pow(2, attempt), 5000); // Exponential backoff, max 5s console.log(`[LEANTIME_ADAPTER] Retrying in ${delay}ms...`); await new Promise(resolve => setTimeout(resolve, delay)); return fetchWithRetry(attempt + 1); } return null; } const data = await response.json(); if (data.error) { console.error('[LEANTIME_ADAPTER] JSON-RPC error:', data.error); // Retry on certain errors if (attempt < MAX_RETRIES && (data.error.code === -32603 || data.error.code === -32000)) { const delay = Math.min(1000 * Math.pow(2, attempt), 5000); console.log(`[LEANTIME_ADAPTER] Retrying after error in ${delay}ms...`); await new Promise(resolve => setTimeout(resolve, delay)); return fetchWithRetry(attempt + 1); } return null; } if (!data.result || !Array.isArray(data.result)) { console.error('[LEANTIME_ADAPTER] Invalid response format:', data); return null; } const users = data.result; // Find user with matching email (check in both username and email fields) const user = users.find((u: any) => u.username === email || u.email === email || (typeof u.username === 'string' && u.username.toLowerCase() === email.toLowerCase()) ); if (user && user.id) { const userId = typeof user.id === 'number' ? user.id : parseInt(String(user.id), 10); if (!isNaN(userId)) { // Cache the result try { const redis = getRedisClient(); await redis.set(CACHE_KEY, userId.toString(), 'EX', LeantimeAdapter.USER_ID_CACHE_TTL); console.log(`[LEANTIME_ADAPTER] Cached Leantime user ID for ${email}: ${userId} (TTL: ${LeantimeAdapter.USER_ID_CACHE_TTL}s)`); } catch (cacheError) { console.warn('[LEANTIME_ADAPTER] Failed to cache user ID (non-fatal):', cacheError); // Continue even if caching fails } return userId; } } console.warn(`[LEANTIME_ADAPTER] User not found in Leantime for email: ${email}`); return null; } catch (error) { console.error(`[LEANTIME_ADAPTER] Error fetching user ID (attempt ${attempt + 1}):`, error); // Retry on network errors if (attempt < MAX_RETRIES && error instanceof Error) { const delay = Math.min(1000 * Math.pow(2, attempt), 5000); console.log(`[LEANTIME_ADAPTER] Retrying after network error in ${delay}ms...`); await new Promise(resolve => setTimeout(resolve, delay)); return fetchWithRetry(attempt + 1); } return null; } }; return await fetchWithRetry(retryCount); } catch (error) { console.error('[LEANTIME_ADAPTER] Fatal error getting Leantime user ID:', error); return null; } } }