const EventEmitter = require('events'); const { Readable, Duplex } = require('stream'); class StreamBuffer extends EventEmitter { constructor(threshold, inputStream, outputStream) { super(); this.#setupBuffer(threshold); this.#setupStreams(inputStream, outputStream); } resume() { this.streams.buffer.resume(); } pause() { this.streams.buffer.pause(); } #setupBuffer(threshold) { this.size = 0; if (threshold === undefined || isNaN(threshold)) { // 64 mb threshold = 67108864; } this.threshold = { value: threshold, announced: false }; this.limit = { value: config?.buffer?.limit, announced: false }; if (isNaN(this.limit.value) || this.limit.value < this.threshold.value) { this.limit.value = this.threshold.value; } } #setupStreams(inputStream, outputStream) { if (inputStream === undefined || outputStream === undefined) { return; } this.streams = { input: inputStream, output: outputStream, buffer: new Readable() }; this.streams.buffer._read = () => { }; this.streams.buffer.buffered = 0; this.streams.buffer.tmp = 0; this.#handleBufferStream(); this.#handleInputStream(); this.#handleOutputStream(); } #isThresholdReached() { return this.streams.buffer.buffered >= this.threshold.value; } #isThresholdAnnounced() { return this.threshold.announced === true; } #isLimitReached() { return this.streams.buffer.buffered >= this.limit.value; } #handleBufferStream() { if (this.streams.buffer === undefined) { return; } this.streams.buffer.pause(); this.streams.buffer.on('data', (data) => { const chunkSize = data.length; const flushed = this.streams.output.write(data); if (flushed !== true) { // logger.warn('backpressure detected...'); this.streams.buffer.tmp = chunkSize; this.streams.buffer.pause(); } else { this.streams.buffer.buffered -= chunkSize; } if (this.#isLimitReached() === false && this.streams.input.isPaused() === true) { // logger.debug('buffer fell below limit of \'' + this.limit.value + '\' bytes, resuming input stream...'); this.streams.input.resume(); } else if (this.#isLimitReached() === true && this.streams.input.isPaused() === false) { // logger.debug('buffer reached limit of \'' + this.limit.value + '\' bytes, pausing input stream...'); this.streams.input.pause(); } }); } #handleInputStream() { if (this.streams.input === undefined) { return; } this.streams.input.on('data', (data) => { this.streams.buffer.buffered += data.length; this.streams.buffer.push(data); if (this.#isThresholdReached() === true && !this.#isThresholdAnnounced() === true) { this.threshold.announced = true; this.emit(constants.BUFFER_THRESHOLD); logger.debug('buffer reached threshold of ' + this.threshold.value + ' bytes'); } if (this.#isLimitReached()) { // logger.debug('buffer reached limit of ' + this.limit.value + ' bytes, pausing input stream...'); this.streams.input.pause(); // this.streams.buffer.resume(); } }); } #handleOutputStream() { if (this.streams.output === undefined) { return; } this.streams.output.on('drain', () => { // logger.warn('SPEAKER DRAINED - RESUMING DUPLEX STREAM'); this.streams.buffer.buffered -= this.streams.buffer.tmp; this.streams.buffer.resume(); if (this.streams.input.isPaused() && this.streams.buffer.buffered < this.limit.value) { // logger.warn('RESUME READ STREAM - BUFFER LIMIT NOT REACHED') this.streams.input.resume(); } }); this.streams.output.on('flush', () => { logger.debug('speaker flushed'); }); this.streams.output.on('close', () => { logger.debug('speaker closed'); }); this.streams.output.on('progress', (progress) => { // logger.warn('SPEAKER PROGRESS: ' + progress); }); this.streams.output.on('hiccup', () => { if (this.hiccups === undefined) { this.hiccups = 1; } else { this.hiccups++; } logger.warn('HICKUP #' + this.hiccups + ' DETECTED'); }); } // #handleTransferStream() { // if (this.streams.transfer === undefined) { // return; // } // this.streams.transfer.pause(); // this.streams.transfer._read = () => { }; // this.streams.transfer.on('data', (data) => { // if (data.length === 65483) { // logger.warn('POSSIBLE ERROR!'); // } // const flushed = this.streams.output.write(data, (error) => { // const derp = this.streams.output; // if (error !== undefined) { // logger.error('FUCK MY LIFE'); // } // }); // if (flushed === false) { // this.streams.transfer.pause(); // } // if (this.streams.transfer.readableLength < this.limit.value && this.streams.input.isPaused()) { // logger.warn('RESUMING READ STREAM - TRANSFER STRAM UNDERCUT LIMIT'); // this.streams.input.resume(); // } // }); // this.streams.transfer.on('close', () => { // logger.debug('transfer stream of stream buffer closed'); // this.streams.transfer.destroy(); // }); // this.streams.transfer.on('error', (error) => { // logger.error('transfer stream of stream buffer encountered an unexpected error: ' + error); // }); // this.streams.transfer.on('resume', () => { // logger.debug('transfer stream of stream buffer is resumed'); // }); // this.streams.transfer.on('pause', () => { // logger.debug('transfer stream of stream buffer is paused'); // }); // } // #handleInputStream() { // if (this.streams.input === undefined) { // return; // } // this.streams.input.on('data', (data) => { // const flushed = this.streams.output.write(data); // if (this.streams.output.writableCorked > 0) { // if (this.streams.output.writableLength >= this.threshold.value && this.threshold.announced === false) { // logger.debug('output stream\'s internal buffer reached threshold of ' + this.limit.value + ' bytes'); // this.emit(constants.BUFFER_THRESHOLD); // } // if (this.streams.output.writableLength >= this.limit.value) { // logger.debug('output stream\'s internal buffer reached limit of ' + this.limit.value + ' bytes, pausing input stream...'); // this.streams.input.pause(); // if (this.limit.announced === false) { // this.emit(constants.BUFFER_LIMIT); // } // } // return; // } // if (flushed === false && !this.streams.input.isPaused()) { // logger.warn('BACKPRESSURE FROM INPUT STREAM - PAUSING INPUT STREAM'); // this.streams.input.pause(); // } // // this.streams.transfer.push(data); // // if (this.streams.transfer.readableLength >= this.threshold.value && this.threshold.announced === false) { // // logger.debug('transfer stream reached threshold of ' + this.limit.value + ' bytes'); // // this.emit(constants.BUFFER_THRESHOLD); // // } // // if (this.streams.transfer.readableLength >= this.limit.value) { // // logger.debug('transfer stream reached limit of ' + this.limit.value + ' bytes, pausing read stream...'); // // this.streams.input.pause(); // // if (this.limit.announced === false) { // // this.emit(constants.BUFFER_LIMIT); // // } // // } // }); // this.streams.input.on('close', () => { // logger.debug('input stream of stream buffer closed'); // this.streams.input.destroy(); // }); // this.streams.input.on('error', (error) => { // logger.error('input stream of stream buffer encountered an unexpected error: ' + error); // }); // this.streams.input.on('resume', () => { // logger.debug('input stream of stream buffer is resumed'); // }); // this.streams.input.on('pause', () => { // logger.debug('input stream of stream buffer is paused'); // }); // } // #handleOutputStream() { // if (this.streams.output === undefined) { // return; // } // this.streams.output.on('drain', () => { // logger.debug('output stream of stream buffer is drained, resuming input stream...'); // // this.streams.input.resume(); // }); // this.streams.output.on('resume', () => { // logger.debug('output stream of stream buffer is resumed'); // }); // this.streams.output.on('pause', () => { // logger.debug('output stream of stream buffer is paused'); // }); // this.streams.output.on('close', () => { // logger.debug('output stream of stream buffer closed'); // }); // this.streams.output.on('error', (error) => { // logger.error('output stream of stream buffer encountered an unexpected error: ' + error); // }); // } } module.exports = StreamBuffer;