diff --git a/classes/Api.js b/classes/Api.js index df704b4..559bacd 100644 --- a/classes/Api.js +++ b/classes/Api.js @@ -38,14 +38,14 @@ class Api { if (global.audioserver !== undefined) { await global.audioserver.destroy(); } - global.audioserver = new AudioServer('/home/velvettear/mounts/kingston/public/test.pcm'); + global.audioserver = new AudioServer('/home/velvettear/mounts/kingston/public/YOU_SUFFER.mp3'); }); this.#registerEndpoint(constants.API_RESUME, constants.REQUEST_METHOD_POST, async () => { if (global.audioserver === undefined) { return; } await global.audioserver.destroy(); - global.audioserver = new AudioServer('/home/velvettear/mounts/kingston/public/test.pcm', global.audioserver.progress); + global.audioserver = new AudioServer('/home/velvettear/mounts/kingston/public/YOU_SUFFER.mp3', global.audioserver.progress); }); this.#registerEndpoint(constants.API_PAUSE, constants.REQUEST_METHOD_POST, async () => { if (global.audioserver === undefined) { diff --git a/classes/AudioServer.js b/classes/AudioServer.js index 08c152f..b4d6a32 100644 --- a/classes/AudioServer.js +++ b/classes/AudioServer.js @@ -1,17 +1,14 @@ const net = require('net'); const { stat, open } = require('fs/promises'); - const Message = require('./Message.js'); -const constants = require('../libs/constants.js'); +const PCMStream = require('./PCMStream.js'); class AudioServer { constructor(file, progress) { this.listen = config?.server?.listen || '0.0.0.0'; this.port = 0; - this.buffer = { - file: file - }; + this.file = file; this.clients = []; this.sockets = []; this.broadcasts = {}; @@ -37,36 +34,32 @@ class AudioServer { this.sockets.push(socket); }); this.server.on('error', (err) => { - logger.error('ERROR IN AUDIOSERVER ' + err); reject('audio server encountered an error: ' + err); }); this.server.on('close', () => { logger.info('audio server closed'); }); }); - await this.#prepareBuffer(); + // TODO: GET AUDIO INFO FROM FILE AND PASS TO PCMSTREAM FOR CONVERSION + this.pcm = new PCMStream(this.file, this.progress); + try { + await this.pcm.prepare(); + } catch (error) { + logger.error('encountered an error creating pcm stream: ' + error); + this.pcm = undefined; + this.aborted = true; + return; + } + this.#calculateThreshold(); this.#announceAudioServer(); } - async #prepareBuffer() { - if (this.buffer.fd !== undefined) { - this.buffer.fd.close(); + #calculateThreshold() { + this.threshold = config.audio?.threshold; + if (isNaN(this.threshold)) { + this.threshold = 16; } - if (this.buffer.stream?.destroyed === false) { - this.buffer.stream.close(); - this.buffer.stream.destroy(); - } - this.buffer.fd = await open(this.buffer.file); - this.buffer.stream = this.buffer.fd.createReadStream({ - start: this.progress - }); - const stats = await stat(this.buffer.file); - this.buffer.size = stats.size - this.progress; - let percentage = 30; - if (!(isNaN(config.audio?.threshold))) { - percentage = config.audio.threshold; - } - this.buffer.threshold = (this.buffer.size / 100) * percentage; + this.threshold = this.threshold * 1024; } #handleEvents() { @@ -111,12 +104,12 @@ class AudioServer { logger.debug(msg); }); client.audiosocket.on('drain', () => { - if (this.buffer.stream === undefined || this.buffer.stream.isPaused() === false) { + if (this.pcm === undefined || this.pcm.isPaused() === false) { return; } - this.#isFileTransmitted(client); + this.#checkFileTransmission(client); // logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...'); - this.buffer.stream.resume(); + this.pcm.resume(); }); } @@ -195,23 +188,17 @@ class AudioServer { async #handleStatePlaying(client) { logger.debug(client.getTag() + ' has started playback...'); - // TODO: remove - test only - // await sleep(10000); - // this.#pausePlayback(); } async #handleStatePaused(client, data) { if (client === undefined || data === undefined) { return; } - logger.debug(client.getTag() + ' paused playback, progress: \'' + data.progress + '/' + this.buffer.size + '...'); - // TODO: remove - test only - // await sleep(100); - // this.#startPlayback(); + logger.debug(client.getTag() + ' paused playback, progress: \'' + data.progress + '...'); } async #handleStateStopped(client, data) { - logger.debug(client.getTag() + ' stopped playback, progress: ' + data.progress + '/' + this.buffer.size + '...'); + logger.debug(client.getTag() + ' stopped playback, progress: ' + data.progress + '...'); if (!this.#allClientsInState(constants.CLIENT_STATE_STOPPED)) { return; } @@ -219,7 +206,7 @@ class AudioServer { } async #handleStateError(client, data) { - logger.error(client.getTag() + ' experienced an error during playback, progress: \'' + data.progress + '/' + this.buffer.size + ': ' + error); + logger.error(client.getTag() + ' experienced an error during playback, progress: \'' + data.progress + ': ' + error); } #getClientById(clientId) { @@ -239,9 +226,8 @@ class AudioServer { this.broadcasts[constants.CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', { port: this.server.address().port, settings: { - size: this.buffer.size, - threshold: this.buffer.threshold, - // TODO: GET AUDIO INFO FROM DATABASE + threshold: this.threshold, + // TODO: GET AUDIO INFO FROM DATABASE AND PASS TO CLIENT(S) FOR PLAYBACK audio: { channels: 2, bitDepth: 16, @@ -270,7 +256,8 @@ class AudioServer { async #transmitFile() { const timestamp = Date.now(); return new Promise((resolve, reject) => { - this.buffer.stream.on('data', (data) => { + this.pcm.resume(); + this.pcm.on('data', (data) => { for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.audiosocket.destroyed) { @@ -279,32 +266,27 @@ class AudioServer { } if (client.audiosocket.write(data) !== true) { // logger.debug(client.getTag() + ' detected backpressure, pausing read stream...'); - this.buffer.stream.pause(); + this.pcm.pause(); continue; } - this.#isFileTransmitted(client); + this.#checkFileTransmission(client); } }); - this.buffer.stream.on('close', () => { - logger.debug('reading file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.stream.bytesRead + ' bytes)'); + this.pcm.on('close', () => { + logger.debug('transmitting to pcm data converted file \'' + this.file + '\' took ' + (Date.now() - timestamp) + 'ms'); resolve(); }); - this.buffer.stream.on('error', (error) => { - // TODO: handle with try / catch - logger.debug('STREAM ERROR!'); - reject(error); - }); }); } - #isFileTransmitted(client) { - if (client?.audiosocket === undefined) { + #checkFileTransmission(client) { + if (client?.audiosocket === undefined || this.pcm === undefined) { return; } - if (client.audiosocket.bytesWritten < this.buffer.size) { + if (this.pcm.destroyed !== true) { return false; } - logger.debug(client.getTag() + ' transmitted ' + client.audiosocket.bytesWritten + ' bytes after ' + (Date.now() ) + 'ms'); + logger.debug(client.getTag() + ' transmitted ' + client.audiosocket.bytesWritten + ' bytes after ' + (Date.now()) + 'ms'); client.audiosocket.end(); client.audiosocket.destroy(); return true; @@ -320,9 +302,6 @@ class AudioServer { } audiosocket.destroy(); } - this.buffer?.fd?.close(); - this.buffer?.stream?.close(); - this.buffer?.stream?.destroy(); if (this.server?.listening !== true) { return; } @@ -336,6 +315,10 @@ class AudioServer { resolve(); }); }); + if (this.pcm.destroyed === false) { + await this.pcm.destroy(); + } + this.pcm = undefined; } } diff --git a/classes/PCMStream.js b/classes/PCMStream.js new file mode 100644 index 0000000..120fb12 --- /dev/null +++ b/classes/PCMStream.js @@ -0,0 +1,195 @@ +const path = require('path'); +const { spawn } = require('child_process'); +const { tmpdir } = require('os'); +const { unlink, open } = require('fs/promises'); +const EventEmitter = require('events'); + +class PCMStream extends EventEmitter { + + constructor(file, start, format, channels, sampleRate) { + super(); + this.file = file; + this.start = start || 0; + this.discarded = 0; + this.ffmpeg = { + format: format || 'pcm_s16le', + channels: channels || 2, + sampleRate: sampleRate || 44100 + }; + this.fifo = {}; + } + + async prepare() { + if (this.file === undefined) { + throw new Error('cannot prepare pcm stream from an undefined file'); + } + this.file = path.resolve(this.file); + await this.#createFifo(); + await this.#spawnFFmpeg(); + await this.#readFifo(); + } + + resume() { + this.fifo?.stream.on('data', async (data) => { + if (this.start === 0 || this.discarded >= this.start) { + this.emit('data', data); + return; + } + let tmp = data.length + this.discarded; + if (tmp < this.start) { + this.discarded = tmp; + return; + } + tmp = this.start - this.discarded; + this.discarded += tmp; + data = data.slice(tmp); + this.emit('data', data); + }); + this.fifo?.stream?.resume(); + } + + pause() { + this.fifo?.stream?.pause(); + this.fifo?.stream?.removeAllListeners('data'); + } + + isPaused() { + return this.fifo?.stream?.isPaused(); + } + + async #spawnFFmpeg() { + if (this.file === undefined) { + throw new Error('can not convert an undefined file to pcm'); + } + const args = [ + '-y', + '-i', + this.file, + '-acodec', + this.ffmpeg.format, + '-ac', + this.ffmpeg.channels, + '-ar', + this.ffmpeg.sampleRate, + '-f', + 's16le', + this.fifo.file + ]; + return new Promise((resolve, reject) => { + this.ffmpeg.process = spawn('ffmpeg', args); + this.ffmpeg.process.on('spawn', () => { + logger.debug('successfully spawned process \'ffmpeg\' (args: ' + args + ') for pcm conversion...'); + this.ffmpeg.timestamp = Date.now(); + resolve(); + }); + this.ffmpeg.process.on('error', async (error) => { + logger.error('encountered an error spawning process \'ffmpeg\' (args: ' + args + ') for pcm conversion: ' + error); + await this.destroy(); + reject(error); + }); + this.ffmpeg.process.on('close', async (code, signal) => { + let msg = 'process \'ffmpeg\' (args: ' + args + ') closed'; + if (code !== undefined) { + msg += ' with code \'' + code + '\''; + } else { + msg += ' with signal \'' + signal + '\''; + } + msg += ' after ' + (Date.now() - this.ffmpeg.timestamp) + 'ms'; + logger.debug(msg); + await this.destroy(); + this.emit('close'); + }); + }); + } + + async #createFifo() { + let fifo = path.join(tmpdir(), 'kannon.fifo'); + try { + await unlink(fifo); + } catch (error) { + logger.debug('theres no fifo file to delete...'); + } + this.fifo.process = spawn('mkfifo', [fifo]); + return new Promise((resolve, reject) => { + this.fifo.process.on('spawn', () => { + logger.debug('successfully spawned process \'mkfifo\' (args: ' + fifo + ')...'); + this.fifo.file = fifo; + }); + this.fifo.process.on('error', async (error) => { + logger.error('encountered an error spawning process \'mkfifo\' (args: \'' + fifo + '\'): ' + error); + await this.destroy(); + reject(error); + }); + this.fifo.process.on('close', (code, signal) => { + let msg = 'process \'mkfifo\' (args: \'' + fifo + '\') closed'; + if (code !== undefined) { + msg += ' with code \'' + code + '\''; + } else { + msg += ' with signal ' + signal + '\''; + } + logger.debug(msg); + resolve(); + }); + }); + } + + async #readFifo() { + if (this.fifo.file === undefined) { + throw new Error('can not read from undefined fifo file'); + } + const timestamp = Date.now(); + this.fifo.fd = await open(this.fifo.file); + this.fifo.stream = this.fifo.fd.createReadStream(); + return new Promise((resolve, reject) => { + this.fifo.stream.on('error', async (error) => { + logger.error('encountered an error reading from fifo file \'' + this.fifo + '\': ' + error); + await this.destroy(); + reject(error); + }); + this.fifo.stream.on('close', () => { + logger.debug('read stream for fifo file \'' + this.fifo.file + '\' closed after ' + (Date.now() - timestamp) + 'ms (read ' + this.fifo.stream.bytesRead + ' bytes)'); + }); + this.fifo.stream.on('readable', () => { + this.fifo.stream.removeAllListeners('readable'); + resolve(); + }); + this.fifo.stream.on('drain', () => { + logger.warn('FIFO STREAM DRAINED'); + }); + }); + } + + async #deleteFifo() { + if (this.fifo.file === undefined) { + return; + } + try { + await unlink(this.fifo.file); + } catch (error) { + logger.error('encountered an error deleting the fifo file \'' + this.fifo.file + '\': ' + error); + } + } + + async destroy() { + if (this.ffmpeg.process.killed != true) { + this.ffmpeg.process.kill(); + this.ffmpeg.process = undefined; + } + if (this.fifo.process.killed !== true) { + this.fifo.process.kill(); + this.fifo.process = undefined; + } + if (this.fifo.stream.destroyed !== true) { + this.fifo.stream.destroy(); + this.fifo.stream = undefined; + } + if (this.fifo.fd.closed !== true) { + this.fifo.fd.close(); + this.fifo.fd = undefined; + } + await this.#deleteFifo(); + this.destroyed = true; + } +} + +module.exports = PCMStream; \ No newline at end of file diff --git a/example_config_server.json b/example_config_server.json index 7da20df..ab2f6c0 100644 --- a/example_config_server.json +++ b/example_config_server.json @@ -28,6 +28,6 @@ }, "audio": { "nodelay": false, - "threshold": 1 + "threshold": 8 } } \ No newline at end of file