343 lines
No EOL
12 KiB
JavaScript
343 lines
No EOL
12 KiB
JavaScript
const net = require('net');
|
|
const { stat, open } = require('fs/promises');
|
|
|
|
const Message = require('./Message.js');
|
|
const constants = require('../libs/constants.js');
|
|
|
|
class AudioServer {
|
|
|
|
constructor(file, progress) {
|
|
this.listen = config?.server?.listen || '0.0.0.0';
|
|
this.port = 0;
|
|
this.buffer = {
|
|
file: file
|
|
};
|
|
this.clients = [];
|
|
this.sockets = [];
|
|
this.broadcasts = {};
|
|
this.progress = progress || 0;
|
|
this.server = net.createServer();
|
|
this.#prepare();
|
|
}
|
|
|
|
async #prepare() {
|
|
if (server?.clients === undefined || server.clients.length === 0) {
|
|
logger.warn('there are currently no clients connected, aborting preparation of audio server...')
|
|
this.aborted = true;
|
|
return;
|
|
}
|
|
await new Promise((resolve, reject) => {
|
|
this.server.listen(this.port, this.listen).on('listening', () => {
|
|
this.port = this.server.address().port;
|
|
logger.info('audio server listening on ' + this.listen + ':' + this.port + '...');
|
|
this.#handleEvents();
|
|
resolve();
|
|
});
|
|
this.server.on('connection', (socket) => {
|
|
this.sockets.push(socket);
|
|
});
|
|
this.server.on('error', (err) => {
|
|
logger.error('ERROR IN AUDIOSERVER ' + err);
|
|
reject('audio server encountered an error: ' + err);
|
|
});
|
|
this.server.on('close', () => {
|
|
logger.info('audio server closed');
|
|
});
|
|
});
|
|
await this.#prepareBuffer();
|
|
this.#announceAudioServer();
|
|
}
|
|
|
|
async #prepareBuffer() {
|
|
if (this.buffer.fd !== undefined) {
|
|
this.buffer.fd.close();
|
|
}
|
|
if (this.buffer.stream?.destroyed === false) {
|
|
this.buffer.stream.close();
|
|
this.buffer.stream.destroy();
|
|
}
|
|
this.buffer.fd = await open(this.buffer.file);
|
|
this.buffer.stream = this.buffer.fd.createReadStream({
|
|
start: this.progress
|
|
});
|
|
const stats = await stat(this.buffer.file);
|
|
this.buffer.size = stats.size - this.progress;
|
|
let percentage = 30;
|
|
if (!(isNaN(config.audio?.threshold))) {
|
|
percentage = config.audio.threshold;
|
|
}
|
|
this.buffer.threshold = (this.buffer.size / 100) * percentage;
|
|
}
|
|
|
|
#handleEvents() {
|
|
eventparser.on('audio:register', (data) => {
|
|
this.#handleRegister(server.getClientById(data?.clientId), data?.port);
|
|
});
|
|
eventparser.on('audio:state', (data) => {
|
|
this.#setClientState(this.#getClientById(data?.clientId), data?.state, data);
|
|
});
|
|
}
|
|
|
|
#handleRegister(client, port) {
|
|
if (client === undefined || port === undefined) {
|
|
return;
|
|
}
|
|
let socket;
|
|
for (let index = 0; index < this.sockets.length; index++) {
|
|
if (this.sockets[index].remotePort === port) {
|
|
socket = this.sockets[index];
|
|
this.sockets.splice(index, 1);
|
|
break;
|
|
}
|
|
}
|
|
if (socket === undefined) {
|
|
return;
|
|
}
|
|
client.audiosocket = socket;
|
|
client.audiosocket.setNoDelay(config?.audio?.nodelay || false);
|
|
this.clients.push(client);
|
|
this.#setClientState(client, constants.CLIENT_STATE_REGISTERED);
|
|
client.audiosocket.on('connect', () => {
|
|
logger.debug(client.getTag() + ' opened audio socket');
|
|
});
|
|
client.audiosocket.on('error', (error) => {
|
|
logger.error(client.getTag() + ' encountered an error: ' + error);
|
|
});
|
|
client.audiosocket.on('close', (hadError) => {
|
|
let msg = client.getTag() + ' closed audio socket';
|
|
if (hadError === true) {
|
|
msg += ' after an error';
|
|
}
|
|
logger.debug(msg);
|
|
});
|
|
client.audiosocket.on('drain', () => {
|
|
if (this.buffer.stream === undefined || this.buffer.stream.isPaused() === false) {
|
|
return;
|
|
}
|
|
this.#isFileTransmitted(client);
|
|
// logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...');
|
|
this.buffer.stream.resume();
|
|
});
|
|
}
|
|
|
|
#setClientState(client, state, data) {
|
|
if (client === undefined || state === undefined) {
|
|
return;
|
|
}
|
|
logger.debug(client.getTag() + ' state changed to \'' + state + '\'');
|
|
client.state = state;
|
|
this.#handleStateChange(client, data);
|
|
}
|
|
|
|
#allClientsInState(state) {
|
|
if (this.clients === undefined || this.clients.length === 0 || state === undefined) {
|
|
return false;
|
|
}
|
|
const broadcasts = this.broadcasts[state];
|
|
let result = true;
|
|
for (let index = 0; index < this.clients.length; index++) {
|
|
const client = this.clients[index];
|
|
if (client.state !== state) {
|
|
result = false;
|
|
}
|
|
if (broadcasts === undefined) {
|
|
continue;
|
|
}
|
|
const indexOfId = this.broadcasts[state].indexOf(client.id);
|
|
if (indexOfId >= 0) {
|
|
this.broadcasts[state].splice(indexOfId, 1);
|
|
}
|
|
}
|
|
if (broadcasts !== undefined) {
|
|
result = result && this.broadcasts[state].length === 0;
|
|
}
|
|
return result;
|
|
}
|
|
|
|
#handleStateChange(client, data) {
|
|
if (client === undefined) {
|
|
return;
|
|
}
|
|
if (this.progress < data?.progress) {
|
|
this.progress = data.progress;
|
|
}
|
|
switch (client.state) {
|
|
case constants.CLIENT_STATE_REGISTERED:
|
|
return this.#handleStateRegistered(client);
|
|
case constants.CLIENT_STATE_READY:
|
|
return this.#handleStateReady(client);
|
|
case constants.CLIENT_STATE_PLAYING:
|
|
return this.#handleStatePlaying(client);
|
|
case constants.CLIENT_STATE_PAUSED:
|
|
return this.#handleStatePaused(client, data);
|
|
case constants.CLIENT_STATE_STOPPED:
|
|
return this.#handleStateStopped(client, data);
|
|
case constants.CLIENT_STATE_ERROR:
|
|
return this.#handleStateError(client, data);
|
|
}
|
|
}
|
|
|
|
async #handleStateRegistered(client) {
|
|
logger.debug(client.getTag() + ' has registered...');
|
|
if (!this.#allClientsInState(constants.CLIENT_STATE_REGISTERED)) {
|
|
return;
|
|
}
|
|
this.#transmitFile();
|
|
}
|
|
|
|
async #handleStateReady(client) {
|
|
logger.debug(client.getTag() + ' is ready for playback...');
|
|
if (!this.#allClientsInState(constants.CLIENT_STATE_READY)) {
|
|
return;
|
|
}
|
|
this.startPlayback();
|
|
}
|
|
|
|
async #handleStatePlaying(client) {
|
|
logger.debug(client.getTag() + ' has started playback...');
|
|
// TODO: remove - test only
|
|
// await sleep(10000);
|
|
// this.#pausePlayback();
|
|
}
|
|
|
|
async #handleStatePaused(client, data) {
|
|
if (client === undefined || data === undefined) {
|
|
return;
|
|
}
|
|
logger.debug(client.getTag() + ' paused playback, progress: \'' + data.progress + '/' + this.buffer.size + '...');
|
|
// TODO: remove - test only
|
|
// await sleep(100);
|
|
// this.#startPlayback();
|
|
}
|
|
|
|
async #handleStateStopped(client, data) {
|
|
logger.debug(client.getTag() + ' stopped playback, progress: ' + data.progress + '/' + this.buffer.size + '...');
|
|
if (!this.#allClientsInState(constants.CLIENT_STATE_STOPPED)) {
|
|
return;
|
|
}
|
|
this.destroy();
|
|
}
|
|
|
|
async #handleStateError(client, data) {
|
|
logger.error(client.getTag() + ' experienced an error during playback, progress: \'' + data.progress + '/' + this.buffer.size + ': ' + error);
|
|
}
|
|
|
|
#getClientById(clientId) {
|
|
if (clientId === undefined) {
|
|
return;
|
|
}
|
|
for (let index = 0; index < this.clients.length; index++) {
|
|
const client = this.clients[index];
|
|
if (client.id !== clientId) {
|
|
continue;
|
|
}
|
|
return client;
|
|
}
|
|
}
|
|
|
|
async #announceAudioServer() {
|
|
this.broadcasts[constants.CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', {
|
|
port: this.server.address().port,
|
|
settings: {
|
|
size: this.buffer.size,
|
|
threshold: this.buffer.threshold,
|
|
// TODO: GET AUDIO INFO FROM DATABASE
|
|
audio: {
|
|
channels: 2,
|
|
bitDepth: 16,
|
|
sampleRate: 44100
|
|
}
|
|
}
|
|
}).broadcast(true);
|
|
logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_REGISTERED] + '\'...');
|
|
}
|
|
|
|
async startPlayback() {
|
|
this.broadcasts[constants.CLIENT_STATE_PLAYING] = await new Message('audio:play').broadcast();
|
|
logger.debug('sent broadcast to start playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PLAYING] + '\'...');
|
|
}
|
|
|
|
async pausePlayback() {
|
|
this.broadcasts[constants.CLIENT_STATE_PAUSED] = await new Message('audio:pause').broadcast();
|
|
logger.debug('sent broadcast to pause playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PAUSED] + '\'...');
|
|
}
|
|
|
|
async stopPlayback() {
|
|
this.broadcasts[constants.CLIENT_STATE_STOPPED] = await new Message('audio:stop').broadcast();
|
|
logger.debug('sent broadcast to stop playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_STOPPED] + '\'...');
|
|
}
|
|
|
|
async #transmitFile() {
|
|
const timestamp = Date.now();
|
|
return new Promise((resolve, reject) => {
|
|
this.buffer.stream.on('data', (data) => {
|
|
for (let index = 0; index < this.clients.length; index++) {
|
|
const client = this.clients[index];
|
|
if (client.audiosocket.destroyed) {
|
|
this.clients.splice(index, 1);
|
|
continue;
|
|
}
|
|
if (client.audiosocket.write(data) !== true) {
|
|
// logger.debug(client.getTag() + ' detected backpressure, pausing read stream...');
|
|
this.buffer.stream.pause();
|
|
continue;
|
|
}
|
|
this.#isFileTransmitted(client);
|
|
}
|
|
});
|
|
this.buffer.stream.on('close', () => {
|
|
logger.debug('reading file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.stream.bytesRead + ' bytes)');
|
|
resolve();
|
|
});
|
|
this.buffer.stream.on('error', (error) => {
|
|
// TODO: handle with try / catch
|
|
logger.debug('STREAM ERROR!');
|
|
reject(error);
|
|
});
|
|
});
|
|
}
|
|
|
|
#isFileTransmitted(client) {
|
|
if (client?.audiosocket === undefined) {
|
|
return;
|
|
}
|
|
if (client.audiosocket.bytesWritten < this.buffer.size) {
|
|
return false;
|
|
}
|
|
logger.debug(client.getTag() + ' transmitted ' + client.audiosocket.bytesWritten + ' bytes after ' + (Date.now() ) + 'ms');
|
|
client.audiosocket.end();
|
|
client.audiosocket.destroy();
|
|
return true;
|
|
}
|
|
|
|
async destroy() {
|
|
eventparser.removeAllListeners('audio:register');
|
|
eventparser.removeAllListeners('audio:state');
|
|
for (let index = 0; index < this.clients.length; index++) {
|
|
const audiosocket = this.clients[index].audiosocket;
|
|
if (audiosocket.destroyed === true) {
|
|
continue;
|
|
}
|
|
audiosocket.destroy();
|
|
}
|
|
this.buffer?.fd?.close();
|
|
this.buffer?.stream?.close();
|
|
this.buffer?.stream?.destroy();
|
|
if (this.server?.listening !== true) {
|
|
return;
|
|
}
|
|
await new Promise((resolve, reject) => {
|
|
this.server.close((error) => {
|
|
if (error !== undefined) {
|
|
logger.error('an error occured closing the audio server: ' + error);
|
|
// TODO: reject and try/catch later?
|
|
// reject(error);
|
|
}
|
|
resolve();
|
|
});
|
|
});
|
|
}
|
|
|
|
}
|
|
|
|
module.exports = AudioServer; |