implemented memory efficient streaming/buffering
This commit is contained in:
parent
ff97f8d7bc
commit
35681ffede
5 changed files with 196 additions and 322 deletions
|
@ -1,27 +1,50 @@
|
||||||
|
const NodeSpeaker = require('../libs/speaker/index.js');
|
||||||
const EventEmitter = require('events');
|
const EventEmitter = require('events');
|
||||||
const { Readable, Duplex } = require('stream');
|
const { Readable } = require('stream');
|
||||||
|
|
||||||
class StreamBuffer extends EventEmitter {
|
class StreamBuffer extends EventEmitter {
|
||||||
|
|
||||||
constructor(threshold, inputStream, outputStream) {
|
constructor(stream, settings) {
|
||||||
super();
|
super();
|
||||||
this.#setupBuffer(threshold);
|
this.times = {
|
||||||
this.#setupStreams(inputStream, outputStream);
|
start: Date.now()
|
||||||
|
};
|
||||||
|
this.playback = {
|
||||||
|
paused: false,
|
||||||
|
progress: 0,
|
||||||
|
hiccups: 0
|
||||||
|
};
|
||||||
|
this.#setupBuffer(settings?.threshold);
|
||||||
|
this.#setupStreams(stream, settings?.audio);
|
||||||
}
|
}
|
||||||
|
|
||||||
resume() {
|
resume() {
|
||||||
this.streams.buffer.resume();
|
this.#writeBufferedChunk(4096, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
pause() {
|
pause() {
|
||||||
this.streams.buffer.pause();
|
this.playback.paused = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
stop() {
|
||||||
|
this.#destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
getProgress() {
|
||||||
|
return this.playback.progress;
|
||||||
|
}
|
||||||
|
|
||||||
|
getHiccups() {
|
||||||
|
return this.playback.hiccups;
|
||||||
|
}
|
||||||
|
|
||||||
|
getTimes() {
|
||||||
|
return this.times;
|
||||||
}
|
}
|
||||||
|
|
||||||
#setupBuffer(threshold) {
|
#setupBuffer(threshold) {
|
||||||
this.size = 0;
|
|
||||||
if (threshold === undefined || isNaN(threshold)) {
|
if (threshold === undefined || isNaN(threshold)) {
|
||||||
// 64 mb
|
threshold = 4096;
|
||||||
threshold = 67108864;
|
|
||||||
}
|
}
|
||||||
this.threshold = {
|
this.threshold = {
|
||||||
value: threshold,
|
value: threshold,
|
||||||
|
@ -36,25 +59,27 @@ class StreamBuffer extends EventEmitter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#setupStreams(inputStream, outputStream) {
|
#setupStreams(stream, audioSettings) {
|
||||||
if (inputStream === undefined || outputStream === undefined) {
|
if (stream === undefined) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.streams = {
|
this.streams = {
|
||||||
input: inputStream,
|
input: stream,
|
||||||
output: outputStream,
|
output: new NodeSpeaker({
|
||||||
|
channels: audioSettings?.channels || 2,
|
||||||
|
bitDepth: audioSettings?.bitDepth || 16,
|
||||||
|
sampleRate: audioSettings?.sampleRate || 44100,
|
||||||
|
}),
|
||||||
buffer: new Readable()
|
buffer: new Readable()
|
||||||
};
|
};
|
||||||
this.streams.buffer._read = () => { };
|
this.streams.buffer._read = () => { };
|
||||||
this.streams.buffer.buffered = 0;
|
|
||||||
this.streams.buffer.tmp = 0;
|
|
||||||
this.#handleBufferStream();
|
this.#handleBufferStream();
|
||||||
this.#handleInputStream();
|
this.#handleInputStream();
|
||||||
this.#handleOutputStream();
|
this.#handleOutputStream();
|
||||||
}
|
}
|
||||||
|
|
||||||
#isThresholdReached() {
|
#isThresholdReached() {
|
||||||
return this.streams.buffer.buffered >= this.threshold.value;
|
return this.streams.buffer.readableLength >= this.threshold.value;
|
||||||
}
|
}
|
||||||
|
|
||||||
#isThresholdAnnounced() {
|
#isThresholdAnnounced() {
|
||||||
|
@ -62,31 +87,76 @@ class StreamBuffer extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
#isLimitReached() {
|
#isLimitReached() {
|
||||||
return this.streams.buffer.buffered >= this.limit.value;
|
return this.streams.buffer.readableLength >= this.limit.value;
|
||||||
|
}
|
||||||
|
|
||||||
|
#writeBufferedChunk(chunkSize, resume) {
|
||||||
|
if (resume === true) {
|
||||||
|
this.playback.paused = false;
|
||||||
|
this.emit('play');
|
||||||
|
} else if (this.playback.paused) {
|
||||||
|
this.emit('pause');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (chunkSize === undefined || isNaN(chunkSize)) {
|
||||||
|
chunkSize = 4096;
|
||||||
|
}
|
||||||
|
if (this.#isThresholdAnnounced === false) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let data;
|
||||||
|
if (this.streams.buffer.readableLength >= chunkSize) {
|
||||||
|
data = this.streams.buffer.read(chunkSize);
|
||||||
|
} else {
|
||||||
|
data = this.streams.buffer.read();
|
||||||
|
}
|
||||||
|
if (data === undefined || data == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.streams.output.write(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
#fillBuffer() {
|
||||||
|
const rebuffered = this.#rebufferChunk(this.threshold.value);
|
||||||
|
if (rebuffered === undefined || this.#isThresholdAnnounced() === true) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (this.#isThresholdReached()) {
|
||||||
|
this.times.threshold = Date.now();
|
||||||
|
this.threshold.announced = true;
|
||||||
|
this.emit(constants.THRESHOLD);
|
||||||
|
logger.debug('buffer reached threshold of ' + this.threshold.value + ' bytes after ' + (this.times.threshold - this.times.start) + 'ms...');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.#fillBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
#rebufferChunk(chunkSize) {
|
||||||
|
if (chunkSize === undefined || isNaN(chunkSize)) {
|
||||||
|
chunkSize = 4096;
|
||||||
|
}
|
||||||
|
if (this.#isLimitReached()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let data;
|
||||||
|
if (this.streams.input.readableLength >= chunkSize) {
|
||||||
|
data = this.streams.input.read(chunkSize);
|
||||||
|
} else {
|
||||||
|
data = this.streams.input.read();
|
||||||
|
}
|
||||||
|
if (data === undefined || data === null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.streams.buffer.push(data);
|
||||||
|
return data.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
#handleBufferStream() {
|
#handleBufferStream() {
|
||||||
if (this.streams.buffer === undefined) {
|
if (this.streams.buffer === undefined) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.streams.buffer.pause();
|
this.streams.buffer.on('error', (error) => {
|
||||||
this.streams.buffer.on('data', (data) => {
|
logger.error('buffer stream encountered an error: ' + error);
|
||||||
const chunkSize = data.length;
|
|
||||||
const flushed = this.streams.output.write(data);
|
|
||||||
if (flushed !== true) {
|
|
||||||
// logger.warn('backpressure detected...');
|
|
||||||
this.streams.buffer.tmp = chunkSize;
|
|
||||||
this.streams.buffer.pause();
|
|
||||||
} else {
|
|
||||||
this.streams.buffer.buffered -= chunkSize;
|
|
||||||
}
|
|
||||||
if (this.#isLimitReached() === false && this.streams.input.isPaused() === true) {
|
|
||||||
// logger.debug('buffer fell below limit of \'' + this.limit.value + '\' bytes, resuming input stream...');
|
|
||||||
this.streams.input.resume();
|
|
||||||
} else if (this.#isLimitReached() === true && this.streams.input.isPaused() === false) {
|
|
||||||
// logger.debug('buffer reached limit of \'' + this.limit.value + '\' bytes, pausing input stream...');
|
|
||||||
this.streams.input.pause();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,19 +164,15 @@ class StreamBuffer extends EventEmitter {
|
||||||
if (this.streams.input === undefined) {
|
if (this.streams.input === undefined) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.streams.input.on('data', (data) => {
|
this.streams.input.on('error', (error) => {
|
||||||
this.streams.buffer.buffered += data.length;
|
logger.error('input stream encountered an error: ' + error);
|
||||||
this.streams.buffer.push(data);
|
});
|
||||||
if (this.#isThresholdReached() === true && !this.#isThresholdAnnounced() === true) {
|
this.streams.input.on('readable', () => {
|
||||||
this.threshold.announced = true;
|
this.#fillBuffer();
|
||||||
this.emit(constants.BUFFER_THRESHOLD);
|
});
|
||||||
logger.debug('buffer reached threshold of ' + this.threshold.value + ' bytes');
|
this.streams.input.on('close', () => {
|
||||||
}
|
this.times.transmitted = Date.now();
|
||||||
if (this.#isLimitReached()) {
|
logger.debug('input stream closed, transmitting file took ' + (this.times.transmitted - this.times.start) + 'ms');
|
||||||
// logger.debug('buffer reached limit of ' + this.limit.value + ' bytes, pausing input stream...');
|
|
||||||
this.streams.input.pause();
|
|
||||||
// this.streams.buffer.resume();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,149 +180,38 @@ class StreamBuffer extends EventEmitter {
|
||||||
if (this.streams.output === undefined) {
|
if (this.streams.output === undefined) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
this.streams.output.on('error', () => {
|
||||||
|
logger.error('output stream encountered an error: ' + error);
|
||||||
|
});
|
||||||
|
this.streams.output.on('written', (bytes) => {
|
||||||
|
this.#writeBufferedChunk(bytes);
|
||||||
|
this.#rebufferChunk(bytes);
|
||||||
|
});
|
||||||
this.streams.output.on('drain', () => {
|
this.streams.output.on('drain', () => {
|
||||||
// logger.warn('SPEAKER DRAINED - RESUMING DUPLEX STREAM');
|
this.times.drained = Date.now();
|
||||||
this.streams.buffer.buffered -= this.streams.buffer.tmp;
|
logger.debug('output stream is drained, file should be completely written to the speaker after ' + (this.times.drained - this.times.start) + 'ms');
|
||||||
this.streams.buffer.resume();
|
this.#destroy();
|
||||||
if (this.streams.input.isPaused() && this.streams.buffer.buffered < this.limit.value) {
|
|
||||||
// logger.warn('RESUME READ STREAM - BUFFER LIMIT NOT REACHED')
|
|
||||||
this.streams.input.resume();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
this.streams.output.on('flush', () => {
|
|
||||||
logger.debug('speaker flushed');
|
|
||||||
});
|
|
||||||
this.streams.output.on('close', () => {
|
|
||||||
logger.debug('speaker closed');
|
|
||||||
});
|
});
|
||||||
this.streams.output.on('progress', (progress) => {
|
this.streams.output.on('progress', (progress) => {
|
||||||
// logger.warn('SPEAKER PROGRESS: ' + progress);
|
this.playback.progress = progress;
|
||||||
});
|
});
|
||||||
this.streams.output.on('hiccup', () => {
|
this.streams.output.on('hiccup', () => {
|
||||||
if (this.hiccups === undefined) {
|
this.playback.hiccups++;
|
||||||
this.hiccups = 1;
|
logger.warn('hiccup ' + this.playback.hiccups + ' detected...');
|
||||||
} else {
|
|
||||||
this.hiccups++;
|
|
||||||
}
|
|
||||||
logger.warn('HICKUP #' + this.hiccups + ' DETECTED');
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// #handleTransferStream() {
|
#destroy() {
|
||||||
// if (this.streams.transfer === undefined) {
|
for (const key in this.streams) {
|
||||||
// return;
|
const stream = this.streams[key];
|
||||||
// }
|
if (stream.destroyed === true) {
|
||||||
// this.streams.transfer.pause();
|
continue;
|
||||||
// this.streams.transfer._read = () => { };
|
}
|
||||||
// this.streams.transfer.on('data', (data) => {
|
stream.destroy();
|
||||||
// if (data.length === 65483) {
|
}
|
||||||
// logger.warn('POSSIBLE ERROR!');
|
this.times.stop = Date.now();
|
||||||
// }
|
this.emit('close');
|
||||||
// const flushed = this.streams.output.write(data, (error) => {
|
}
|
||||||
// const derp = this.streams.output;
|
|
||||||
// if (error !== undefined) {
|
|
||||||
// logger.error('FUCK MY LIFE');
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
// if (flushed === false) {
|
|
||||||
// this.streams.transfer.pause();
|
|
||||||
// }
|
|
||||||
// if (this.streams.transfer.readableLength < this.limit.value && this.streams.input.isPaused()) {
|
|
||||||
// logger.warn('RESUMING READ STREAM - TRANSFER STRAM UNDERCUT LIMIT');
|
|
||||||
// this.streams.input.resume();
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
// this.streams.transfer.on('close', () => {
|
|
||||||
// logger.debug('transfer stream of stream buffer closed');
|
|
||||||
// this.streams.transfer.destroy();
|
|
||||||
// });
|
|
||||||
// this.streams.transfer.on('error', (error) => {
|
|
||||||
// logger.error('transfer stream of stream buffer encountered an unexpected error: ' + error);
|
|
||||||
// });
|
|
||||||
// this.streams.transfer.on('resume', () => {
|
|
||||||
// logger.debug('transfer stream of stream buffer is resumed');
|
|
||||||
// });
|
|
||||||
// this.streams.transfer.on('pause', () => {
|
|
||||||
// logger.debug('transfer stream of stream buffer is paused');
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
|
|
||||||
// #handleInputStream() {
|
|
||||||
// if (this.streams.input === undefined) {
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
// this.streams.input.on('data', (data) => {
|
|
||||||
// const flushed = this.streams.output.write(data);
|
|
||||||
// if (this.streams.output.writableCorked > 0) {
|
|
||||||
// if (this.streams.output.writableLength >= this.threshold.value && this.threshold.announced === false) {
|
|
||||||
// logger.debug('output stream\'s internal buffer reached threshold of ' + this.limit.value + ' bytes');
|
|
||||||
// this.emit(constants.BUFFER_THRESHOLD);
|
|
||||||
// }
|
|
||||||
// if (this.streams.output.writableLength >= this.limit.value) {
|
|
||||||
// logger.debug('output stream\'s internal buffer reached limit of ' + this.limit.value + ' bytes, pausing input stream...');
|
|
||||||
// this.streams.input.pause();
|
|
||||||
// if (this.limit.announced === false) {
|
|
||||||
// this.emit(constants.BUFFER_LIMIT);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
// if (flushed === false && !this.streams.input.isPaused()) {
|
|
||||||
// logger.warn('BACKPRESSURE FROM INPUT STREAM - PAUSING INPUT STREAM');
|
|
||||||
// this.streams.input.pause();
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// // this.streams.transfer.push(data);
|
|
||||||
// // if (this.streams.transfer.readableLength >= this.threshold.value && this.threshold.announced === false) {
|
|
||||||
// // logger.debug('transfer stream reached threshold of ' + this.limit.value + ' bytes');
|
|
||||||
// // this.emit(constants.BUFFER_THRESHOLD);
|
|
||||||
// // }
|
|
||||||
// // if (this.streams.transfer.readableLength >= this.limit.value) {
|
|
||||||
// // logger.debug('transfer stream reached limit of ' + this.limit.value + ' bytes, pausing read stream...');
|
|
||||||
// // this.streams.input.pause();
|
|
||||||
// // if (this.limit.announced === false) {
|
|
||||||
// // this.emit(constants.BUFFER_LIMIT);
|
|
||||||
// // }
|
|
||||||
// // }
|
|
||||||
// });
|
|
||||||
// this.streams.input.on('close', () => {
|
|
||||||
// logger.debug('input stream of stream buffer closed');
|
|
||||||
// this.streams.input.destroy();
|
|
||||||
// });
|
|
||||||
// this.streams.input.on('error', (error) => {
|
|
||||||
// logger.error('input stream of stream buffer encountered an unexpected error: ' + error);
|
|
||||||
// });
|
|
||||||
// this.streams.input.on('resume', () => {
|
|
||||||
// logger.debug('input stream of stream buffer is resumed');
|
|
||||||
// });
|
|
||||||
// this.streams.input.on('pause', () => {
|
|
||||||
// logger.debug('input stream of stream buffer is paused');
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
|
|
||||||
// #handleOutputStream() {
|
|
||||||
// if (this.streams.output === undefined) {
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
// this.streams.output.on('drain', () => {
|
|
||||||
// logger.debug('output stream of stream buffer is drained, resuming input stream...');
|
|
||||||
// // this.streams.input.resume();
|
|
||||||
// });
|
|
||||||
// this.streams.output.on('resume', () => {
|
|
||||||
// logger.debug('output stream of stream buffer is resumed');
|
|
||||||
// });
|
|
||||||
// this.streams.output.on('pause', () => {
|
|
||||||
// logger.debug('output stream of stream buffer is paused');
|
|
||||||
// });
|
|
||||||
// this.streams.output.on('close', () => {
|
|
||||||
// logger.debug('output stream of stream buffer closed');
|
|
||||||
// });
|
|
||||||
// this.streams.output.on('error', (error) => {
|
|
||||||
// logger.error('output stream of stream buffer encountered an unexpected error: ' + error);
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = StreamBuffer;
|
module.exports = StreamBuffer;
|
|
@ -1,5 +1,5 @@
|
||||||
const net = require('net');
|
const net = require('net');
|
||||||
const { sleep } = require('../libs/util');
|
const constants = require('../libs/constants');
|
||||||
const Message = require('./Message');
|
const Message = require('./Message');
|
||||||
|
|
||||||
class Audiostream {
|
class Audiostream {
|
||||||
|
@ -25,7 +25,7 @@ class Audiostream {
|
||||||
this.host = config?.server?.host || "127.0.0.1";
|
this.host = config?.server?.host || "127.0.0.1";
|
||||||
this.port = data.port;
|
this.port = data.port;
|
||||||
this.clientId = data.clientId;
|
this.clientId = data.clientId;
|
||||||
this.threshold = data.threshold;
|
this.settings = data.settings;
|
||||||
this.#handleSocket(net.connect({
|
this.#handleSocket(net.connect({
|
||||||
host: this.getHost(),
|
host: this.getHost(),
|
||||||
port: this.getPort()
|
port: this.getPort()
|
||||||
|
@ -37,24 +37,23 @@ class Audiostream {
|
||||||
logger.debug('handling event \'audio:initialize\'...');
|
logger.debug('handling event \'audio:initialize\'...');
|
||||||
this.#initialize(data);
|
this.#initialize(data);
|
||||||
});
|
});
|
||||||
this.eventParser.on('audio:play', (data) => {
|
this.eventParser.on(constants.AUDIO_PLAY, (data) => {
|
||||||
logger.debug('handling event \'audio:play\'...');
|
logger.debug('handling event \'' + constants.AUDIO_PLAY + '\'...');
|
||||||
// global.player.play(data?.position);
|
|
||||||
global.player.play();
|
global.player.play();
|
||||||
});
|
});
|
||||||
this.eventParser.on('audio:pause', (data) => {
|
this.eventParser.on(constants.AUDIO_PAUSE, (data) => {
|
||||||
logger.debug('handling event \'audio:pause\'...');
|
logger.debug('handling event \'' + constants.AUDIO_PAUSE + '\'...');
|
||||||
global.player.pause();
|
global.player.pause();
|
||||||
});
|
});
|
||||||
this.eventParser.on('audio:stop', () => {
|
this.eventParser.on(constants.AUDIO_STOP, () => {
|
||||||
logger.debug('handling event \'audio:stop\'...');
|
logger.debug('handling event \'' + constants.AUDIO_STOP + '\'...');
|
||||||
global.player.stop();
|
global.player.stop();
|
||||||
});
|
});
|
||||||
global.player.on('statechange', (data) => {
|
global.player.on(constants.STATECHANGE, (data) => {
|
||||||
new Message('audio:state', {
|
new Message(constants.AUDIO_STATE, {
|
||||||
clientId: this.clientId,
|
clientId: this.clientId,
|
||||||
state: data.state,
|
state: data.state,
|
||||||
position: global.player.getPosition()
|
progress: data.progress
|
||||||
}).send();
|
}).send();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -62,7 +61,7 @@ class Audiostream {
|
||||||
#handleSocket(socket) {
|
#handleSocket(socket) {
|
||||||
socket.on('connect', async () => {
|
socket.on('connect', async () => {
|
||||||
logger.debug('connected to audio server \'' + this.getTag() + '\'...');
|
logger.debug('connected to audio server \'' + this.getTag() + '\'...');
|
||||||
await global.player.prepare(this.threshold, socket);
|
await global.player.prepare(socket, this.settings);
|
||||||
new Message('audio:register', { clientId: this.clientId, port: socket.localPort }).send();
|
new Message('audio:register', { clientId: this.clientId, port: socket.localPort }).send();
|
||||||
});
|
});
|
||||||
socket.on('error', (error) => {
|
socket.on('error', (error) => {
|
||||||
|
@ -70,7 +69,6 @@ class Audiostream {
|
||||||
});
|
});
|
||||||
socket.on('close', () => {
|
socket.on('close', () => {
|
||||||
logger.info('connection to audio server \'' + this.getTag() + '\' closed');
|
logger.info('connection to audio server \'' + this.getTag() + '\' closed');
|
||||||
// global.player.stopFeed();
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,106 +1,68 @@
|
||||||
const NodeSpeaker = require('../libs/speaker/index.js');
|
|
||||||
const EventEmitter = require('events');
|
const EventEmitter = require('events');
|
||||||
const { spawn } = require('child_process');
|
|
||||||
const createWriteStream = require('fs').createWriteStream;
|
|
||||||
const unlink = require('fs/promises').unlink;
|
|
||||||
const resolve = require('path').resolve;
|
|
||||||
|
|
||||||
const AudioBuffer = require('./AudioBuffer.js');
|
const AudioBuffer = require('./AudioBuffer.js');
|
||||||
const { STATE_PLAYING } = require('../libs/constants.js');
|
|
||||||
|
|
||||||
class Player extends EventEmitter {
|
class Player extends EventEmitter {
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
super();
|
super();
|
||||||
this.timestamp = Date.now();
|
|
||||||
this.position = 0;
|
|
||||||
this.events = [];
|
this.events = [];
|
||||||
this.tmp = {
|
|
||||||
file: resolve(global.config?.tmp || '/tmp/kannon.tmp')
|
|
||||||
};
|
|
||||||
this.buffer = {
|
|
||||||
size: 0,
|
|
||||||
elements: []
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async prepare(threshold, stream) {
|
async prepare(stream, settings) {
|
||||||
logger.debug('preparing audio player...');
|
logger.debug('preparing audio player...');
|
||||||
await this.#reset();
|
this.#reset();
|
||||||
|
this.audiobuffer = new AudioBuffer(stream, settings);
|
||||||
this.speaker = new NodeSpeaker({
|
this.audiobuffer.on(constants.THRESHOLD, () => {
|
||||||
channels: 2,
|
this.#setState(constants.READY);
|
||||||
bitDepth: 16,
|
|
||||||
sampleRate: 44100
|
|
||||||
});
|
});
|
||||||
this.audiobuffer = new AudioBuffer(threshold, stream, this.speaker);
|
this.audiobuffer.on('close', () => {
|
||||||
this.audiobuffer.on(constants.BUFFER_THRESHOLD, () => {
|
this.#setState(constants.STOPPED);
|
||||||
this.#setState(constants.STATE_READY);
|
|
||||||
});
|
});
|
||||||
|
this.audiobuffer.on('play', () => {
|
||||||
|
this.#setState(constants.PLAYING);
|
||||||
|
});
|
||||||
|
this.audiobuffer.on('pause', () => {
|
||||||
|
this.#setState(constants.PAUSED);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
play() {
|
play() {
|
||||||
this.#setState(STATE_PLAYING);
|
|
||||||
this.audiobuffer.resume();
|
this.audiobuffer.resume();
|
||||||
}
|
}
|
||||||
|
|
||||||
async pause() {
|
pause() {
|
||||||
await this.#reset(true);
|
this.audiobuffer.pause();
|
||||||
this.#setState(constants.STATE_PAUSED);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async stop() {
|
async stop() {
|
||||||
await this.#reset();
|
this.audiobuffer.stop();
|
||||||
this.#setState(constants.STATE_STOPPED);
|
this.#reset();
|
||||||
|
this.#setState(constants.STOPPED);
|
||||||
}
|
}
|
||||||
|
|
||||||
isReady() {
|
isReady() {
|
||||||
return this.state === constants.STATE_READY;
|
return this.state === constants.READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
isPlaying() {
|
isPlaying() {
|
||||||
return this.state === constants.STATE_PLAYING;
|
return this.state === constants.PLAYING;
|
||||||
}
|
}
|
||||||
|
|
||||||
isPaused() {
|
isPaused() {
|
||||||
return this.state === constants.STATE_PAUSED;
|
return this.state === constants.PAUSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
isFinished() {
|
isFinished() {
|
||||||
return this.state === constants.STATE_STOPPED;
|
return this.state === constants.STOPPED;
|
||||||
}
|
}
|
||||||
|
|
||||||
hasError() {
|
hasError() {
|
||||||
return this.state === constants.STATE_ERROR;
|
return this.state === constants.ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
getPosition() {
|
getProgress() {
|
||||||
return this.position;
|
return this.audiobuffer?.getProgress();
|
||||||
}
|
|
||||||
|
|
||||||
async #spawnProcess(position) {
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
const args = [
|
|
||||||
'-vn',
|
|
||||||
'-nodisp'
|
|
||||||
];
|
|
||||||
if (this.isPaused() && !isNaN(position)) {
|
|
||||||
args.unshift('-ss', position);
|
|
||||||
}
|
|
||||||
args.push(this.tmp.file);
|
|
||||||
this.process = spawn("ffplay", args);
|
|
||||||
this.process.on('error', (error) => {
|
|
||||||
this.#reset();
|
|
||||||
// TODO: try/catch error
|
|
||||||
reject('error spawning process \'ffplay\': ' + error);
|
|
||||||
});
|
|
||||||
this.process.on('spawn', () => {
|
|
||||||
logger.info('spawned process \'ffplay\' (pid: ' + this.process.pid + ')...');
|
|
||||||
this.#setState(constants.STATE_PLAYING);
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#setState(state, data) {
|
#setState(state, data) {
|
||||||
|
@ -114,63 +76,12 @@ class Player extends EventEmitter {
|
||||||
}
|
}
|
||||||
logger.debug('emitting state \'' + state + '\' of audio player...');
|
logger.debug('emitting state \'' + state + '\' of audio player...');
|
||||||
this.emit(this.state, { data: data });
|
this.emit(this.state, { data: data });
|
||||||
this.emit('statechange', { state: this.state });
|
this.emit(constants.STATECHANGE, { state: this.state, progress: this.getProgress() });
|
||||||
this.events.push(state);
|
this.events.push(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
async #killProcess() {
|
#reset() {
|
||||||
if (this.process === undefined) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const pid = this.process.pid;
|
|
||||||
this.#closeStdIO();
|
|
||||||
if (this.process?.killed === false) {
|
|
||||||
this.process.kill('SIGTERM');
|
|
||||||
}
|
|
||||||
await new Promise((resolve, reject) => {
|
|
||||||
this.process.on('close', (code, signal) => {
|
|
||||||
let msg = 'process \'ffplay\' (pid: ' + pid + ') closed with';
|
|
||||||
if (code !== undefined) {
|
|
||||||
msg += ' code \'' + code + '\'';
|
|
||||||
} else {
|
|
||||||
msg += ' signal \'' + signal + '\'';
|
|
||||||
}
|
|
||||||
logger.debug(msg);
|
|
||||||
this.process = undefined;
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#closeStdIO() {
|
|
||||||
if (this.process?.stdio === undefined) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
logger.debug('closing all stdio streams of process \'ffplay\' (pid: ' + this.process.pid + ')...');
|
|
||||||
for (let index = 0; index < this.process.stdio.length; index++) {
|
|
||||||
this.process.stdio[index].destroy();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async #removeTemporaryFile() {
|
|
||||||
try {
|
|
||||||
await unlink(this.tmp.file);
|
|
||||||
} catch (error) {
|
|
||||||
if (error?.code === 'ENOENT') {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
logger.error('error removing temporary file \'' + this.tmp.file + '\': ' + error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async #reset(paused) {
|
|
||||||
await this.#killProcess();
|
|
||||||
this.timestamp = Date.now();
|
|
||||||
this.events = [];
|
this.events = [];
|
||||||
if (paused === true) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.position = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,15 +1,24 @@
|
||||||
module.exports = {
|
module.exports = {
|
||||||
SOCKET_EVENT_PING: 'ping',
|
PING: 'ping',
|
||||||
SOCKET_EVENT_PONG: 'pong',
|
PONG: 'pong',
|
||||||
|
|
||||||
STATE_READY: 'ready',
|
READY: 'ready',
|
||||||
STATE_PLAYING: 'playing',
|
PLAYING: 'playing',
|
||||||
STATE_PAUSED: 'paused',
|
PAUSED: 'paused',
|
||||||
STATE_STOPPED: 'stopped',
|
STOPPED: 'stopped',
|
||||||
STATE_ERROR: 'error',
|
ERROR: 'error',
|
||||||
|
|
||||||
|
STATECHANGE: 'statechange',
|
||||||
|
|
||||||
|
THRESHOLD: 'threshold',
|
||||||
|
HICCUP: 'hiccup',
|
||||||
|
PROGRESS: 'progress',
|
||||||
|
|
||||||
|
AUDIO_PLAY: 'audio:play',
|
||||||
|
AUDIO_PAUSE: 'audio:pause',
|
||||||
|
AUDIO_STOP: 'audio:stop',
|
||||||
|
AUDIO_STATE: 'audio:state',
|
||||||
|
|
||||||
BUFFER_THRESHOLD: 'threshold',
|
|
||||||
BUFFER_LIMIT: 'limit',
|
|
||||||
|
|
||||||
EVENT_DELIMITER: '<<< kannon >>>'
|
EVENT_DELIMITER: '<<< kannon >>>'
|
||||||
}
|
}
|
|
@ -207,6 +207,7 @@ class Speaker extends Writable {
|
||||||
|
|
||||||
const onwrite = (r) => {
|
const onwrite = (r) => {
|
||||||
this.bytesWritten += r;
|
this.bytesWritten += r;
|
||||||
|
this.emit('written', r);
|
||||||
this.emit('progress', this.bytesWritten);
|
this.emit('progress', this.bytesWritten);
|
||||||
debug('wrote %o bytes', r)
|
debug('wrote %o bytes', r)
|
||||||
if (r === 0) {
|
if (r === 0) {
|
||||||
|
|
Loading…
Reference in a new issue