From 8cdb4ca0dad93e70854057715eafddfc038b47b3 Mon Sep 17 00:00:00 2001 From: velvettear Date: Tue, 19 Apr 2022 15:34:10 +0200 Subject: [PATCH] moved around a lot of stuff --- classes/AudioServer.js | 179 ++++++++++++++++++++++++++----------- classes/Message.js | 10 ++- classes/Server.js | 20 +++-- example_config_server.json | 3 + libs/constants.js | 11 ++- 5 files changed, 160 insertions(+), 63 deletions(-) diff --git a/classes/AudioServer.js b/classes/AudioServer.js index 94ca754..87854ca 100644 --- a/classes/AudioServer.js +++ b/classes/AudioServer.js @@ -7,41 +7,24 @@ const stat = require('fs/promises').stat; const Message = require('./Message.js'); const EventParser = require('./EventParser.js'); +const { CLIENT_STATE_READY, CLIENT_STATE_PLAYING, CLIENT_STATE_PAUSED, CLIENT_STATE_STOPPED, CLIENT_STATE_ERROR } = require('../libs/constants.js'); + class AudioServer { constructor(file) { this.listen = config?.server?.listen || '0.0.0.0'; this.port = 0; - this.file = file; + this.buffer = { + file: file + }; this.clients = []; this.broadcastClients = []; + this.position = 0; this.server = net.createServer(); this.eventParser = new EventParser(); this.#prepare(); } - async start() { - if (this.aborted === true) { - return; - } - const buffer = await this.#waitForBuffer(); - await this.#waitForAllClients(); - this.#handleClientConnections(); - const promises = []; - for (let index = 0; index < this.clients.length; index++) { - const client = this.clients[index]; - client.audiostart = Date.now(); - promises.push(new Promise((resolve, reject) => { - client.audiosocket.end(buffer, () => { - logger.debug(client.getTag() + ' sent audio file \'' + this.file + '\' after ' + (Date.now() - client.audiostart) + 'ms...'); - resolve(); - }); - })); - } - await Promise.allSettled(promises); - await this.destroy(); - } - async #prepare() { if (server?.clients === undefined || server.clients.length === 0) { logger.warn('there are currently no clients connected, aborting preparation of audio server...') @@ -61,14 +44,21 @@ class AudioServer { reject('an error occured preparing the audio server for file \'' + this.file + '\' > ' + err); }); }); - const stats = await stat(this.file); - const broadcastedTo = await new Message('audiostream-initialize', { port: this.server.address().port, size: stats.size }).broadcast(true); - for (let index = 0; index < broadcastedTo.length; index++) { - if (broadcastedTo[index]?.status !== 'fulfilled') { - continue; - } - this.broadcastClients.push(broadcastedTo[index].value); - } + const stats = await stat(this.buffer.file); + this.buffer.size = stats.size; + this.buffer.threshold = (this.buffer.size / 100) / (!isNaN(config.audio?.threshold) || 30); + this.broadcastClients = await new Message('audio:initialize', { + port: this.server.address().port, + size: this.buffer.size, + threshold: this.buffer.threshold + }).broadcast(true); + // const broadcastedTo = + // for (let index = 0; index < broadcastedTo.length; index++) { + // if (broadcastedTo[index]?.status !== 'fulfilled') { + // continue; + // } + // this.broadcastClients.push(broadcastedTo[index].value); + // } logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcastClients.toString() + '\'...'); this.#bufferFile(); } @@ -77,14 +67,8 @@ class AudioServer { socket.on('data', (data) => { this.eventParser.parse(data, socket); }); - this.eventParser.on('audiostream-ready', (clientId, socket) => { - let client; - for (let index = 0; index < server.clients.length; index++) { - if (server.clients[index].id === clientId) { - client = server.clients[index]; - break; - } - } + this.eventParser.on('audio:register', (clientId, socket) => { + let client = server.getClientById(clientId); if (client === undefined) { return; } @@ -92,7 +76,92 @@ class AudioServer { this.clients.push(client); logger.debug(client.getTag() + ' connected to audio server...'); this.broadcastClients.splice(this.broadcastClients.indexOf(clientId), 1); + this.#sendData(client); }); + this.eventParser.on('audio:ready', async (clientId) => { + let allClientsReady = true; + for (let index = 0; index < this.clients.length; index++) { + const client = this.clients[index]; + if (client.id === clientId) { + client.state = CLIENT_STATE_READY; + logger.debug(client.getTag() + ' is ready for playback...'); + } + if (client.state !== CLIENT_STATE_READY) { + allClientsReady = false; + } + } + if (allClientsReady !== true) { + return; + } + const broadcastedTo = await new Message('audio:play', { position: this.position }).broadcast(); + logger.debug('sent broadcast for playback to client(s) \'' + broadcastedTo + '\'...'); + }); + this.eventParser.on('audio:paused', (position) => { + if (!isNaN(position) && positon > this.position) { + this.position = position; + } + let client = this.#getClientById(clientId); + if (client === undefined) { + return; + } + client.state = CLIENT_STATE_PAUSED; + }); + } + + async #sendData(client) { + const timestamp = Date.now(); + const buffer = await this.#waitForBuffer(); + return new Promise((resolve, reject) => { + client.audiosocket.end(buffer, () => { + logger.debug(client.getTag() + ' sent audio file \'' + this.buffer.file + '\' after ' + (Date.now() - timestamp) + 'ms...'); + }); + client.audiosocket.on('error', (error) => { + logger.error(client.getTag() + ' encountered an error: ' + error); + }); + client.audiosocket.on('end', () => { + logger.debug(client.getTag() + ' ended audio socket'); + }); + client.audiosocket.on('close', (hadError) => { + let fn = resolve; + let msg = client.getTag() + ' closed audio socket'; + if (hadError === true) { + msg += ' after an error'; + fn = reject; + } + logger.debug(msg); + fn(msg); + }); + }); + } + + async #sendAudio() { + if (this.aborted === true) { + return; + } + const buffer = await this.#waitForBuffer(); + await this.#waitForAllClients(); + this.#handleClientConnections(); + const promises = []; + for (let index = 0; index < this.clients.length; index++) { + const client = this.clients[index]; + client.audiostart = Date.now(); + promises.push(new Promise((resolve, reject) => { + client.audiosocket.end(buffer, () => { + logger.debug(client.getTag() + ' sent audio file \'' + this.file + '\' after ' + (Date.now() - client.audiostart) + 'ms...'); + resolve(); + }); + })); + } + await Promise.allSettled(promises); + await this.destroy(); + } + + + async #waitForAllClients() { + while (this.broadcastClients.length > 0) { + await sleep(1); + } + return; } #handleClientConnections() { @@ -111,35 +180,28 @@ class AudioServer { } logger.debug(msg); }); - - } - } - async #waitForAllClients() { - while (this.broadcastClients.length > 0) { - await sleep(1); } - return; } async #waitForBuffer() { - while (this.buffer === undefined) { + while (this.buffer.data === undefined || this.buffer.data.length < this.buffer.size) { await sleep(1); } - return this.buffer; + return this.buffer.data; } async #bufferFile() { return new Promise((resolve, reject) => { const timestamp = Date.now(); const buffer = []; - const stream = fs.createReadStream(this.file); + const stream = fs.createReadStream(this.buffer.file); stream.on('data', (data) => { buffer.push(data); }); stream.on('close', () => { - this.buffer = Buffer.concat(buffer); - logger.debug('buffering file \'' + this.file + '\' took ' + (Date.now() - timestamp) + 'ms (length: ' + this.buffer.length + ' bytes)'); + this.buffer.data = Buffer.concat(buffer); + logger.debug('buffering file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.data.length + ' bytes)'); resolve(); }); stream.on('error', (error) => { @@ -149,8 +211,21 @@ class AudioServer { }); } + #getClientById(clientId) { + if (clientId === undefined) { + return; + } + for (let index = 0; index < this.clients.length; index++) { + const client = this.clients[index]; + if (client.id !== clientId) { + continue; + } + return client; + } + } + async destroy() { - this.eventParser.removeAllListeners('audiostream-ready'); + this.eventParser.removeAllListeners('audio:ready'); for (let index = 0; index < this.clients.length; index++) { const audiosocket = this.clients[index].audiosocket; if (audiosocket.destroyed === true) { diff --git a/classes/Message.js b/classes/Message.js index 1417bb8..bb454ba 100644 --- a/classes/Message.js +++ b/classes/Message.js @@ -39,7 +39,15 @@ class Message { for (let index = 0; index < server.clients.length; index++) { promises.push(this.send(server.clients[index], addClientId)); } - return await Promise.allSettled(promises); + const reached = []; + const result = await Promise.allSettled(promises); + for (let index = 0; index < result.length; index++) { + if (result[index]?.status !== 'fulfilled') { + continue; + } + reached.push(result[index].value); + } + return reached; } } diff --git a/classes/Server.js b/classes/Server.js index 4865530..fd9015f 100644 --- a/classes/Server.js +++ b/classes/Server.js @@ -14,10 +14,6 @@ class Server { } start() { - // setInterval(() => { - // const audioServer = new AudioServer('/mnt/kingston/downloads/DOPESMOKER.flac'); - // audioServer.start(); - // }, 10000); return new Promise((resolve, reject) => { this.server.listen(this.port, this.listen).on('listening', () => { this.port = this.server.address().port; @@ -48,8 +44,7 @@ class Server { #addClient(socket) { this.clients.push(new Client(socket)); - const audioServer = new AudioServer('/mnt/kingston/downloads/DOPESMOKER.flac'); - audioServer.start(); + new AudioServer('/mnt/kingston/public/DOPESMOKER.flac'); } removeClient(client) { @@ -57,6 +52,19 @@ class Server { this.clients.splice(this.clients.indexOf(client), 1); } + getClientById(clientId) { + if (clientId === undefined) { + return; + } + for (let index = 0; index < this.clients.length; index++) { + const client = this.clients[index]; + if (client.id !== clientId) { + continue; + } + return client; + } + } + } module.exports = Server; \ No newline at end of file diff --git a/example_config_server.json b/example_config_server.json index 94671cc..f3823d4 100644 --- a/example_config_server.json +++ b/example_config_server.json @@ -20,5 +20,8 @@ "database": "kannon", "username": "postgres", "password": "$Velvet90" + }, + "audio": { + "threshold": 10 } } \ No newline at end of file diff --git a/libs/constants.js b/libs/constants.js index f50b6aa..0441678 100644 --- a/libs/constants.js +++ b/libs/constants.js @@ -2,9 +2,12 @@ module.exports = { FS_EVENT_ADD: 'add', FS_EVENT_UNLINK: 'unlink', FS_EVENT_CHANGE: 'change', - - SOCKET_EVENT_PING: 'ping', - SOCKET_EVENT_PONG: 'pong', - EVENT_DELIMITER: '<<< kannon >>>' + EVENT_DELIMITER: '<<< kannon >>>', + + CLIENT_STATE_READY: 'ready', + CLIENT_STATE_PLAYING: 'playing', + CLIENT_STATE_PAUSED: 'paused', + CLIENT_STATE_STOPPED: 'stopped', + CLIENT_STATE_ERROR: 'error' } \ No newline at end of file