kannon/classes/AudioServer.js

325 lines
12 KiB
JavaScript
Raw Permalink Normal View History

2022-04-14 14:23:41 +02:00
const net = require('net');
const Message = require('./Message.js');
const PCMStream = require('./PCMStream.js');
2022-04-14 14:23:41 +02:00
class AudioServer {
constructor(file, progress) {
this.listen = config?.audio?.listen || '0.0.0.0';
this.port = config?.audio?.port;
this.file = file;
2022-04-14 14:23:41 +02:00
this.clients = [];
2022-04-20 16:15:33 +02:00
this.sockets = [];
2022-04-21 13:09:31 +02:00
this.broadcasts = {};
this.progress = progress || 0;
2022-04-14 14:23:41 +02:00
this.server = net.createServer();
this.#prepare();
}
async #prepare() {
if (server?.clients === undefined || server.clients.length === 0) {
logger.warn('there are currently no clients connected, aborting preparation of audio server...')
this.aborted = true;
return;
}
await new Promise((resolve, reject) => {
this.server.listen(this.port, this.listen).on('listening', () => {
this.port = this.server.address().port;
logger.info('audio server listening on ' + this.listen + ':' + this.port + '...');
2022-04-20 16:15:33 +02:00
this.#handleEvents();
2022-04-14 14:23:41 +02:00
resolve();
});
this.server.on('connection', (socket) => {
2022-04-20 16:15:33 +02:00
this.sockets.push(socket);
2022-04-14 14:23:41 +02:00
});
this.server.on('error', (err) => {
reject('audio server encountered an error: ' + err);
});
this.server.on('close', () => {
logger.info('audio server closed');
2022-04-14 14:23:41 +02:00
});
});
// TODO: GET AUDIO INFO FROM FILE AND PASS TO PCMSTREAM FOR CONVERSION
this.pcm = new PCMStream(this.file, this.progress);
try {
await this.pcm.prepare();
} catch (error) {
logger.error('encountered an error creating pcm stream: ' + error);
this.pcm = undefined;
this.aborted = true;
return;
}
this.#calculateThreshold();
this.#announceAudioServer();
}
#calculateThreshold() {
this.threshold = config.audio?.threshold;
if (isNaN(this.threshold)) {
this.threshold = 16;
}
this.threshold = this.threshold * 1024;
2022-04-14 14:23:41 +02:00
}
#handleEvents() {
2022-04-20 16:15:33 +02:00
eventparser.on('audio:register', (data) => {
2022-04-21 13:09:31 +02:00
this.#handleRegister(server.getClientById(data?.clientId), data?.port);
});
eventparser.on('audio:state', (data) => {
this.#setClientState(this.#getClientById(data?.clientId), data?.state, data);
});
}
#handleRegister(client, port) {
if (client === undefined || port === undefined) {
return;
}
let socket;
for (let index = 0; index < this.sockets.length; index++) {
if (this.sockets[index].remotePort === port) {
socket = this.sockets[index];
this.sockets.splice(index, 1);
break;
2022-04-14 14:23:41 +02:00
}
2022-04-21 13:09:31 +02:00
}
if (socket === undefined) {
return;
}
client.audiosocket = socket;
2022-04-29 16:50:56 +02:00
client.audiosocket.setNoDelay(config?.audio?.nodelay || false);
2022-04-21 13:09:31 +02:00
this.clients.push(client);
2022-04-21 13:43:33 +02:00
this.#setClientState(client, constants.CLIENT_STATE_REGISTERED);
2022-04-21 13:09:31 +02:00
client.audiosocket.on('connect', () => {
logger.debug(client.getTag() + ' opened audio socket');
});
client.audiosocket.on('error', (error) => {
logger.error(client.getTag() + ' encountered an error: ' + error);
});
client.audiosocket.on('close', (hadError) => {
let msg = client.getTag() + ' closed audio socket';
if (hadError === true) {
msg += ' after an error';
2022-04-20 16:15:33 +02:00
}
2022-04-21 13:09:31 +02:00
logger.debug(msg);
});
client.audiosocket.on('drain', () => {
if (this.pcm === undefined || this.pcm.isPaused() === false) {
2022-04-20 16:15:33 +02:00
return;
}
this.#checkFileTransmission(client);
2022-04-21 13:19:08 +02:00
// logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...');
this.pcm.resume();
2022-04-19 15:34:10 +02:00
});
2022-04-20 16:15:33 +02:00
}
2022-04-21 13:09:31 +02:00
#setClientState(client, state, data) {
if (client === undefined || state === undefined) {
2022-04-20 16:15:33 +02:00
return;
}
2022-04-21 13:09:31 +02:00
logger.debug(client.getTag() + ' state changed to \'' + state + '\'');
client.state = state;
this.#handleStateChange(client, data);
}
#allClientsInState(state) {
if (this.clients === undefined || this.clients.length === 0 || state === undefined) {
return false;
}
const broadcasts = this.broadcasts[state];
let result = true;
for (let index = 0; index < this.clients.length; index++) {
const client = this.clients[index];
if (client.state !== state) {
result = false;
}
if (broadcasts === undefined) {
continue;
}
const indexOfId = this.broadcasts[state].indexOf(client.id);
if (indexOfId >= 0) {
this.broadcasts[state].splice(indexOfId, 1);
}
}
if (broadcasts !== undefined) {
result = result && this.broadcasts[state].length === 0;
}
return result;
}
#handleStateChange(client, data) {
2022-04-20 16:15:33 +02:00
if (client === undefined) {
return;
}
if (this.progress < data?.progress) {
this.progress = data.progress;
}
2022-04-20 16:15:33 +02:00
switch (client.state) {
2022-04-21 13:43:33 +02:00
case constants.CLIENT_STATE_REGISTERED:
2022-04-21 13:09:31 +02:00
return this.#handleStateRegistered(client);
2022-04-21 13:43:33 +02:00
case constants.CLIENT_STATE_READY:
2022-04-20 16:15:33 +02:00
return this.#handleStateReady(client);
2022-04-21 13:43:33 +02:00
case constants.CLIENT_STATE_PLAYING:
2022-04-20 16:15:33 +02:00
return this.#handleStatePlaying(client);
2022-04-21 13:43:33 +02:00
case constants.CLIENT_STATE_PAUSED:
2022-04-20 16:15:33 +02:00
return this.#handleStatePaused(client, data);
2022-04-21 13:43:33 +02:00
case constants.CLIENT_STATE_STOPPED:
2022-04-20 16:15:33 +02:00
return this.#handleStateStopped(client, data);
2022-04-21 13:43:33 +02:00
case constants.CLIENT_STATE_ERROR:
2022-04-20 16:15:33 +02:00
return this.#handleStateError(client, data);
}
}
2022-04-21 13:09:31 +02:00
async #handleStateRegistered(client) {
logger.debug(client.getTag() + ' has registered...');
2022-04-21 13:43:33 +02:00
if (!this.#allClientsInState(constants.CLIENT_STATE_REGISTERED)) {
2022-04-21 13:09:31 +02:00
return;
}
this.#transmitFile();
}
2022-04-20 16:15:33 +02:00
async #handleStateReady(client) {
logger.debug(client.getTag() + ' is ready for playback...');
2022-04-21 13:43:33 +02:00
if (!this.#allClientsInState(constants.CLIENT_STATE_READY)) {
2022-04-21 13:09:31 +02:00
return;
2022-04-20 16:15:33 +02:00
}
this.startPlayback();
2022-04-20 16:15:33 +02:00
}
async #handleStatePlaying(client) {
logger.debug(client.getTag() + ' has started playback...');
}
async #handleStatePaused(client, data) {
if (client === undefined || data === undefined) {
return;
}
logger.debug(client.getTag() + ' paused playback, progress: \'' + data.progress + '...');
2022-04-20 16:15:33 +02:00
}
async #handleStateStopped(client, data) {
logger.debug(client.getTag() + ' stopped playback, progress: ' + data.progress + '...');
if (!this.#allClientsInState(constants.CLIENT_STATE_STOPPED)) {
return;
}
this.destroy();
2022-04-20 16:15:33 +02:00
}
async #handleStateError(client, data) {
logger.error(client.getTag() + ' experienced an error during playback, progress: \'' + data.progress + ': ' + error);
2022-04-14 14:23:41 +02:00
}
2022-04-19 15:34:10 +02:00
#getClientById(clientId) {
if (clientId === undefined) {
return;
}
for (let index = 0; index < this.clients.length; index++) {
const client = this.clients[index];
if (client.id !== clientId) {
continue;
}
return client;
}
}
2022-04-20 16:15:33 +02:00
async #announceAudioServer() {
2022-04-21 13:43:33 +02:00
this.broadcasts[constants.CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', {
2022-04-20 16:15:33 +02:00
port: this.server.address().port,
2022-04-29 16:50:56 +02:00
settings: {
threshold: this.threshold,
// TODO: GET AUDIO INFO FROM DATABASE AND PASS TO CLIENT(S) FOR PLAYBACK
2022-04-29 16:50:56 +02:00
audio: {
channels: 2,
bitDepth: 16,
sampleRate: 44100
}
}
2022-04-20 16:15:33 +02:00
}).broadcast(true);
2022-04-21 13:43:33 +02:00
logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_REGISTERED] + '\'...');
2022-04-20 16:15:33 +02:00
}
async startPlayback() {
2022-04-29 16:50:56 +02:00
this.broadcasts[constants.CLIENT_STATE_PLAYING] = await new Message('audio:play').broadcast();
2022-04-21 13:43:33 +02:00
logger.debug('sent broadcast to start playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PLAYING] + '\'...');
2022-04-20 16:15:33 +02:00
}
2022-04-29 16:50:56 +02:00
async pausePlayback() {
2022-04-21 13:43:33 +02:00
this.broadcasts[constants.CLIENT_STATE_PAUSED] = await new Message('audio:pause').broadcast();
logger.debug('sent broadcast to pause playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PAUSED] + '\'...');
2022-04-21 13:09:31 +02:00
}
async stopPlayback() {
2022-04-29 16:50:56 +02:00
this.broadcasts[constants.CLIENT_STATE_STOPPED] = await new Message('audio:stop').broadcast();
logger.debug('sent broadcast to stop playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_STOPPED] + '\'...');
}
2022-04-21 13:09:31 +02:00
async #transmitFile() {
const timestamp = Date.now();
return new Promise((resolve, reject) => {
this.pcm.resume();
this.pcm.on('data', (data) => {
2022-04-21 13:09:31 +02:00
for (let index = 0; index < this.clients.length; index++) {
const client = this.clients[index];
2022-04-29 16:50:56 +02:00
if (client.audiosocket.destroyed) {
this.clients.splice(index, 1);
continue;
}
2022-04-21 13:09:31 +02:00
if (client.audiosocket.write(data) !== true) {
2022-04-21 13:19:08 +02:00
// logger.debug(client.getTag() + ' detected backpressure, pausing read stream...');
this.pcm.pause();
continue;
2022-04-21 13:09:31 +02:00
}
this.#checkFileTransmission(client);
2022-04-21 13:09:31 +02:00
}
});
this.pcm.on('close', () => {
logger.debug('transmitting to pcm data converted file \'' + this.file + '\' took ' + (Date.now() - timestamp) + 'ms');
2022-04-21 13:09:31 +02:00
resolve();
});
});
2022-04-20 16:15:33 +02:00
}
#checkFileTransmission(client) {
if (client?.audiosocket === undefined || this.pcm === undefined) {
return;
}
if (this.pcm.destroyed !== true) {
return false;
}
logger.debug(client.getTag() + ' transmitted ' + client.audiosocket.bytesWritten + ' bytes after ' + (Date.now()) + 'ms');
client.audiosocket.end();
client.audiosocket.destroy();
return true;
}
2022-04-14 14:23:41 +02:00
async destroy() {
eventparser.removeAllListeners('audio:register');
eventparser.removeAllListeners('audio:state');
2022-04-14 14:23:41 +02:00
for (let index = 0; index < this.clients.length; index++) {
const audiosocket = this.clients[index].audiosocket;
if (audiosocket.destroyed === true) {
continue;
}
audiosocket.destroy();
}
if (this.server?.listening !== true) {
return;
}
2022-04-14 14:23:41 +02:00
await new Promise((resolve, reject) => {
this.server.close((error) => {
if (error !== undefined) {
logger.error('an error occured closing the audio server: ' + error);
// TODO: reject and try/catch later?
// reject(error);
2022-04-14 14:23:41 +02:00
}
resolve();
});
});
if (this.pcm.destroyed === false) {
await this.pcm.destroy();
}
this.pcm = undefined;
2022-04-14 14:23:41 +02:00
}
}
module.exports = AudioServer;