const net = require('net'); const { stat, open } = require('fs/promises'); const Message = require('./Message.js'); const constants = require('../libs/constants.js'); class AudioServer { constructor(file, progress) { this.listen = config?.server?.listen || '0.0.0.0'; this.port = 0; this.buffer = { file: file }; this.clients = []; this.sockets = []; this.broadcasts = {}; this.progress = progress || 0; this.server = net.createServer(); this.#prepare(); } async #prepare() { if (server?.clients === undefined || server.clients.length === 0) { logger.warn('there are currently no clients connected, aborting preparation of audio server...') this.aborted = true; return; } await new Promise((resolve, reject) => { this.server.listen(this.port, this.listen).on('listening', () => { this.port = this.server.address().port; logger.info('audio server listening on ' + this.listen + ':' + this.port + '...'); this.#handleEvents(); resolve(); }); this.server.on('connection', (socket) => { this.sockets.push(socket); }); this.server.on('error', (err) => { logger.error('ERROR IN AUDIOSERVER ' + err); reject('audio server encountered an error: ' + err); }); this.server.on('close', () => { logger.info('audio server closed'); }); }); await this.#prepareBuffer(); this.#announceAudioServer(); } async #prepareBuffer() { if (this.buffer.fd !== undefined) { this.buffer.fd.close(); } if (this.buffer.stream?.destroyed === false) { this.buffer.stream.close(); this.buffer.stream.destroy(); } this.buffer.fd = await open(this.buffer.file); this.buffer.stream = this.buffer.fd.createReadStream({ start: this.progress }); const stats = await stat(this.buffer.file); this.buffer.size = stats.size - this.progress; let percentage = 30; if (!(isNaN(config.audio?.threshold))) { percentage = config.audio.threshold; } this.buffer.threshold = (this.buffer.size / 100) * percentage; } #handleEvents() { eventparser.on('audio:register', (data) => { this.#handleRegister(server.getClientById(data?.clientId), data?.port); }); eventparser.on('audio:state', (data) => { this.#setClientState(this.#getClientById(data?.clientId), data?.state, data); }); } #handleRegister(client, port) { if (client === undefined || port === undefined) { return; } let socket; for (let index = 0; index < this.sockets.length; index++) { if (this.sockets[index].remotePort === port) { socket = this.sockets[index]; this.sockets.splice(index, 1); break; } } if (socket === undefined) { return; } client.audiosocket = socket; client.audiosocket.setNoDelay(config?.audio?.nodelay || false); this.clients.push(client); this.#setClientState(client, constants.CLIENT_STATE_REGISTERED); client.audiosocket.on('connect', () => { logger.debug(client.getTag() + ' opened audio socket'); }); client.audiosocket.on('error', (error) => { logger.error(client.getTag() + ' encountered an error: ' + error); }); client.audiosocket.on('close', (hadError) => { let msg = client.getTag() + ' closed audio socket'; if (hadError === true) { msg += ' after an error'; } logger.debug(msg); }); client.audiosocket.on('drain', () => { if (this.buffer.stream === undefined || this.buffer.stream.isPaused() === false) { return; } // logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...'); this.buffer.stream.resume(); }); } #setClientState(client, state, data) { if (client === undefined || state === undefined) { return; } logger.debug(client.getTag() + ' state changed to \'' + state + '\''); client.state = state; this.#handleStateChange(client, data); } #allClientsInState(state) { if (this.clients === undefined || this.clients.length === 0 || state === undefined) { return false; } const broadcasts = this.broadcasts[state]; let result = true; for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.state !== state) { result = false; } if (broadcasts === undefined) { continue; } const indexOfId = this.broadcasts[state].indexOf(client.id); if (indexOfId >= 0) { this.broadcasts[state].splice(indexOfId, 1); } } if (broadcasts !== undefined) { result = result && this.broadcasts[state].length === 0; } return result; } #handleStateChange(client, data) { if (client === undefined) { return; } if (this.progress < data?.progress) { this.progress = data.progress; } switch (client.state) { case constants.CLIENT_STATE_REGISTERED: return this.#handleStateRegistered(client); case constants.CLIENT_STATE_READY: return this.#handleStateReady(client); case constants.CLIENT_STATE_PLAYING: return this.#handleStatePlaying(client); case constants.CLIENT_STATE_PAUSED: return this.#handleStatePaused(client, data); case constants.CLIENT_STATE_STOPPED: return this.#handleStateStopped(client, data); case constants.CLIENT_STATE_ERROR: return this.#handleStateError(client, data); } } async #handleStateRegistered(client) { logger.debug(client.getTag() + ' has registered...'); if (!this.#allClientsInState(constants.CLIENT_STATE_REGISTERED)) { return; } this.#transmitFile(); } async #handleStateReady(client) { logger.debug(client.getTag() + ' is ready for playback...'); if (!this.#allClientsInState(constants.CLIENT_STATE_READY)) { return; } this.startPlayback(); } async #handleStatePlaying(client) { logger.debug(client.getTag() + ' has started playback...'); // TODO: remove - test only // await sleep(10000); // this.#pausePlayback(); } async #handleStatePaused(client, data) { if (client === undefined || data === undefined) { return; } logger.debug(client.getTag() + ' paused playback, progress: \'' + data.progress + '/' + this.buffer.size + '...'); // TODO: remove - test only // await sleep(100); // this.#startPlayback(); } async #handleStateStopped(client, data) { logger.debug(client.getTag() + ' stopped playback, progress: ' + data.progress + '/' + this.buffer.size + '...'); if (!this.#allClientsInState(constants.CLIENT_STATE_STOPPED)) { return; } this.destroy(); } async #handleStateError(client, data) { logger.error(client.getTag() + ' experienced an error during playback, progress: \'' + data.progress + '/' + this.buffer.size + ': ' + error); } #getClientById(clientId) { if (clientId === undefined) { return; } for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.id !== clientId) { continue; } return client; } } async #announceAudioServer() { this.broadcasts[constants.CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', { port: this.server.address().port, settings: { size: this.buffer.size, threshold: this.buffer.threshold, // TODO: GET AUDIO INFO FROM DATABASE audio: { channels: 2, bitDepth: 16, sampleRate: 44100 } } }).broadcast(true); logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_REGISTERED] + '\'...'); } async startPlayback() { this.broadcasts[constants.CLIENT_STATE_PLAYING] = await new Message('audio:play').broadcast(); logger.debug('sent broadcast to start playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PLAYING] + '\'...'); } async pausePlayback() { this.broadcasts[constants.CLIENT_STATE_PAUSED] = await new Message('audio:pause').broadcast(); logger.debug('sent broadcast to pause playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PAUSED] + '\'...'); } async stopPlayback() { this.broadcasts[constants.CLIENT_STATE_STOPPED] = await new Message('audio:stop').broadcast(); logger.debug('sent broadcast to stop playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_STOPPED] + '\'...'); } async #transmitFile() { const timestamp = Date.now(); return new Promise((resolve, reject) => { this.buffer.stream.on('data', (data) => { for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.audiosocket.destroyed) { this.clients.splice(index, 1); continue; } if (client.audiosocket.write(data) !== true) { // logger.debug(client.getTag() + ' detected backpressure, pausing read stream...'); this.buffer.stream.pause(); } if (client.audiosocket.bytesWritten >= this.buffer.size) { logger.warn(client.getTag() + ' transmitted audio file after ' + (Date.now() - timestamp) + 'ms'); client.audiosocket.end(); client.audiosocket.destroy(); } } }); this.buffer.stream.on('close', () => { logger.debug('reading file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.stream.bytesRead + ' bytes)'); resolve(); }); this.buffer.stream.on('error', (error) => { // TODO: handle with try / catch logger.debug('STREAM ERROR!'); reject(error); }); }); } async destroy() { eventparser.removeAllListeners('audio:ready'); for (let index = 0; index < this.clients.length; index++) { const audiosocket = this.clients[index].audiosocket; if (audiosocket.destroyed === true) { continue; } audiosocket.destroy(); } this.buffer.fd.close(); this.buffer.stream.close(); this.buffer.stream.destroy(); if (this.server?.listening !== true) { return; } await new Promise((resolve, reject) => { this.server.close((error) => { if (error !== undefined) { logger.error('an error occured closing the audio server: ' + error); // TODO: reject and try/catch later? // reject(error); } resolve(); }); }); } } module.exports = AudioServer;