NeahNew/lib/services/email-service.ts
2026-01-07 10:32:42 +01:00

1684 lines
51 KiB
TypeScript

'use server';
import 'server-only';
import { ImapFlow } from 'imapflow';
import nodemailer from 'nodemailer';
import { prisma } from '@/lib/prisma';
import { simpleParser } from 'mailparser';
import {
cacheEmailCredentials,
getCachedEmailCredentials,
cacheEmailList,
getCachedEmailList,
cacheEmailContent,
getCachedEmailContent,
cacheImapSession,
getCachedImapSession,
invalidateFolderCache,
invalidateEmailContentCache
} from '@/lib/redis';
import { EmailCredentials, EmailMessage, EmailAddress, EmailAttachment } from '@/lib/types';
import { ensureFreshToken } from './token-refresh';
import { createXOAuth2Token, refreshAccessToken as refreshMicrosoftAccessToken } from './microsoft-oauth';
import { MailCredentials } from '@prisma/client';
import Redis from 'ioredis';
import { getRedisClient } from '../redis';
import { logger } from '@/lib/logger';
// Define EmailCredentials interface with OAuth properties
interface EmailCredentialsExtended extends EmailCredentials {
useOAuth?: boolean;
accessToken?: string;
refreshToken?: string;
tokenExpiry?: number;
}
// Define the extended MailCredentials type that includes OAuth fields
interface MailCredentialsWithOAuth extends MailCredentials {
useOAuth?: boolean;
accessToken?: string | null;
refreshToken?: string | null;
tokenExpiry?: Date | null;
}
// Types specific to this service
export interface EmailListResult {
emails: EmailMessage[];
totalEmails: number;
page: number;
perPage: number;
totalPages: number;
folder: string;
mailboxes: string[];
newestEmailId: number;
}
// Connection pool to reuse IMAP clients
const connectionPool: Record<string, {
client: ImapFlow;
lastUsed: number;
isConnecting: boolean;
connectionPromise?: Promise<ImapFlow>;
connectionAttempts?: number;
}> = {};
// Track overall connection metrics
let totalConnectionRequests = 0;
let totalNewConnections = 0;
let totalReuseConnections = 0;
let totalConnectionErrors = 0;
let lastMetricsReset = Date.now();
// CRITICAL PERFORMANCE FIX: Increase idle timeout from 15 minutes to 30 minutes
// This will keep connections alive longer and reduce reconnection delays
const CONNECTION_TIMEOUT = 30 * 60 * 1000; // Increased to 30 minutes (was 15 minutes)
const MAX_POOL_SIZE = 20; // Maximum number of connections to keep in the pool
const CONNECTION_CHECK_INTERVAL = 60 * 1000; // Check every minute
const MIN_POOL_SIZE = 2; // Keep at least this many active connections per user
// Clean up idle connections periodically
setInterval(() => {
const now = Date.now();
const connectionKeys = Object.keys(connectionPool);
// If we've been collecting metrics for more than an hour, log and reset
if (now - lastMetricsReset > 60 * 60 * 1000) {
logger.debug('[IMAP METRICS] Summary', {
totalRequests: totalConnectionRequests,
newConnections: totalNewConnections,
reusedConnections: totalReuseConnections,
errors: totalConnectionErrors,
});
totalConnectionRequests = 0;
totalNewConnections = 0;
totalReuseConnections = 0;
totalConnectionErrors = 0;
lastMetricsReset = now;
}
// PERFORMANCE FIX: Group connections by user for better management
const connectionsByUser: Record<string, string[]> = {};
connectionKeys.forEach(key => {
const userId = key.split(':')[0];
if (!connectionsByUser[userId]) {
connectionsByUser[userId] = [];
}
connectionsByUser[userId].push(key);
});
// PERFORMANCE FIX: Manage pool size per user
Object.entries(connectionsByUser).forEach(([userId, userConnections]) => {
// Sort connections by last used (oldest first)
const sortedConnections = userConnections
.map(key => ({ key, lastUsed: connectionPool[key].lastUsed }))
.sort((a, b) => a.lastUsed - b.lastUsed);
// Keep the most recently used connections up to the min pool size
const connectionsToKeep = sortedConnections.slice(-MIN_POOL_SIZE);
const keepKeys = new Set(connectionsToKeep.map(conn => conn.key));
// Check the rest for idle timeout
sortedConnections.forEach(({ key, lastUsed }) => {
// Skip connections to keep and those that are in the process of connecting
if (keepKeys.has(key) || connectionPool[key].isConnecting) {
return;
}
// Only close connections idle for too long
if (now - lastUsed > CONNECTION_TIMEOUT) {
logger.debug('[IMAP] Closing idle connection', {
key,
idleSeconds: Math.round((now - lastUsed) / 1000),
});
try {
if (connectionPool[key].client.usable) {
connectionPool[key].client.logout().catch(err => {
logger.error('[IMAP] Error closing idle connection', {
key,
error: err instanceof Error ? err.message : String(err),
});
});
}
} catch (error) {
logger.error('[IMAP] Error checking connection status', {
key,
error: error instanceof Error ? error.message : String(error),
});
} finally {
delete connectionPool[key];
logger.debug('[IMAP] Removed idle connection from pool', {
key,
poolSize: Object.keys(connectionPool).length,
});
}
}
});
});
// Log connection pool status with more details
const activeCount = connectionKeys.filter(key => {
const conn = connectionPool[key];
return !conn.isConnecting && (conn.client?.usable || false);
}).length;
const connectingCount = connectionKeys.filter(key => connectionPool[key].isConnecting).length;
logger.debug('[IMAP POOL] Status', {
size: connectionKeys.length,
active: activeCount,
connecting: connectingCount,
max: MAX_POOL_SIZE,
});
}, CONNECTION_CHECK_INTERVAL);
/**
* Get IMAP connection for a user, reusing existing connections when possible
* with improved connection handling and error recovery
*/
export async function getImapConnection(
userId: string,
accountId?: string
): Promise<ImapFlow> {
const startTime = Date.now();
totalConnectionRequests++;
logger.debug('[IMAP] getImapConnection called', {
userId,
accountId: accountId ?? 'default',
});
// Special handling for 'default' accountId - find the first available account
if (!accountId || accountId === 'default') {
logger.debug('[IMAP] Resolving default accountId', { userId });
// Try getting the account ID from cache to avoid database query
const sessionData = await getCachedImapSession(userId);
if (sessionData && sessionData.defaultAccountId) {
accountId = sessionData.defaultAccountId;
logger.debug('[IMAP] Using cached default account ID', {
userId,
accountId,
});
} else {
// Query to find all accounts for this user
const accounts = await prisma.mailCredentials.findMany({
where: { userId },
orderBy: { createdAt: 'asc' },
take: 1
});
if (accounts && accounts.length > 0) {
const firstAccount = accounts[0];
logger.debug('[IMAP] Using first available account from DB', {
userId,
accountId: firstAccount.id,
email: firstAccount.email,
});
accountId = firstAccount.id;
// Cache default account ID for future use
if (sessionData) {
await cacheImapSession(userId, {
...sessionData,
defaultAccountId: accountId,
lastActive: Date.now()
});
} else {
await cacheImapSession(userId, {
lastActive: Date.now(),
defaultAccountId: accountId
});
}
} else {
totalConnectionErrors++;
throw new Error('No email accounts configured for this user');
}
}
}
// Use accountId in connection key to ensure different accounts get different connections
const connectionKey = `${userId}:${accountId}`;
// If we already have a connection for this key
if (connectionPool[connectionKey]) {
const connection = connectionPool[connectionKey];
// If a connection is being established, wait for it
if (connection.isConnecting && connection.connectionPromise) {
logger.debug('[IMAP] Waiting for existing connection', { connectionKey });
try {
const client = await connection.connectionPromise;
connection.lastUsed = Date.now();
totalReuseConnections++;
logger.debug('[IMAP] Reused pending connection', {
connectionKey,
durationMs: Date.now() - startTime,
});
return client;
} catch (error) {
logger.error('[IMAP] Error waiting for existing connection', {
connectionKey,
error: error instanceof Error ? error.message : String(error),
});
// Fall through to create new connection
}
}
// Try to use existing connection if it's usable
try {
// PERFORMANCE FIX: More robust connection status checking
if (connection.client && connection.client.usable) {
// Touch the connection to mark it as recently used
connection.lastUsed = Date.now();
logger.debug('[IMAP] Reusing existing connection', { connectionKey });
// Update session data in Redis
await updateSessionData(userId, accountId);
totalReuseConnections++;
logger.debug('[IMAP] Successfully reused connection', {
connectionKey,
durationMs: Date.now() - startTime,
});
return connection.client;
} else {
logger.debug('[IMAP] Existing connection not usable, recreating', {
connectionKey,
});
// Will create a new connection below
}
} catch (error) {
logger.warn('[IMAP] Error checking existing connection', {
connectionKey,
error: error instanceof Error ? error.message : String(error),
});
// Will create a new connection below
}
}
// If we get here, we need a new connection
logger.debug('[IMAP] Creating new connection', { connectionKey });
// First try to get credentials from Redis cache
let credentials = await getCachedEmailCredentials(userId, accountId);
logger.debug('[IMAP] Retrieved credentials from Redis cache', {
userId,
accountId,
found: !!credentials,
});
// If not in cache, get from database and cache them
if (!credentials) {
logger.debug('[IMAP] Credentials not found in cache, querying database', {
userId,
accountId,
});
// Fetch directly from database
const dbCredentials = await prisma.mailCredentials.findFirst({
where: {
AND: [
{ userId },
accountId ? { id: accountId } : {}
]
}
});
if (!dbCredentials) {
logger.error('[IMAP] No credentials found for user', {
userId,
accountId,
});
totalConnectionErrors++;
throw new Error('Email account credentials not found');
}
logger.debug('[IMAP] Database lookup returned credentials', {
userId,
accountId,
email: dbCredentials.email,
hasPassword: !!dbCredentials.password,
});
// Create our credentials object from database data
// Include OAuth tokens from database if available
credentials = {
email: dbCredentials.email,
password: dbCredentials.password || '',
host: dbCredentials.host,
port: dbCredentials.port,
secure: dbCredentials.secure,
smtp_host: dbCredentials.smtp_host || undefined,
smtp_port: dbCredentials.smtp_port || undefined,
smtp_secure: dbCredentials.smtp_secure ?? false,
display_name: dbCredentials.display_name || undefined,
color: dbCredentials.color || undefined,
// Include OAuth fields from database
useOAuth: dbCredentials.use_oauth || false,
accessToken: dbCredentials.access_token || undefined,
refreshToken: dbCredentials.refresh_token || undefined,
tokenExpiry: dbCredentials.token_expiry ? dbCredentials.token_expiry.getTime() : undefined
};
}
// Cast to extended type
const extendedCreds = credentials as EmailCredentialsExtended;
// MICROSOFT FIX: Detect Microsoft accounts by hostname and set OAuth flag
if (extendedCreds.host === 'outlook.office365.com') {
logger.debug('[IMAP] Microsoft account detected, enabling OAuth', {
email: extendedCreds.email,
});
extendedCreds.useOAuth = true;
// If we have no password but useOAuth is true, we need to make sure refresh token exists in Redis
if (!extendedCreds.password && !extendedCreds.accessToken) {
// If running in browser edge environment (serverless), try to refresh our tokens from Redis
try {
const cachedCreds = await getCachedEmailCredentials(userId, accountId);
if (cachedCreds && cachedCreds.refreshToken) {
logger.debug('[IMAP] Found refresh token in Redis for account', {
email: extendedCreds.email,
});
extendedCreds.refreshToken = cachedCreds.refreshToken;
extendedCreds.accessToken = cachedCreds.accessToken;
extendedCreds.tokenExpiry = cachedCreds.tokenExpiry;
// Make sure we cache these credentials again with the tokens
await cacheEmailCredentials(userId, accountId, extendedCreds);
} else {
logger.warn('[IMAP] No refresh token found in Redis cache', {
email: extendedCreds.email,
});
}
} catch (err) {
logger.error('[IMAP] Error retrieving cached credentials', {
email: extendedCreds.email,
error: err instanceof Error ? err.message : String(err),
});
}
}
}
// If using OAuth, ensure we have a fresh token
if (extendedCreds.useOAuth) {
logger.debug('[IMAP] Account configured to use OAuth', {
email: extendedCreds.email,
});
if (!extendedCreds.accessToken) {
logger.error('[IMAP] OAuth enabled but no access token', {
email: extendedCreds.email,
});
}
try {
logger.debug('[IMAP] Ensuring fresh token for OAuth account', {
email: extendedCreds.email,
});
const { accessToken, success } = await ensureFreshToken(userId, extendedCreds.email);
if (success && accessToken) {
extendedCreds.accessToken = accessToken;
logger.debug('[IMAP] Successfully refreshed token', {
email: extendedCreds.email,
});
} else {
logger.error('[IMAP] Failed to refresh token', {
email: extendedCreds.email,
});
}
} catch (err) {
logger.error('[IMAP] Error refreshing token', {
email: extendedCreds.email,
error: err instanceof Error ? err.message : String(err),
});
}
}
// Initialize connection tracking
connectionPool[connectionKey] = {
client: null as any,
lastUsed: Date.now(),
isConnecting: true,
connectionAttempts: (connectionPool[connectionKey]?.connectionAttempts || 0) + 1
};
// PERFORMANCE FIX: Add connection timeout to prevent hanging connections
let connectionTimeout: NodeJS.Timeout | null = setTimeout(() => {
logger.error('[IMAP] Connection timed out', {
connectionKey,
timeoutMs: 60000,
});
if (connectionPool[connectionKey]?.isConnecting) {
delete connectionPool[connectionKey];
totalConnectionErrors++;
}
}, 60 * 1000); // 60 seconds timeout
// Create connection promise using the extended credentials
const connectionPromise = createImapConnection(extendedCreds, connectionKey)
.then(client => {
// Update connection pool entry
connectionPool[connectionKey].client = client;
connectionPool[connectionKey].isConnecting = false;
connectionPool[connectionKey].lastUsed = Date.now();
// Clear timeout since connection was successful
if (connectionTimeout) {
clearTimeout(connectionTimeout);
connectionTimeout = null;
}
// Update session data
updateSessionData(userId, accountId).catch(err => {
logger.error('[IMAP] Failed to update session data', {
error: err instanceof Error ? err.message : String(err),
});
});
totalNewConnections++;
logger.debug('[IMAP] Created new connection', {
connectionKey,
durationMs: Date.now() - startTime,
attempts: connectionPool[connectionKey].connectionAttempts,
});
return client;
})
.catch(error => {
// Clear timeout to prevent double errors
if (connectionTimeout) {
clearTimeout(connectionTimeout);
connectionTimeout = null;
}
// Handle connection error
logger.error('[IMAP] Failed to create connection', {
connectionKey,
error: error instanceof Error ? error.message : String(error),
});
delete connectionPool[connectionKey];
totalConnectionErrors++;
throw error;
});
// Save the promise to allow other requests to wait for this connection
connectionPool[connectionKey].connectionPromise = connectionPromise;
return connectionPromise;
}
/**
* Helper function to create a new IMAP connection
*/
async function createImapConnection(credentials: EmailCredentials, connectionKey: string): Promise<ImapFlow> {
// Cast to extended type
const extendedCreds = credentials as EmailCredentialsExtended;
logger.debug('[IMAP] Creating ImapFlow client with credentials metadata', {
email: extendedCreds.email,
host: extendedCreds.host,
port: extendedCreds.port,
hasPassword: !!extendedCreds.password,
useOAuth: !!extendedCreds.useOAuth,
hasAccessToken: !!extendedCreds.accessToken,
hasRefreshToken: !!extendedCreds.refreshToken,
hasTokenExpiry: !!extendedCreds.tokenExpiry,
});
let authParams: any;
// Check if we have valid OAuth tokens
if (extendedCreds.useOAuth && extendedCreds.accessToken) {
logger.debug('[IMAP] Using XOAUTH2 authentication', { connectionKey });
// Set auth parameters for ImapFlow
authParams = {
user: extendedCreds.email,
accessToken: extendedCreds.accessToken
};
logger.debug('[IMAP] XOAUTH2 auth configured', { connectionKey });
} else if (extendedCreds.password) {
// Use regular password authentication
logger.debug('[IMAP] Using password authentication', { connectionKey });
authParams = {
user: extendedCreds.email,
pass: extendedCreds.password
};
} else {
// No authentication method available
logger.error('[IMAP] No authentication method found for connection', {
connectionKey,
hasPassword: !!extendedCreds.password,
useOAuth: !!extendedCreds.useOAuth,
hasAccessToken: !!extendedCreds.accessToken,
});
throw new Error(`No authentication method available for ${connectionKey} - need either password or OAuth token`);
}
logger.debug('[IMAP] Creating ImapFlow client', {
connectionKey,
authType: extendedCreds.useOAuth ? 'OAuth' : 'Password',
});
const client = new ImapFlow({
host: extendedCreds.host,
port: extendedCreds.port,
secure: extendedCreds.secure ?? true,
auth: authParams,
logger: false,
emitLogs: false,
tls: {
rejectUnauthorized: false
},
disableAutoIdle: false
});
try {
logger.debug('[IMAP] Connecting to server', {
host: extendedCreds.host,
port: extendedCreds.port,
});
await client.connect();
logger.debug('[IMAP] Connected to server', { connectionKey });
} catch (error) {
logger.error('[IMAP] Failed to connect to server', {
connectionKey,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
// Add error handler
client.on('error', (err) => {
logger.error('[IMAP] Connection error', {
connectionKey,
error: err instanceof Error ? err.message : String(err),
});
// Remove from pool on error
if (connectionPool[connectionKey]) {
delete connectionPool[connectionKey];
}
});
return client;
}
/**
* Update session data in Redis
*/
async function updateSessionData(userId: string, accountId?: string): Promise<void> {
const sessionData = await getCachedImapSession(userId);
if (sessionData) {
await cacheImapSession(userId, {
...sessionData,
lastActive: Date.now(),
...(accountId && { defaultAccountId: accountId })
});
} else {
await cacheImapSession(userId, {
lastActive: Date.now(),
...(accountId && { defaultAccountId: accountId })
});
}
}
/**
* Get user's email credentials from database
*/
export async function getUserEmailCredentials(userId: string, accountId?: string): Promise<EmailCredentials | null> {
const credentials = await prisma.mailCredentials.findFirst({
where: {
AND: [
{ userId },
accountId ? { id: accountId } : {}
]
}
});
if (!credentials) return null;
const mailCredentials = credentials as unknown as {
email: string;
password: string;
host: string;
port: number;
secure: boolean;
smtp_host: string | null;
smtp_port: number | null;
smtp_secure: boolean | null;
display_name: string | null;
color: string | null;
};
return {
email: mailCredentials.email,
password: mailCredentials.password,
host: mailCredentials.host,
port: mailCredentials.port,
secure: mailCredentials.secure,
smtp_host: mailCredentials.smtp_host || undefined,
smtp_port: mailCredentials.smtp_port || undefined,
smtp_secure: mailCredentials.smtp_secure ?? false,
display_name: mailCredentials.display_name || undefined,
color: mailCredentials.color || undefined
};
}
/**
* Save or update user's email credentials
*/
export async function saveUserEmailCredentials(
userId: string,
accountId: string,
credentials: EmailCredentials
): Promise<void> {
logger.debug('[EMAIL] Saving credentials for user', {
userId,
accountId,
});
if (!credentials) {
throw new Error('No credentials provided');
}
// Cast to extended type to access OAuth properties
const extendedCreds = credentials as EmailCredentialsExtended;
// Store OAuth information in a separate object for caching
const oauthData = {
useOAuth: extendedCreds.useOAuth,
accessToken: extendedCreds.accessToken,
refreshToken: extendedCreds.refreshToken,
tokenExpiry: extendedCreds.tokenExpiry
};
// Extract fields for database schema
// OAuth tokens are now persisted to Prisma for long-term storage
const dbCredentials: any = {
email: credentials.email,
password: credentials.password ?? '', // Required field in the DB schema
host: credentials.host,
port: credentials.port,
secure: credentials.secure ?? true,
smtp_host: credentials.smtp_host || null,
smtp_port: credentials.smtp_port || null,
smtp_secure: credentials.smtp_secure ?? false,
display_name: credentials.display_name || null,
color: credentials.color || null,
// Persist OAuth tokens to database for long-term storage
use_oauth: oauthData.useOAuth || false,
refresh_token: oauthData.refreshToken || null,
access_token: oauthData.accessToken || null,
token_expiry: oauthData.tokenExpiry ? new Date(oauthData.tokenExpiry) : null
};
try {
logger.debug('[EMAIL] Saving credentials to database', {
userId,
accountId,
email: dbCredentials.email,
hasPassword: !!dbCredentials.password,
hasOAuth: !!oauthData.useOAuth,
hasAccessToken: !!oauthData.accessToken,
hasRefreshToken: !!oauthData.refreshToken,
});
// Save to database using the unique constraint on [userId, email]
await prisma.mailCredentials.upsert({
where: {
id: await prisma.mailCredentials.findFirst({
where: {
AND: [
{ userId },
{ email: accountId }
]
},
select: { id: true }
}).then(result => result?.id ?? '')
},
update: dbCredentials,
create: {
userId,
...dbCredentials
}
});
// Create a combined credentials object for caching
const fullCreds = {
...dbCredentials,
...oauthData
} as EmailCredentialsExtended; // Cast to the expected type
// Cache the full credentials including OAuth tokens
await cacheEmailCredentials(userId, accountId, fullCreds);
logger.debug('[EMAIL] Saved credentials and cached full data', {
userId,
accountId,
});
} catch (error) {
logger.error('[EMAIL] Error saving credentials', {
userId,
accountId,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
// Helper type for IMAP fetch options
interface FetchOptions {
envelope: boolean;
flags: boolean;
bodyStructure: boolean;
internalDate: boolean;
size: boolean;
bodyParts: { part: string; query: any; limit?: number }[];
}
/**
* Get list of emails for a user
*/
export async function getEmails(
userId: string,
folder: string,
page: number = 1,
perPage: number = 20,
accountId?: string,
checkOnly: boolean = false
): Promise<EmailListResult> {
// Normalize folder name and handle account ID
logger.debug('[EMAIL] getEmails called', {
userId,
folder,
page,
perPage,
accountId: accountId || 'default',
checkOnly,
});
try {
// The getImapConnection function already handles 'default' accountId by finding the first available account
const client = await getImapConnection(userId, accountId);
// At this point, accountId has been resolved to an actual account ID by getImapConnection
// Store the resolved accountId in a variable that is guaranteed to be a string
const resolvedAccountId = accountId || 'default';
// Attempt to select the mailbox
try {
const mailboxInfo = await client.mailboxOpen(folder);
logger.debug('[EMAIL] Opened mailbox', {
folder,
totalMessages: mailboxInfo.exists,
});
// Get list of all mailboxes for UI
const mailboxes = await getMailboxes(client, resolvedAccountId);
// Calculate pagination
const totalEmails = mailboxInfo.exists || 0;
const totalPages = Math.ceil(totalEmails / perPage);
// Check if mailbox is empty
if (totalEmails === 0) {
// Cache the empty result
const emptyResult = {
emails: [],
totalEmails: 0,
page,
perPage,
totalPages: 0,
folder,
mailboxes,
newestEmailId: 0
};
// Only cache if not in checkOnly mode
if (!checkOnly) {
await cacheEmailList(
userId,
resolvedAccountId, // Use the guaranteed string account ID
folder,
page,
perPage,
emptyResult
);
}
return emptyResult;
}
// If checkOnly mode, we just fetch the most recent email's ID to compare
if (checkOnly) {
logger.debug('[EMAIL] getEmails in checkOnly mode', {
folder,
accountId: resolvedAccountId,
});
// Get the most recent message (highest sequence number)
const lastMessageSequence = totalEmails.toString();
const messages = await client.fetch(lastMessageSequence, {
uid: true
});
let newestEmailId = 0;
for await (const message of messages) {
newestEmailId = message.uid;
}
logger.debug('[EMAIL] Latest email UID', {
folder,
accountId: resolvedAccountId,
newestEmailId,
});
// Return minimal result with just the newest email ID
return {
emails: [],
totalEmails,
page,
perPage,
totalPages,
folder,
mailboxes,
newestEmailId
};
}
// Calculate message range for pagination
const start = Math.max(1, totalEmails - (page * perPage) + 1);
const end = Math.max(1, totalEmails - ((page - 1) * perPage));
logger.debug('[EMAIL] Fetching messages range', {
folder,
accountId: resolvedAccountId,
start,
end,
});
// Fetch messages
const messages = await client.fetch(`${start}:${end}`, {
envelope: true,
flags: true,
bodyStructure: true,
uid: true
});
const emails: EmailMessage[] = [];
let newestEmailId = 0;
for await (const message of messages) {
// Track the newest email ID (highest UID)
if (message.uid > newestEmailId) {
newestEmailId = message.uid;
}
const email: EmailMessage = {
id: message.uid.toString(),
from: message.envelope.from?.map(addr => ({
name: addr.name || '',
address: addr.address || ''
})) || [],
to: message.envelope.to?.map(addr => ({
name: addr.name || '',
address: addr.address || ''
})) || [],
subject: message.envelope.subject || '',
date: message.envelope.date || new Date(),
flags: {
seen: message.flags.has('\\Seen'),
flagged: message.flags.has('\\Flagged'),
answered: message.flags.has('\\Answered'),
draft: message.flags.has('\\Draft'),
deleted: message.flags.has('\\Deleted')
},
size: message.size || 0,
hasAttachments: message.bodyStructure?.childNodes?.some(node => node.disposition === 'attachment') || false,
folder: folder,
contentFetched: false,
accountId: resolvedAccountId,
content: {
text: '',
html: '',
isHtml: false,
direction: 'ltr'
}
};
emails.push(email);
}
// Prepare the result
const result = {
emails,
totalEmails,
page,
perPage,
totalPages: Math.ceil(totalEmails / perPage),
folder,
mailboxes,
newestEmailId
};
// Cache the result with the effective account ID (only if not in checkOnly mode)
if (!checkOnly) {
await cacheEmailList(
userId,
resolvedAccountId, // Use the guaranteed string account ID
folder,
page,
perPage,
result
);
}
return result;
} catch (error) {
logger.error('[EMAIL] Error fetching emails (inner)', {
userId,
folder,
accountId: resolvedAccountId,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
} catch (error) {
logger.error('[EMAIL] Error fetching emails (outer)', {
userId,
folder,
accountId: accountId || 'default',
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
// Map email addresses safely with null checks
function mapAddresses(addresses: any[] | undefined): Array<{ name: string; address: string }> {
if (!addresses || !Array.isArray(addresses)) {
return [];
}
return addresses.map((addr: any) => ({
name: addr.name || addr.address || '',
address: addr.address || ''
}));
}
/**
* Get a single email with full content
*/
export async function getEmailContent(
userId: string,
emailId: string,
folder: string = 'INBOX',
accountId?: string
): Promise<EmailMessage> {
// Validate parameters
if (!userId || !emailId || !folder) {
throw new Error('Missing required parameters');
}
// Validate UID format
if (!/^\d+$/.test(emailId)) {
throw new Error('Invalid email ID format: must be a numeric UID');
}
// Convert to number for IMAP
const numericId = parseInt(emailId, 10);
if (isNaN(numericId)) {
throw new Error('Email ID must be a number');
}
// Extract account ID from folder name if present and none was explicitly provided
const folderAccountId = folder.includes(':') ? folder.split(':')[0] : accountId;
// Use the most specific account ID available
const effectiveAccountId = folderAccountId || accountId || 'default';
// Normalize folder name by removing account prefix if present
const normalizedFolder = folder.includes(':') ? folder.split(':')[1] : folder;
logger.debug('[EMAIL] getEmailContent called', {
userId,
emailId,
folder: normalizedFolder,
accountId: effectiveAccountId,
});
// Use normalized folder name and effective account ID for cache key
const cachedEmail = await getCachedEmailContent(userId, effectiveAccountId, emailId);
if (cachedEmail) {
logger.debug('[EMAIL] Using cached email content', {
userId,
accountId: effectiveAccountId,
emailId,
});
return cachedEmail;
}
logger.debug('[EMAIL] Cache miss for email content, fetching from IMAP', {
userId,
accountId: effectiveAccountId,
emailId,
});
const client = await getImapConnection(userId, effectiveAccountId);
try {
await client.mailboxOpen(normalizedFolder);
// Log connection details with account context
logger.debug('[EMAIL] Fetching email from folder', {
emailId,
folder: normalizedFolder,
accountId: effectiveAccountId,
});
// Open mailbox with error handling
const mailbox = await client.mailboxOpen(normalizedFolder);
if (!mailbox || typeof mailbox === 'boolean') {
throw new Error(`Failed to open mailbox: ${normalizedFolder} for account ${effectiveAccountId}`);
}
// Log mailbox status with account context
logger.debug('[EMAIL] Mailbox opened', {
folder: normalizedFolder,
accountId: effectiveAccountId,
totalMessages: mailbox.exists,
});
// Get the UIDVALIDITY and UIDNEXT values
const uidValidity = mailbox.uidValidity;
const uidNext = mailbox.uidNext;
logger.debug('[EMAIL] Mailbox UID metadata', {
folder: normalizedFolder,
accountId: effectiveAccountId,
uidValidity,
uidNext,
});
// Validate UID exists in mailbox
if (numericId >= uidNext) {
throw new Error(`Email ID ${numericId} is greater than or equal to the highest UID in mailbox (${uidNext}) for account ${effectiveAccountId}`);
}
// First, try to get the sequence number for this UID
const searchResult = await client.search({ uid: numericId.toString() });
if (!searchResult || searchResult.length === 0) {
throw new Error(`Email with UID ${numericId} not found in folder ${normalizedFolder} for account ${effectiveAccountId}`);
}
const sequenceNumber = searchResult[0];
logger.debug('[EMAIL] Found sequence number for UID', {
accountId: effectiveAccountId,
uid: numericId,
sequenceNumber,
});
// Now fetch using the sequence number with error handling
let message;
try {
message = await client.fetchOne(sequenceNumber.toString(), {
source: true,
envelope: true,
flags: true,
size: true
});
} catch (fetchError) {
logger.error('[EMAIL] Error fetching message by sequence', {
sequenceNumber,
error: fetchError instanceof Error ? fetchError.message : String(fetchError),
});
throw new Error(`Failed to fetch email: ${fetchError instanceof Error ? fetchError.message : 'Unknown error'}`);
}
if (!message) {
throw new Error(`Email not found with sequence number ${sequenceNumber} in folder ${normalizedFolder} for account ${effectiveAccountId}`);
}
// Check if message has required fields
if (!message.source || !message.envelope) {
throw new Error(`Invalid email data received: missing source or envelope data`);
}
const { source, envelope, flags, size } = message;
// Validate envelope data
if (!envelope) {
throw new Error('Email envelope data is missing');
}
// Parse the email content, ensuring all styles and structure are preserved
let parsedEmail;
try {
parsedEmail = await simpleParser(source.toString(), {
skipHtmlToText: true,
keepCidLinks: true
});
} catch (parseError) {
logger.error('[EMAIL] Error parsing email content', {
emailId,
error: parseError instanceof Error ? parseError.message : String(parseError),
});
throw new Error(`Failed to parse email content: ${parseError instanceof Error ? parseError.message : 'Unknown error'}`);
}
// Convert flags from Set to boolean checks
const flagsArray = Array.from(flags as Set<string>);
// Preserve the raw HTML exactly as it was in the original email
const rawHtml = parsedEmail.html || '';
const email: EmailMessage = {
id: emailId,
messageId: envelope.messageId,
subject: envelope.subject || "(No Subject)",
from: mapAddresses(envelope.from),
to: mapAddresses(envelope.to),
cc: mapAddresses(envelope.cc),
bcc: mapAddresses(envelope.bcc),
date: envelope.date || new Date(),
flags: {
seen: flagsArray.includes("\\Seen"),
flagged: flagsArray.includes("\\Flagged"),
answered: flagsArray.includes("\\Answered"),
deleted: flagsArray.includes("\\Deleted"),
draft: flagsArray.includes("\\Draft"),
},
hasAttachments: parsedEmail.attachments?.length > 0,
attachments: parsedEmail.attachments?.map(att => ({
filename: att.filename || 'attachment',
contentType: att.contentType,
size: att.size || 0
})),
content: {
text: parsedEmail.text || '',
html: rawHtml || '',
isHtml: !!rawHtml,
direction: 'ltr' // Default to left-to-right
},
folder: normalizedFolder,
contentFetched: true,
size: size || 0,
accountId: effectiveAccountId
};
// Cache the email content with effective account ID
await cacheEmailContent(userId, effectiveAccountId, emailId, email);
return email;
} catch (error) {
logger.error('[EMAIL] Email fetch failed', {
userId,
emailId,
folder: normalizedFolder,
accountId: effectiveAccountId,
error: error instanceof Error ? error.message : 'Unknown error',
});
throw error;
} finally {
try {
await client.mailboxClose();
} catch (error) {
console.error('Error closing mailbox:', error);
}
}
}
/**
* Mark an email as read or unread
*/
export async function markEmailReadStatus(
userId: string,
emailId: string,
isRead: boolean,
folder: string = 'INBOX',
accountId?: string
): Promise<boolean> {
// Extract account ID from folder name if present and none was explicitly provided
const folderAccountId = folder.includes(':') ? folder.split(':')[0] : accountId;
// Use the most specific account ID available
const effectiveAccountId = folderAccountId || accountId || 'default';
// Normalize folder name by removing account prefix if present
const normalizedFolder = folder.includes(':') ? folder.split(':')[1] : folder;
logger.debug('[EMAIL] markEmailReadStatus called', {
userId,
emailId,
isRead,
folder: normalizedFolder,
accountId: effectiveAccountId,
});
const client = await getImapConnection(userId, effectiveAccountId);
try {
await client.mailboxOpen(normalizedFolder);
if (isRead) {
await client.messageFlagsAdd(emailId, ['\\Seen']);
} else {
await client.messageFlagsRemove(emailId, ['\\Seen']);
}
// Invalidate content cache since the flags changed
await invalidateEmailContentCache(userId, effectiveAccountId, emailId);
// Also invalidate folder cache because unread counts may have changed
await invalidateFolderCache(userId, effectiveAccountId, normalizedFolder);
return true;
} catch (error) {
logger.error('[EMAIL] Error marking email read status', {
userId,
emailId,
isRead,
folder: normalizedFolder,
accountId: effectiveAccountId,
error: error instanceof Error ? error.message : String(error),
});
return false;
} finally {
try {
await client.mailboxClose();
} catch (error) {
logger.error('[EMAIL] Error closing mailbox after markEmailReadStatus', {
error: error instanceof Error ? error.message : String(error),
});
}
}
}
/**
* Toggle an email's flagged (starred) status
*/
export async function toggleEmailFlag(
userId: string,
emailId: string,
flagged: boolean,
folder: string = 'INBOX',
accountId?: string
): Promise<boolean> {
// Extract account ID from folder name if present and none was explicitly provided
const folderAccountId = folder.includes(':') ? folder.split(':')[0] : accountId;
// Use the most specific account ID available
const effectiveAccountId = folderAccountId || accountId || 'default';
// Normalize folder name by removing account prefix if present
const normalizedFolder = folder.includes(':') ? folder.split(':')[1] : folder;
logger.debug('[EMAIL] toggleEmailFlag called', {
userId,
emailId,
flagged,
folder: normalizedFolder,
accountId: effectiveAccountId,
});
const client = await getImapConnection(userId, effectiveAccountId);
try {
await client.mailboxOpen(normalizedFolder);
if (flagged) {
await client.messageFlagsAdd(emailId, ['\\Flagged']);
} else {
await client.messageFlagsRemove(emailId, ['\\Flagged']);
}
// Invalidate content cache since the flags changed
await invalidateEmailContentCache(userId, effectiveAccountId, emailId);
return true;
} catch (error) {
logger.error('[EMAIL] Error toggling email flag', {
userId,
emailId,
flagged,
folder: normalizedFolder,
accountId: effectiveAccountId,
error: error instanceof Error ? error.message : String(error),
});
return false;
} finally {
try {
await client.mailboxClose();
} catch (error) {
logger.error('[EMAIL] Error closing mailbox after toggleEmailFlag', {
error: error instanceof Error ? error.message : String(error),
});
}
}
}
// Define EmailContent interface
interface EmailContent {
to: string;
cc?: string;
bcc?: string;
subject: string;
plainText: string;
htmlContent: string;
attachments?: Array<{
filename: string;
content: string;
contentType: string;
}>;
}
export async function sendEmail(
userId: string,
emailData: {
to: string;
cc?: string;
bcc?: string;
subject: string;
body: string;
attachments?: Array<{
name: string;
content: string;
type: string;
}>;
}
): Promise<{ success: boolean; messageId?: string; error?: string }> {
const credentials = await getUserEmailCredentials(userId);
if (!credentials) {
return {
success: false,
error: 'No email credentials found'
};
}
// Cast to extended type
const extendedCreds = credentials as EmailCredentialsExtended;
// Configure SMTP auth based on OAuth or password
const smtpAuth = extendedCreds.useOAuth && extendedCreds.accessToken
? {
type: 'OAuth2',
user: extendedCreds.email,
accessToken: extendedCreds.accessToken
}
: {
user: extendedCreds.email,
pass: extendedCreds.password
};
// Create SMTP transporter with user's SMTP settings
const transporter = nodemailer.createTransport({
host: extendedCreds.smtp_host || 'smtp.infomaniak.com',
port: extendedCreds.smtp_port || 587,
secure: extendedCreds.smtp_secure || false,
auth: smtpAuth,
tls: {
rejectUnauthorized: false
}
} as nodemailer.TransportOptions);
try {
const info = await transporter.sendMail({
from: extendedCreds.email,
to: emailData.to,
cc: emailData.cc,
bcc: emailData.bcc,
subject: emailData.subject,
text: emailData.body,
html: emailData.body,
attachments: emailData.attachments?.map(att => ({
filename: att.name,
content: att.content,
contentType: att.type
})),
});
return {
success: true,
messageId: info.messageId
};
} catch (error) {
console.error('Failed to send email:', error);
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
}
/**
* Get list of mailboxes from an IMAP connection
*/
export async function getMailboxes(client: ImapFlow, accountId?: string): Promise<string[]> {
try {
const mailboxes = await client.list();
// If we have an accountId, prefix the folder names to prevent namespace collisions
if (accountId) {
return mailboxes.map(mailbox => `${accountId}:${mailbox.path}`);
}
// For backward compatibility, return unprefixed names when no accountId
return mailboxes.map(mailbox => mailbox.path);
} catch (error) {
logger.error('[EMAIL] Error fetching mailboxes', {
error: error instanceof Error ? error.message : String(error),
});
// Return empty array on error to avoid showing incorrect folders
return [];
}
}
/**
* Test IMAP and SMTP connections for an email account
*/
export async function testEmailConnection(credentials: EmailCredentials): Promise<{
imap: boolean;
smtp?: boolean;
error?: string;
folders?: string[];
}> {
// Cast to extended type to use OAuth properties
const extendedCreds = credentials as EmailCredentialsExtended;
logger.debug('[EMAIL] Testing connection with credentials metadata', {
email: extendedCreds.email,
host: extendedCreds.host,
port: extendedCreds.port,
hasPassword: !!extendedCreds.password,
useOAuth: !!extendedCreds.useOAuth,
hasAccessToken: !!extendedCreds.accessToken,
hasRefreshToken: !!extendedCreds.refreshToken,
});
// Test IMAP connection
try {
logger.debug('[EMAIL] Testing IMAP connection', {
email: extendedCreds.email,
host: extendedCreds.host,
port: extendedCreds.port,
});
// Configure auth based on whether we're using OAuth or password
let authParams: any;
if (extendedCreds.useOAuth && extendedCreds.accessToken) {
logger.debug('[EMAIL] Using XOAUTH2 authentication mechanism', {
email: extendedCreds.email,
});
// For OAuth, pass the accessToken directly to ImapFlow
authParams = {
user: extendedCreds.email,
accessToken: extendedCreds.accessToken
};
// Log the token length to verify it exists (without value)
logger.debug('[EMAIL] Access token present for test', {
email: extendedCreds.email,
});
} else {
logger.debug('[EMAIL] Using password authentication mechanism', {
email: extendedCreds.email,
});
authParams = {
user: extendedCreds.email,
pass: extendedCreds.password
};
}
const client = new ImapFlow({
host: extendedCreds.host,
port: extendedCreds.port,
secure: extendedCreds.secure ?? true,
auth: authParams,
logger: false,
tls: {
rejectUnauthorized: false
}
});
logger.debug('[EMAIL] Attempting to connect to IMAP server for test', {
email: extendedCreds.email,
});
await client.connect();
logger.debug('[EMAIL] IMAP connection successful! Getting mailboxes...', {
email: extendedCreds.email,
});
const folders = await getMailboxes(client);
await client.logout();
logger.debug('[EMAIL] IMAP connection successful for test', {
email: extendedCreds.email,
folderCount: folders.length,
});
// Test SMTP connection if SMTP settings are provided
let smtpSuccess = false;
if (extendedCreds.smtp_host && extendedCreds.smtp_port) {
try {
logger.debug('[EMAIL] Testing SMTP connection', {
email: extendedCreds.email,
host: extendedCreds.smtp_host,
port: extendedCreds.smtp_port,
});
// Configure SMTP auth based on OAuth or password
const smtpAuth = extendedCreds.useOAuth && extendedCreds.accessToken
? {
type: 'OAuth2',
user: extendedCreds.email,
accessToken: extendedCreds.accessToken
}
: {
user: extendedCreds.email,
pass: extendedCreds.password,
};
const transporter = nodemailer.createTransport({
host: extendedCreds.smtp_host,
port: extendedCreds.smtp_port,
secure: extendedCreds.smtp_secure ?? false,
auth: smtpAuth,
tls: {
rejectUnauthorized: false
}
} as nodemailer.TransportOptions);
await transporter.verify();
logger.debug('[EMAIL] SMTP connection successful', {
email: extendedCreds.email,
});
smtpSuccess = true;
} catch (smtpError) {
logger.error('[EMAIL] SMTP connection failed', {
email: extendedCreds.email,
error: smtpError instanceof Error ? smtpError.message : String(smtpError),
});
return {
imap: true,
smtp: false,
error: `SMTP connection failed: ${smtpError instanceof Error ? smtpError.message : 'Unknown error'}`,
folders
};
}
}
return {
imap: true,
smtp: smtpSuccess,
folders
};
} catch (error) {
logger.error('[EMAIL] IMAP connection failed', {
email: extendedCreds.email,
error: error instanceof Error ? error.message : String(error),
});
return {
imap: false,
error: `IMAP connection failed: ${error instanceof Error ? error.message : 'Unknown error'}`
};
}
}
/**
* Force recaching of user credentials
* This function is used to refresh credentials in Redis cache
*/
export async function forceRecacheUserCredentials(userId: string): Promise<boolean> {
try {
// Get credentials from database
const accounts = await prisma.mailCredentials.findMany({
where: { userId }
});
if (!accounts || accounts.length === 0) {
logger.debug('[EMAIL] No email accounts found for user when recaching credentials', {
userId,
});
return false;
}
// Recache each account's credentials
for (const account of accounts) {
const credentials: EmailCredentials = {
host: account.host,
port: account.port,
email: account.email,
password: account.password || undefined,
secure: account.secure,
smtp_host: account.smtp_host || undefined,
smtp_port: account.smtp_port || undefined,
smtp_secure: account.smtp_secure || undefined,
display_name: account.display_name || undefined,
color: account.color || undefined
};
await cacheEmailCredentials(userId, account.id, credentials);
logger.debug('[EMAIL] Recached credentials for account', {
userId,
accountId: account.id,
});
// Invalidate other caches related to this account
await invalidateFolderCache(userId, account.id, '*');
await invalidateEmailContentCache(userId, account.id, '*');
}
return true;
} catch (error) {
logger.error('[EMAIL] Error recaching credentials for user', {
userId,
error: error instanceof Error ? error.message : String(error),
});
return false;
}
}