diff --git a/classes/Api.js b/classes/Api.js index 40f1ee5..3fe4488 100644 --- a/classes/Api.js +++ b/classes/Api.js @@ -34,8 +34,28 @@ class Api { } #setup() { - this.#registerEndpoint(constants.API_PLAY, constants.REQUEST_METHOD_POST, () => { - new AudioServer('/mnt/kingston/public/LEFTOVER.flac'); + this.#registerEndpoint(constants.API_PLAY, constants.REQUEST_METHOD_POST, async () => { + if (global.audioserver !== undefined) { + await global.audioserver.destroy(); + } + global.audioserver = new AudioServer('/home/velvettear/mounts/kingston/public/test.pcm'); + }); + this.#registerEndpoint(constants.API_PAUSE, constants.REQUEST_METHOD_POST, async () => { + if (global.audioserver === undefined) { + return; + } + global.audioserver.pausePlayback(); + }); + this.#registerEndpoint(constants.API_RESUME, constants.REQUEST_METHOD_POST, async () => { + if (global.audioserver === undefined) { + return; + } + }); + this.#registerEndpoint(constants.API_STOP, constants.REQUEST_METHOD_POST, async () => { + if (global.audioserver === undefined) { + return; + } + global.audioserver.stopPlayback(); }); } diff --git a/classes/AudioServer.js b/classes/AudioServer.js index 860e8da..7b0eae4 100644 --- a/classes/AudioServer.js +++ b/classes/AudioServer.js @@ -13,13 +13,12 @@ class AudioServer { this.port = 0; this.buffer = { file: file, - stream: fs.createReadStream(file), - limit: (config?.audio.bufferlimit || 256) * 1048576 + stream: fs.createReadStream(file) }; this.clients = []; this.sockets = []; this.broadcasts = {}; - this.position = 0; + this.progress = 0; this.server = net.createServer(); this.#prepare(); } @@ -41,16 +40,17 @@ class AudioServer { this.sockets.push(socket); }); this.server.on('error', (err) => { + logger.error('ERROR IN AUDIOSERVER ' + err); reject('an error occured preparing the audio server for file \'' + this.file + '\' > ' + err); }); }); const stats = await stat(this.buffer.file); this.buffer.size = stats.size; - let divisor = 30; + let percentage = 30; if (!(isNaN(config.audio?.threshold))) { - divisor = config.audio.threshold; + percentage = config.audio.threshold; } - this.buffer.threshold = (this.buffer.size / 100) / divisor; + this.buffer.threshold = (this.buffer.size / 100) * percentage; this.#announceAudioServer(); } @@ -79,6 +79,7 @@ class AudioServer { return; } client.audiosocket = socket; + client.audiosocket.setNoDelay(config?.audio?.nodelay || false); this.clients.push(client); this.#setClientState(client, constants.CLIENT_STATE_REGISTERED); client.audiosocket.on('connect', () => { @@ -98,7 +99,7 @@ class AudioServer { logger.debug(msg); }); client.audiosocket.on('drain', () => { - if (this.buffer.stream === undefined || !this.buffer.stream.isPaused()) { + if (this.buffer.stream === undefined || this.buffer.stream.isPaused() === false) { return; } // logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...'); @@ -179,26 +180,26 @@ class AudioServer { async #handleStatePlaying(client) { logger.debug(client.getTag() + ' has started playback...'); // TODO: remove - test only - await sleep(5000); - this.#pausePlayback(); + // await sleep(10000); + // this.#pausePlayback(); } async #handleStatePaused(client, data) { if (client === undefined || data === undefined) { return; } - logger.debug(client.getTag() + ' paused playback at position \'' + data.position + '\'...'); + logger.debug(client.getTag() + ' paused playback, progress: \'' + data.progress + '/' + this.buffer.size + '...'); // TODO: remove - test only - await sleep(1); - this.#startPlayback(); + // await sleep(100); + // this.#startPlayback(); } async #handleStateStopped(client, data) { - logger.debug(client.getTag() + ' stopped playback at position \'' + data.position + '\'...'); + logger.debug(client.getTag() + ' stopped playback, progress: ' + data.progress + '/' + this.buffer.size + '...'); } async #handleStateError(client, data) { - logger.error(client.getTag() + ' experienced an error during playback at position \'' + data.position + '\': ' + data.error); + logger.error(client.getTag() + ' experienced an error during playback, progress: \'' + data.progress + '/' + this.buffer.size + ': ' + error); } #getClientById(clientId) { @@ -217,38 +218,51 @@ class AudioServer { async #announceAudioServer() { this.broadcasts[constants.CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', { port: this.server.address().port, - size: this.buffer.size, - threshold: this.buffer.threshold + settings: { + size: this.buffer.size, + threshold: this.buffer.threshold, + // TODO: GET AUDIO INFO FROM DATABASE + audio: { + channels: 2, + bitDepth: 16, + sampleRate: 44100 + } + } }).broadcast(true); logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_REGISTERED] + '\'...'); } async #startPlayback() { - this.broadcasts[constants.CLIENT_STATE_PLAYING] = await new Message('audio:play', { position: this.position }).broadcast(); + this.broadcasts[constants.CLIENT_STATE_PLAYING] = await new Message('audio:play').broadcast(); logger.debug('sent broadcast to start playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PLAYING] + '\'...'); } - async #stopPlayback() { - this.broadcasts[constants.CLIENT_STATE_STOPPED] = await new Message('audio:stop').broadcast(); - logger.debug('sent broadcast to stop playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_STOPPED] + '\'...'); - } - - async #pausePlayback() { + async pausePlayback() { this.broadcasts[constants.CLIENT_STATE_PAUSED] = await new Message('audio:pause').broadcast(); logger.debug('sent broadcast to pause playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PAUSED] + '\'...'); } + async stopPlayback() { + this.broadcasts[constants.CLIENT_STATE_STOPPED] = await new Message('audio:stop').broadcast(); + logger.debug('sent broadcast to stop playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_STOPPED] + '\'...'); + } + async #transmitFile() { const timestamp = Date.now(); return new Promise((resolve, reject) => { this.buffer.stream.on('data', (data) => { for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; + if (client.audiosocket.destroyed) { + this.clients.splice(index, 1); + continue; + } if (client.audiosocket.write(data) !== true) { // logger.debug(client.getTag() + ' detected backpressure, pausing read stream...'); this.buffer.stream.pause(); } if (client.audiosocket.bytesWritten >= this.buffer.size) { + logger.warn(client.getTag() + ' transmitted audio file after ' + (Date.now() - timestamp) + 'ms'); client.audiosocket.end(); client.audiosocket.destroy(); } @@ -260,6 +274,7 @@ class AudioServer { }); this.buffer.stream.on('error', (error) => { // TODO: handle with try / catch + logger.debug('STREAM ERROR!'); reject(error); }); }); @@ -277,6 +292,7 @@ class AudioServer { await new Promise((resolve, reject) => { this.server.close((err) => { if (err !== undefined) { + logger.error('ERROR CLOSING AUDIOSERVER ' + err); reject(err); } resolve(); diff --git a/classes/Client.js b/classes/Client.js index ed79bc2..278211b 100644 --- a/classes/Client.js +++ b/classes/Client.js @@ -30,15 +30,15 @@ class Client { #listenForEvents() { logger.debug(this.getTag() + ' connected to communication server...'); - this.socket.on('timeout', () => { - this.#handleEventTimeout(); - }); this.socket.on('close', () => { this.#handleEventClose() }); this.socket.on('end', () => { this.#handleEventEnd() }); + this.socket.on('error', (error) => { + this.#handleEventError(error); + }); this.socket.on('data', (data) => { this.#handleEventData(data, this.socket) }); @@ -50,20 +50,6 @@ class Client { }); } - async #handleEventData(data) { - eventparser.parse(data); - } - - #handleEventTimeout() { - logger.warn(this.getTag() + ' timed out'); - this.destroy(); - } - - #handleEventHeartbeatTimeout() { - logger.warn(this.getTag() + ' heartbeat timed out'); - this.destroy(); - } - #handleEventClose() { logger.debug(this.getTag() + ' closed socket to communication server'); server.removeClient(this); @@ -74,6 +60,22 @@ class Client { this.destroy(); } + #handleEventError(error) { + if (error === undefined) { + return; + } + logger.debug(this.getTag() + ' encountered an error: ' + error); + } + + async #handleEventData(data) { + eventparser.parse(data); + } + + #handleEventHeartbeatTimeout() { + logger.warn(this.getTag() + ' heartbeat timed out'); + this.destroy(); + } + #handleEventNetworkStatistics(data) { logger.debug(this.getTag() + ' network statistics: ' + JSON.stringify(data)); } diff --git a/classes/Heartbeat.js b/classes/Heartbeat.js index d82aad8..df0203a 100644 --- a/classes/Heartbeat.js +++ b/classes/Heartbeat.js @@ -24,7 +24,8 @@ class Heartbeat extends EventEmitter { await sleep(this.interval); } this.alive = false; - this.ping = process.hrtime.bigint(); + // this.ping = process.hrtime.bigint(); + this.ping = Date.now(); await new Message('ping').send(this.client); this.timeout = setTimeout(() => { this.#sendPing(); @@ -39,12 +40,13 @@ class Heartbeat extends EventEmitter { eventparser.on('pong', () => { logger.debug(this.client.getTag() + ' handling event \'pong\'...'); this.alive = true; - this.pong = process.hrtime.bigint(); + // this.pong = process.hrtime.bigint(); + this.pong = Date.now(); this.latency = this.pong - this.ping; this.emit('network-statistics', { - ping: this.ping.toString(), - pong: this.pong.toString(), - latency: this.latency.toString() + ping: this.ping, + pong: this.pong, + latency: this.latency }); }); } diff --git a/classes/Server.js b/classes/Server.js index 7704833..39333b0 100644 --- a/classes/Server.js +++ b/classes/Server.js @@ -22,6 +22,7 @@ class Server { this.#addClient(socket); }); this.server.on('error', (err) => { + logger.error('ERROR IN SERVER ' + err); reject('an unexpected error occured: ' + err); }); }); diff --git a/example_config_server.json b/example_config_server.json index 8adcb18..e73549f 100644 --- a/example_config_server.json +++ b/example_config_server.json @@ -27,7 +27,7 @@ "password": "kannon" }, "audio": { - "threshold": 10, - "bufferlimit": 64 + "nodelay": false, + "threshold": 100 } } \ No newline at end of file diff --git a/libs/constants.js b/libs/constants.js index 42f3821..fbcc265 100644 --- a/libs/constants.js +++ b/libs/constants.js @@ -15,5 +15,8 @@ module.exports = { REQUEST_METHOD_GET: 'get', REQUEST_METHOD_POST: 'post', - API_PLAY: '/play' + API_PLAY: '/play', + API_PAUSE: '/pause', + API_RESUME: '/resume', + API_STOP: '/stop' } \ No newline at end of file diff --git a/package.json b/package.json index ebb984e..e1ee98a 100644 --- a/package.json +++ b/package.json @@ -24,4 +24,4 @@ "sequelize": "^6.18.0", "sqlite3": "^5.0.2" } -} +} \ No newline at end of file