const net = require('net'); const { stat, open } = require('fs/promises'); const Message = require('./Message.js'); const PCMStream = require('./PCMStream.js'); class AudioServer { constructor(file, progress) { this.listen = config?.server?.listen || '0.0.0.0'; this.port = 0; this.file = file; this.clients = []; this.sockets = []; this.broadcasts = {}; this.progress = progress || 0; this.server = net.createServer(); this.#prepare(); } async #prepare() { if (server?.clients === undefined || server.clients.length === 0) { logger.warn('there are currently no clients connected, aborting preparation of audio server...') this.aborted = true; return; } await new Promise((resolve, reject) => { this.server.listen(this.port, this.listen).on('listening', () => { this.port = this.server.address().port; logger.info('audio server listening on ' + this.listen + ':' + this.port + '...'); this.#handleEvents(); resolve(); }); this.server.on('connection', (socket) => { this.sockets.push(socket); }); this.server.on('error', (err) => { reject('audio server encountered an error: ' + err); }); this.server.on('close', () => { logger.info('audio server closed'); }); }); // TODO: GET AUDIO INFO FROM FILE AND PASS TO PCMSTREAM FOR CONVERSION this.pcm = new PCMStream(this.file, this.progress); try { await this.pcm.prepare(); } catch (error) { logger.error('encountered an error creating pcm stream: ' + error); this.pcm = undefined; this.aborted = true; return; } this.#calculateThreshold(); this.#announceAudioServer(); } #calculateThreshold() { this.threshold = config.audio?.threshold; if (isNaN(this.threshold)) { this.threshold = 16; } this.threshold = this.threshold * 1024; } #handleEvents() { eventparser.on('audio:register', (data) => { this.#handleRegister(server.getClientById(data?.clientId), data?.port); }); eventparser.on('audio:state', (data) => { this.#setClientState(this.#getClientById(data?.clientId), data?.state, data); }); } #handleRegister(client, port) { if (client === undefined || port === undefined) { return; } let socket; for (let index = 0; index < this.sockets.length; index++) { if (this.sockets[index].remotePort === port) { socket = this.sockets[index]; this.sockets.splice(index, 1); break; } } if (socket === undefined) { return; } client.audiosocket = socket; client.audiosocket.setNoDelay(config?.audio?.nodelay || false); this.clients.push(client); this.#setClientState(client, constants.CLIENT_STATE_REGISTERED); client.audiosocket.on('connect', () => { logger.debug(client.getTag() + ' opened audio socket'); }); client.audiosocket.on('error', (error) => { logger.error(client.getTag() + ' encountered an error: ' + error); }); client.audiosocket.on('close', (hadError) => { let msg = client.getTag() + ' closed audio socket'; if (hadError === true) { msg += ' after an error'; } logger.debug(msg); }); client.audiosocket.on('drain', () => { if (this.pcm === undefined || this.pcm.isPaused() === false) { return; } this.#checkFileTransmission(client); // logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...'); this.pcm.resume(); }); } #setClientState(client, state, data) { if (client === undefined || state === undefined) { return; } logger.debug(client.getTag() + ' state changed to \'' + state + '\''); client.state = state; this.#handleStateChange(client, data); } #allClientsInState(state) { if (this.clients === undefined || this.clients.length === 0 || state === undefined) { return false; } const broadcasts = this.broadcasts[state]; let result = true; for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.state !== state) { result = false; } if (broadcasts === undefined) { continue; } const indexOfId = this.broadcasts[state].indexOf(client.id); if (indexOfId >= 0) { this.broadcasts[state].splice(indexOfId, 1); } } if (broadcasts !== undefined) { result = result && this.broadcasts[state].length === 0; } return result; } #handleStateChange(client, data) { if (client === undefined) { return; } if (this.progress < data?.progress) { this.progress = data.progress; } switch (client.state) { case constants.CLIENT_STATE_REGISTERED: return this.#handleStateRegistered(client); case constants.CLIENT_STATE_READY: return this.#handleStateReady(client); case constants.CLIENT_STATE_PLAYING: return this.#handleStatePlaying(client); case constants.CLIENT_STATE_PAUSED: return this.#handleStatePaused(client, data); case constants.CLIENT_STATE_STOPPED: return this.#handleStateStopped(client, data); case constants.CLIENT_STATE_ERROR: return this.#handleStateError(client, data); } } async #handleStateRegistered(client) { logger.debug(client.getTag() + ' has registered...'); if (!this.#allClientsInState(constants.CLIENT_STATE_REGISTERED)) { return; } this.#transmitFile(); } async #handleStateReady(client) { logger.debug(client.getTag() + ' is ready for playback...'); if (!this.#allClientsInState(constants.CLIENT_STATE_READY)) { return; } this.startPlayback(); } async #handleStatePlaying(client) { logger.debug(client.getTag() + ' has started playback...'); } async #handleStatePaused(client, data) { if (client === undefined || data === undefined) { return; } logger.debug(client.getTag() + ' paused playback, progress: \'' + data.progress + '...'); } async #handleStateStopped(client, data) { logger.debug(client.getTag() + ' stopped playback, progress: ' + data.progress + '...'); if (!this.#allClientsInState(constants.CLIENT_STATE_STOPPED)) { return; } this.destroy(); } async #handleStateError(client, data) { logger.error(client.getTag() + ' experienced an error during playback, progress: \'' + data.progress + ': ' + error); } #getClientById(clientId) { if (clientId === undefined) { return; } for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.id !== clientId) { continue; } return client; } } async #announceAudioServer() { this.broadcasts[constants.CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', { port: this.server.address().port, settings: { threshold: this.threshold, // TODO: GET AUDIO INFO FROM DATABASE AND PASS TO CLIENT(S) FOR PLAYBACK audio: { channels: 2, bitDepth: 16, sampleRate: 44100 } } }).broadcast(true); logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_REGISTERED] + '\'...'); } async startPlayback() { this.broadcasts[constants.CLIENT_STATE_PLAYING] = await new Message('audio:play').broadcast(); logger.debug('sent broadcast to start playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PLAYING] + '\'...'); } async pausePlayback() { this.broadcasts[constants.CLIENT_STATE_PAUSED] = await new Message('audio:pause').broadcast(); logger.debug('sent broadcast to pause playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PAUSED] + '\'...'); } async stopPlayback() { this.broadcasts[constants.CLIENT_STATE_STOPPED] = await new Message('audio:stop').broadcast(); logger.debug('sent broadcast to stop playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_STOPPED] + '\'...'); } async #transmitFile() { const timestamp = Date.now(); return new Promise((resolve, reject) => { this.pcm.resume(); this.pcm.on('data', (data) => { for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.audiosocket.destroyed) { this.clients.splice(index, 1); continue; } if (client.audiosocket.write(data) !== true) { // logger.debug(client.getTag() + ' detected backpressure, pausing read stream...'); this.pcm.pause(); continue; } this.#checkFileTransmission(client); } }); this.pcm.on('close', () => { logger.debug('transmitting to pcm data converted file \'' + this.file + '\' took ' + (Date.now() - timestamp) + 'ms'); resolve(); }); }); } #checkFileTransmission(client) { if (client?.audiosocket === undefined || this.pcm === undefined) { return; } if (this.pcm.destroyed !== true) { return false; } logger.debug(client.getTag() + ' transmitted ' + client.audiosocket.bytesWritten + ' bytes after ' + (Date.now()) + 'ms'); client.audiosocket.end(); client.audiosocket.destroy(); return true; } async destroy() { eventparser.removeAllListeners('audio:register'); eventparser.removeAllListeners('audio:state'); for (let index = 0; index < this.clients.length; index++) { const audiosocket = this.clients[index].audiosocket; if (audiosocket.destroyed === true) { continue; } audiosocket.destroy(); } if (this.server?.listening !== true) { return; } await new Promise((resolve, reject) => { this.server.close((error) => { if (error !== undefined) { logger.error('an error occured closing the audio server: ' + error); // TODO: reject and try/catch later? // reject(error); } resolve(); }); }); if (this.pcm.destroyed === false) { await this.pcm.destroy(); } this.pcm = undefined; } } module.exports = AudioServer;