NeahOpti/node_modules/imapflow/lib/handler/imap-stream.js
2025-04-22 12:49:37 +02:00

237 lines
7.3 KiB
JavaScript

'use strict';
const Transform = require('stream').Transform;
const logger = require('../logger');
const LINE = 0x01;
const LITERAL = 0x02;
const LF = 0x0a;
const CR = 0x0d;
const NUM_0 = 0x30;
const NUM_9 = 0x39;
const CURLY_OPEN = 0x7b;
const CURLY_CLOSE = 0x7d;
class ImapStream extends Transform {
constructor(options) {
super({
//writableHighWaterMark: 3,
readableObjectMode: true,
writableObjectMode: false
});
this.options = options || {};
this.cid = this.options.cid;
this.log =
this.options.logger && typeof this.options.logger === 'object'
? this.options.logger
: logger.child({
component: 'imap-connection',
cid: this.cid
});
this.readBytesCounter = 0;
this.state = LINE;
this.literalWaiting = 0;
this.inputBuffer = []; // lines
this.lineBuffer = []; // current line
this.literalBuffer = [];
this.literals = [];
this.compress = false;
this.secureConnection = this.options.secureConnection;
this.processingInput = false;
this.inputQueue = []; // unprocessed input chunks
}
checkLiteralMarker(line) {
if (!line || !line.length) {
return false;
}
let pos = line.length - 1;
if (line[pos] === LF) {
pos--;
} else {
return false;
}
if (pos >= 0 && line[pos] === CR) {
pos--;
}
if (pos < 0) {
return false;
}
if (!pos || line[pos] !== CURLY_CLOSE) {
return false;
}
pos--;
let numBytes = [];
for (; pos > 0; pos--) {
let c = line[pos];
if (c >= NUM_0 && c <= NUM_9) {
numBytes.unshift(c);
continue;
}
if (c === CURLY_OPEN && numBytes.length) {
this.state = LITERAL;
this.literalWaiting = Number(Buffer.from(numBytes).toString());
return true;
}
return false;
}
return false;
}
async processInputChunk(chunk, startPos) {
startPos = startPos || 0;
if (startPos >= chunk.length) {
return;
}
switch (this.state) {
case LINE: {
let lineStart = startPos;
for (let i = startPos, len = chunk.length; i < len; i++) {
if (chunk[i] === LF) {
// line end found
this.lineBuffer.push(chunk.slice(lineStart, i + 1));
lineStart = i + 1;
let line = Buffer.concat(this.lineBuffer);
this.inputBuffer.push(line);
this.lineBuffer = [];
// try to detect if this is a literal start
if (this.checkLiteralMarker(line)) {
// switch into line mode and start over
return await this.processInputChunk(chunk, lineStart);
}
// reached end of command input, emit it
let payload = this.inputBuffer.length === 1 ? this.inputBuffer[0] : Buffer.concat(this.inputBuffer);
let literals = this.literals;
this.inputBuffer = [];
this.literals = [];
if (payload.length) {
// remove final line terminator
let skipBytes = 0;
if (payload.length >= 1 && payload[payload.length - 1] === LF) {
skipBytes++;
if (payload.length >= 2 && payload[payload.length - 2] === CR) {
skipBytes++;
}
}
if (skipBytes) {
payload = payload.slice(0, payload.length - skipBytes);
}
if (payload.length) {
await new Promise(resolve => {
this.push({ payload, literals, next: resolve });
});
}
}
}
}
if (lineStart < chunk.length) {
this.lineBuffer.push(chunk.slice(lineStart));
}
break;
}
case LITERAL: {
// exactly until end of chunk
if (chunk.length === startPos + this.literalWaiting) {
if (!startPos) {
this.literalBuffer.push(chunk);
} else {
this.literalBuffer.push(chunk.slice(startPos));
}
this.literalWaiting -= chunk.length;
this.literals.push(Buffer.concat(this.literalBuffer));
this.literalBuffer = [];
this.state = LINE;
return;
} else if (chunk.length > startPos + this.literalWaiting) {
let partial = chunk.slice(startPos, startPos + this.literalWaiting);
this.literalBuffer.push(partial);
startPos += partial.length;
this.literalWaiting -= partial.length;
this.literals.push(Buffer.concat(this.literalBuffer));
this.literalBuffer = [];
this.state = LINE;
return await this.processInputChunk(chunk, startPos);
} else {
let partial = chunk.slice(startPos);
this.literalBuffer.push(partial);
startPos += partial.length;
this.literalWaiting -= partial.length;
return;
}
}
}
}
async processInput() {
let data;
while ((data = this.inputQueue.shift())) {
await this.processInputChunk(data.chunk);
// mark chunk as processed
data.next();
}
}
_transform(chunk, encoding, next) {
if (typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding);
}
if (!chunk || !chunk.length) {
return next();
}
this.readBytesCounter += chunk.length;
if (this.options.logRaw) {
this.log.trace({
src: 's',
msg: 'read from socket',
data: chunk.toString('base64'),
compress: !!this.compress,
secure: !!this.secureConnection,
cid: this.cid
});
}
if (chunk && chunk.length) {
this.inputQueue.push({ chunk, next });
}
if (!this.processingInput) {
this.processingInput = true;
this.processInput()
.catch(err => this.emit('error', err))
.finally(() => (this.processingInput = false));
}
}
_flush(next) {
next();
}
}
module.exports.ImapStream = ImapStream;