diff --git a/classes/AudioBuffer.js b/classes/AudioBuffer.js new file mode 100644 index 0000000..2487eea --- /dev/null +++ b/classes/AudioBuffer.js @@ -0,0 +1,262 @@ +const EventEmitter = require('events'); +const { Readable, Duplex } = require('stream'); + +class StreamBuffer extends EventEmitter { + + constructor(threshold, inputStream, outputStream) { + super(); + this.#setupBuffer(threshold); + this.#setupStreams(inputStream, outputStream); + } + + resume() { + this.streams.buffer.resume(); + } + + pause() { + this.streams.buffer.pause(); + } + + #setupBuffer(threshold) { + this.size = 0; + if (threshold === undefined || isNaN(threshold)) { + // 64 mb + threshold = 67108864; + } + this.threshold = { + value: threshold, + announced: false + }; + this.limit = { + value: config?.buffer?.limit, + announced: false + }; + if (isNaN(this.limit.value) || this.limit.value < this.threshold.value) { + this.limit.value = this.threshold.value; + } + } + + #setupStreams(inputStream, outputStream) { + if (inputStream === undefined || outputStream === undefined) { + return; + } + this.streams = { + input: inputStream, + output: outputStream, + buffer: new Readable() + }; + this.streams.buffer._read = () => { }; + this.streams.buffer.buffered = 0; + this.streams.buffer.tmp = 0; + this.#handleBufferStream(); + this.#handleInputStream(); + this.#handleOutputStream(); + } + + #isThresholdReached() { + return this.streams.buffer.buffered >= this.threshold.value; + } + + #isThresholdAnnounced() { + return this.threshold.announced === true; + } + + #isLimitReached() { + return this.streams.buffer.buffered >= this.limit.value; + } + + #handleBufferStream() { + if (this.streams.buffer === undefined) { + return; + } + this.streams.buffer.pause(); + this.streams.buffer.on('data', (data) => { + const chunkSize = data.length; + const flushed = this.streams.output.write(data); + if (flushed !== true) { + // logger.warn('backpressure detected...'); + this.streams.buffer.tmp = chunkSize; + this.streams.buffer.pause(); + } else { + this.streams.buffer.buffered -= chunkSize; + } + if (this.#isLimitReached() === false && this.streams.input.isPaused() === true) { + // logger.debug('buffer fell below limit of \'' + this.limit.value + '\' bytes, resuming input stream...'); + this.streams.input.resume(); + } else if (this.#isLimitReached() === true && this.streams.input.isPaused() === false) { + // logger.debug('buffer reached limit of \'' + this.limit.value + '\' bytes, pausing input stream...'); + this.streams.input.pause(); + } + }); + } + + #handleInputStream() { + if (this.streams.input === undefined) { + return; + } + this.streams.input.on('data', (data) => { + this.streams.buffer.buffered += data.length; + this.streams.buffer.push(data); + if (this.#isThresholdReached() === true && !this.#isThresholdAnnounced() === true) { + this.threshold.announced = true; + this.emit(constants.BUFFER_THRESHOLD); + logger.debug('buffer reached threshold of ' + this.threshold.value + ' bytes'); + } + if (this.#isLimitReached()) { + // logger.debug('buffer reached limit of ' + this.limit.value + ' bytes, pausing input stream...'); + this.streams.input.pause(); + // this.streams.buffer.resume(); + } + }); + } + + #handleOutputStream() { + if (this.streams.output === undefined) { + return; + } + this.streams.output.on('drain', () => { + // logger.warn('SPEAKER DRAINED - RESUMING DUPLEX STREAM'); + this.streams.buffer.buffered -= this.streams.buffer.tmp; + this.streams.buffer.resume(); + if (this.streams.input.isPaused() && this.streams.buffer.buffered < this.limit.value) { + // logger.warn('RESUME READ STREAM - BUFFER LIMIT NOT REACHED') + this.streams.input.resume(); + } + }); + this.streams.output.on('flush', () => { + logger.debug('speaker flushed'); + }); + this.streams.output.on('close', () => { + logger.debug('speaker closed'); + }); + this.streams.output.on('progress', (progress) => { + // logger.warn('SPEAKER PROGRESS: ' + progress); + }); + this.streams.output.on('hiccup', () => { + if (this.hiccups === undefined) { + this.hiccups = 1; + } else { + this.hiccups++; + } + logger.warn('HICKUP #' + this.hiccups + ' DETECTED'); + }); + } + + // #handleTransferStream() { + // if (this.streams.transfer === undefined) { + // return; + // } + // this.streams.transfer.pause(); + // this.streams.transfer._read = () => { }; + // this.streams.transfer.on('data', (data) => { + // if (data.length === 65483) { + // logger.warn('POSSIBLE ERROR!'); + // } + // const flushed = this.streams.output.write(data, (error) => { + // const derp = this.streams.output; + // if (error !== undefined) { + // logger.error('FUCK MY LIFE'); + // } + // }); + // if (flushed === false) { + // this.streams.transfer.pause(); + // } + // if (this.streams.transfer.readableLength < this.limit.value && this.streams.input.isPaused()) { + // logger.warn('RESUMING READ STREAM - TRANSFER STRAM UNDERCUT LIMIT'); + // this.streams.input.resume(); + // } + // }); + // this.streams.transfer.on('close', () => { + // logger.debug('transfer stream of stream buffer closed'); + // this.streams.transfer.destroy(); + // }); + // this.streams.transfer.on('error', (error) => { + // logger.error('transfer stream of stream buffer encountered an unexpected error: ' + error); + // }); + // this.streams.transfer.on('resume', () => { + // logger.debug('transfer stream of stream buffer is resumed'); + // }); + // this.streams.transfer.on('pause', () => { + // logger.debug('transfer stream of stream buffer is paused'); + // }); + // } + + // #handleInputStream() { + // if (this.streams.input === undefined) { + // return; + // } + // this.streams.input.on('data', (data) => { + // const flushed = this.streams.output.write(data); + // if (this.streams.output.writableCorked > 0) { + // if (this.streams.output.writableLength >= this.threshold.value && this.threshold.announced === false) { + // logger.debug('output stream\'s internal buffer reached threshold of ' + this.limit.value + ' bytes'); + // this.emit(constants.BUFFER_THRESHOLD); + // } + // if (this.streams.output.writableLength >= this.limit.value) { + // logger.debug('output stream\'s internal buffer reached limit of ' + this.limit.value + ' bytes, pausing input stream...'); + // this.streams.input.pause(); + // if (this.limit.announced === false) { + // this.emit(constants.BUFFER_LIMIT); + // } + // } + // return; + // } + // if (flushed === false && !this.streams.input.isPaused()) { + // logger.warn('BACKPRESSURE FROM INPUT STREAM - PAUSING INPUT STREAM'); + // this.streams.input.pause(); + // } + + + + // // this.streams.transfer.push(data); + // // if (this.streams.transfer.readableLength >= this.threshold.value && this.threshold.announced === false) { + // // logger.debug('transfer stream reached threshold of ' + this.limit.value + ' bytes'); + // // this.emit(constants.BUFFER_THRESHOLD); + // // } + // // if (this.streams.transfer.readableLength >= this.limit.value) { + // // logger.debug('transfer stream reached limit of ' + this.limit.value + ' bytes, pausing read stream...'); + // // this.streams.input.pause(); + // // if (this.limit.announced === false) { + // // this.emit(constants.BUFFER_LIMIT); + // // } + // // } + // }); + // this.streams.input.on('close', () => { + // logger.debug('input stream of stream buffer closed'); + // this.streams.input.destroy(); + // }); + // this.streams.input.on('error', (error) => { + // logger.error('input stream of stream buffer encountered an unexpected error: ' + error); + // }); + // this.streams.input.on('resume', () => { + // logger.debug('input stream of stream buffer is resumed'); + // }); + // this.streams.input.on('pause', () => { + // logger.debug('input stream of stream buffer is paused'); + // }); + // } + + // #handleOutputStream() { + // if (this.streams.output === undefined) { + // return; + // } + // this.streams.output.on('drain', () => { + // logger.debug('output stream of stream buffer is drained, resuming input stream...'); + // // this.streams.input.resume(); + // }); + // this.streams.output.on('resume', () => { + // logger.debug('output stream of stream buffer is resumed'); + // }); + // this.streams.output.on('pause', () => { + // logger.debug('output stream of stream buffer is paused'); + // }); + // this.streams.output.on('close', () => { + // logger.debug('output stream of stream buffer closed'); + // }); + // this.streams.output.on('error', (error) => { + // logger.error('output stream of stream buffer encountered an unexpected error: ' + error); + // }); + // } +} + +module.exports = StreamBuffer; \ No newline at end of file diff --git a/classes/Audiostream.js b/classes/Audiostream.js index 5699ef3..ceb7366 100644 --- a/classes/Audiostream.js +++ b/classes/Audiostream.js @@ -40,7 +40,7 @@ class Audiostream { this.eventParser.on('audio:play', (data) => { logger.debug('handling event \'audio:play\'...'); // global.player.play(data?.position); - global.player.speak(); + global.player.play(); }); this.eventParser.on('audio:pause', (data) => { logger.debug('handling event \'audio:pause\'...'); @@ -68,12 +68,9 @@ class Audiostream { socket.on('error', (error) => { logger.error('error connecting to audio server \'' + this.getTag() + '\': ' + error); }); - socket.on('end', () => { - logger.info('connection to audio server \'' + this.getTag() + '\' ended'); - }); socket.on('close', () => { logger.info('connection to audio server \'' + this.getTag() + '\' closed'); - global.player.stopFeed(); + // global.player.stopFeed(); }); } } diff --git a/classes/Connection.js b/classes/Connection.js index dbb6c64..4ec5ba8 100644 --- a/classes/Connection.js +++ b/classes/Connection.js @@ -61,9 +61,6 @@ class Connection { socket.on('close', () => { this.#handleEventClose(resolve); }); - socket.on('end', () => { - this.#handleEventEnd(); - }); socket.on('data', (data) => { this.#handleEventData(data); }); @@ -82,10 +79,6 @@ class Connection { return reject('error connecting to communication server \'' + this.getTag() + '\': ' + error); } - #handleEventEnd() { - logger.info('connection to communication server \'' + this.getTag() + '\' ended'); - } - #handleEventClose(resolve) { logger.info('connection to communication server \'' + this.getTag() + '\' closed'); this.destroy(); @@ -114,7 +107,6 @@ class Connection { this.socket.removeAllListeners('error'); this.socket.removeAllListeners('timeout'); this.socket.removeAllListeners('close'); - this.socket.removeAllListeners('end'); this.socket.removeAllListeners('data'); this.socket.end(); this.socket.destroy(); diff --git a/classes/Player.js b/classes/Player.js index bc129e4..f122cb0 100644 --- a/classes/Player.js +++ b/classes/Player.js @@ -1,10 +1,13 @@ -const NodeSpeaker = require('speaker'); +const NodeSpeaker = require('../libs/speaker/index.js'); const EventEmitter = require('events'); const { spawn } = require('child_process'); const createWriteStream = require('fs').createWriteStream; const unlink = require('fs/promises').unlink; const resolve = require('path').resolve; +const AudioBuffer = require('./AudioBuffer.js'); +const { STATE_PLAYING } = require('../libs/constants.js'); + class Player extends EventEmitter { constructor() { @@ -24,106 +27,21 @@ class Player extends EventEmitter { async prepare(threshold, stream) { logger.debug('preparing audio player...'); await this.#reset(); - // await this.#removeTemporaryFile(); - this.buffer.threshold = threshold; - this.stream = stream; - // this.tmp.stream = createWriteStream(this.tmp.file); - // this.speaker = new Speaker(speakeroptions.channel, speakeroptions.bitDepth, speakeroptions.sampleRate); + this.speaker = new NodeSpeaker({ channels: 2, bitDepth: 16, sampleRate: 44100 }); - this.speaker.on('open', () => { - logger.debug('speaker opened...'); - }); - this.speaker.on('flush', () => { - logger.debug('speaker flushed...'); - }); - this.speaker.on('close', () => { - logger.debug('speaker closed...'); - }); - this.speaker.on('drain', () => { - if (this.isPlaying() === false) { - return; - } - if (this.buffer.size < this.buffer.limit) { - logger.debug('fell below buffer limit of ' + this.buffer.limit + ' bytes, resuming read stream...'); - this.stream.resume(); - } - }); - this.buffer.limit = config?.buffer?.limit; - if (isNaN(this.buffer.limit) || this.buffer.limit < this.buffer.threshold) { - this.buffer.limit = this.buffer.threshold; - } - this.#fillBuffer(); - } - - #fillBuffer() { - this.stream.on('data', (data) => { - this.buffer.size += data.length; - this.buffer.elements.push(data); - if (this.buffer.announced === undefined && this.buffer.size >= this.buffer.threshold) { - this.buffer.announced = true; - this.#setState(constants.STATE_READY); - logger.debug('buffer threshold of ' + this.buffer.threshold + ' bytes reached after ' + (Date.now() - this.timestamp) + 'ms'); - } - if (this.buffer.size >= this.buffer.limit) { - logger.debug('buffer limit of ' + this.buffer.limit + ' bytes reached, pausing read stream...'); - this.stream.pause(); - this.playFromBuffer(); - } + this.audiobuffer = new AudioBuffer(threshold, stream, this.speaker); + this.audiobuffer.on(constants.BUFFER_THRESHOLD, () => { + this.#setState(constants.STATE_READY); }); } - speak() { - this.#setState(constants.STATE_PLAYING); - this.playFromBuffer(); - } - - playFromBuffer() { - const tmp = this.buffer.elements[0]; - if (tmp === undefined) { - return; - } - this.buffer.elements.shift(); - this.buffer.size -= tmp.length; - if (this.speaker.write(tmp) === false) { - this.stream.pause(); - } - } - - // async feed(buffer) { - // this.tmp.stream.write(buffer); - // if (this.tmp.announced === undefined && this.tmp.stream.bytesWritten >= this.threshold) { - // this.tmp.announced = true; - // this.#setState(constants.STATE_READY); - // logger.debug('threshold of ' + this.threshold + ' bytes reached after ' + (Date.now() - this.timestamp) + 'ms'); - // } - // } - - stopFeed() { - // logger.debug('finished writing of ' + this.tmp.stream.bytesWritten + ' bytes after ' + (Date.now() - this.timestamp) + 'ms'); - // this.tmp.stream.end(); - // this.tmp.stream.close(); - } - - async play(position) { - if (this.isPlaying()) { - await this.stop(); - } - await this.#spawnProcess(position); - this.process.stderr.on('data', (data) => { - data = data.toString(); - const position = data.toString().trim().split(' ')[0]; - if (position.length === 0 || isNaN(position)) { - return; - } - this.position = position; - }); - this.process.stdin.on('error', (error) => { - this.#setState(constants.STATE_ERROR, error); - }); + play() { + this.#setState(STATE_PLAYING); + this.audiobuffer.resume(); } async pause() { diff --git a/example_config.json b/example_config.json index 71b5513..f1275ff 100644 --- a/example_config.json +++ b/example_config.json @@ -13,7 +13,7 @@ "delay": 1000 }, "buffer": { - "limit": 10 + "limit": 40960 }, "tmp": "/tmp/kannon.tmp" } \ No newline at end of file diff --git a/kannon-client.js b/kannon-client.js index 9a1f992..b02e8c1 100644 --- a/kannon-client.js +++ b/kannon-client.js @@ -3,7 +3,7 @@ const path = require('path'); const Connection = require('./classes/Connection.js'); const Logger = require('./classes/Logger.js'); -const Player = require('./classes/Player'); +const Player = require('./classes/Player.js'); const INTERRUPTS = ['beforeExit', 'SIGINT', 'SIGTERM']; diff --git a/libs/constants.js b/libs/constants.js index b7d6973..1e43403 100644 --- a/libs/constants.js +++ b/libs/constants.js @@ -8,5 +8,8 @@ module.exports = { STATE_STOPPED: 'stopped', STATE_ERROR: 'error', + BUFFER_THRESHOLD: 'threshold', + BUFFER_LIMIT: 'limit', + EVENT_DELIMITER: '<<< kannon >>>' } \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 94c50b9..5c4acfc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,58 +9,9 @@ "version": "0.0.1", "license": "MIT", "dependencies": { - "moment": "^2.29.1", - "speaker": "^0.5.4" + "moment": "^2.29.1" } }, - "node_modules/bindings": { - "version": "1.5.0", - "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.5.0.tgz", - "integrity": "sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ==", - "dependencies": { - "file-uri-to-path": "1.0.0" - } - }, - "node_modules/buffer-alloc": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/buffer-alloc/-/buffer-alloc-1.2.0.tgz", - "integrity": "sha512-CFsHQgjtW1UChdXgbyJGtnm+O/uLQeZdtbDo8mfUgYXCHSM1wgrVxXm6bSyrUuErEb+4sYVGCzASBRot7zyrow==", - "dependencies": { - "buffer-alloc-unsafe": "^1.1.0", - "buffer-fill": "^1.0.0" - } - }, - "node_modules/buffer-alloc-unsafe": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/buffer-alloc-unsafe/-/buffer-alloc-unsafe-1.1.0.tgz", - "integrity": "sha512-TEM2iMIEQdJ2yjPJoSIsldnleVaAk1oW3DBVUykyOLsEsFmEc9kn+SFFPz+gl54KQNxlDnAwCXosOS9Okx2xAg==" - }, - "node_modules/buffer-fill": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/buffer-fill/-/buffer-fill-1.0.0.tgz", - "integrity": "sha1-+PeLdniYiO858gXNY39o5wISKyw=" - }, - "node_modules/debug": { - "version": "4.3.4", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", - "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dependencies": { - "ms": "2.1.2" - }, - "engines": { - "node": ">=6.0" - }, - "peerDependenciesMeta": { - "supports-color": { - "optional": true - } - } - }, - "node_modules/file-uri-to-path": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/file-uri-to-path/-/file-uri-to-path-1.0.0.tgz", - "integrity": "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw==" - }, "node_modules/moment": { "version": "2.29.2", "resolved": "https://registry.npmjs.org/moment/-/moment-2.29.2.tgz", @@ -68,87 +19,13 @@ "engines": { "node": "*" } - }, - "node_modules/ms": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" - }, - "node_modules/speaker": { - "version": "0.5.4", - "resolved": "https://registry.npmjs.org/speaker/-/speaker-0.5.4.tgz", - "integrity": "sha512-0I35CJGgqU1rd/a3qVysR5gLlG+8QlzJcPAEnYvT0BLfuLdJ7JNdlQHwbh7ETNcXDXbzm2O148GEAoAER54Dvw==", - "hasInstallScript": true, - "dependencies": { - "bindings": "^1.3.0", - "buffer-alloc": "^1.1.0", - "debug": "^4.0.0" - }, - "engines": { - "node": ">=8.6" - } } }, "dependencies": { - "bindings": { - "version": "1.5.0", - "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.5.0.tgz", - "integrity": "sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ==", - "requires": { - "file-uri-to-path": "1.0.0" - } - }, - "buffer-alloc": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/buffer-alloc/-/buffer-alloc-1.2.0.tgz", - "integrity": "sha512-CFsHQgjtW1UChdXgbyJGtnm+O/uLQeZdtbDo8mfUgYXCHSM1wgrVxXm6bSyrUuErEb+4sYVGCzASBRot7zyrow==", - "requires": { - "buffer-alloc-unsafe": "^1.1.0", - "buffer-fill": "^1.0.0" - } - }, - "buffer-alloc-unsafe": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/buffer-alloc-unsafe/-/buffer-alloc-unsafe-1.1.0.tgz", - "integrity": "sha512-TEM2iMIEQdJ2yjPJoSIsldnleVaAk1oW3DBVUykyOLsEsFmEc9kn+SFFPz+gl54KQNxlDnAwCXosOS9Okx2xAg==" - }, - "buffer-fill": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/buffer-fill/-/buffer-fill-1.0.0.tgz", - "integrity": "sha1-+PeLdniYiO858gXNY39o5wISKyw=" - }, - "debug": { - "version": "4.3.4", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", - "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "requires": { - "ms": "2.1.2" - } - }, - "file-uri-to-path": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/file-uri-to-path/-/file-uri-to-path-1.0.0.tgz", - "integrity": "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw==" - }, "moment": { "version": "2.29.2", "resolved": "https://registry.npmjs.org/moment/-/moment-2.29.2.tgz", "integrity": "sha512-UgzG4rvxYpN15jgCmVJwac49h9ly9NurikMWGPdVxm8GZD6XjkKPxDTjQQ43gtGgnV3X0cAyWDdP2Wexoquifg==" - }, - "ms": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" - }, - "speaker": { - "version": "0.5.4", - "resolved": "https://registry.npmjs.org/speaker/-/speaker-0.5.4.tgz", - "integrity": "sha512-0I35CJGgqU1rd/a3qVysR5gLlG+8QlzJcPAEnYvT0BLfuLdJ7JNdlQHwbh7ETNcXDXbzm2O148GEAoAER54Dvw==", - "requires": { - "bindings": "^1.3.0", - "buffer-alloc": "^1.1.0", - "debug": "^4.0.0" - } } } } diff --git a/package.json b/package.json index 0c84844..8cb61f5 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,6 @@ "url": "https://git.velvettear.de/velvettear/kannon-client.git" }, "dependencies": { - "moment": "^2.29.1", - "speaker": "^0.5.4" + "moment": "^2.29.1" } -} +} \ No newline at end of file