From a826a38e377d3de81342e523cdb2b7052a43e530 Mon Sep 17 00:00:00 2001 From: velvettear Date: Tue, 3 May 2022 12:25:54 +0200 Subject: [PATCH] possible fixes for the blocking heartbeat problem --- classes/AudioServer.js | 30 +++++++++++++++++++++--------- classes/Client.js | 4 ++-- classes/Heartbeat.js | 23 +++++++++++------------ classes/Message.js | 2 +- classes/Server.js | 7 +++---- 5 files changed, 38 insertions(+), 28 deletions(-) diff --git a/classes/AudioServer.js b/classes/AudioServer.js index 5fe0c2b..a32109e 100644 --- a/classes/AudioServer.js +++ b/classes/AudioServer.js @@ -114,6 +114,7 @@ class AudioServer { if (this.buffer.stream === undefined || this.buffer.stream.isPaused() === false) { return; } + this.#isFileTransmitted(client); // logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...'); this.buffer.stream.resume(); }); @@ -279,12 +280,9 @@ class AudioServer { if (client.audiosocket.write(data) !== true) { // logger.debug(client.getTag() + ' detected backpressure, pausing read stream...'); this.buffer.stream.pause(); + continue; } - if (client.audiosocket.bytesWritten >= this.buffer.size) { - logger.warn(client.getTag() + ' transmitted audio file after ' + (Date.now() - timestamp) + 'ms'); - client.audiosocket.end(); - client.audiosocket.destroy(); - } + this.#isFileTransmitted(client); } }); this.buffer.stream.on('close', () => { @@ -299,8 +297,22 @@ class AudioServer { }); } + #isFileTransmitted(client) { + if (client?.audiosocket === undefined) { + return; + } + if (client.audiosocket.bytesWritten < this.buffer.size) { + return false; + } + logger.debug(client.getTag() + ' transmitted ' + client.audiosocket.bytesWritten + 'bytes after ' + (Date.now() ) + 'ms'); + client.audiosocket.end(); + client.audiosocket.destroy(); + return true; + } + async destroy() { - eventparser.removeAllListeners('audio:ready'); + eventparser.removeAllListeners('audio:register'); + eventparser.removeAllListeners('audio:state'); for (let index = 0; index < this.clients.length; index++) { const audiosocket = this.clients[index].audiosocket; if (audiosocket.destroyed === true) { @@ -308,9 +320,9 @@ class AudioServer { } audiosocket.destroy(); } - this.buffer.fd.close(); - this.buffer.stream.close(); - this.buffer.stream.destroy(); + this.buffer?.fd?.close(); + this.buffer?.stream?.close(); + this.buffer?.stream?.destroy(); if (this.server?.listening !== true) { return; } diff --git a/classes/Client.js b/classes/Client.js index dcebeae..feb66ea 100644 --- a/classes/Client.js +++ b/classes/Client.js @@ -73,11 +73,11 @@ class Client { #handleEventHeartbeatTimeout() { logger.warn(this.getTag() + ' heartbeat timed out'); - // this.destroy(); + this.destroy(); } #handleEventNetworkStatistics(data) { - logger.debug(this.getTag() + ' network statistics: ' + JSON.stringify(data)); + // logger.debug(this.getTag() + ' network statistics: ' + JSON.stringify(data)); } destroy() { diff --git a/classes/Heartbeat.js b/classes/Heartbeat.js index df0203a..a69e586 100644 --- a/classes/Heartbeat.js +++ b/classes/Heartbeat.js @@ -7,38 +7,39 @@ class Heartbeat extends EventEmitter { constructor(client) { super(); - this.interval = config?.heartbeat || 10000; + this.interval = config?.server?.heartbeat || 10000; this.client = client; this.#listenForPingPong(); this.#sendPing(); } async #sendPing() { + if (this.destroyed === true) { + return; + } if (this.timeout !== undefined) { clearTimeout(this.timeout); } if (this.alive === false) { this.emit('timeout'); - return; - } else if (this.alive === undefined) { + } + if (this.alive === undefined) { await sleep(this.interval); } this.alive = false; - // this.ping = process.hrtime.bigint(); this.ping = Date.now(); await new Message('ping').send(this.client); - this.timeout = setTimeout(() => { - this.#sendPing(); - }, this.interval); + await sleep(this.interval); + this.#sendPing(); } async #listenForPingPong() { eventparser.on('ping', () => { - logger.debug(this.client.getTag() + ' handling event \'ping\', responding with \'pong\'...'); + // logger.debug(this.client.getTag() + ' handling event \'ping\', responding with \'pong\'...'); new Message('pong').send(this.client); }); eventparser.on('pong', () => { - logger.debug(this.client.getTag() + ' handling event \'pong\'...'); + // logger.debug(this.client.getTag() + ' handling event \'pong\'...'); this.alive = true; // this.pong = process.hrtime.bigint(); this.pong = Date.now(); @@ -52,9 +53,7 @@ class Heartbeat extends EventEmitter { } destroy() { - if (this.timeout !== undefined) { - clearTimeout(this.timeout); - } + this.destroyed = true; eventparser.removeAllListeners('ping'); eventparser.removeAllListeners('pong'); } diff --git a/classes/Message.js b/classes/Message.js index ecb2700..eee7160 100644 --- a/classes/Message.js +++ b/classes/Message.js @@ -25,7 +25,7 @@ class Message { this.data.clientId = client.id; } const data = this.toString(); - logger.debug(client.getTag() + ' sending data: ' + data); + // logger.debug(client.getTag() + ' sending data: ' + data); await new Promise((resolve, reject) => { client.socket.write(this.toString() + constants.EVENT_DELIMITER, resolve); }); diff --git a/classes/Server.js b/classes/Server.js index 39333b0..1f33716 100644 --- a/classes/Server.js +++ b/classes/Server.js @@ -1,5 +1,4 @@ const Client = require('./Client.js'); -const AudioServer = require('./AudioServer.js'); const net = require('net'); class Server { @@ -21,9 +20,9 @@ class Server { this.server.on('connection', (socket) => { this.#addClient(socket); }); - this.server.on('error', (err) => { - logger.error('ERROR IN SERVER ' + err); - reject('an unexpected error occured: ' + err); + this.server.on('error', (error) => { + logger.error('communication server encountered an error: ' + error); + // reject('an unexpected error occured: ' + error); }); }); }