223 lines
No EOL
6.5 KiB
JavaScript
223 lines
No EOL
6.5 KiB
JavaScript
const NodeSpeaker = require('../libs/speaker/index.js');
|
|
const EventEmitter = require('events');
|
|
const { Readable } = require('stream');
|
|
|
|
class StreamBuffer extends EventEmitter {
|
|
|
|
constructor(stream, settings) {
|
|
super();
|
|
this.size = settings.size;
|
|
this.times = {
|
|
start: Date.now()
|
|
};
|
|
this.playback = {
|
|
paused: false,
|
|
hiccups: 0
|
|
};
|
|
this.#setupBuffer(settings?.threshold);
|
|
this.#setupStreams(stream, settings?.audio);
|
|
}
|
|
|
|
play() {
|
|
this.#writeBufferedChunk(4096, true);
|
|
}
|
|
|
|
pause() {
|
|
this.playback.paused = true;
|
|
this.threshold.announced = false;
|
|
this.streams.buffer._readableState.buffer.clear();
|
|
this.streams.buffer._readableState.length = 0;
|
|
}
|
|
|
|
stop() {
|
|
this.#destroy();
|
|
}
|
|
|
|
getProgress() {
|
|
return this.streams.output?.bytesWritten || 0;
|
|
}
|
|
|
|
getHiccups() {
|
|
return this.playback.hiccups;
|
|
}
|
|
|
|
getTimes() {
|
|
return this.times;
|
|
}
|
|
|
|
#setupBuffer(threshold) {
|
|
if (threshold === undefined || isNaN(threshold)) {
|
|
threshold = 4096;
|
|
}
|
|
this.threshold = {
|
|
value: threshold,
|
|
announced: false
|
|
};
|
|
this.limit = {
|
|
value: config?.buffer?.limit * 1048576,
|
|
announced: false
|
|
};
|
|
if (isNaN(this.limit.value) || this.limit.value < this.threshold.value) {
|
|
this.limit.value = this.threshold.value;
|
|
}
|
|
}
|
|
|
|
#setupStreams(stream, audioSettings) {
|
|
if (stream === undefined) {
|
|
return;
|
|
}
|
|
this.streams = {
|
|
input: stream,
|
|
output: new NodeSpeaker({
|
|
channels: audioSettings?.channels || 2,
|
|
bitDepth: audioSettings?.bitDepth || 16,
|
|
sampleRate: audioSettings?.sampleRate || 44100,
|
|
}),
|
|
buffer: new Readable()
|
|
};
|
|
this.streams.buffer._read = () => { };
|
|
this.#handleBufferStream();
|
|
this.#handleInputStream();
|
|
this.#handleOutputStream();
|
|
}
|
|
|
|
#isThresholdReached() {
|
|
return this.streams.buffer.readableLength >= this.threshold.value;
|
|
}
|
|
|
|
#isThresholdAnnounced() {
|
|
return this.threshold.announced === true;
|
|
}
|
|
|
|
#isLimitReached() {
|
|
return this.streams.buffer.readableLength >= this.limit.value;
|
|
}
|
|
|
|
#writeBufferedChunk(chunkSize, resume) {
|
|
if (resume === true) {
|
|
this.playback.paused = false;
|
|
this.emit('play');
|
|
} else if (this.playback.paused) {
|
|
this.emit('pause');
|
|
return;
|
|
}
|
|
if (chunkSize === undefined || isNaN(chunkSize)) {
|
|
chunkSize = 4096;
|
|
}
|
|
if (this.#isThresholdAnnounced === false) {
|
|
return;
|
|
}
|
|
let data;
|
|
if (this.streams.buffer.readableLength >= chunkSize) {
|
|
data = this.streams.buffer.read(chunkSize);
|
|
} else {
|
|
data = this.streams.buffer.read();
|
|
}
|
|
if (data === undefined || data == null) {
|
|
return;
|
|
}
|
|
this.streams.output.write(data);
|
|
}
|
|
|
|
#fillBuffer() {
|
|
const rebuffered = this.#rebufferChunk(this.threshold.value);
|
|
if (rebuffered === undefined || this.#isThresholdAnnounced() === true) {
|
|
return;
|
|
}
|
|
if (this.#isThresholdReached()) {
|
|
this.times.threshold = Date.now();
|
|
this.threshold.announced = true;
|
|
this.emit(constants.THRESHOLD);
|
|
logger.debug('buffer reached threshold of ' + this.threshold.value + ' bytes after ' + (this.times.threshold - this.times.start) + 'ms...');
|
|
return;
|
|
}
|
|
this.#fillBuffer();
|
|
}
|
|
|
|
#rebufferChunk(chunkSize) {
|
|
if (chunkSize === undefined || isNaN(chunkSize)) {
|
|
chunkSize = 4096;
|
|
}
|
|
if (this.#isLimitReached()) {
|
|
return;
|
|
}
|
|
let data;
|
|
if (this.streams.input.readableLength >= chunkSize) {
|
|
data = this.streams.input.read(chunkSize);
|
|
} else {
|
|
data = this.streams.input.read();
|
|
}
|
|
if (data === undefined || data === null) {
|
|
return;
|
|
}
|
|
this.streams.buffer.push(data);
|
|
return data.length;
|
|
}
|
|
|
|
#handleBufferStream() {
|
|
if (this.streams.buffer === undefined) {
|
|
return;
|
|
}
|
|
this.streams.buffer.on('error', (error) => {
|
|
logger.error('buffer stream encountered an error: ' + error);
|
|
});
|
|
}
|
|
|
|
#handleInputStream() {
|
|
if (this.streams.input === undefined) {
|
|
return;
|
|
}
|
|
this.streams.input.on('error', (error) => {
|
|
logger.error('input stream encountered an error: ' + error);
|
|
});
|
|
this.streams.input.on('readable', () => {
|
|
this.#fillBuffer();
|
|
});
|
|
this.streams.input.on('close', () => {
|
|
this.times.transmitted = Date.now();
|
|
logger.debug('input stream closed, transmitting file took ' + (this.times.transmitted - this.times.start) + 'ms');
|
|
});
|
|
}
|
|
|
|
#handleOutputStream() {
|
|
if (this.streams.output === undefined) {
|
|
return;
|
|
}
|
|
this.streams.output.on('error', () => {
|
|
logger.error('output stream encountered an error: ' + error);
|
|
});
|
|
this.streams.output.on('written', (bytes) => {
|
|
this.#writeBufferedChunk(bytes);
|
|
this.#rebufferChunk(bytes);
|
|
});
|
|
this.streams.output.on('drain', () => {
|
|
this.times.drained = Date.now();
|
|
logger.debug('output stream is drained, wrote ' + this.streams.output.bytesWritten + '/' + this.size + ' bytes to the speaker after ' + (this.times.drained - this.times.start) + 'ms');
|
|
if (this.streams.output.bytesWritten < this.size) {
|
|
return;
|
|
}
|
|
this.#destroy();
|
|
});
|
|
// this.streams.output.on('progress', (progress) => {
|
|
// this.playback.progress = progress;
|
|
// });
|
|
this.streams.output.on('hiccup', () => {
|
|
this.playback.hiccups++;
|
|
logger.warn('hiccup ' + this.playback.hiccups + ' detected...');
|
|
});
|
|
}
|
|
|
|
#destroy() {
|
|
for (const key in this.streams) {
|
|
const stream = this.streams[key];
|
|
if (stream.destroyed === true) {
|
|
continue;
|
|
}
|
|
stream.destroy();
|
|
}
|
|
this.times.stop = Date.now();
|
|
this.emit('close');
|
|
}
|
|
}
|
|
|
|
module.exports = StreamBuffer; |