From 232f8c6a75f09276517b11fa71e9ab06c62af1cf Mon Sep 17 00:00:00 2001 From: velvettear Date: Thu, 21 Apr 2022 13:09:31 +0200 Subject: [PATCH] fixed and optimized a lot of stuff --- classes/AudioServer.js | 326 ++++++++++++++++------------------------- classes/Client.js | 11 +- classes/Heartbeat.js | 16 +- libs/constants.js | 1 + 4 files changed, 144 insertions(+), 210 deletions(-) diff --git a/classes/AudioServer.js b/classes/AudioServer.js index 7bd8e5a..3721799 100644 --- a/classes/AudioServer.js +++ b/classes/AudioServer.js @@ -6,7 +6,7 @@ const stat = require('fs/promises').stat; const Message = require('./Message.js'); -const { CLIENT_STATE_READY, CLIENT_STATE_PLAYING, CLIENT_STATE_PAUSED, CLIENT_STATE_STOPPED, CLIENT_STATE_ERROR } = require('../libs/constants.js'); +const { CLIENT_STATE_REGISTERED, CLIENT_STATE_READY, CLIENT_STATE_PLAYING, CLIENT_STATE_PAUSED, CLIENT_STATE_STOPPED, CLIENT_STATE_ERROR } = require('../libs/constants.js'); class AudioServer { @@ -20,9 +20,8 @@ class AudioServer { }; this.clients = []; this.sockets = []; - this.playback = { - position: undefined - }; + this.broadcasts = {}; + this.position = 0; this.server = net.createServer(); this.#prepare(); } @@ -55,75 +54,97 @@ class AudioServer { #handleEvents() { eventparser.on('audio:register', (data) => { - if (data?.clientId === undefined || data?.socket === undefined) { - return; - } - let client = server.getClientById(data.clientId); - if (client === undefined) { - return; - } - let socket; - for (let index = 0; index < this.sockets.length; index++) { - if (this.sockets[index].remotePort === data.socket) { - socket = this.sockets[index]; - this.sockets.splice(index, 1); - break; - } - } - if (socket === undefined) { - return; - } - client.audiosocket = socket; - this.clients.push(client); - logger.debug(client.getTag() + ' connected to audio server...'); - // this.#sendData(client, data); - // TEST ONLY - const timestamp = Date.now(); - const stream = fs.createReadStream(this.buffer.file); - stream.on('data', (data) => { - const backpressure = false; - for (let index = 0; index < this.clients.length; index++) { - if (!this.clients[index].audiosocket.write(data)) { - backpressure = true; - } - } - if (backpressure === true) { - logger.debug('BACKPRESSURE DETECTED - PAUSING STREAM!'); - stream.pause(); - } - }); - stream.on('drain', () => { - logger.debug('STREAM IS DRAINED - RESUMING!'); - stream.resume(); - }); - stream.on('close', () => { - for (let index = 0; index < this.clients.length; index++) { - this.clients[index].audiosocket.end(); - } - logger.debug('buffering file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + stream.bytesRead + ' bytes)'); - }); - stream.on('error', (error) => { - logger.error('') - }); - - + this.#handleRegister(server.getClientById(data?.clientId), data?.port); }); eventparser.on('audio:state', (data) => { - this.#handleStateChange(data); + this.#setClientState(this.#getClientById(data?.clientId), data?.state, data); }); } - #handleStateChange(data) { - if (data?.clientId === undefined || data?.state === undefined) { + #handleRegister(client, port) { + if (client === undefined || port === undefined) { return; } - let client = this.#getClientById(data.clientId); + let socket; + for (let index = 0; index < this.sockets.length; index++) { + if (this.sockets[index].remotePort === port) { + socket = this.sockets[index]; + this.sockets.splice(index, 1); + break; + } + } + if (socket === undefined) { + return; + } + client.audiosocket = socket; + this.clients.push(client); + this.#setClientState(client, CLIENT_STATE_REGISTERED); + client.audiosocket.on('connect', () => { + logger.debug(client.getTag() + ' opened audio socket'); + }); + client.audiosocket.on('error', (error) => { + logger.error(client.getTag() + ' encountered an error: ' + error); + }); + client.audiosocket.on('end', () => { + logger.debug(client.getTag() + ' ended audio socket'); + }); + client.audiosocket.on('close', (hadError) => { + let msg = client.getTag() + ' closed audio socket'; + if (hadError === true) { + msg += ' after an error'; + } + logger.debug(msg); + }); + client.audiosocket.on('drain', () => { + if (this.buffer.stream === undefined || !this.buffer.stream.isPaused()) { + return; + } + logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...'); + this.buffer.stream.resume(); + }); + } + + #setClientState(client, state, data) { + if (client === undefined || state === undefined) { + return; + } + logger.debug(client.getTag() + ' state changed to \'' + state + '\''); + client.state = state; + this.#handleStateChange(client, data); + } + + #allClientsInState(state) { + if (this.clients === undefined || this.clients.length === 0 || state === undefined) { + return false; + } + const broadcasts = this.broadcasts[state]; + let result = true; + for (let index = 0; index < this.clients.length; index++) { + const client = this.clients[index]; + if (client.state !== state) { + result = false; + } + if (broadcasts === undefined) { + continue; + } + const indexOfId = this.broadcasts[state].indexOf(client.id); + if (indexOfId >= 0) { + this.broadcasts[state].splice(indexOfId, 1); + } + } + if (broadcasts !== undefined) { + result = result && this.broadcasts[state].length === 0; + } + return result; + } + + #handleStateChange(client, data) { if (client === undefined) { return; } - logger.debug(client.getTag() + ' state changed to \'' + data.state + '\''); - client.state = data.state; switch (client.state) { + case CLIENT_STATE_REGISTERED: + return this.#handleStateRegistered(client); case CLIENT_STATE_READY: return this.#handleStateReady(client); case CLIENT_STATE_PLAYING: @@ -137,12 +158,18 @@ class AudioServer { } } + async #handleStateRegistered(client) { + logger.debug(client.getTag() + ' has registered...'); + if (!this.#allClientsInState(CLIENT_STATE_REGISTERED)) { + return; + } + this.#transmitFile(); + } + async #handleStateReady(client) { logger.debug(client.getTag() + ' is ready for playback...'); - for (let index = 0; index < this.clients.length; index++) { - if (this.clients[index].state !== CLIENT_STATE_READY) { - return; - } + if (!this.#allClientsInState(CLIENT_STATE_READY)) { + return; } this.#startPlayback(); } @@ -159,13 +186,6 @@ class AudioServer { return; } logger.debug(client.getTag() + ' paused playback at position \'' + data.position + '\'...'); - for (let index = 0; index < this.playback.paused.length; index++) { - if (this.playback.paused[index] === client.id) { - if (this.playback.position === 0 || this.playback.position > data.position) { - this.playback.position = data.position; - } - } - } // TODO: remove - test only await sleep(1); this.#startPlayback(); @@ -179,118 +199,6 @@ class AudioServer { logger.error(client.getTag() + ' experienced an error during playback at position \'' + data.position + '\': ' + data.error); } - async #sendData(client) { - const timestamp = Date.now(); - // const buffer = await this.#bufferFile(); - return new Promise(async (resolve, reject) => { - this.buffer.stream.on('data', (data) => { - this.buffer.buffered += data.length; - buffer.push(data); - if (resolved !== true && this.buffer.stream.bytesRead >= this.buffer.threshold) { - resolved = true; - logger.debug('buffering threshold of ' + this.buffer.threshold + ' bytes for file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms'); - resolve(buffer); - } - if (this.buffer.buffered >= this.buffer.limit) { - logger.debug('pausing read stream'); - this.buffer.stream.pause(); - if (resolved !== true) { - resolved = true; - resolve(buffer); - } - } - }); - this.buffer.stream.on('close', () => { - // this.buffer.data = Buffer.concat(buffer); - logger.debug('buffering file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.stream.bytesRead + ' bytes)'); - // resolve(); - }); - this.buffer.stream.on('error', (error) => { - // TODO: handle with try/catch - reject(error); - }); - // this.buffer.written = 0; - // while (true) { - // if (buffer[0] === undefined) { - // await sleep(1); - // continue; - // } - // const tmp = buffer[0]; - // buffer.shift(); - // if ((client.audiosocket.bytesWritten + tmp.length) >= this.buffer.size) { - // logger.debug(client.getTag() + ' sent audio file \'' + this.buffer.file + '\' after ' + (Date.now() - timestamp) + 'ms...'); - // client.audiosocket.end(tmp); - // break; - // } - // client.audiosocket.write(tmp); - // this.buffer.buffered -= tmp.length; - // if (this.buffer.stream.isPaused() && this.buffer.buffered < this.buffer.limit) { - // this.buffer.stream.resume(); - // logger.debug('resuming read stream'); - // } - // } - client.audiosocket.on('error', (error) => { - logger.error(client.getTag() + ' encountered an error: ' + error); - }); - client.audiosocket.on('end', () => { - logger.debug(client.getTag() + ' ended audio socket'); - }); - client.audiosocket.on('close', (hadError) => { - let fn = resolve; - let msg = client.getTag() + ' closed audio socket'; - if (hadError === true) { - msg += ' after an error'; - fn = reject; - } - logger.debug(msg); - fn(msg); - }); - }); - } - - // async #waitForBuffer() { - // while (this.buffer.data === undefined || this.buffer.data.length < this.buffer.size) { - // await sleep(1); - // } - // return this.buffer.data; - // } - - // async #bufferFile() { - // // const stream = fs.createReadStream(this.buffer.file); - // return new Promise((resolve, reject) => { - // const timestamp = Date.now(); - // const buffer = []; - // let resolved = false; - // this.buffer.buffered = 0; - // this.buffer.stream.on('data', (data) => { - // this.buffer.buffered += data.length; - // buffer.push(data); - // if (resolved !== true && this.buffer.stream.bytesRead >= this.buffer.threshold) { - // resolved = true; - // logger.debug('buffering threshold of ' + this.buffer.threshold + ' bytes for file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms'); - // resolve(buffer); - // } - // if (this.buffer.buffered >= this.buffer.limit) { - // logger.debug('pausing read stream'); - // this.buffer.stream.pause(); - // if (resolved !== true) { - // resolved = true; - // resolve(buffer); - // } - // } - // }); - // this.buffer.stream.on('close', () => { - // // this.buffer.data = Buffer.concat(buffer); - // logger.debug('buffering file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.stream.bytesRead + ' bytes)'); - // // resolve(); - // }); - // this.buffer.stream.on('error', (error) => { - // // TODO: handle with try/catch - // reject(error); - // }); - // }); - // } - #getClientById(clientId) { if (clientId === undefined) { return; @@ -305,30 +213,54 @@ class AudioServer { } async #announceAudioServer() { - const broadcasted = await new Message('audio:initialize', { + this.broadcasts[CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', { port: this.server.address().port, size: this.buffer.size, threshold: this.buffer.threshold }).broadcast(true); - logger.debug('sent broadcast for audio server to client(s) \'' + broadcasted + '\'...'); + logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcasts[CLIENT_STATE_REGISTERED] + '\'...'); } async #startPlayback() { - const broadcasted = await new Message('audio:play', { position: this.playback.position }).broadcast(); - logger.debug('sent broadcast to start playback to client(s) \'' + broadcasted + '\'...'); - this.playback.started = broadcasted; + this.broadcasts[CLIENT_STATE_PLAYING] = await new Message('audio:play', { position: this.position }).broadcast(); + logger.debug('sent broadcast to start playback to client(s) \'' + this.broadcasts[CLIENT_STATE_PLAYING] + '\'...'); } - async #stopPlayback() { - const broadcasted = await new Message('audio:stop').broadcast(); - logger.debug('sent broadcast to stop playback to client(s) \'' + broadcasted + '\'...'); - this.playback.stopped = broadcasted; + async #stopPlayback() { + this.broadcasts[CLIENT_STATE_STOPPED] = await new Message('audio:stop').broadcast(); + logger.debug('sent broadcast to stop playback to client(s) \'' + this.broadcasts[CLIENT_STATE_STOPPED] + '\'...'); } async #pausePlayback() { - const broadcasted = await new Message('audio:pause').broadcast(); - logger.debug('sent broadcast to pause playback to client(s) \'' + broadcasted + '\'...'); - this.playback.paused = broadcasted; + this.broadcasts[CLIENT_STATE_PAUSED] = await new Message('audio:pause').broadcast(); + logger.debug('sent broadcast to pause playback to client(s) \'' + this.broadcasts[CLIENT_STATE_PAUSED] + '\'...'); + } + + async #transmitFile() { + const timestamp = Date.now(); + return new Promise((resolve, reject) => { + this.buffer.stream.on('data', (data) => { + for (let index = 0; index < this.clients.length; index++) { + const client = this.clients[index]; + if (client.audiosocket.write(data) !== true) { + logger.debug(client.getTag() + ' detected backpressure, pausing read stream...'); + this.buffer.stream.pause(); + } + if (client.audiosocket.bytesWritten >= this.buffer.size) { + client.audiosocket.end(); + client.audiosocket.destroy(); + } + } + }); + this.buffer.stream.on('close', () => { + logger.debug('reading file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.stream.bytesRead + ' bytes)'); + resolve(); + }); + this.buffer.stream.on('error', (error) => { + // TODO: handle with try / catch + reject(error); + }); + }); } async destroy() { diff --git a/classes/Client.js b/classes/Client.js index 33d4fe6..ed79bc2 100644 --- a/classes/Client.js +++ b/classes/Client.js @@ -45,12 +45,12 @@ class Client { this.heartbeat.on('timeout', () => { this.#handleEventHeartbeatTimeout(); }); - this.heartbeat.on('latency', (data) => { - this.#handleEventLatency(data); + this.heartbeat.on('network-statistics', (data) => { + this.#handleEventNetworkStatistics(data); }); } - async #handleEventData(data, socket) { + async #handleEventData(data) { eventparser.parse(data); } @@ -74,9 +74,8 @@ class Client { this.destroy(); } - #handleEventLatency(data) { - this.latency = data; - logger.debug(this.getTag() + ' latency: ' + JSON.stringify(data)); + #handleEventNetworkStatistics(data) { + logger.debug(this.getTag() + ' network statistics: ' + JSON.stringify(data)); } destroy() { diff --git a/classes/Heartbeat.js b/classes/Heartbeat.js index 59a7809..d82aad8 100644 --- a/classes/Heartbeat.js +++ b/classes/Heartbeat.js @@ -24,7 +24,8 @@ class Heartbeat extends EventEmitter { await sleep(this.interval); } this.alive = false; - await new Message('ping', { server: Date.now() }).send(this.client); + this.ping = process.hrtime.bigint(); + await new Message('ping').send(this.client); this.timeout = setTimeout(() => { this.#sendPing(); }, this.interval); @@ -35,14 +36,15 @@ class Heartbeat extends EventEmitter { logger.debug(this.client.getTag() + ' handling event \'ping\', responding with \'pong\'...'); new Message('pong').send(this.client); }); - eventparser.on('pong', (data) => { + eventparser.on('pong', () => { logger.debug(this.client.getTag() + ' handling event \'pong\'...'); - const now = Date.now(); this.alive = true; - this.emit('latency', { - toClient: (data.client - data.server), - fromClient: (now - data.client), - roundtrip: (now - data.server) + this.pong = process.hrtime.bigint(); + this.latency = this.pong - this.ping; + this.emit('network-statistics', { + ping: this.ping.toString(), + pong: this.pong.toString(), + latency: this.latency.toString() }); }); } diff --git a/libs/constants.js b/libs/constants.js index 0441678..b00f324 100644 --- a/libs/constants.js +++ b/libs/constants.js @@ -5,6 +5,7 @@ module.exports = { EVENT_DELIMITER: '<<< kannon >>>', + CLIENT_STATE_REGISTERED: 'registered', CLIENT_STATE_READY: 'ready', CLIENT_STATE_PLAYING: 'playing', CLIENT_STATE_PAUSED: 'paused',