kannon-client/classes/AudioBuffer.js

223 lines
6.5 KiB
JavaScript
Raw Normal View History

const NodeSpeaker = require('../libs/speaker/index.js');
const EventEmitter = require('events');
const { Readable } = require('stream');
class StreamBuffer extends EventEmitter {
constructor(stream, settings) {
super();
this.size = settings.size;
this.times = {
start: Date.now()
};
this.playback = {
paused: false,
hiccups: 0
};
this.#setupBuffer(settings?.threshold);
this.#setupStreams(stream, settings?.audio);
}
play() {
this.#writeBufferedChunk(4096, true);
}
pause() {
this.playback.paused = true;
this.threshold.announced = false;
this.streams.buffer._readableState.buffer.clear();
this.streams.buffer._readableState.length = 0;
}
stop() {
this.#destroy();
}
getProgress() {
return this.streams.output?.bytesWritten || 0;
}
getHiccups() {
return this.playback.hiccups;
}
getTimes() {
return this.times;
}
#setupBuffer(threshold) {
if (threshold === undefined || isNaN(threshold)) {
threshold = 4096;
}
this.threshold = {
value: threshold,
announced: false
};
this.limit = {
value: config?.buffer?.limit,
announced: false
};
if (isNaN(this.limit.value) || this.limit.value < this.threshold.value) {
this.limit.value = this.threshold.value;
}
}
#setupStreams(stream, audioSettings) {
if (stream === undefined) {
return;
}
this.streams = {
input: stream,
output: new NodeSpeaker({
channels: audioSettings?.channels || 2,
bitDepth: audioSettings?.bitDepth || 16,
sampleRate: audioSettings?.sampleRate || 44100,
}),
buffer: new Readable()
};
this.streams.buffer._read = () => { };
this.#handleBufferStream();
this.#handleInputStream();
this.#handleOutputStream();
}
#isThresholdReached() {
return this.streams.buffer.readableLength >= this.threshold.value;
}
#isThresholdAnnounced() {
return this.threshold.announced === true;
}
#isLimitReached() {
return this.streams.buffer.readableLength >= this.limit.value;
}
#writeBufferedChunk(chunkSize, resume) {
if (resume === true) {
this.playback.paused = false;
this.emit('play');
} else if (this.playback.paused) {
this.emit('pause');
return;
}
if (chunkSize === undefined || isNaN(chunkSize)) {
chunkSize = 4096;
}
if (this.#isThresholdAnnounced === false) {
return;
}
let data;
if (this.streams.buffer.readableLength >= chunkSize) {
data = this.streams.buffer.read(chunkSize);
} else {
data = this.streams.buffer.read();
}
if (data === undefined || data == null) {
return;
}
this.streams.output.write(data);
}
#fillBuffer() {
const rebuffered = this.#rebufferChunk(this.threshold.value);
if (rebuffered === undefined || this.#isThresholdAnnounced() === true) {
return;
}
if (this.#isThresholdReached()) {
this.times.threshold = Date.now();
this.threshold.announced = true;
this.emit(constants.THRESHOLD);
logger.debug('buffer reached threshold of ' + this.threshold.value + ' bytes after ' + (this.times.threshold - this.times.start) + 'ms...');
return;
}
this.#fillBuffer();
}
#rebufferChunk(chunkSize) {
if (chunkSize === undefined || isNaN(chunkSize)) {
chunkSize = 4096;
}
if (this.#isLimitReached()) {
return;
}
let data;
if (this.streams.input.readableLength >= chunkSize) {
data = this.streams.input.read(chunkSize);
} else {
data = this.streams.input.read();
}
if (data === undefined || data === null) {
return;
}
this.streams.buffer.push(data);
return data.length;
}
#handleBufferStream() {
if (this.streams.buffer === undefined) {
return;
}
this.streams.buffer.on('error', (error) => {
logger.error('buffer stream encountered an error: ' + error);
});
}
#handleInputStream() {
if (this.streams.input === undefined) {
return;
}
this.streams.input.on('error', (error) => {
logger.error('input stream encountered an error: ' + error);
});
this.streams.input.on('readable', () => {
this.#fillBuffer();
});
this.streams.input.on('close', () => {
this.times.transmitted = Date.now();
logger.debug('input stream closed, transmitting file took ' + (this.times.transmitted - this.times.start) + 'ms');
});
}
#handleOutputStream() {
if (this.streams.output === undefined) {
return;
}
this.streams.output.on('error', () => {
logger.error('output stream encountered an error: ' + error);
});
this.streams.output.on('written', (bytes) => {
this.#writeBufferedChunk(bytes);
this.#rebufferChunk(bytes);
});
this.streams.output.on('drain', () => {
this.times.drained = Date.now();
logger.debug('output stream is drained, wrote ' + this.streams.output.bytesWritten + '/' + this.size + ' bytes to the speaker after ' + (this.times.drained - this.times.start) + 'ms');
if (this.streams.output.bytesWritten < this.size) {
return;
}
this.#destroy();
});
// this.streams.output.on('progress', (progress) => {
// this.playback.progress = progress;
// });
this.streams.output.on('hiccup', () => {
this.playback.hiccups++;
logger.warn('hiccup ' + this.playback.hiccups + ' detected...');
});
}
#destroy() {
for (const key in this.streams) {
const stream = this.streams[key];
if (stream.destroyed === true) {
continue;
}
stream.destroy();
}
this.times.stop = Date.now();
this.emit('close');
}
}
module.exports = StreamBuffer;