const sleep = require('../libs/util.js').sleep; const net = require('net'); const fs = require('fs'); const stat = require('fs/promises').stat; const Message = require('./Message.js'); const { CLIENT_STATE_REGISTERED, CLIENT_STATE_READY, CLIENT_STATE_PLAYING, CLIENT_STATE_PAUSED, CLIENT_STATE_STOPPED, CLIENT_STATE_ERROR } = require('../libs/constants.js'); class AudioServer { constructor(file) { this.listen = config?.server?.listen || '0.0.0.0'; this.port = 0; this.buffer = { file: file, stream: fs.createReadStream(file), limit: (config?.audio.bufferlimit || 256) * 1048576 }; this.clients = []; this.sockets = []; this.broadcasts = {}; this.position = 0; this.server = net.createServer(); this.#prepare(); } async #prepare() { if (server?.clients === undefined || server.clients.length === 0) { logger.warn('there are currently no clients connected, aborting preparation of audio server...') this.aborted = true; return; } await new Promise((resolve, reject) => { this.server.listen(this.port, this.listen).on('listening', () => { this.port = this.server.address().port; logger.info('audio server listening on ' + this.listen + ':' + this.port + '...'); this.#handleEvents(); resolve(); }); this.server.on('connection', (socket) => { this.sockets.push(socket); }); this.server.on('error', (err) => { reject('an error occured preparing the audio server for file \'' + this.file + '\' > ' + err); }); }); const stats = await stat(this.buffer.file); this.buffer.size = stats.size; this.buffer.threshold = (this.buffer.size / 100) / (!isNaN(config.audio?.threshold) || 30); this.#announceAudioServer(); } #handleEvents() { eventparser.on('audio:register', (data) => { this.#handleRegister(server.getClientById(data?.clientId), data?.port); }); eventparser.on('audio:state', (data) => { this.#setClientState(this.#getClientById(data?.clientId), data?.state, data); }); } #handleRegister(client, port) { if (client === undefined || port === undefined) { return; } let socket; for (let index = 0; index < this.sockets.length; index++) { if (this.sockets[index].remotePort === port) { socket = this.sockets[index]; this.sockets.splice(index, 1); break; } } if (socket === undefined) { return; } client.audiosocket = socket; this.clients.push(client); this.#setClientState(client, CLIENT_STATE_REGISTERED); client.audiosocket.on('connect', () => { logger.debug(client.getTag() + ' opened audio socket'); }); client.audiosocket.on('error', (error) => { logger.error(client.getTag() + ' encountered an error: ' + error); }); client.audiosocket.on('end', () => { logger.debug(client.getTag() + ' ended audio socket'); }); client.audiosocket.on('close', (hadError) => { let msg = client.getTag() + ' closed audio socket'; if (hadError === true) { msg += ' after an error'; } logger.debug(msg); }); client.audiosocket.on('drain', () => { if (this.buffer.stream === undefined || !this.buffer.stream.isPaused()) { return; } logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...'); this.buffer.stream.resume(); }); } #setClientState(client, state, data) { if (client === undefined || state === undefined) { return; } logger.debug(client.getTag() + ' state changed to \'' + state + '\''); client.state = state; this.#handleStateChange(client, data); } #allClientsInState(state) { if (this.clients === undefined || this.clients.length === 0 || state === undefined) { return false; } const broadcasts = this.broadcasts[state]; let result = true; for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.state !== state) { result = false; } if (broadcasts === undefined) { continue; } const indexOfId = this.broadcasts[state].indexOf(client.id); if (indexOfId >= 0) { this.broadcasts[state].splice(indexOfId, 1); } } if (broadcasts !== undefined) { result = result && this.broadcasts[state].length === 0; } return result; } #handleStateChange(client, data) { if (client === undefined) { return; } switch (client.state) { case CLIENT_STATE_REGISTERED: return this.#handleStateRegistered(client); case CLIENT_STATE_READY: return this.#handleStateReady(client); case CLIENT_STATE_PLAYING: return this.#handleStatePlaying(client); case CLIENT_STATE_PAUSED: return this.#handleStatePaused(client, data); case CLIENT_STATE_STOPPED: return this.#handleStateStopped(client, data); case CLIENT_STATE_ERROR: return this.#handleStateError(client, data); } } async #handleStateRegistered(client) { logger.debug(client.getTag() + ' has registered...'); if (!this.#allClientsInState(CLIENT_STATE_REGISTERED)) { return; } this.#transmitFile(); } async #handleStateReady(client) { logger.debug(client.getTag() + ' is ready for playback...'); if (!this.#allClientsInState(CLIENT_STATE_READY)) { return; } this.#startPlayback(); } async #handleStatePlaying(client) { logger.debug(client.getTag() + ' has started playback...'); // TODO: remove - test only await sleep(5000); this.#pausePlayback(); } async #handleStatePaused(client, data) { if (client === undefined || data === undefined) { return; } logger.debug(client.getTag() + ' paused playback at position \'' + data.position + '\'...'); // TODO: remove - test only await sleep(1); this.#startPlayback(); } async #handleStateStopped(client, data) { logger.debug(client.getTag() + ' stopped playback at position \'' + data.position + '\'...'); } async #handleStateError(client, data) { logger.error(client.getTag() + ' experienced an error during playback at position \'' + data.position + '\': ' + data.error); } #getClientById(clientId) { if (clientId === undefined) { return; } for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.id !== clientId) { continue; } return client; } } async #announceAudioServer() { this.broadcasts[CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', { port: this.server.address().port, size: this.buffer.size, threshold: this.buffer.threshold }).broadcast(true); logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcasts[CLIENT_STATE_REGISTERED] + '\'...'); } async #startPlayback() { this.broadcasts[CLIENT_STATE_PLAYING] = await new Message('audio:play', { position: this.position }).broadcast(); logger.debug('sent broadcast to start playback to client(s) \'' + this.broadcasts[CLIENT_STATE_PLAYING] + '\'...'); } async #stopPlayback() { this.broadcasts[CLIENT_STATE_STOPPED] = await new Message('audio:stop').broadcast(); logger.debug('sent broadcast to stop playback to client(s) \'' + this.broadcasts[CLIENT_STATE_STOPPED] + '\'...'); } async #pausePlayback() { this.broadcasts[CLIENT_STATE_PAUSED] = await new Message('audio:pause').broadcast(); logger.debug('sent broadcast to pause playback to client(s) \'' + this.broadcasts[CLIENT_STATE_PAUSED] + '\'...'); } async #transmitFile() { const timestamp = Date.now(); return new Promise((resolve, reject) => { this.buffer.stream.on('data', (data) => { for (let index = 0; index < this.clients.length; index++) { const client = this.clients[index]; if (client.audiosocket.write(data) !== true) { logger.debug(client.getTag() + ' detected backpressure, pausing read stream...'); this.buffer.stream.pause(); } if (client.audiosocket.bytesWritten >= this.buffer.size) { client.audiosocket.end(); client.audiosocket.destroy(); } } }); this.buffer.stream.on('close', () => { logger.debug('reading file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.stream.bytesRead + ' bytes)'); resolve(); }); this.buffer.stream.on('error', (error) => { // TODO: handle with try / catch reject(error); }); }); } async destroy() { eventparser.removeAllListeners('audio:ready'); for (let index = 0; index < this.clients.length; index++) { const audiosocket = this.clients[index].audiosocket; if (audiosocket.destroyed === true) { continue; } audiosocket.destroy(); } await new Promise((resolve, reject) => { this.server.close((err) => { if (err !== undefined) { reject(err); } resolve(); }); }); } } module.exports = AudioServer;