const NodeSpeaker = require('../libs/speaker/index.js'); const EventEmitter = require('events'); const { Readable } = require('stream'); class StreamBuffer extends EventEmitter { constructor(stream, settings) { super(); this.size = settings.size; this.times = { start: Date.now() }; this.playback = { paused: false, hiccups: 0 }; this.audiosettings = settings?.audio; this.#setupBuffer(settings?.threshold); this.#setupStreams(stream); } play() { this.#writeBufferedChunk(4096, true); } pause() { this.playback.paused = true; this.threshold.announced = false; this.streams.buffer._readableState.buffer.clear(); this.streams.buffer._readableState.length = 0; } stop() { this.#destroy(); } getProgress() { return this.streams.output?.bytesWritten || 0; } getHiccups() { return this.playback.hiccups; } getTimes() { return this.times; } #setupBuffer(threshold) { if (threshold === undefined || isNaN(threshold)) { threshold = 4096; } this.threshold = { value: threshold, announced: false }; this.limit = { value: config?.buffer?.limit * 1048576, announced: false }; if (isNaN(this.limit.value) || this.limit.value < this.threshold.value) { this.limit.value = this.threshold.value; } } #setupStreams(stream) { if (stream === undefined) { return; } this.streams = { input: stream, buffer: new Readable() }; this.streams.buffer._read = () => { }; this.#createOutputStream(); this.#handleBufferStream(); this.#handleInputStream(); this.#handleOutputStream(); } #createOutputStream() { if (this.streams.output !== undefined) { this.streams.output.destroy(); } this.streams.output = new NodeSpeaker({ channels: this.audiosettings?.channels || 2, bitDepth: this.audiosettings?.bitDepth || 16, sampleRate: this.audiosettings?.sampleRate || 44100, }); } #isThresholdReached() { return this.streams.buffer.readableLength >= this.threshold.value; } #isThresholdAnnounced() { return this.threshold.announced === true; } #isLimitReached() { return this.streams.buffer.readableLength >= this.limit.value; } #writeBufferedChunk(chunkSize, resume) { if (resume === true) { this.playback.paused = false; this.emit('play'); } else if (this.playback.paused) { this.emit('pause'); return; } if (chunkSize === undefined || isNaN(chunkSize)) { chunkSize = 4096; } if (this.#isThresholdAnnounced === false) { return; } let data; if (this.streams.buffer.readableLength >= chunkSize) { data = this.streams.buffer.read(chunkSize); } else { data = this.streams.buffer.read(); } if (data === undefined || data == null) { return; } this.streams.output.write(data); } #fillBuffer() { const rebuffered = this.#rebufferChunk(this.threshold.value); if (rebuffered === undefined || this.#isThresholdAnnounced() === true) { return; } if (this.#isThresholdReached()) { this.times.threshold = Date.now(); this.threshold.announced = true; this.emit(constants.THRESHOLD); logger.debug('buffer reached threshold of ' + this.threshold.value + ' bytes after ' + (this.times.threshold - this.times.start) + 'ms...'); return; } this.#fillBuffer(); } #rebufferChunk(chunkSize) { if (chunkSize === undefined || isNaN(chunkSize)) { chunkSize = 4096; } if (this.#isLimitReached()) { return; } let data; if (this.streams.input.readableLength >= chunkSize) { data = this.streams.input.read(chunkSize); } else { data = this.streams.input.read(); } if (data === undefined || data === null) { return; } this.streams.buffer.push(data); return data.length; } #handleBufferStream() { if (this.streams.buffer === undefined) { return; } this.streams.buffer.on('error', (error) => { logger.error('buffer stream encountered an error: ' + error); }); } #handleInputStream() { if (this.streams.input === undefined) { return; } this.streams.input.on('error', (error) => { logger.error('input stream encountered an error: ' + error); }); this.streams.input.on('readable', () => { this.#fillBuffer(); }); this.streams.input.on('close', () => { this.times.transmitted = Date.now(); logger.debug('input stream closed, transmitting file took ' + (this.times.transmitted - this.times.start) + 'ms'); }); } #handleOutputStream() { if (this.streams.output === undefined) { return; } this.streams.output.on('error', (error) => { logger.error('output stream encountered an error: ' + error); this.#createOutputStream(); }); this.streams.output.on('written', (bytes) => { this.#writeBufferedChunk(bytes); this.#rebufferChunk(bytes); }); this.streams.output.on('drain', () => { this.times.drained = Date.now(); logger.debug('output stream is drained, wrote ' + this.streams.output.bytesWritten + '/' + this.size + ' bytes to the speaker after ' + (this.times.drained - this.times.start) + 'ms'); if (this.streams.output.bytesWritten < this.size) { return; } this.#destroy(); }); this.streams.output.on('hiccup', () => { this.playback.hiccups++; logger.warn('hiccup ' + this.playback.hiccups + ' detected...'); }); } #destroy() { for (const key in this.streams) { const stream = this.streams[key]; if (stream.destroyed === true) { continue; } stream.destroy(); } this.times.stop = Date.now(); this.emit('close'); } } module.exports = StreamBuffer;