diff --git a/classes/AudioBuffer.js b/classes/AudioBuffer.js index 2487eea..42da365 100644 --- a/classes/AudioBuffer.js +++ b/classes/AudioBuffer.js @@ -1,27 +1,50 @@ +const NodeSpeaker = require('../libs/speaker/index.js'); const EventEmitter = require('events'); -const { Readable, Duplex } = require('stream'); +const { Readable } = require('stream'); class StreamBuffer extends EventEmitter { - constructor(threshold, inputStream, outputStream) { + constructor(stream, settings) { super(); - this.#setupBuffer(threshold); - this.#setupStreams(inputStream, outputStream); + this.times = { + start: Date.now() + }; + this.playback = { + paused: false, + progress: 0, + hiccups: 0 + }; + this.#setupBuffer(settings?.threshold); + this.#setupStreams(stream, settings?.audio); } resume() { - this.streams.buffer.resume(); + this.#writeBufferedChunk(4096, true); } pause() { - this.streams.buffer.pause(); + this.playback.paused = true; + } + + stop() { + this.#destroy(); + } + + getProgress() { + return this.playback.progress; + } + + getHiccups() { + return this.playback.hiccups; + } + + getTimes() { + return this.times; } #setupBuffer(threshold) { - this.size = 0; if (threshold === undefined || isNaN(threshold)) { - // 64 mb - threshold = 67108864; + threshold = 4096; } this.threshold = { value: threshold, @@ -36,25 +59,27 @@ class StreamBuffer extends EventEmitter { } } - #setupStreams(inputStream, outputStream) { - if (inputStream === undefined || outputStream === undefined) { + #setupStreams(stream, audioSettings) { + if (stream === undefined) { return; } this.streams = { - input: inputStream, - output: outputStream, + input: stream, + output: new NodeSpeaker({ + channels: audioSettings?.channels || 2, + bitDepth: audioSettings?.bitDepth || 16, + sampleRate: audioSettings?.sampleRate || 44100, + }), buffer: new Readable() }; this.streams.buffer._read = () => { }; - this.streams.buffer.buffered = 0; - this.streams.buffer.tmp = 0; this.#handleBufferStream(); this.#handleInputStream(); this.#handleOutputStream(); } #isThresholdReached() { - return this.streams.buffer.buffered >= this.threshold.value; + return this.streams.buffer.readableLength >= this.threshold.value; } #isThresholdAnnounced() { @@ -62,31 +87,76 @@ class StreamBuffer extends EventEmitter { } #isLimitReached() { - return this.streams.buffer.buffered >= this.limit.value; + return this.streams.buffer.readableLength >= this.limit.value; + } + + #writeBufferedChunk(chunkSize, resume) { + if (resume === true) { + this.playback.paused = false; + this.emit('play'); + } else if (this.playback.paused) { + this.emit('pause'); + return; + } + if (chunkSize === undefined || isNaN(chunkSize)) { + chunkSize = 4096; + } + if (this.#isThresholdAnnounced === false) { + return; + } + let data; + if (this.streams.buffer.readableLength >= chunkSize) { + data = this.streams.buffer.read(chunkSize); + } else { + data = this.streams.buffer.read(); + } + if (data === undefined || data == null) { + return; + } + this.streams.output.write(data); + } + + #fillBuffer() { + const rebuffered = this.#rebufferChunk(this.threshold.value); + if (rebuffered === undefined || this.#isThresholdAnnounced() === true) { + return; + } + if (this.#isThresholdReached()) { + this.times.threshold = Date.now(); + this.threshold.announced = true; + this.emit(constants.THRESHOLD); + logger.debug('buffer reached threshold of ' + this.threshold.value + ' bytes after ' + (this.times.threshold - this.times.start) + 'ms...'); + return; + } + this.#fillBuffer(); + } + + #rebufferChunk(chunkSize) { + if (chunkSize === undefined || isNaN(chunkSize)) { + chunkSize = 4096; + } + if (this.#isLimitReached()) { + return; + } + let data; + if (this.streams.input.readableLength >= chunkSize) { + data = this.streams.input.read(chunkSize); + } else { + data = this.streams.input.read(); + } + if (data === undefined || data === null) { + return; + } + this.streams.buffer.push(data); + return data.length; } #handleBufferStream() { if (this.streams.buffer === undefined) { return; } - this.streams.buffer.pause(); - this.streams.buffer.on('data', (data) => { - const chunkSize = data.length; - const flushed = this.streams.output.write(data); - if (flushed !== true) { - // logger.warn('backpressure detected...'); - this.streams.buffer.tmp = chunkSize; - this.streams.buffer.pause(); - } else { - this.streams.buffer.buffered -= chunkSize; - } - if (this.#isLimitReached() === false && this.streams.input.isPaused() === true) { - // logger.debug('buffer fell below limit of \'' + this.limit.value + '\' bytes, resuming input stream...'); - this.streams.input.resume(); - } else if (this.#isLimitReached() === true && this.streams.input.isPaused() === false) { - // logger.debug('buffer reached limit of \'' + this.limit.value + '\' bytes, pausing input stream...'); - this.streams.input.pause(); - } + this.streams.buffer.on('error', (error) => { + logger.error('buffer stream encountered an error: ' + error); }); } @@ -94,19 +164,15 @@ class StreamBuffer extends EventEmitter { if (this.streams.input === undefined) { return; } - this.streams.input.on('data', (data) => { - this.streams.buffer.buffered += data.length; - this.streams.buffer.push(data); - if (this.#isThresholdReached() === true && !this.#isThresholdAnnounced() === true) { - this.threshold.announced = true; - this.emit(constants.BUFFER_THRESHOLD); - logger.debug('buffer reached threshold of ' + this.threshold.value + ' bytes'); - } - if (this.#isLimitReached()) { - // logger.debug('buffer reached limit of ' + this.limit.value + ' bytes, pausing input stream...'); - this.streams.input.pause(); - // this.streams.buffer.resume(); - } + this.streams.input.on('error', (error) => { + logger.error('input stream encountered an error: ' + error); + }); + this.streams.input.on('readable', () => { + this.#fillBuffer(); + }); + this.streams.input.on('close', () => { + this.times.transmitted = Date.now(); + logger.debug('input stream closed, transmitting file took ' + (this.times.transmitted - this.times.start) + 'ms'); }); } @@ -114,149 +180,38 @@ class StreamBuffer extends EventEmitter { if (this.streams.output === undefined) { return; } + this.streams.output.on('error', () => { + logger.error('output stream encountered an error: ' + error); + }); + this.streams.output.on('written', (bytes) => { + this.#writeBufferedChunk(bytes); + this.#rebufferChunk(bytes); + }); this.streams.output.on('drain', () => { - // logger.warn('SPEAKER DRAINED - RESUMING DUPLEX STREAM'); - this.streams.buffer.buffered -= this.streams.buffer.tmp; - this.streams.buffer.resume(); - if (this.streams.input.isPaused() && this.streams.buffer.buffered < this.limit.value) { - // logger.warn('RESUME READ STREAM - BUFFER LIMIT NOT REACHED') - this.streams.input.resume(); - } - }); - this.streams.output.on('flush', () => { - logger.debug('speaker flushed'); - }); - this.streams.output.on('close', () => { - logger.debug('speaker closed'); + this.times.drained = Date.now(); + logger.debug('output stream is drained, file should be completely written to the speaker after ' + (this.times.drained - this.times.start) + 'ms'); + this.#destroy(); }); this.streams.output.on('progress', (progress) => { - // logger.warn('SPEAKER PROGRESS: ' + progress); + this.playback.progress = progress; }); this.streams.output.on('hiccup', () => { - if (this.hiccups === undefined) { - this.hiccups = 1; - } else { - this.hiccups++; - } - logger.warn('HICKUP #' + this.hiccups + ' DETECTED'); + this.playback.hiccups++; + logger.warn('hiccup ' + this.playback.hiccups + ' detected...'); }); } - // #handleTransferStream() { - // if (this.streams.transfer === undefined) { - // return; - // } - // this.streams.transfer.pause(); - // this.streams.transfer._read = () => { }; - // this.streams.transfer.on('data', (data) => { - // if (data.length === 65483) { - // logger.warn('POSSIBLE ERROR!'); - // } - // const flushed = this.streams.output.write(data, (error) => { - // const derp = this.streams.output; - // if (error !== undefined) { - // logger.error('FUCK MY LIFE'); - // } - // }); - // if (flushed === false) { - // this.streams.transfer.pause(); - // } - // if (this.streams.transfer.readableLength < this.limit.value && this.streams.input.isPaused()) { - // logger.warn('RESUMING READ STREAM - TRANSFER STRAM UNDERCUT LIMIT'); - // this.streams.input.resume(); - // } - // }); - // this.streams.transfer.on('close', () => { - // logger.debug('transfer stream of stream buffer closed'); - // this.streams.transfer.destroy(); - // }); - // this.streams.transfer.on('error', (error) => { - // logger.error('transfer stream of stream buffer encountered an unexpected error: ' + error); - // }); - // this.streams.transfer.on('resume', () => { - // logger.debug('transfer stream of stream buffer is resumed'); - // }); - // this.streams.transfer.on('pause', () => { - // logger.debug('transfer stream of stream buffer is paused'); - // }); - // } - - // #handleInputStream() { - // if (this.streams.input === undefined) { - // return; - // } - // this.streams.input.on('data', (data) => { - // const flushed = this.streams.output.write(data); - // if (this.streams.output.writableCorked > 0) { - // if (this.streams.output.writableLength >= this.threshold.value && this.threshold.announced === false) { - // logger.debug('output stream\'s internal buffer reached threshold of ' + this.limit.value + ' bytes'); - // this.emit(constants.BUFFER_THRESHOLD); - // } - // if (this.streams.output.writableLength >= this.limit.value) { - // logger.debug('output stream\'s internal buffer reached limit of ' + this.limit.value + ' bytes, pausing input stream...'); - // this.streams.input.pause(); - // if (this.limit.announced === false) { - // this.emit(constants.BUFFER_LIMIT); - // } - // } - // return; - // } - // if (flushed === false && !this.streams.input.isPaused()) { - // logger.warn('BACKPRESSURE FROM INPUT STREAM - PAUSING INPUT STREAM'); - // this.streams.input.pause(); - // } - - - - // // this.streams.transfer.push(data); - // // if (this.streams.transfer.readableLength >= this.threshold.value && this.threshold.announced === false) { - // // logger.debug('transfer stream reached threshold of ' + this.limit.value + ' bytes'); - // // this.emit(constants.BUFFER_THRESHOLD); - // // } - // // if (this.streams.transfer.readableLength >= this.limit.value) { - // // logger.debug('transfer stream reached limit of ' + this.limit.value + ' bytes, pausing read stream...'); - // // this.streams.input.pause(); - // // if (this.limit.announced === false) { - // // this.emit(constants.BUFFER_LIMIT); - // // } - // // } - // }); - // this.streams.input.on('close', () => { - // logger.debug('input stream of stream buffer closed'); - // this.streams.input.destroy(); - // }); - // this.streams.input.on('error', (error) => { - // logger.error('input stream of stream buffer encountered an unexpected error: ' + error); - // }); - // this.streams.input.on('resume', () => { - // logger.debug('input stream of stream buffer is resumed'); - // }); - // this.streams.input.on('pause', () => { - // logger.debug('input stream of stream buffer is paused'); - // }); - // } - - // #handleOutputStream() { - // if (this.streams.output === undefined) { - // return; - // } - // this.streams.output.on('drain', () => { - // logger.debug('output stream of stream buffer is drained, resuming input stream...'); - // // this.streams.input.resume(); - // }); - // this.streams.output.on('resume', () => { - // logger.debug('output stream of stream buffer is resumed'); - // }); - // this.streams.output.on('pause', () => { - // logger.debug('output stream of stream buffer is paused'); - // }); - // this.streams.output.on('close', () => { - // logger.debug('output stream of stream buffer closed'); - // }); - // this.streams.output.on('error', (error) => { - // logger.error('output stream of stream buffer encountered an unexpected error: ' + error); - // }); - // } + #destroy() { + for (const key in this.streams) { + const stream = this.streams[key]; + if (stream.destroyed === true) { + continue; + } + stream.destroy(); + } + this.times.stop = Date.now(); + this.emit('close'); + } } module.exports = StreamBuffer; \ No newline at end of file diff --git a/classes/Audiostream.js b/classes/Audiostream.js index ceb7366..7a1f2fd 100644 --- a/classes/Audiostream.js +++ b/classes/Audiostream.js @@ -1,5 +1,5 @@ const net = require('net'); -const { sleep } = require('../libs/util'); +const constants = require('../libs/constants'); const Message = require('./Message'); class Audiostream { @@ -25,7 +25,7 @@ class Audiostream { this.host = config?.server?.host || "127.0.0.1"; this.port = data.port; this.clientId = data.clientId; - this.threshold = data.threshold; + this.settings = data.settings; this.#handleSocket(net.connect({ host: this.getHost(), port: this.getPort() @@ -37,24 +37,23 @@ class Audiostream { logger.debug('handling event \'audio:initialize\'...'); this.#initialize(data); }); - this.eventParser.on('audio:play', (data) => { - logger.debug('handling event \'audio:play\'...'); - // global.player.play(data?.position); + this.eventParser.on(constants.AUDIO_PLAY, (data) => { + logger.debug('handling event \'' + constants.AUDIO_PLAY + '\'...'); global.player.play(); }); - this.eventParser.on('audio:pause', (data) => { - logger.debug('handling event \'audio:pause\'...'); + this.eventParser.on(constants.AUDIO_PAUSE, (data) => { + logger.debug('handling event \'' + constants.AUDIO_PAUSE + '\'...'); global.player.pause(); }); - this.eventParser.on('audio:stop', () => { - logger.debug('handling event \'audio:stop\'...'); + this.eventParser.on(constants.AUDIO_STOP, () => { + logger.debug('handling event \'' + constants.AUDIO_STOP + '\'...'); global.player.stop(); }); - global.player.on('statechange', (data) => { - new Message('audio:state', { + global.player.on(constants.STATECHANGE, (data) => { + new Message(constants.AUDIO_STATE, { clientId: this.clientId, state: data.state, - position: global.player.getPosition() + progress: data.progress }).send(); }); } @@ -62,7 +61,7 @@ class Audiostream { #handleSocket(socket) { socket.on('connect', async () => { logger.debug('connected to audio server \'' + this.getTag() + '\'...'); - await global.player.prepare(this.threshold, socket); + await global.player.prepare(socket, this.settings); new Message('audio:register', { clientId: this.clientId, port: socket.localPort }).send(); }); socket.on('error', (error) => { @@ -70,7 +69,6 @@ class Audiostream { }); socket.on('close', () => { logger.info('connection to audio server \'' + this.getTag() + '\' closed'); - // global.player.stopFeed(); }); } } diff --git a/classes/Player.js b/classes/Player.js index f122cb0..30d7fc7 100644 --- a/classes/Player.js +++ b/classes/Player.js @@ -1,106 +1,68 @@ -const NodeSpeaker = require('../libs/speaker/index.js'); const EventEmitter = require('events'); -const { spawn } = require('child_process'); -const createWriteStream = require('fs').createWriteStream; -const unlink = require('fs/promises').unlink; -const resolve = require('path').resolve; const AudioBuffer = require('./AudioBuffer.js'); -const { STATE_PLAYING } = require('../libs/constants.js'); class Player extends EventEmitter { constructor() { super(); - this.timestamp = Date.now(); - this.position = 0; this.events = []; - this.tmp = { - file: resolve(global.config?.tmp || '/tmp/kannon.tmp') - }; - this.buffer = { - size: 0, - elements: [] - }; } - async prepare(threshold, stream) { + async prepare(stream, settings) { logger.debug('preparing audio player...'); - await this.#reset(); - - this.speaker = new NodeSpeaker({ - channels: 2, - bitDepth: 16, - sampleRate: 44100 + this.#reset(); + this.audiobuffer = new AudioBuffer(stream, settings); + this.audiobuffer.on(constants.THRESHOLD, () => { + this.#setState(constants.READY); }); - this.audiobuffer = new AudioBuffer(threshold, stream, this.speaker); - this.audiobuffer.on(constants.BUFFER_THRESHOLD, () => { - this.#setState(constants.STATE_READY); + this.audiobuffer.on('close', () => { + this.#setState(constants.STOPPED); }); + this.audiobuffer.on('play', () => { + this.#setState(constants.PLAYING); + }); + this.audiobuffer.on('pause', () => { + this.#setState(constants.PAUSED); + }) } play() { - this.#setState(STATE_PLAYING); this.audiobuffer.resume(); } - async pause() { - await this.#reset(true); - this.#setState(constants.STATE_PAUSED); + pause() { + this.audiobuffer.pause(); } async stop() { - await this.#reset(); - this.#setState(constants.STATE_STOPPED); + this.audiobuffer.stop(); + this.#reset(); + this.#setState(constants.STOPPED); } isReady() { - return this.state === constants.STATE_READY; + return this.state === constants.READY; } isPlaying() { - return this.state === constants.STATE_PLAYING; + return this.state === constants.PLAYING; } isPaused() { - return this.state === constants.STATE_PAUSED; + return this.state === constants.PAUSED; } isFinished() { - return this.state === constants.STATE_STOPPED; + return this.state === constants.STOPPED; } hasError() { - return this.state === constants.STATE_ERROR; + return this.state === constants.ERROR; } - getPosition() { - return this.position; - } - - async #spawnProcess(position) { - return new Promise((resolve, reject) => { - const args = [ - '-vn', - '-nodisp' - ]; - if (this.isPaused() && !isNaN(position)) { - args.unshift('-ss', position); - } - args.push(this.tmp.file); - this.process = spawn("ffplay", args); - this.process.on('error', (error) => { - this.#reset(); - // TODO: try/catch error - reject('error spawning process \'ffplay\': ' + error); - }); - this.process.on('spawn', () => { - logger.info('spawned process \'ffplay\' (pid: ' + this.process.pid + ')...'); - this.#setState(constants.STATE_PLAYING); - resolve(); - }); - }); - + getProgress() { + return this.audiobuffer?.getProgress(); } #setState(state, data) { @@ -114,63 +76,12 @@ class Player extends EventEmitter { } logger.debug('emitting state \'' + state + '\' of audio player...'); this.emit(this.state, { data: data }); - this.emit('statechange', { state: this.state }); + this.emit(constants.STATECHANGE, { state: this.state, progress: this.getProgress() }); this.events.push(state); } - async #killProcess() { - if (this.process === undefined) { - return; - } - const pid = this.process.pid; - this.#closeStdIO(); - if (this.process?.killed === false) { - this.process.kill('SIGTERM'); - } - await new Promise((resolve, reject) => { - this.process.on('close', (code, signal) => { - let msg = 'process \'ffplay\' (pid: ' + pid + ') closed with'; - if (code !== undefined) { - msg += ' code \'' + code + '\''; - } else { - msg += ' signal \'' + signal + '\''; - } - logger.debug(msg); - this.process = undefined; - resolve(); - }); - }); - } - - #closeStdIO() { - if (this.process?.stdio === undefined) { - return; - } - logger.debug('closing all stdio streams of process \'ffplay\' (pid: ' + this.process.pid + ')...'); - for (let index = 0; index < this.process.stdio.length; index++) { - this.process.stdio[index].destroy(); - } - } - - async #removeTemporaryFile() { - try { - await unlink(this.tmp.file); - } catch (error) { - if (error?.code === 'ENOENT') { - return; - } - logger.error('error removing temporary file \'' + this.tmp.file + '\': ' + error); - } - } - - async #reset(paused) { - await this.#killProcess(); - this.timestamp = Date.now(); + #reset() { this.events = []; - if (paused === true) { - return; - } - this.position = 0; } } diff --git a/libs/constants.js b/libs/constants.js index 1e43403..ef66372 100644 --- a/libs/constants.js +++ b/libs/constants.js @@ -1,15 +1,24 @@ module.exports = { - SOCKET_EVENT_PING: 'ping', - SOCKET_EVENT_PONG: 'pong', + PING: 'ping', + PONG: 'pong', - STATE_READY: 'ready', - STATE_PLAYING: 'playing', - STATE_PAUSED: 'paused', - STATE_STOPPED: 'stopped', - STATE_ERROR: 'error', + READY: 'ready', + PLAYING: 'playing', + PAUSED: 'paused', + STOPPED: 'stopped', + ERROR: 'error', + + STATECHANGE: 'statechange', + + THRESHOLD: 'threshold', + HICCUP: 'hiccup', + PROGRESS: 'progress', + + AUDIO_PLAY: 'audio:play', + AUDIO_PAUSE: 'audio:pause', + AUDIO_STOP: 'audio:stop', + AUDIO_STATE: 'audio:state', - BUFFER_THRESHOLD: 'threshold', - BUFFER_LIMIT: 'limit', EVENT_DELIMITER: '<<< kannon >>>' } \ No newline at end of file diff --git a/libs/speaker/index.js b/libs/speaker/index.js index 4a04b62..857a316 100644 --- a/libs/speaker/index.js +++ b/libs/speaker/index.js @@ -207,6 +207,7 @@ class Speaker extends Writable { const onwrite = (r) => { this.bytesWritten += r; + this.emit('written', r); this.emit('progress', this.bytesWritten); debug('wrote %o bytes', r) if (r === 0) {