extended api, optimized some stuff

This commit is contained in:
Daniel Sommer 2022-04-29 16:50:56 +02:00
parent 0fa7ca78a6
commit 7dc3fb6ece
8 changed files with 95 additions and 51 deletions

View file

@ -34,8 +34,28 @@ class Api {
}
#setup() {
this.#registerEndpoint(constants.API_PLAY, constants.REQUEST_METHOD_POST, () => {
new AudioServer('/mnt/kingston/public/LEFTOVER.flac');
this.#registerEndpoint(constants.API_PLAY, constants.REQUEST_METHOD_POST, async () => {
if (global.audioserver !== undefined) {
await global.audioserver.destroy();
}
global.audioserver = new AudioServer('/home/velvettear/mounts/kingston/public/test.pcm');
});
this.#registerEndpoint(constants.API_PAUSE, constants.REQUEST_METHOD_POST, async () => {
if (global.audioserver === undefined) {
return;
}
global.audioserver.pausePlayback();
});
this.#registerEndpoint(constants.API_RESUME, constants.REQUEST_METHOD_POST, async () => {
if (global.audioserver === undefined) {
return;
}
});
this.#registerEndpoint(constants.API_STOP, constants.REQUEST_METHOD_POST, async () => {
if (global.audioserver === undefined) {
return;
}
global.audioserver.stopPlayback();
});
}

View file

@ -13,13 +13,12 @@ class AudioServer {
this.port = 0;
this.buffer = {
file: file,
stream: fs.createReadStream(file),
limit: (config?.audio.bufferlimit || 256) * 1048576
stream: fs.createReadStream(file)
};
this.clients = [];
this.sockets = [];
this.broadcasts = {};
this.position = 0;
this.progress = 0;
this.server = net.createServer();
this.#prepare();
}
@ -41,16 +40,17 @@ class AudioServer {
this.sockets.push(socket);
});
this.server.on('error', (err) => {
logger.error('ERROR IN AUDIOSERVER ' + err);
reject('an error occured preparing the audio server for file \'' + this.file + '\' > ' + err);
});
});
const stats = await stat(this.buffer.file);
this.buffer.size = stats.size;
let divisor = 30;
let percentage = 30;
if (!(isNaN(config.audio?.threshold))) {
divisor = config.audio.threshold;
percentage = config.audio.threshold;
}
this.buffer.threshold = (this.buffer.size / 100) / divisor;
this.buffer.threshold = (this.buffer.size / 100) * percentage;
this.#announceAudioServer();
}
@ -79,6 +79,7 @@ class AudioServer {
return;
}
client.audiosocket = socket;
client.audiosocket.setNoDelay(config?.audio?.nodelay || false);
this.clients.push(client);
this.#setClientState(client, constants.CLIENT_STATE_REGISTERED);
client.audiosocket.on('connect', () => {
@ -98,7 +99,7 @@ class AudioServer {
logger.debug(msg);
});
client.audiosocket.on('drain', () => {
if (this.buffer.stream === undefined || !this.buffer.stream.isPaused()) {
if (this.buffer.stream === undefined || this.buffer.stream.isPaused() === false) {
return;
}
// logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...');
@ -179,26 +180,26 @@ class AudioServer {
async #handleStatePlaying(client) {
logger.debug(client.getTag() + ' has started playback...');
// TODO: remove - test only
await sleep(5000);
this.#pausePlayback();
// await sleep(10000);
// this.#pausePlayback();
}
async #handleStatePaused(client, data) {
if (client === undefined || data === undefined) {
return;
}
logger.debug(client.getTag() + ' paused playback at position \'' + data.position + '\'...');
logger.debug(client.getTag() + ' paused playback, progress: \'' + data.progress + '/' + this.buffer.size + '...');
// TODO: remove - test only
await sleep(1);
this.#startPlayback();
// await sleep(100);
// this.#startPlayback();
}
async #handleStateStopped(client, data) {
logger.debug(client.getTag() + ' stopped playback at position \'' + data.position + '\'...');
logger.debug(client.getTag() + ' stopped playback, progress: ' + data.progress + '/' + this.buffer.size + '...');
}
async #handleStateError(client, data) {
logger.error(client.getTag() + ' experienced an error during playback at position \'' + data.position + '\': ' + data.error);
logger.error(client.getTag() + ' experienced an error during playback, progress: \'' + data.progress + '/' + this.buffer.size + ': ' + error);
}
#getClientById(clientId) {
@ -217,38 +218,51 @@ class AudioServer {
async #announceAudioServer() {
this.broadcasts[constants.CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', {
port: this.server.address().port,
size: this.buffer.size,
threshold: this.buffer.threshold
settings: {
size: this.buffer.size,
threshold: this.buffer.threshold,
// TODO: GET AUDIO INFO FROM DATABASE
audio: {
channels: 2,
bitDepth: 16,
sampleRate: 44100
}
}
}).broadcast(true);
logger.debug('sent broadcast for audio server to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_REGISTERED] + '\'...');
}
async #startPlayback() {
this.broadcasts[constants.CLIENT_STATE_PLAYING] = await new Message('audio:play', { position: this.position }).broadcast();
this.broadcasts[constants.CLIENT_STATE_PLAYING] = await new Message('audio:play').broadcast();
logger.debug('sent broadcast to start playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PLAYING] + '\'...');
}
async #stopPlayback() {
this.broadcasts[constants.CLIENT_STATE_STOPPED] = await new Message('audio:stop').broadcast();
logger.debug('sent broadcast to stop playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_STOPPED] + '\'...');
}
async #pausePlayback() {
async pausePlayback() {
this.broadcasts[constants.CLIENT_STATE_PAUSED] = await new Message('audio:pause').broadcast();
logger.debug('sent broadcast to pause playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_PAUSED] + '\'...');
}
async stopPlayback() {
this.broadcasts[constants.CLIENT_STATE_STOPPED] = await new Message('audio:stop').broadcast();
logger.debug('sent broadcast to stop playback to client(s) \'' + this.broadcasts[constants.CLIENT_STATE_STOPPED] + '\'...');
}
async #transmitFile() {
const timestamp = Date.now();
return new Promise((resolve, reject) => {
this.buffer.stream.on('data', (data) => {
for (let index = 0; index < this.clients.length; index++) {
const client = this.clients[index];
if (client.audiosocket.destroyed) {
this.clients.splice(index, 1);
continue;
}
if (client.audiosocket.write(data) !== true) {
// logger.debug(client.getTag() + ' detected backpressure, pausing read stream...');
this.buffer.stream.pause();
}
if (client.audiosocket.bytesWritten >= this.buffer.size) {
logger.warn(client.getTag() + ' transmitted audio file after ' + (Date.now() - timestamp) + 'ms');
client.audiosocket.end();
client.audiosocket.destroy();
}
@ -260,6 +274,7 @@ class AudioServer {
});
this.buffer.stream.on('error', (error) => {
// TODO: handle with try / catch
logger.debug('STREAM ERROR!');
reject(error);
});
});
@ -277,6 +292,7 @@ class AudioServer {
await new Promise((resolve, reject) => {
this.server.close((err) => {
if (err !== undefined) {
logger.error('ERROR CLOSING AUDIOSERVER ' + err);
reject(err);
}
resolve();

View file

@ -30,15 +30,15 @@ class Client {
#listenForEvents() {
logger.debug(this.getTag() + ' connected to communication server...');
this.socket.on('timeout', () => {
this.#handleEventTimeout();
});
this.socket.on('close', () => {
this.#handleEventClose()
});
this.socket.on('end', () => {
this.#handleEventEnd()
});
this.socket.on('error', (error) => {
this.#handleEventError(error);
});
this.socket.on('data', (data) => {
this.#handleEventData(data, this.socket)
});
@ -50,20 +50,6 @@ class Client {
});
}
async #handleEventData(data) {
eventparser.parse(data);
}
#handleEventTimeout() {
logger.warn(this.getTag() + ' timed out');
this.destroy();
}
#handleEventHeartbeatTimeout() {
logger.warn(this.getTag() + ' heartbeat timed out');
this.destroy();
}
#handleEventClose() {
logger.debug(this.getTag() + ' closed socket to communication server');
server.removeClient(this);
@ -74,6 +60,22 @@ class Client {
this.destroy();
}
#handleEventError(error) {
if (error === undefined) {
return;
}
logger.debug(this.getTag() + ' encountered an error: ' + error);
}
async #handleEventData(data) {
eventparser.parse(data);
}
#handleEventHeartbeatTimeout() {
logger.warn(this.getTag() + ' heartbeat timed out');
this.destroy();
}
#handleEventNetworkStatistics(data) {
logger.debug(this.getTag() + ' network statistics: ' + JSON.stringify(data));
}

View file

@ -24,7 +24,8 @@ class Heartbeat extends EventEmitter {
await sleep(this.interval);
}
this.alive = false;
this.ping = process.hrtime.bigint();
// this.ping = process.hrtime.bigint();
this.ping = Date.now();
await new Message('ping').send(this.client);
this.timeout = setTimeout(() => {
this.#sendPing();
@ -39,12 +40,13 @@ class Heartbeat extends EventEmitter {
eventparser.on('pong', () => {
logger.debug(this.client.getTag() + ' handling event \'pong\'...');
this.alive = true;
this.pong = process.hrtime.bigint();
// this.pong = process.hrtime.bigint();
this.pong = Date.now();
this.latency = this.pong - this.ping;
this.emit('network-statistics', {
ping: this.ping.toString(),
pong: this.pong.toString(),
latency: this.latency.toString()
ping: this.ping,
pong: this.pong,
latency: this.latency
});
});
}

View file

@ -22,6 +22,7 @@ class Server {
this.#addClient(socket);
});
this.server.on('error', (err) => {
logger.error('ERROR IN SERVER ' + err);
reject('an unexpected error occured: ' + err);
});
});

View file

@ -27,7 +27,7 @@
"password": "kannon"
},
"audio": {
"threshold": 10,
"bufferlimit": 64
"nodelay": false,
"threshold": 100
}
}

View file

@ -15,5 +15,8 @@ module.exports = {
REQUEST_METHOD_GET: 'get',
REQUEST_METHOD_POST: 'post',
API_PLAY: '/play'
API_PLAY: '/play',
API_PAUSE: '/pause',
API_RESUME: '/resume',
API_STOP: '/stop'
}