diff --git a/classes/AudioServer.js b/classes/AudioServer.js index 87854ca..d8326df 100644 --- a/classes/AudioServer.js +++ b/classes/AudioServer.js @@ -5,7 +5,6 @@ const fs = require('fs'); const stat = require('fs/promises').stat; const Message = require('./Message.js'); -const EventParser = require('./EventParser.js'); const { CLIENT_STATE_READY, CLIENT_STATE_PLAYING, CLIENT_STATE_PAUSED, CLIENT_STATE_STOPPED, CLIENT_STATE_ERROR } = require('../libs/constants.js'); @@ -18,10 +17,11 @@ class AudioServer { file: file }; this.clients = []; - this.broadcastClients = []; - this.position = 0; + this.sockets = []; + this.playback = { + position: 0 + }; this.server = net.createServer(); - this.eventParser = new EventParser(); this.#prepare(); } @@ -35,10 +35,11 @@ class AudioServer { this.server.listen(this.port, this.listen).on('listening', () => { this.port = this.server.address().port; logger.info('audio server listening on ' + this.listen + ':' + this.port + '...'); + this.#handleEvents(); resolve(); }); this.server.on('connection', (socket) => { - this.#handleConnection(socket); + this.sockets.push(socket); }); this.server.on('error', (err) => { reject('an error occured preparing the audio server for file \'' + this.file + '\' > ' + err); @@ -47,65 +48,104 @@ class AudioServer { const stats = await stat(this.buffer.file); this.buffer.size = stats.size; this.buffer.threshold = (this.buffer.size / 100) / (!isNaN(config.audio?.threshold) || 30); - this.broadcastClients = await new Message('audio:initialize', { - port: this.server.address().port, - size: this.buffer.size, - threshold: this.buffer.threshold - }).broadcast(true); - // const broadcastedTo = - // for (let index = 0; index < broadcastedTo.length; index++) { - // if (broadcastedTo[index]?.status !== 'fulfilled') { - // continue; - // } - // this.broadcastClients.push(broadcastedTo[index].value); - // } - logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcastClients.toString() + '\'...'); + this.#announceAudioServer(); this.#bufferFile(); } - #handleConnection(socket) { - socket.on('data', (data) => { - this.eventParser.parse(data, socket); - }); - this.eventParser.on('audio:register', (clientId, socket) => { - let client = server.getClientById(clientId); + #handleEvents(socket) { + eventparser.on('audio:register', (data) => { + if (data?.clientId === undefined || data?.socket === undefined) { + return; + } + let client = server.getClientById(data.clientId); if (client === undefined) { return; } + let socket; + for (let index = 0; index < this.sockets.length; index++) { + if (this.sockets[index].remotePort === data.socket) { + socket = this.sockets[index]; + this.sockets.splice(index, 1); + break; + } + } + if (socket === undefined) { + return; + } client.audiosocket = socket; this.clients.push(client); logger.debug(client.getTag() + ' connected to audio server...'); - this.broadcastClients.splice(this.broadcastClients.indexOf(clientId), 1); this.#sendData(client); }); - this.eventParser.on('audio:ready', async (clientId) => { - let allClientsReady = true; - for (let index = 0; index < this.clients.length; index++) { - const client = this.clients[index]; - if (client.id === clientId) { - client.state = CLIENT_STATE_READY; - logger.debug(client.getTag() + ' is ready for playback...'); - } - if (client.state !== CLIENT_STATE_READY) { - allClientsReady = false; - } - } - if (allClientsReady !== true) { + eventparser.on('audio:state', (data) => { + this.#handleStateChange(data); + }); + } + + #handleStateChange(data) { + if (data?.clientId === undefined || data?.state === undefined) { + return; + } + let client = this.#getClientById(data.clientId); + if (client === undefined) { + return; + } + logger.debug(client.getTag() + ' state changed to \'' + data.state + '\''); + client.state = data.state; + switch (client.state) { + case CLIENT_STATE_READY: + return this.#handleStateReady(client); + case CLIENT_STATE_PLAYING: + return this.#handleStatePlaying(client); + case CLIENT_STATE_PAUSED: + return this.#handleStatePaused(client, data); + case CLIENT_STATE_STOPPED: + return this.#handleStateStopped(client, data); + case CLIENT_STATE_ERROR: + return this.#handleStateError(client, data); + } + } + + async #handleStateReady(client) { + logger.debug(client.getTag() + ' is ready for playback...'); + for (let index = 0; index < this.clients.length; index++) { + if (this.clients[index].state !== CLIENT_STATE_READY) { return; } - const broadcastedTo = await new Message('audio:play', { position: this.position }).broadcast(); - logger.debug('sent broadcast for playback to client(s) \'' + broadcastedTo + '\'...'); - }); - this.eventParser.on('audio:paused', (position) => { - if (!isNaN(position) && positon > this.position) { - this.position = position; + } + this.#startPlayback(); + } + + async #handleStatePlaying(client) { + logger.debug(client.getTag() + ' has started playback...'); + // TODO: remove - test only + await sleep(5000); + this.#pausePlayback(); + } + + async #handleStatePaused(client, data) { + if (client === undefined || data === undefined) { + return; + } + logger.debug(client.getTag() + ' paused playback at position \'' + data.position + '\'...'); + for (let index = 0; index < this.playback.paused.length; index++) { + if (this.playback.paused[index] === client.id) { + if (this.playback.position === 0 || this.playback.position > data.position) { + this.playback.position = data.position; + } } - let client = this.#getClientById(clientId); - if (client === undefined) { - return; - } - client.state = CLIENT_STATE_PAUSED; - }); + } + // TODO: remove - test only + await sleep(1); + this.#startPlayback(); + } + + async #handleStateStopped(client, data) { + logger.debug(client.getTag() + ' stopped playback at position \'' + data.position + '\'...'); + } + + async #handleStateError(client, data) { + logger.error(client.getTag() + ' experienced an error during playback at position \'' + data.position + '\': ' + data.error); } async #sendData(client) { @@ -134,56 +174,6 @@ class AudioServer { }); } - async #sendAudio() { - if (this.aborted === true) { - return; - } - const buffer = await this.#waitForBuffer(); - await this.#waitForAllClients(); - this.#handleClientConnections(); - const promises = []; - for (let index = 0; index < this.clients.length; index++) { - const client = this.clients[index]; - client.audiostart = Date.now(); - promises.push(new Promise((resolve, reject) => { - client.audiosocket.end(buffer, () => { - logger.debug(client.getTag() + ' sent audio file \'' + this.file + '\' after ' + (Date.now() - client.audiostart) + 'ms...'); - resolve(); - }); - })); - } - await Promise.allSettled(promises); - await this.destroy(); - } - - - async #waitForAllClients() { - while (this.broadcastClients.length > 0) { - await sleep(1); - } - return; - } - - #handleClientConnections() { - for (let index = 0; index < this.clients.length; index++) { - const client = this.clients[index]; - client.audiosocket.on('error', (error) => { - logger.error(client.getTag() + ' encountered an error: ' + error); - }); - client.audiosocket.on('end', () => { - logger.debug(client.getTag() + ' ended audio socket'); - }); - client.audiosocket.on('close', (hadError) => { - let msg = client.getTag() + ' closed audio socket'; - if (hadError === true) { - msg += ' after an error'; - } - logger.debug(msg); - }); - - } - } - async #waitForBuffer() { while (this.buffer.data === undefined || this.buffer.data.length < this.buffer.size) { await sleep(1); @@ -224,8 +214,35 @@ class AudioServer { } } + async #announceAudioServer() { + const broadcasted = await new Message('audio:initialize', { + port: this.server.address().port, + size: this.buffer.size, + threshold: this.buffer.threshold + }).broadcast(true); + logger.debug('sent broadcast for audio server to client(s) \'' + broadcasted + '\'...'); + } + + async #startPlayback() { + const broadcasted = await new Message('audio:play', { position: this.playback.position }).broadcast(); + logger.debug('sent broadcast to start playback to client(s) \'' + broadcasted + '\'...'); + this.playback.started = broadcasted; + } + + async #stopPlayback() { + const broadcasted = await new Message('audio:stop').broadcast(); + logger.debug('sent broadcast to stop playback to client(s) \'' + broadcasted + '\'...'); + this.playback.stopped = broadcasted; + } + + async #pausePlayback() { + const broadcasted = await new Message('audio:pause').broadcast(); + logger.debug('sent broadcast to pause playback to client(s) \'' + broadcasted + '\'...'); + this.playback.paused = broadcasted; + } + async destroy() { - this.eventParser.removeAllListeners('audio:ready'); + eventparser.removeAllListeners('audio:ready'); for (let index = 0; index < this.clients.length; index++) { const audiosocket = this.clients[index].audiosocket; if (audiosocket.destroyed === true) { diff --git a/classes/Client.js b/classes/Client.js index e8394d5..33d4fe6 100644 --- a/classes/Client.js +++ b/classes/Client.js @@ -1,5 +1,4 @@ const Heartbeat = require('./Heartbeat.js'); -const EventParser = require('./EventParser.js'); let clientId = -1; @@ -9,7 +8,6 @@ class Client { clientId++; this.id = clientId; this.socket = socket; - this.eventParser = new EventParser(); this.heartbeat = new Heartbeat(this); this.#listenForEvents(); } @@ -42,7 +40,7 @@ class Client { this.#handleEventEnd() }); this.socket.on('data', (data) => { - this.#handleEventData(data) + this.#handleEventData(data, this.socket) }); this.heartbeat.on('timeout', () => { this.#handleEventHeartbeatTimeout(); @@ -52,8 +50,8 @@ class Client { }); } - async #handleEventData(data) { - this.eventParser.parse(data); + async #handleEventData(data, socket) { + eventparser.parse(data); } #handleEventTimeout() { diff --git a/classes/EventParser.js b/classes/EventParser.js index 0377249..9bd1671 100644 --- a/classes/EventParser.js +++ b/classes/EventParser.js @@ -8,7 +8,7 @@ class EventParser extends EventEmitter { this.buffer = ''; } - parse(data, socket) { + parse(data) { if (data === undefined) { return; } @@ -23,7 +23,7 @@ class EventParser extends EventEmitter { return; } const eventId = event.id.toLowerCase(); - this.emit(eventId, event.data, socket); + this.emit(eventId, event.data); } } diff --git a/classes/Heartbeat.js b/classes/Heartbeat.js index 83104b6..59a7809 100644 --- a/classes/Heartbeat.js +++ b/classes/Heartbeat.js @@ -7,7 +7,7 @@ class Heartbeat extends EventEmitter { constructor(client) { super(); - this.interval = config?.server?.heartbeat || 10000; + this.interval = config?.heartbeat || 10000; this.client = client; this.#listenForPingPong(); this.#sendPing(); @@ -31,11 +31,11 @@ class Heartbeat extends EventEmitter { } async #listenForPingPong() { - this.client.eventParser.on('ping', () => { + eventparser.on('ping', () => { logger.debug(this.client.getTag() + ' handling event \'ping\', responding with \'pong\'...'); new Message('pong').send(this.client); }); - this.client.eventParser.on('pong', (data) => { + eventparser.on('pong', (data) => { logger.debug(this.client.getTag() + ' handling event \'pong\'...'); const now = Date.now(); this.alive = true; @@ -51,8 +51,8 @@ class Heartbeat extends EventEmitter { if (this.timeout !== undefined) { clearTimeout(this.timeout); } - this.client.eventParser.removeAllListeners('ping'); - this.client.eventParser.removeAllListeners('pong'); + eventparser.removeAllListeners('ping'); + eventparser.removeAllListeners('pong'); } } diff --git a/example_config_library.json b/example_config_library.json index 99a74fc..5e54c6a 100644 --- a/example_config_library.json +++ b/example_config_library.json @@ -19,7 +19,7 @@ "database": { "dialect": "postgres", "storage": "/tmp/kannon.sqlite", - "host": "192.168.104.135", + "host": "192.168.104.136", "port": 5432, "database": "kannon", "username": "postgres", diff --git a/example_config_server.json b/example_config_server.json index f3823d4..40a8941 100644 --- a/example_config_server.json +++ b/example_config_server.json @@ -15,11 +15,11 @@ "database": { "dialect": "postgres", "storage": "/tmp/kannon.sqlite", - "host": "192.168.104.135", + "host": "192.168.104.136", "port": 5432, "database": "kannon", - "username": "postgres", - "password": "$Velvet90" + "username": "kannon", + "password": "kannon" }, "audio": { "threshold": 10 diff --git a/kannon.js b/kannon.js index 326939a..9ed536b 100644 --- a/kannon.js +++ b/kannon.js @@ -7,6 +7,7 @@ const Server = require('./classes/Server.js'); const Database = require('./classes/Database.js'); const Queue = require('./classes/Queue.js'); const Watcher = require('./classes/Watcher.js'); +const EventParser = require('./classes/EventParser'); const INTERRUPTS = ['beforeExit', 'SIGINT', 'SIGTERM']; @@ -30,6 +31,7 @@ async function main() { await global.database.initialize(); // socket server if (util.isEnabled(global.config.server)) { + global.eventparser = new EventParser(); global.server = new Server(); await global.server.start(); }