const sleep = require('../libs/util.js').sleep; const net = require('net'); const fs = require('fs'); const stat = require('fs/promises').stat; const Message = require('./Message.js'); const EventParser = require('./EventParser.js'); const { CLIENT_STATE_READY, CLIENT_STATE_PLAYING, CLIENT_STATE_PAUSED, CLIENT_STATE_STOPPED, CLIENT_STATE_ERROR } = require('../libs/constants.js'); class AudioServer { constructor(file) { this.listen = config?.server?.listen || '0.0.0.0'; this.port = 0; this.buffer = { file: file }; this.clients = []; this.broadcastClients = []; this.position = 0; this.server = net.createServer(); this.eventParser = new EventParser(); this.#prepare(); } async #prepare() { if (server?.clients === undefined || server.clients.length === 0) { logger.warn('there are currently no clients connected, aborting preparation of audio server...') this.aborted = true; return; } await new Promise((resolve, reject) => { this.server.listen(this.port, this.listen).on('listening', () => { this.port = this.server.address().port; logger.info('audio server listening on ' + this.listen + ':' + this.port + '...'); resolve(); }); this.server.on('connection', (socket) => { this.#handleConnection(socket); }); this.server.on('error', (err) => { reject('an error occured preparing the audio server for file \'' + this.file + '\' > ' + err); }); }); const stats = await stat(this.buffer.file); this.buffer.size = stats.size; this.buffer.threshold = (this.buffer.size / 100) / (!isNaN(config.audio?.threshold) || 30); this.broadcastClients = await new Message('audio:initialize', { port: this.server.address().port, size: this.buffer.size, threshold: this.buffer.threshold }).broadcast(true); // const broadcastedTo = // for (let index = 0; index < broadcastedTo.length; index++) { // if (broadcastedTo[index]?.status !== 'fulfilled') { // continue; // } // this.broadcastClients.push(broadcastedTo[index].value); // } logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcastClients.toString() + '\'...'); this.#bufferFile(); } #handleConnection(socket) { socket.on('data', (data) => { this.eventParser.parse(data, socket); }); this.eventParser.on('audio:register', (clientId, socket) => { let client = server.getClientById(clientId); if (client === undefined) { return; } client.audiosocket = socket; this.clients.push(client); logger.debug(client.getTag() + ' connected to audio server...'); this.broadcastClients.splice(this.broadcastClients.indexOf(clientId), 1); this.#sendData(client); }); this.eventParser.on('audio:ready', async (clientId) => { let allClientsReady = true; for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.id === clientId) { client.state = CLIENT_STATE_READY; logger.debug(client.getTag() + ' is ready for playback...'); } if (client.state !== CLIENT_STATE_READY) { allClientsReady = false; } } if (allClientsReady !== true) { return; } const broadcastedTo = await new Message('audio:play', { position: this.position }).broadcast(); logger.debug('sent broadcast for playback to client(s) \'' + broadcastedTo + '\'...'); }); this.eventParser.on('audio:paused', (position) => { if (!isNaN(position) && positon > this.position) { this.position = position; } let client = this.#getClientById(clientId); if (client === undefined) { return; } client.state = CLIENT_STATE_PAUSED; }); } async #sendData(client) { const timestamp = Date.now(); const buffer = await this.#waitForBuffer(); return new Promise((resolve, reject) => { client.audiosocket.end(buffer, () => { logger.debug(client.getTag() + ' sent audio file \'' + this.buffer.file + '\' after ' + (Date.now() - timestamp) + 'ms...'); }); client.audiosocket.on('error', (error) => { logger.error(client.getTag() + ' encountered an error: ' + error); }); client.audiosocket.on('end', () => { logger.debug(client.getTag() + ' ended audio socket'); }); client.audiosocket.on('close', (hadError) => { let fn = resolve; let msg = client.getTag() + ' closed audio socket'; if (hadError === true) { msg += ' after an error'; fn = reject; } logger.debug(msg); fn(msg); }); }); } async #sendAudio() { if (this.aborted === true) { return; } const buffer = await this.#waitForBuffer(); await this.#waitForAllClients(); this.#handleClientConnections(); const promises = []; for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; client.audiostart = Date.now(); promises.push(new Promise((resolve, reject) => { client.audiosocket.end(buffer, () => { logger.debug(client.getTag() + ' sent audio file \'' + this.file + '\' after ' + (Date.now() - client.audiostart) + 'ms...'); resolve(); }); })); } await Promise.allSettled(promises); await this.destroy(); } async #waitForAllClients() { while (this.broadcastClients.length > 0) { await sleep(1); } return; } #handleClientConnections() { for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; client.audiosocket.on('error', (error) => { logger.error(client.getTag() + ' encountered an error: ' + error); }); client.audiosocket.on('end', () => { logger.debug(client.getTag() + ' ended audio socket'); }); client.audiosocket.on('close', (hadError) => { let msg = client.getTag() + ' closed audio socket'; if (hadError === true) { msg += ' after an error'; } logger.debug(msg); }); } } async #waitForBuffer() { while (this.buffer.data === undefined || this.buffer.data.length < this.buffer.size) { await sleep(1); } return this.buffer.data; } async #bufferFile() { return new Promise((resolve, reject) => { const timestamp = Date.now(); const buffer = []; const stream = fs.createReadStream(this.buffer.file); stream.on('data', (data) => { buffer.push(data); }); stream.on('close', () => { this.buffer.data = Buffer.concat(buffer); logger.debug('buffering file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.data.length + ' bytes)'); resolve(); }); stream.on('error', (error) => { // TODO: handle with try/catch reject(error); }); }); } #getClientById(clientId) { if (clientId === undefined) { return; } for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.id !== clientId) { continue; } return client; } } async destroy() { this.eventParser.removeAllListeners('audio:ready'); for (let index = 0; index < this.clients.length; index++) { const audiosocket = this.clients[index].audiosocket; if (audiosocket.destroyed === true) { continue; } audiosocket.destroy(); } await new Promise((resolve, reject) => { this.server.close((err) => { if (err !== undefined) { reject(err); } resolve(); }); }); } } module.exports = AudioServer;