courrier multi account restore compose
This commit is contained in:
parent
363e999dcd
commit
a7b023e359
@ -5,14 +5,18 @@ import { getImapConnection } from '@/lib/services/email-service';
|
||||
import { prisma } from '@/lib/prisma';
|
||||
import { getRedisClient } from '@/lib/redis';
|
||||
|
||||
// Cache TTL for unread counts (30 seconds)
|
||||
const UNREAD_COUNTS_CACHE_TTL = 30;
|
||||
// Cache TTL for unread counts (increased to 2 minutes for better performance)
|
||||
const UNREAD_COUNTS_CACHE_TTL = 120;
|
||||
// Key for unread counts cache
|
||||
const UNREAD_COUNTS_CACHE_KEY = (userId: string) => `email:unread:${userId}`;
|
||||
// Refresh lock key to prevent parallel refreshes
|
||||
const REFRESH_LOCK_KEY = (userId: string) => `email:unread-refresh:${userId}`;
|
||||
// Lock TTL to prevent stuck locks (30 seconds)
|
||||
const REFRESH_LOCK_TTL = 30;
|
||||
|
||||
/**
|
||||
* API route for fetching unread counts for email folders
|
||||
* Optimized with proper caching and connection reuse
|
||||
* Optimized with proper caching, connection reuse, and background refresh
|
||||
*/
|
||||
export async function GET(request: Request) {
|
||||
try {
|
||||
@ -33,88 +37,72 @@ export async function GET(request: Request) {
|
||||
if (cachedCounts) {
|
||||
// Use cached results if available
|
||||
console.log(`[UNREAD_API] Using cached unread counts for user ${userId}`);
|
||||
|
||||
// If the cache is about to expire, schedule a background refresh
|
||||
const ttl = await redis.ttl(UNREAD_COUNTS_CACHE_KEY(userId));
|
||||
if (ttl < UNREAD_COUNTS_CACHE_TTL / 2) {
|
||||
// Only refresh if not already refreshing (use a lock)
|
||||
const lockAcquired = await redis.set(
|
||||
REFRESH_LOCK_KEY(userId),
|
||||
Date.now().toString(),
|
||||
'EX',
|
||||
REFRESH_LOCK_TTL,
|
||||
'NX' // Set only if key doesn't exist
|
||||
);
|
||||
|
||||
if (lockAcquired) {
|
||||
console.log(`[UNREAD_API] Scheduling background refresh for user ${userId}`);
|
||||
// Use Promise to run in background
|
||||
setTimeout(() => {
|
||||
refreshUnreadCounts(userId, redis)
|
||||
.catch(err => console.error(`[UNREAD_API] Background refresh error: ${err}`))
|
||||
.finally(() => {
|
||||
// Release lock regardless of outcome
|
||||
redis.del(REFRESH_LOCK_KEY(userId)).catch(() => {});
|
||||
});
|
||||
}, 0);
|
||||
}
|
||||
}
|
||||
|
||||
return NextResponse.json(JSON.parse(cachedCounts));
|
||||
}
|
||||
|
||||
console.log(`[UNREAD_API] Cache miss for user ${userId}, fetching unread counts`);
|
||||
|
||||
// Get all accounts from the database directly
|
||||
const accounts = await prisma.mailCredentials.findMany({
|
||||
where: { userId },
|
||||
select: {
|
||||
id: true,
|
||||
email: true
|
||||
}
|
||||
});
|
||||
|
||||
console.log(`[UNREAD_API] Found ${accounts.length} accounts for user ${userId}`);
|
||||
|
||||
if (accounts.length === 0) {
|
||||
return NextResponse.json({ default: {} });
|
||||
}
|
||||
|
||||
// Mapping to hold the unread counts
|
||||
const unreadCounts: Record<string, Record<string, number>> = {};
|
||||
|
||||
// For each account, get the unread counts for standard folders
|
||||
for (const account of accounts) {
|
||||
const accountId = account.id;
|
||||
try {
|
||||
// Get IMAP connection for this account
|
||||
console.log(`[UNREAD_API] Processing account ${accountId} (${account.email})`);
|
||||
const client = await getImapConnection(userId, accountId);
|
||||
unreadCounts[accountId] = {};
|
||||
|
||||
// Standard folders to check
|
||||
const standardFolders = ['INBOX', 'Sent', 'Drafts', 'Trash', 'Junk', 'Spam', 'Archive', 'Sent Items', 'Archives', 'Notes', 'Éléments supprimés'];
|
||||
|
||||
// Get mailboxes for this account to check if folders exist
|
||||
const mailboxes = await client.list();
|
||||
const availableFolders = mailboxes.map(mb => mb.path);
|
||||
|
||||
// Check each standard folder if it exists
|
||||
for (const folder of standardFolders) {
|
||||
// Skip if folder doesn't exist in this account
|
||||
if (!availableFolders.includes(folder) &&
|
||||
!availableFolders.some(f => f.toLowerCase() === folder.toLowerCase())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
// Check folder status without opening it (more efficient)
|
||||
const status = await client.status(folder, { unseen: true });
|
||||
|
||||
if (status && typeof status.unseen === 'number') {
|
||||
// Store the unread count
|
||||
unreadCounts[accountId][folder] = status.unseen;
|
||||
|
||||
// Also store with prefixed version for consistency
|
||||
unreadCounts[accountId][`${accountId}:${folder}`] = status.unseen;
|
||||
|
||||
console.log(`[UNREAD_API] Account ${accountId}, folder ${folder}: ${status.unseen} unread`);
|
||||
}
|
||||
} catch (folderError) {
|
||||
console.error(`[UNREAD_API] Error getting unread count for ${accountId}:${folder}:`, folderError);
|
||||
// Continue to next folder even if this one fails
|
||||
}
|
||||
}
|
||||
|
||||
// Don't close the connection - let the connection pool handle it
|
||||
// This avoids opening and closing connections repeatedly
|
||||
} catch (accountError) {
|
||||
console.error(`[UNREAD_API] Error processing account ${accountId}:`, accountError);
|
||||
}
|
||||
}
|
||||
|
||||
// Save to cache for 30 seconds to avoid hammering the IMAP server
|
||||
await redis.set(
|
||||
UNREAD_COUNTS_CACHE_KEY(userId),
|
||||
JSON.stringify(unreadCounts),
|
||||
// Try to acquire lock to prevent parallel refreshes
|
||||
const lockAcquired = await redis.set(
|
||||
REFRESH_LOCK_KEY(userId),
|
||||
Date.now().toString(),
|
||||
'EX',
|
||||
UNREAD_COUNTS_CACHE_TTL
|
||||
REFRESH_LOCK_TTL,
|
||||
'NX' // Set only if key doesn't exist
|
||||
);
|
||||
|
||||
return NextResponse.json(unreadCounts);
|
||||
|
||||
if (!lockAcquired) {
|
||||
console.log(`[UNREAD_API] Another process is refreshing unread counts for ${userId}`);
|
||||
|
||||
// Return empty counts with short cache time if we can't acquire lock
|
||||
// The next request will likely get cached data
|
||||
return NextResponse.json({ _status: 'pending_refresh' });
|
||||
}
|
||||
|
||||
try {
|
||||
// Fetch new counts
|
||||
const unreadCounts = await fetchUnreadCounts(userId);
|
||||
|
||||
// Save to cache with longer TTL (2 minutes)
|
||||
await redis.set(
|
||||
UNREAD_COUNTS_CACHE_KEY(userId),
|
||||
JSON.stringify(unreadCounts),
|
||||
'EX',
|
||||
UNREAD_COUNTS_CACHE_TTL
|
||||
);
|
||||
|
||||
return NextResponse.json(unreadCounts);
|
||||
} finally {
|
||||
// Always release lock
|
||||
await redis.del(REFRESH_LOCK_KEY(userId));
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error("[UNREAD_API] Error fetching unread counts:", error);
|
||||
return NextResponse.json(
|
||||
@ -124,6 +112,103 @@ export async function GET(request: Request) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Background refresh function to update cache without blocking the API response
|
||||
*/
|
||||
async function refreshUnreadCounts(userId: string, redis: any): Promise<void> {
|
||||
try {
|
||||
console.log(`[UNREAD_API] Background refresh started for user ${userId}`);
|
||||
const unreadCounts = await fetchUnreadCounts(userId);
|
||||
|
||||
// Save to cache
|
||||
await redis.set(
|
||||
UNREAD_COUNTS_CACHE_KEY(userId),
|
||||
JSON.stringify(unreadCounts),
|
||||
'EX',
|
||||
UNREAD_COUNTS_CACHE_TTL
|
||||
);
|
||||
|
||||
console.log(`[UNREAD_API] Background refresh completed for user ${userId}`);
|
||||
} catch (error) {
|
||||
console.error(`[UNREAD_API] Background refresh failed for user ${userId}:`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Core function to fetch unread counts from IMAP
|
||||
*/
|
||||
async function fetchUnreadCounts(userId: string): Promise<Record<string, Record<string, number>>> {
|
||||
// Get all accounts from the database directly
|
||||
const accounts = await prisma.mailCredentials.findMany({
|
||||
where: { userId },
|
||||
select: {
|
||||
id: true,
|
||||
email: true
|
||||
}
|
||||
});
|
||||
|
||||
console.log(`[UNREAD_API] Found ${accounts.length} accounts for user ${userId}`);
|
||||
|
||||
if (accounts.length === 0) {
|
||||
return { default: {} };
|
||||
}
|
||||
|
||||
// Mapping to hold the unread counts
|
||||
const unreadCounts: Record<string, Record<string, number>> = {};
|
||||
|
||||
// For each account, get the unread counts for standard folders
|
||||
for (const account of accounts) {
|
||||
const accountId = account.id;
|
||||
try {
|
||||
// Get IMAP connection for this account
|
||||
console.log(`[UNREAD_API] Processing account ${accountId} (${account.email})`);
|
||||
const client = await getImapConnection(userId, accountId);
|
||||
unreadCounts[accountId] = {};
|
||||
|
||||
// Standard folders to check
|
||||
const standardFolders = ['INBOX', 'Sent', 'Drafts', 'Trash', 'Junk', 'Spam', 'Archive', 'Sent Items', 'Archives', 'Notes', 'Éléments supprimés'];
|
||||
|
||||
// Get mailboxes for this account to check if folders exist
|
||||
const mailboxes = await client.list();
|
||||
const availableFolders = mailboxes.map(mb => mb.path);
|
||||
|
||||
// Check each standard folder if it exists
|
||||
for (const folder of standardFolders) {
|
||||
// Skip if folder doesn't exist in this account
|
||||
if (!availableFolders.includes(folder) &&
|
||||
!availableFolders.some(f => f.toLowerCase() === folder.toLowerCase())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
// Check folder status without opening it (more efficient)
|
||||
const status = await client.status(folder, { unseen: true });
|
||||
|
||||
if (status && typeof status.unseen === 'number') {
|
||||
// Store the unread count
|
||||
unreadCounts[accountId][folder] = status.unseen;
|
||||
|
||||
// Also store with prefixed version for consistency
|
||||
unreadCounts[accountId][`${accountId}:${folder}`] = status.unseen;
|
||||
|
||||
console.log(`[UNREAD_API] Account ${accountId}, folder ${folder}: ${status.unseen} unread`);
|
||||
}
|
||||
} catch (folderError) {
|
||||
console.error(`[UNREAD_API] Error getting unread count for ${accountId}:${folder}:`, folderError);
|
||||
// Continue to next folder even if this one fails
|
||||
}
|
||||
}
|
||||
|
||||
// Don't close the connection - let the connection pool handle it
|
||||
} catch (accountError) {
|
||||
console.error(`[UNREAD_API] Error processing account ${accountId}:`, accountError);
|
||||
}
|
||||
}
|
||||
|
||||
return unreadCounts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to get all account IDs for a user
|
||||
*/
|
||||
|
||||
@ -625,19 +625,44 @@ export const useEmailState = () => {
|
||||
// Skip fetching if an email was viewed recently (within last 5 seconds)
|
||||
const now = Date.now();
|
||||
const lastViewedTimestamp = (window as any).__lastViewedEmailTimestamp || 0;
|
||||
if (lastViewedTimestamp && now - lastViewedTimestamp < 5000) { // Increased from 2000ms for better performance
|
||||
if (lastViewedTimestamp && now - lastViewedTimestamp < 5000) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Reset failure tracking if it's been more than 1 minute since last failure
|
||||
if ((window as any).__unreadCountFailures && now - (window as any).__unreadCountFailures > 60000) {
|
||||
(window as any).__unreadCountFailures = 0;
|
||||
// Try to get from sessionStorage first for faster response
|
||||
try {
|
||||
const storageKey = `unread_counts_${session.user.id}`;
|
||||
const storedData = sessionStorage.getItem(storageKey);
|
||||
|
||||
if (storedData) {
|
||||
const { data, timestamp } = JSON.parse(storedData);
|
||||
// Use stored data if it's less than 30 seconds old
|
||||
if (now - timestamp < 30000) {
|
||||
logEmailOp('FETCH_UNREAD', 'Using sessionStorage data', { age: Math.round((now - timestamp)/1000) + 's' });
|
||||
dispatch({ type: 'SET_UNREAD_COUNTS', payload: data });
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
// Ignore storage errors
|
||||
}
|
||||
|
||||
// Exponential backoff for failures
|
||||
if ((window as any).__unreadCountFailures > 0) {
|
||||
const backoffMs = Math.min(30000, 1000 * Math.pow(2, (window as any).__unreadCountFailures - 1));
|
||||
if ((window as any).__unreadCountFailures && now - (window as any).__unreadCountFailures < backoffMs) {
|
||||
// Reset failure tracking if it's been more than 1 minute since last failure
|
||||
if ((window as any).__unreadCountFailures?.lastFailureTime &&
|
||||
now - (window as any).__unreadCountFailures.lastFailureTime > 60000) {
|
||||
(window as any).__unreadCountFailures = { count: 0, lastFailureTime: 0 };
|
||||
}
|
||||
|
||||
// Exponential backoff for failures with proper tracking object
|
||||
if (!(window as any).__unreadCountFailures) {
|
||||
(window as any).__unreadCountFailures = { count: 0, lastFailureTime: 0 };
|
||||
}
|
||||
|
||||
if ((window as any).__unreadCountFailures.count > 0) {
|
||||
const failures = (window as any).__unreadCountFailures.count;
|
||||
const backoffMs = Math.min(30000, 1000 * Math.pow(2, failures - 1));
|
||||
if (now - (window as any).__unreadCountFailures.lastFailureTime < backoffMs) {
|
||||
logEmailOp('BACKOFF', `Skipping unread fetch, in backoff period (${backoffMs}ms)`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -650,13 +675,19 @@ export const useEmailState = () => {
|
||||
|
||||
const response = await fetch('/api/courrier/unread-counts', {
|
||||
method: 'GET',
|
||||
headers: { 'Content-Type': 'application/json' }
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
// Add cache control headers
|
||||
cache: 'no-cache',
|
||||
next: { revalidate: 0 }
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
// If request failed, increment failure count but cap it
|
||||
(window as any).__unreadCountFailures = Math.min((window as any).__unreadCountFailures || 0 + 1, 10);
|
||||
const failures = (window as any).__unreadCountFailures;
|
||||
// If request failed, track failures properly
|
||||
(window as any).__unreadCountFailures.count =
|
||||
Math.min((window as any).__unreadCountFailures.count + 1, 10);
|
||||
(window as any).__unreadCountFailures.lastFailureTime = now;
|
||||
|
||||
const failures = (window as any).__unreadCountFailures.count;
|
||||
|
||||
if (failures > 3) {
|
||||
// After 3 failures, slow down requests with exponential backoff
|
||||
@ -676,14 +707,40 @@ export const useEmailState = () => {
|
||||
}
|
||||
} else {
|
||||
// Reset failure counter on success
|
||||
(window as any).__unreadCountFailures = 0;
|
||||
(window as any).__unreadCountFailures = { count: 0, lastFailureTime: 0 };
|
||||
|
||||
const data = await response.json();
|
||||
const timeAfterCall = performance.now();
|
||||
logEmailOp('FETCH_UNREAD', `Received unread counts in ${(timeAfterCall - timeBeforeCall).toFixed(2)}ms`, data);
|
||||
|
||||
// Skip if we got the "pending_refresh" status
|
||||
if (data._status === 'pending_refresh') {
|
||||
logEmailOp('FETCH_UNREAD', 'Server is refreshing counts, will try again soon');
|
||||
|
||||
// Retry after a short delay
|
||||
setTimeout(() => {
|
||||
fetchUnreadCounts();
|
||||
}, 2000);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
logEmailOp('FETCH_UNREAD', `Received unread counts in ${(timeAfterCall - timeBeforeCall).toFixed(2)}ms`);
|
||||
|
||||
if (data && typeof data === 'object') {
|
||||
dispatch({ type: 'SET_UNREAD_COUNTS', payload: data });
|
||||
|
||||
// Store in sessionStorage for faster future access
|
||||
try {
|
||||
sessionStorage.setItem(
|
||||
`unread_counts_${session.user.id}`,
|
||||
JSON.stringify({
|
||||
data,
|
||||
timestamp: now
|
||||
})
|
||||
);
|
||||
} catch (err) {
|
||||
// Ignore storage errors
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
@ -705,14 +762,13 @@ export const useEmailState = () => {
|
||||
|
||||
const now = Date.now();
|
||||
const lastUpdate = (window as any).__lastUnreadUpdate;
|
||||
const MIN_UPDATE_INTERVAL = 2000; // 2 seconds minimum between updates
|
||||
const MIN_UPDATE_INTERVAL = 10000; // 10 seconds minimum between updates (increased from 2s)
|
||||
|
||||
if (now - lastUpdate.timestamp < MIN_UPDATE_INTERVAL) {
|
||||
return; // Skip if updated too recently
|
||||
}
|
||||
|
||||
// Rather than calculating locally, let's fetch from the API
|
||||
// This ensures we get accurate server-side counts
|
||||
// Rather than calculating locally, fetch from the API
|
||||
fetchUnreadCounts();
|
||||
|
||||
// Update timestamp of last update
|
||||
@ -726,7 +782,7 @@ export const useEmailState = () => {
|
||||
// Debounce unread count updates to prevent rapid multiple updates
|
||||
let updateTimeoutId: ReturnType<typeof setTimeout>;
|
||||
|
||||
const debounceMs = 2000; // Increase debounce to 2 seconds
|
||||
const debounceMs = 5000; // Increase debounce to 5 seconds (from 2s)
|
||||
|
||||
// Function to call after debounce period
|
||||
const debouncedUpdate = () => {
|
||||
@ -738,9 +794,17 @@ export const useEmailState = () => {
|
||||
// Clear any existing timeout and start a new one
|
||||
debouncedUpdate();
|
||||
|
||||
// Also set up a periodic refresh every minute if the tab is active
|
||||
const periodicRefreshId = setInterval(() => {
|
||||
if (document.visibilityState === 'visible') {
|
||||
updateUnreadCounts();
|
||||
}
|
||||
}, 60000); // 1 minute
|
||||
|
||||
// Cleanup timeout on unmount or state change
|
||||
return () => {
|
||||
clearTimeout(updateTimeoutId);
|
||||
clearInterval(periodicRefreshId);
|
||||
};
|
||||
// Deliberately exclude unreadCountMap to prevent infinite loops
|
||||
}, [state.emails, updateUnreadCounts]);
|
||||
@ -800,12 +864,4 @@ export const useEmailState = () => {
|
||||
fetchUnreadCounts,
|
||||
viewEmail
|
||||
};
|
||||
};
|
||||
|
||||
async function cacheEmails(userId, folder, accountId, page, perPage, emails) {
|
||||
const client = await getRedisClient();
|
||||
const key = `${userId}:${accountId}:${folder}:${page}:${perPage}`;
|
||||
|
||||
// Cache with TTL of 15 minutes (900 seconds)
|
||||
await client.setEx(key, 900, JSON.stringify(emails));
|
||||
}
|
||||
};
|
||||
53
lib/redis.ts
53
lib/redis.ts
@ -3,23 +3,52 @@ import CryptoJS from 'crypto-js';
|
||||
|
||||
// Initialize Redis client
|
||||
let redisClient: Redis | null = null;
|
||||
let isConnecting = false;
|
||||
let connectionAttempts = 0;
|
||||
const MAX_RECONNECT_ATTEMPTS = 5;
|
||||
|
||||
/**
|
||||
* Get a Redis client instance (singleton pattern)
|
||||
* Get a Redis client instance (singleton pattern) with improved connection management
|
||||
*/
|
||||
export function getRedisClient(): Redis {
|
||||
if (redisClient && redisClient.status === 'ready') {
|
||||
return redisClient;
|
||||
}
|
||||
|
||||
if (isConnecting) {
|
||||
// If we're already trying to connect, return the existing client
|
||||
// This prevents multiple simultaneous connection attempts
|
||||
if (redisClient) return redisClient;
|
||||
|
||||
// This is a fallback in case we're connecting but don't have a client yet
|
||||
console.warn('Redis connection in progress, creating temporary client');
|
||||
}
|
||||
|
||||
if (!redisClient) {
|
||||
isConnecting = true;
|
||||
connectionAttempts = 0;
|
||||
|
||||
// Set Redis connection parameters from environment variables only
|
||||
const redisOptions = {
|
||||
host: process.env.REDIS_HOST,
|
||||
port: process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined,
|
||||
password: process.env.REDIS_PASSWORD,
|
||||
retryStrategy: (times: number) => {
|
||||
connectionAttempts = times;
|
||||
if (times > MAX_RECONNECT_ATTEMPTS) {
|
||||
console.error(`Redis connection failed after ${times} attempts, giving up`);
|
||||
return null; // Stop trying to reconnect
|
||||
}
|
||||
const delay = Math.min(times * 100, 5000);
|
||||
console.log(`Redis reconnect attempt ${times}, retrying in ${delay}ms`);
|
||||
return delay;
|
||||
},
|
||||
maxRetriesPerRequest: 5,
|
||||
enableOfflineQueue: true
|
||||
enableOfflineQueue: true,
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
disconnectTimeout: 2000, // 2 seconds
|
||||
keepAlive: 10000, // 10 seconds
|
||||
keyPrefix: '' // No prefix to keep keys clean
|
||||
};
|
||||
|
||||
console.log('Connecting to Redis using environment variables');
|
||||
@ -27,14 +56,34 @@ export function getRedisClient(): Redis {
|
||||
|
||||
redisClient.on('error', (err) => {
|
||||
console.error('Redis connection error:', err);
|
||||
|
||||
// Only set to null if we've exceeded max attempts
|
||||
if (connectionAttempts > MAX_RECONNECT_ATTEMPTS) {
|
||||
console.error('Redis connection failed permanently, will create new client on next request');
|
||||
redisClient = null;
|
||||
isConnecting = false;
|
||||
}
|
||||
});
|
||||
|
||||
redisClient.on('connect', () => {
|
||||
console.log('Successfully connected to Redis');
|
||||
isConnecting = false;
|
||||
connectionAttempts = 0;
|
||||
});
|
||||
|
||||
redisClient.on('reconnecting', () => {
|
||||
console.log('Reconnecting to Redis...');
|
||||
isConnecting = true;
|
||||
});
|
||||
|
||||
redisClient.on('ready', () => {
|
||||
console.log('Redis connection warmed up');
|
||||
isConnecting = false;
|
||||
});
|
||||
|
||||
redisClient.on('end', () => {
|
||||
console.log('Redis connection ended');
|
||||
// Don't set to null here - let the error handler decide
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -31,33 +31,74 @@ export interface EmailListResult {
|
||||
}
|
||||
|
||||
// Connection pool to reuse IMAP clients
|
||||
const connectionPool: Record<string, { client: ImapFlow; lastUsed: number }> = {};
|
||||
const CONNECTION_TIMEOUT = 5 * 60 * 1000; // 5 minutes
|
||||
const connectionPool: Record<string, {
|
||||
client: ImapFlow;
|
||||
lastUsed: number;
|
||||
isConnecting: boolean;
|
||||
connectionPromise?: Promise<ImapFlow>;
|
||||
}> = {};
|
||||
|
||||
const CONNECTION_TIMEOUT = 15 * 60 * 1000; // Increased to 15 minutes for long-lived connections
|
||||
const MAX_POOL_SIZE = 20; // Maximum number of connections to keep in the pool
|
||||
const CONNECTION_CHECK_INTERVAL = 60 * 1000; // Check every minute
|
||||
|
||||
// Clean up idle connections periodically
|
||||
setInterval(() => {
|
||||
const now = Date.now();
|
||||
const connectionKeys = Object.keys(connectionPool);
|
||||
|
||||
Object.entries(connectionPool).forEach(([key, { client, lastUsed }]) => {
|
||||
if (now - lastUsed > CONNECTION_TIMEOUT) {
|
||||
console.log(`Closing idle IMAP connection for ${key}`);
|
||||
// If we're over the pool size limit, sort by last used and remove oldest
|
||||
if (connectionKeys.length > MAX_POOL_SIZE) {
|
||||
const sortedConnections = connectionKeys
|
||||
.map(key => ({ key, lastUsed: connectionPool[key].lastUsed }))
|
||||
.sort((a, b) => a.lastUsed - b.lastUsed);
|
||||
|
||||
// Keep the most recently used connections up to the max pool size
|
||||
const connectionsToRemove = sortedConnections.slice(0, sortedConnections.length - MAX_POOL_SIZE);
|
||||
|
||||
connectionsToRemove.forEach(({ key }) => {
|
||||
const connection = connectionPool[key];
|
||||
try {
|
||||
if (client.usable) {
|
||||
client.logout().catch(err => {
|
||||
console.error(`Error closing connection for ${key}:`, err);
|
||||
if (connection.client.usable) {
|
||||
connection.client.logout().catch(err => {
|
||||
console.error(`Error closing excess connection for ${key}:`, err);
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`Error checking excess connection status for ${key}:`, error);
|
||||
} finally {
|
||||
delete connectionPool[key];
|
||||
console.log(`Removed excess connection for ${key} from pool (pool size: ${Object.keys(connectionPool).length})`);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Close idle connections
|
||||
Object.entries(connectionPool).forEach(([key, connection]) => {
|
||||
// Skip connections that are currently being established
|
||||
if (connection.isConnecting) return;
|
||||
|
||||
if (now - connection.lastUsed > CONNECTION_TIMEOUT) {
|
||||
console.log(`Closing idle IMAP connection for ${key} (idle for ${Math.round((now - connection.lastUsed)/1000)}s)`);
|
||||
try {
|
||||
if (connection.client.usable) {
|
||||
connection.client.logout().catch(err => {
|
||||
console.error(`Error closing idle connection for ${key}:`, err);
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`Error checking connection status for ${key}:`, error);
|
||||
} finally {
|
||||
delete connectionPool[key];
|
||||
console.log(`Removed idle connection for ${key} from pool (pool size: ${Object.keys(connectionPool).length})`);
|
||||
}
|
||||
}
|
||||
});
|
||||
}, 60 * 1000); // Check every minute
|
||||
}, CONNECTION_CHECK_INTERVAL);
|
||||
|
||||
/**
|
||||
* Get IMAP connection for a user, reusing existing connections when possible
|
||||
* with improved connection handling and error recovery
|
||||
*/
|
||||
export async function getImapConnection(
|
||||
userId: string,
|
||||
@ -69,22 +110,87 @@ export async function getImapConnection(
|
||||
if (!accountId || accountId === 'default') {
|
||||
console.log(`No specific account provided or 'default' requested, trying to find first account for user ${userId}`);
|
||||
|
||||
// Query to find all accounts for this user
|
||||
const accounts = await prisma.mailCredentials.findMany({
|
||||
where: { userId },
|
||||
orderBy: { createdAt: 'asc' },
|
||||
take: 1
|
||||
});
|
||||
|
||||
if (accounts && accounts.length > 0) {
|
||||
const firstAccount = accounts[0];
|
||||
console.log(`Using first available account: ${firstAccount.id} (${firstAccount.email})`);
|
||||
accountId = firstAccount.id;
|
||||
// Try getting the account ID from cache to avoid database query
|
||||
const sessionData = await getCachedImapSession(userId);
|
||||
if (sessionData && sessionData.defaultAccountId) {
|
||||
accountId = sessionData.defaultAccountId;
|
||||
console.log(`Using cached default account ID: ${accountId}`);
|
||||
} else {
|
||||
throw new Error('No email accounts configured for this user');
|
||||
// Query to find all accounts for this user
|
||||
const accounts = await prisma.mailCredentials.findMany({
|
||||
where: { userId },
|
||||
orderBy: { createdAt: 'asc' },
|
||||
take: 1
|
||||
});
|
||||
|
||||
if (accounts && accounts.length > 0) {
|
||||
const firstAccount = accounts[0];
|
||||
console.log(`Using first available account: ${firstAccount.id} (${firstAccount.email})`);
|
||||
accountId = firstAccount.id;
|
||||
|
||||
// Cache default account ID for future use
|
||||
if (sessionData) {
|
||||
await cacheImapSession(userId, {
|
||||
...sessionData,
|
||||
defaultAccountId: accountId,
|
||||
lastActive: Date.now()
|
||||
});
|
||||
} else {
|
||||
await cacheImapSession(userId, {
|
||||
lastActive: Date.now(),
|
||||
defaultAccountId: accountId
|
||||
});
|
||||
}
|
||||
} else {
|
||||
throw new Error('No email accounts configured for this user');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Use accountId in connection key to ensure different accounts get different connections
|
||||
const connectionKey = `${userId}:${accountId}`;
|
||||
|
||||
// If we already have a connection for this key
|
||||
if (connectionPool[connectionKey]) {
|
||||
const connection = connectionPool[connectionKey];
|
||||
|
||||
// If a connection is being established, wait for it
|
||||
if (connection.isConnecting && connection.connectionPromise) {
|
||||
console.log(`Connection in progress for ${connectionKey}, waiting for existing connection`);
|
||||
try {
|
||||
const client = await connection.connectionPromise;
|
||||
connection.lastUsed = Date.now();
|
||||
return client;
|
||||
} catch (error) {
|
||||
console.error(`Error waiting for connection for ${connectionKey}:`, error);
|
||||
// Fall through to create new connection
|
||||
}
|
||||
}
|
||||
|
||||
// Try to use existing connection if it's usable
|
||||
try {
|
||||
if (connection.client.usable) {
|
||||
// Touch the connection to mark it as recently used
|
||||
connection.lastUsed = Date.now();
|
||||
console.log(`Reusing existing IMAP connection for ${connectionKey}`);
|
||||
|
||||
// Update session data in Redis
|
||||
await updateSessionData(userId, accountId);
|
||||
|
||||
return connection.client;
|
||||
} else {
|
||||
console.log(`Existing connection for ${connectionKey} not usable, recreating`);
|
||||
// Will create a new connection below
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn(`Error checking existing connection for ${connectionKey}:`, error);
|
||||
// Will create a new connection below
|
||||
}
|
||||
}
|
||||
|
||||
// If we get here, we need a new connection
|
||||
console.log(`Creating new IMAP connection for ${connectionKey}`);
|
||||
|
||||
// First try to get credentials from Redis cache
|
||||
let credentials = await getCachedEmailCredentials(userId, accountId);
|
||||
|
||||
@ -111,43 +217,51 @@ export async function getImapConnection(
|
||||
throw new Error('Invalid email credentials configuration');
|
||||
}
|
||||
|
||||
// Use accountId in connection key to ensure different accounts get different connections
|
||||
const connectionKey = `${userId}:${accountId}`;
|
||||
const existingConnection = connectionPool[connectionKey];
|
||||
// Create connection record with connecting state
|
||||
connectionPool[connectionKey] = {
|
||||
client: null as any, // Will be set once connected
|
||||
lastUsed: Date.now(),
|
||||
isConnecting: true
|
||||
};
|
||||
|
||||
// Try to get session data from Redis
|
||||
const sessionData = await getCachedImapSession(userId);
|
||||
// Create the connection promise
|
||||
const connectionPromise = createImapConnection(credentials, connectionKey);
|
||||
connectionPool[connectionKey].connectionPromise = connectionPromise;
|
||||
|
||||
// Return existing connection if available and connected
|
||||
if (existingConnection) {
|
||||
try {
|
||||
if (existingConnection.client.usable) {
|
||||
existingConnection.lastUsed = Date.now();
|
||||
console.log(`Reusing existing IMAP connection for ${connectionKey}`);
|
||||
|
||||
// Update session data in Redis
|
||||
if (sessionData) {
|
||||
await cacheImapSession(userId, {
|
||||
...sessionData,
|
||||
lastActive: Date.now()
|
||||
});
|
||||
}
|
||||
|
||||
return existingConnection.client;
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn(`Existing connection for ${connectionKey} is not usable, creating new connection`);
|
||||
// Will create a new connection below
|
||||
}
|
||||
try {
|
||||
const client = await connectionPromise;
|
||||
console.log(`Successfully connected to IMAP server for ${connectionKey}`);
|
||||
|
||||
// Update connection record
|
||||
connectionPool[connectionKey] = {
|
||||
client,
|
||||
lastUsed: Date.now(),
|
||||
isConnecting: false
|
||||
};
|
||||
|
||||
// Update session data in Redis
|
||||
await updateSessionData(userId, accountId);
|
||||
|
||||
return client;
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
|
||||
console.error(`IMAP connection error for ${connectionKey}:`, errorMessage);
|
||||
|
||||
// Clean up failed connection
|
||||
delete connectionPool[connectionKey];
|
||||
|
||||
throw new Error(`Failed to connect to IMAP server: ${errorMessage}`);
|
||||
}
|
||||
|
||||
console.log(`Creating new IMAP connection for ${connectionKey}`);
|
||||
|
||||
// Create new connection
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to create a new IMAP connection
|
||||
*/
|
||||
async function createImapConnection(credentials: EmailCredentials, connectionKey: string): Promise<ImapFlow> {
|
||||
const client = new ImapFlow({
|
||||
host: credentials.host,
|
||||
port: credentials.port,
|
||||
secure: true,
|
||||
secure: credentials.secure ?? true,
|
||||
auth: {
|
||||
user: credentials.email,
|
||||
pass: credentials.password,
|
||||
@ -156,24 +270,45 @@ export async function getImapConnection(
|
||||
emitLogs: false,
|
||||
tls: {
|
||||
rejectUnauthorized: false
|
||||
},
|
||||
// Connection timeout settings
|
||||
disableAutoIdle: false, // Keep idle to auto-refresh connection
|
||||
idleTimeout: 60000, // 1 minute
|
||||
idleRefreshTimeout: 30000, // 30 seconds
|
||||
idleRefreshIntervalMs: 30 * 1000, // 30 seconds
|
||||
});
|
||||
|
||||
await client.connect();
|
||||
|
||||
// Add error handler
|
||||
client.on('error', (err) => {
|
||||
console.error(`IMAP connection error for ${connectionKey}:`, err);
|
||||
// Remove from pool on error
|
||||
if (connectionPool[connectionKey]) {
|
||||
delete connectionPool[connectionKey];
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
await client.connect();
|
||||
console.log(`Successfully connected to IMAP server for ${connectionKey}`);
|
||||
|
||||
// Store in connection pool
|
||||
connectionPool[connectionKey] = {
|
||||
client,
|
||||
lastUsed: Date.now()
|
||||
};
|
||||
|
||||
return client;
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
|
||||
console.error(`IMAP connection error for ${connectionKey}:`, errorMessage);
|
||||
throw new Error(`Failed to connect to IMAP server: ${errorMessage}`);
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update session data in Redis
|
||||
*/
|
||||
async function updateSessionData(userId: string, accountId?: string): Promise<void> {
|
||||
const sessionData = await getCachedImapSession(userId);
|
||||
|
||||
if (sessionData) {
|
||||
await cacheImapSession(userId, {
|
||||
...sessionData,
|
||||
lastActive: Date.now(),
|
||||
...(accountId && { lastAccountId: accountId })
|
||||
});
|
||||
} else {
|
||||
await cacheImapSession(userId, {
|
||||
lastActive: Date.now(),
|
||||
...(accountId && { lastAccountId: accountId })
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user