optimized internal buffering of audio streams

This commit is contained in:
Daniel Sommer 2022-04-25 16:15:55 +02:00
parent d546344194
commit 4449b8cc9c
3 changed files with 42 additions and 114 deletions

View file

@ -68,12 +68,6 @@ class Audiostream {
socket.on('error', (error) => {
logger.error('error connecting to audio server \'' + this.getTag() + '\': ' + error);
});
socket.on('timeout', () => {
logger.warn('connection to audio server \'' + this.getTag() + '\' timed out');
});
// socket.on('data', (data) => {
// global.player.speaker.feed(data);
// });
socket.on('end', () => {
logger.info('connection to audio server \'' + this.getTag() + '\' ended');
});
@ -81,7 +75,6 @@ class Audiostream {
logger.info('connection to audio server \'' + this.getTag() + '\' closed');
global.player.stopFeed();
});
// global.player.speaker.feed(socket);
}
}

View file

@ -1,4 +1,4 @@
const Speaker = require('./Speaker.js');
const NodeSpeaker = require('speaker');
const EventEmitter = require('events');
const { spawn } = require('child_process');
const createWriteStream = require('fs').createWriteStream;
@ -24,15 +24,34 @@ class Player extends EventEmitter {
async prepare(threshold, stream) {
logger.debug('preparing audio player...');
await this.#reset();
await this.#removeTemporaryFile();
this.threshold = threshold;
// await this.#removeTemporaryFile();
this.buffer.threshold = threshold;
this.stream = stream;
// this.tmp.stream = createWriteStream(this.tmp.file);
// this.speaker = new Speaker(speakeroptions.channel, speakeroptions.bitDepth, speakeroptions.sampleRate);
this.speaker = new Speaker(2, 16, 44100);
this.speaker = new NodeSpeaker({
channels: 2,
bitDepth: 16,
sampleRate: 44100
});
this.speaker.on('open', () => {
logger.debug('speaker opened...');
});
this.speaker.on('flush', () => {
logger.debug('speaker flushed...');
});
this.speaker.on('close', () => {
logger.debug('speaker closed...');
});
this.speaker.on('drain', () => {
if (this.isPlaying() === false) {
return;
}
this.playFromBuffer();
});
this.buffer.limit = config?.buffer?.limit;
if (isNaN(this.buffer.limit) || this.buffer.limit < this.threshold) {
this.buffer.limit = this.threshold;
if (isNaN(this.buffer.limit) || this.buffer.limit < this.buffer.threshold) {
this.buffer.limit = this.buffer.threshold;
}
this.#fillBuffer();
}
@ -41,33 +60,31 @@ class Player extends EventEmitter {
this.stream.on('data', (data) => {
this.buffer.size += data.length;
this.buffer.elements.push(data);
if (this.buffer.announced === undefined && this.buffer.size >= this.threshold) {
if (this.buffer.announced === undefined && this.buffer.size >= this.buffer.threshold) {
this.buffer.announced = true;
this.#setState(constants.STATE_READY);
logger.debug('threshold of ' + this.threshold + ' bytes reached after ' + (Date.now() - this.timestamp) + 'ms');
logger.debug('threshold of ' + this.buffer.threshold + ' bytes reached after ' + (Date.now() - this.timestamp) + 'ms');
}
if (this.buffer.size >= this.buffer.limit) {
this.stream.pause();
logger.warn('BUFFER LIMIT REACHED - PAUSING STREAM');
}
this.speak(true);
});
}
speak(checkState) {
if (checkState === true && this.isPlaying() !== true) {
return;
}
speak() {
this.#setState(constants.STATE_PLAYING);
while (this.buffer.elements.length > 0) {
this.playFromBuffer();
}
playFromBuffer() {
const tmp = this.buffer.elements[0];
this.buffer.elements.shift();
this.speaker.pipe(tmp);
this.buffer.size -= tmp.length;
this.speaker.write(tmp);
if (this.buffer.size < this.buffer.limit) {
logger.warn('BUFFER UNDERRUN - RESUMING STREAM');
this.stream.resume();
logger.warn('RESUMING STREAM - BUFFER NOT FILLED');
}
}
}

View file

@ -1,82 +0,0 @@
const NodeSpeaker = require('speaker');
const createReadStream = require('fs').createReadStream;
class Speaker {
constructor(channels, bitDepth, sampleRate) {
this.#handlePlayer(channels, bitDepth, sampleRate);
this.playback = {
played: 0,
tmp: 0
};
}
pipe(data) {
return this.speaker.write(data);
}
// pipe(buffer, position) {
// if (buffer === undefined) {
// return;
// }
// this.playback.stream = createReadStream(file);
// if (isNaN(position) || position < 0) {
// position = 0;
// }
// position = 65537* 100;
// this.playback.stream.on('data', (data) => {
// if (position > 0 && this.playback.played <= position) {
// const offset = position - (this.playback.played + data.length);
// if (offset >= 0) {
// this.playback.played += data.length;
// return;
// }
// data = data.subarray(data.length + offset);
// }
// if (this.speaker.write(data) === true) {
// this.playback.played += data.length;
// } else {
// this.playback.tmp = data.length;
// this.playback.stream.pause();
// }
// });
// this.playback.stream.on('end', () => {
// logger.debug('read stream ended');
// });
// this.playback.stream.on('close', () => {
// logger.debug('read stream closed');
// });
// this.playback.stream.on('drain', () => {
// logger.debug('read stream drained');
// });
// this.playback.stream.on('error', (error) => {
// logger.debug('read stream encountered an error: ' + error);
// });
// }
#handlePlayer(channels, bitDepth, sampleRate) {
this.speaker = new NodeSpeaker({
channels: channels,
bitDepth: bitDepth,
sampleRate: sampleRate
});
this.speaker.on('open', () => {
logger.debug('speaker opened...');
});
this.speaker.on('flush', () => {
logger.debug('speaker flushed...');
});
this.speaker.on('close', () => {
logger.debug('speaker closed...');
});
this.speaker.on('drain', () => {
// handle backpressure
// this.playback.played += this.tmp;
// this.playback.tmp = 0;
// this.playback.stream.resume();
});
}
}
module.exports = Speaker;