kannon/classes/AudioServer.js
2022-04-14 14:23:41 +02:00

174 lines
No EOL
6 KiB
JavaScript

const sleep = require('../libs/util.js').sleep;
const net = require('net');
const fs = require('fs');
const stat = require('fs/promises').stat;
const Message = require('./Message.js');
const EventParser = require('./EventParser.js');
class AudioServer {
constructor(file) {
this.listen = config?.server?.listen || '0.0.0.0';
this.port = 0;
this.file = file;
this.clients = [];
this.broadcastClients = [];
this.server = net.createServer();
this.eventParser = new EventParser();
this.#prepare();
}
async start() {
if (this.aborted === true) {
return;
}
const buffer = await this.#waitForBuffer();
await this.#waitForAllClients();
this.#handleClientConnections();
const promises = [];
for (let index = 0; index < this.clients.length; index++) {
const client = this.clients[index];
client.audiostart = Date.now();
promises.push(new Promise((resolve, reject) => {
client.audiosocket.end(buffer, () => {
logger.debug(client.getTag() + ' sent audio file \'' + this.file + '\' after ' + (Date.now() - client.audiostart) + 'ms...');
resolve();
});
}));
}
await Promise.allSettled(promises);
await this.destroy();
}
async #prepare() {
if (server?.clients === undefined || server.clients.length === 0) {
logger.warn('there are currently no clients connected, aborting preparation of audio server...')
this.aborted = true;
return;
}
await new Promise((resolve, reject) => {
this.server.listen(this.port, this.listen).on('listening', () => {
this.port = this.server.address().port;
logger.info('audio server listening on ' + this.listen + ':' + this.port + '...');
resolve();
});
this.server.on('connection', (socket) => {
this.#handleConnection(socket);
});
this.server.on('error', (err) => {
reject('an error occured preparing the audio server for file \'' + this.file + '\' > ' + err);
});
});
const stats = await stat(this.file);
const broadcastedTo = await new Message('audiostream-initialize', { port: this.server.address().port, size: stats.size }).broadcast(true);
for (let index = 0; index < broadcastedTo.length; index++) {
if (broadcastedTo[index]?.status !== 'fulfilled') {
continue;
}
this.broadcastClients.push(broadcastedTo[index].value);
}
logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcastClients.toString() + '\'...');
this.#bufferFile();
}
#handleConnection(socket) {
socket.on('data', (data) => {
this.eventParser.parse(data, socket);
});
this.eventParser.on('audiostream-ready', (clientId, socket) => {
let client;
for (let index = 0; index < server.clients.length; index++) {
if (server.clients[index].id === clientId) {
client = server.clients[index];
break;
}
}
if (client === undefined) {
return;
}
client.audiosocket = socket;
this.clients.push(client);
logger.debug(client.getTag() + ' connected to audio server...');
this.broadcastClients.splice(this.broadcastClients.indexOf(clientId), 1);
});
}
#handleClientConnections() {
for (let index = 0; index < this.clients.length; index++) {
const client = this.clients[index];
client.audiosocket.on('error', (error) => {
logger.error(client.getTag() + ' encountered an error: ' + error);
});
client.audiosocket.on('end', () => {
logger.debug(client.getTag() + ' ended audio socket');
});
client.audiosocket.on('close', (hadError) => {
let msg = client.getTag() + ' closed audio socket';
if (hadError === true) {
msg += ' after an error';
}
logger.debug(msg);
});
}
}
async #waitForAllClients() {
while (this.broadcastClients.length > 0) {
await sleep(1);
}
return;
}
async #waitForBuffer() {
while (this.buffer === undefined) {
await sleep(1);
}
return this.buffer;
}
async #bufferFile() {
return new Promise((resolve, reject) => {
const timestamp = Date.now();
const buffer = [];
const stream = fs.createReadStream(this.file);
stream.on('data', (data) => {
buffer.push(data);
});
stream.on('close', () => {
this.buffer = Buffer.concat(buffer);
logger.debug('buffering file \'' + this.file + '\' took ' + (Date.now() - timestamp) + 'ms (length: ' + this.buffer.length + ' bytes)');
resolve();
});
stream.on('error', (error) => {
// TODO: handle with try/catch
reject(error);
});
});
}
async destroy() {
this.eventParser.removeAllListeners('audiostream-ready');
for (let index = 0; index < this.clients.length; index++) {
const audiosocket = this.clients[index].audiosocket;
if (audiosocket.destroyed === true) {
continue;
}
audiosocket.destroy();
}
await new Promise((resolve, reject) => {
this.server.close((err) => {
if (err !== undefined) {
reject(err);
}
resolve();
});
});
}
}
module.exports = AudioServer;