further 'random' development

This commit is contained in:
Daniel Sommer 2022-04-20 16:15:33 +02:00
parent 8cdb4ca0da
commit 424f9c4d82
7 changed files with 134 additions and 117 deletions

View file

@ -5,7 +5,6 @@ const fs = require('fs');
const stat = require('fs/promises').stat;
const Message = require('./Message.js');
const EventParser = require('./EventParser.js');
const { CLIENT_STATE_READY, CLIENT_STATE_PLAYING, CLIENT_STATE_PAUSED, CLIENT_STATE_STOPPED, CLIENT_STATE_ERROR } = require('../libs/constants.js');
@ -18,10 +17,11 @@ class AudioServer {
file: file
};
this.clients = [];
this.broadcastClients = [];
this.position = 0;
this.sockets = [];
this.playback = {
position: 0
};
this.server = net.createServer();
this.eventParser = new EventParser();
this.#prepare();
}
@ -35,10 +35,11 @@ class AudioServer {
this.server.listen(this.port, this.listen).on('listening', () => {
this.port = this.server.address().port;
logger.info('audio server listening on ' + this.listen + ':' + this.port + '...');
this.#handleEvents();
resolve();
});
this.server.on('connection', (socket) => {
this.#handleConnection(socket);
this.sockets.push(socket);
});
this.server.on('error', (err) => {
reject('an error occured preparing the audio server for file \'' + this.file + '\' > ' + err);
@ -47,65 +48,104 @@ class AudioServer {
const stats = await stat(this.buffer.file);
this.buffer.size = stats.size;
this.buffer.threshold = (this.buffer.size / 100) / (!isNaN(config.audio?.threshold) || 30);
this.broadcastClients = await new Message('audio:initialize', {
port: this.server.address().port,
size: this.buffer.size,
threshold: this.buffer.threshold
}).broadcast(true);
// const broadcastedTo =
// for (let index = 0; index < broadcastedTo.length; index++) {
// if (broadcastedTo[index]?.status !== 'fulfilled') {
// continue;
// }
// this.broadcastClients.push(broadcastedTo[index].value);
// }
logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcastClients.toString() + '\'...');
this.#announceAudioServer();
this.#bufferFile();
}
#handleConnection(socket) {
socket.on('data', (data) => {
this.eventParser.parse(data, socket);
});
this.eventParser.on('audio:register', (clientId, socket) => {
let client = server.getClientById(clientId);
#handleEvents(socket) {
eventparser.on('audio:register', (data) => {
if (data?.clientId === undefined || data?.socket === undefined) {
return;
}
let client = server.getClientById(data.clientId);
if (client === undefined) {
return;
}
let socket;
for (let index = 0; index < this.sockets.length; index++) {
if (this.sockets[index].remotePort === data.socket) {
socket = this.sockets[index];
this.sockets.splice(index, 1);
break;
}
}
if (socket === undefined) {
return;
}
client.audiosocket = socket;
this.clients.push(client);
logger.debug(client.getTag() + ' connected to audio server...');
this.broadcastClients.splice(this.broadcastClients.indexOf(clientId), 1);
this.#sendData(client);
});
this.eventParser.on('audio:ready', async (clientId) => {
let allClientsReady = true;
for (let index = 0; index < this.clients.length; index++) {
const client = this.clients[index];
if (client.id === clientId) {
client.state = CLIENT_STATE_READY;
logger.debug(client.getTag() + ' is ready for playback...');
}
if (client.state !== CLIENT_STATE_READY) {
allClientsReady = false;
}
}
if (allClientsReady !== true) {
eventparser.on('audio:state', (data) => {
this.#handleStateChange(data);
});
}
#handleStateChange(data) {
if (data?.clientId === undefined || data?.state === undefined) {
return;
}
let client = this.#getClientById(data.clientId);
if (client === undefined) {
return;
}
logger.debug(client.getTag() + ' state changed to \'' + data.state + '\'');
client.state = data.state;
switch (client.state) {
case CLIENT_STATE_READY:
return this.#handleStateReady(client);
case CLIENT_STATE_PLAYING:
return this.#handleStatePlaying(client);
case CLIENT_STATE_PAUSED:
return this.#handleStatePaused(client, data);
case CLIENT_STATE_STOPPED:
return this.#handleStateStopped(client, data);
case CLIENT_STATE_ERROR:
return this.#handleStateError(client, data);
}
}
async #handleStateReady(client) {
logger.debug(client.getTag() + ' is ready for playback...');
for (let index = 0; index < this.clients.length; index++) {
if (this.clients[index].state !== CLIENT_STATE_READY) {
return;
}
const broadcastedTo = await new Message('audio:play', { position: this.position }).broadcast();
logger.debug('sent broadcast for playback to client(s) \'' + broadcastedTo + '\'...');
});
this.eventParser.on('audio:paused', (position) => {
if (!isNaN(position) && positon > this.position) {
this.position = position;
}
this.#startPlayback();
}
async #handleStatePlaying(client) {
logger.debug(client.getTag() + ' has started playback...');
// TODO: remove - test only
await sleep(5000);
this.#pausePlayback();
}
async #handleStatePaused(client, data) {
if (client === undefined || data === undefined) {
return;
}
logger.debug(client.getTag() + ' paused playback at position \'' + data.position + '\'...');
for (let index = 0; index < this.playback.paused.length; index++) {
if (this.playback.paused[index] === client.id) {
if (this.playback.position === 0 || this.playback.position > data.position) {
this.playback.position = data.position;
}
}
let client = this.#getClientById(clientId);
if (client === undefined) {
return;
}
client.state = CLIENT_STATE_PAUSED;
});
}
// TODO: remove - test only
await sleep(1);
this.#startPlayback();
}
async #handleStateStopped(client, data) {
logger.debug(client.getTag() + ' stopped playback at position \'' + data.position + '\'...');
}
async #handleStateError(client, data) {
logger.error(client.getTag() + ' experienced an error during playback at position \'' + data.position + '\': ' + data.error);
}
async #sendData(client) {
@ -134,56 +174,6 @@ class AudioServer {
});
}
async #sendAudio() {
if (this.aborted === true) {
return;
}
const buffer = await this.#waitForBuffer();
await this.#waitForAllClients();
this.#handleClientConnections();
const promises = [];
for (let index = 0; index < this.clients.length; index++) {
const client = this.clients[index];
client.audiostart = Date.now();
promises.push(new Promise((resolve, reject) => {
client.audiosocket.end(buffer, () => {
logger.debug(client.getTag() + ' sent audio file \'' + this.file + '\' after ' + (Date.now() - client.audiostart) + 'ms...');
resolve();
});
}));
}
await Promise.allSettled(promises);
await this.destroy();
}
async #waitForAllClients() {
while (this.broadcastClients.length > 0) {
await sleep(1);
}
return;
}
#handleClientConnections() {
for (let index = 0; index < this.clients.length; index++) {
const client = this.clients[index];
client.audiosocket.on('error', (error) => {
logger.error(client.getTag() + ' encountered an error: ' + error);
});
client.audiosocket.on('end', () => {
logger.debug(client.getTag() + ' ended audio socket');
});
client.audiosocket.on('close', (hadError) => {
let msg = client.getTag() + ' closed audio socket';
if (hadError === true) {
msg += ' after an error';
}
logger.debug(msg);
});
}
}
async #waitForBuffer() {
while (this.buffer.data === undefined || this.buffer.data.length < this.buffer.size) {
await sleep(1);
@ -224,8 +214,35 @@ class AudioServer {
}
}
async #announceAudioServer() {
const broadcasted = await new Message('audio:initialize', {
port: this.server.address().port,
size: this.buffer.size,
threshold: this.buffer.threshold
}).broadcast(true);
logger.debug('sent broadcast for audio server to client(s) \'' + broadcasted + '\'...');
}
async #startPlayback() {
const broadcasted = await new Message('audio:play', { position: this.playback.position }).broadcast();
logger.debug('sent broadcast to start playback to client(s) \'' + broadcasted + '\'...');
this.playback.started = broadcasted;
}
async #stopPlayback() {
const broadcasted = await new Message('audio:stop').broadcast();
logger.debug('sent broadcast to stop playback to client(s) \'' + broadcasted + '\'...');
this.playback.stopped = broadcasted;
}
async #pausePlayback() {
const broadcasted = await new Message('audio:pause').broadcast();
logger.debug('sent broadcast to pause playback to client(s) \'' + broadcasted + '\'...');
this.playback.paused = broadcasted;
}
async destroy() {
this.eventParser.removeAllListeners('audio:ready');
eventparser.removeAllListeners('audio:ready');
for (let index = 0; index < this.clients.length; index++) {
const audiosocket = this.clients[index].audiosocket;
if (audiosocket.destroyed === true) {

View file

@ -1,5 +1,4 @@
const Heartbeat = require('./Heartbeat.js');
const EventParser = require('./EventParser.js');
let clientId = -1;
@ -9,7 +8,6 @@ class Client {
clientId++;
this.id = clientId;
this.socket = socket;
this.eventParser = new EventParser();
this.heartbeat = new Heartbeat(this);
this.#listenForEvents();
}
@ -42,7 +40,7 @@ class Client {
this.#handleEventEnd()
});
this.socket.on('data', (data) => {
this.#handleEventData(data)
this.#handleEventData(data, this.socket)
});
this.heartbeat.on('timeout', () => {
this.#handleEventHeartbeatTimeout();
@ -52,8 +50,8 @@ class Client {
});
}
async #handleEventData(data) {
this.eventParser.parse(data);
async #handleEventData(data, socket) {
eventparser.parse(data);
}
#handleEventTimeout() {

View file

@ -8,7 +8,7 @@ class EventParser extends EventEmitter {
this.buffer = '';
}
parse(data, socket) {
parse(data) {
if (data === undefined) {
return;
}
@ -23,7 +23,7 @@ class EventParser extends EventEmitter {
return;
}
const eventId = event.id.toLowerCase();
this.emit(eventId, event.data, socket);
this.emit(eventId, event.data);
}
}

View file

@ -7,7 +7,7 @@ class Heartbeat extends EventEmitter {
constructor(client) {
super();
this.interval = config?.server?.heartbeat || 10000;
this.interval = config?.heartbeat || 10000;
this.client = client;
this.#listenForPingPong();
this.#sendPing();
@ -31,11 +31,11 @@ class Heartbeat extends EventEmitter {
}
async #listenForPingPong() {
this.client.eventParser.on('ping', () => {
eventparser.on('ping', () => {
logger.debug(this.client.getTag() + ' handling event \'ping\', responding with \'pong\'...');
new Message('pong').send(this.client);
});
this.client.eventParser.on('pong', (data) => {
eventparser.on('pong', (data) => {
logger.debug(this.client.getTag() + ' handling event \'pong\'...');
const now = Date.now();
this.alive = true;
@ -51,8 +51,8 @@ class Heartbeat extends EventEmitter {
if (this.timeout !== undefined) {
clearTimeout(this.timeout);
}
this.client.eventParser.removeAllListeners('ping');
this.client.eventParser.removeAllListeners('pong');
eventparser.removeAllListeners('ping');
eventparser.removeAllListeners('pong');
}
}

View file

@ -19,7 +19,7 @@
"database": {
"dialect": "postgres",
"storage": "/tmp/kannon.sqlite",
"host": "192.168.104.135",
"host": "192.168.104.136",
"port": 5432,
"database": "kannon",
"username": "postgres",

View file

@ -15,11 +15,11 @@
"database": {
"dialect": "postgres",
"storage": "/tmp/kannon.sqlite",
"host": "192.168.104.135",
"host": "192.168.104.136",
"port": 5432,
"database": "kannon",
"username": "postgres",
"password": "$Velvet90"
"username": "kannon",
"password": "kannon"
},
"audio": {
"threshold": 10

View file

@ -7,6 +7,7 @@ const Server = require('./classes/Server.js');
const Database = require('./classes/Database.js');
const Queue = require('./classes/Queue.js');
const Watcher = require('./classes/Watcher.js');
const EventParser = require('./classes/EventParser');
const INTERRUPTS = ['beforeExit', 'SIGINT', 'SIGTERM'];
@ -30,6 +31,7 @@ async function main() {
await global.database.initialize();
// socket server
if (util.isEnabled(global.config.server)) {
global.eventparser = new EventParser();
global.server = new Server();
await global.server.start();
}