possible fixes for the blocking heartbeat problem

This commit is contained in:
Daniel Sommer 2022-05-03 12:25:54 +02:00
parent 1ce772d855
commit a826a38e37
5 changed files with 38 additions and 28 deletions

View file

@ -114,6 +114,7 @@ class AudioServer {
if (this.buffer.stream === undefined || this.buffer.stream.isPaused() === false) { if (this.buffer.stream === undefined || this.buffer.stream.isPaused() === false) {
return; return;
} }
this.#isFileTransmitted(client);
// logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...'); // logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...');
this.buffer.stream.resume(); this.buffer.stream.resume();
}); });
@ -279,12 +280,9 @@ class AudioServer {
if (client.audiosocket.write(data) !== true) { if (client.audiosocket.write(data) !== true) {
// logger.debug(client.getTag() + ' detected backpressure, pausing read stream...'); // logger.debug(client.getTag() + ' detected backpressure, pausing read stream...');
this.buffer.stream.pause(); this.buffer.stream.pause();
continue;
} }
if (client.audiosocket.bytesWritten >= this.buffer.size) { this.#isFileTransmitted(client);
logger.warn(client.getTag() + ' transmitted audio file after ' + (Date.now() - timestamp) + 'ms');
client.audiosocket.end();
client.audiosocket.destroy();
}
} }
}); });
this.buffer.stream.on('close', () => { this.buffer.stream.on('close', () => {
@ -299,8 +297,22 @@ class AudioServer {
}); });
} }
#isFileTransmitted(client) {
if (client?.audiosocket === undefined) {
return;
}
if (client.audiosocket.bytesWritten < this.buffer.size) {
return false;
}
logger.debug(client.getTag() + ' transmitted ' + client.audiosocket.bytesWritten + 'bytes after ' + (Date.now() ) + 'ms');
client.audiosocket.end();
client.audiosocket.destroy();
return true;
}
async destroy() { async destroy() {
eventparser.removeAllListeners('audio:ready'); eventparser.removeAllListeners('audio:register');
eventparser.removeAllListeners('audio:state');
for (let index = 0; index < this.clients.length; index++) { for (let index = 0; index < this.clients.length; index++) {
const audiosocket = this.clients[index].audiosocket; const audiosocket = this.clients[index].audiosocket;
if (audiosocket.destroyed === true) { if (audiosocket.destroyed === true) {
@ -308,9 +320,9 @@ class AudioServer {
} }
audiosocket.destroy(); audiosocket.destroy();
} }
this.buffer.fd.close(); this.buffer?.fd?.close();
this.buffer.stream.close(); this.buffer?.stream?.close();
this.buffer.stream.destroy(); this.buffer?.stream?.destroy();
if (this.server?.listening !== true) { if (this.server?.listening !== true) {
return; return;
} }

View file

@ -73,11 +73,11 @@ class Client {
#handleEventHeartbeatTimeout() { #handleEventHeartbeatTimeout() {
logger.warn(this.getTag() + ' heartbeat timed out'); logger.warn(this.getTag() + ' heartbeat timed out');
// this.destroy(); this.destroy();
} }
#handleEventNetworkStatistics(data) { #handleEventNetworkStatistics(data) {
logger.debug(this.getTag() + ' network statistics: ' + JSON.stringify(data)); // logger.debug(this.getTag() + ' network statistics: ' + JSON.stringify(data));
} }
destroy() { destroy() {

View file

@ -7,38 +7,39 @@ class Heartbeat extends EventEmitter {
constructor(client) { constructor(client) {
super(); super();
this.interval = config?.heartbeat || 10000; this.interval = config?.server?.heartbeat || 10000;
this.client = client; this.client = client;
this.#listenForPingPong(); this.#listenForPingPong();
this.#sendPing(); this.#sendPing();
} }
async #sendPing() { async #sendPing() {
if (this.destroyed === true) {
return;
}
if (this.timeout !== undefined) { if (this.timeout !== undefined) {
clearTimeout(this.timeout); clearTimeout(this.timeout);
} }
if (this.alive === false) { if (this.alive === false) {
this.emit('timeout'); this.emit('timeout');
return; }
} else if (this.alive === undefined) { if (this.alive === undefined) {
await sleep(this.interval); await sleep(this.interval);
} }
this.alive = false; this.alive = false;
// this.ping = process.hrtime.bigint();
this.ping = Date.now(); this.ping = Date.now();
await new Message('ping').send(this.client); await new Message('ping').send(this.client);
this.timeout = setTimeout(() => { await sleep(this.interval);
this.#sendPing(); this.#sendPing();
}, this.interval);
} }
async #listenForPingPong() { async #listenForPingPong() {
eventparser.on('ping', () => { eventparser.on('ping', () => {
logger.debug(this.client.getTag() + ' handling event \'ping\', responding with \'pong\'...'); // logger.debug(this.client.getTag() + ' handling event \'ping\', responding with \'pong\'...');
new Message('pong').send(this.client); new Message('pong').send(this.client);
}); });
eventparser.on('pong', () => { eventparser.on('pong', () => {
logger.debug(this.client.getTag() + ' handling event \'pong\'...'); // logger.debug(this.client.getTag() + ' handling event \'pong\'...');
this.alive = true; this.alive = true;
// this.pong = process.hrtime.bigint(); // this.pong = process.hrtime.bigint();
this.pong = Date.now(); this.pong = Date.now();
@ -52,9 +53,7 @@ class Heartbeat extends EventEmitter {
} }
destroy() { destroy() {
if (this.timeout !== undefined) { this.destroyed = true;
clearTimeout(this.timeout);
}
eventparser.removeAllListeners('ping'); eventparser.removeAllListeners('ping');
eventparser.removeAllListeners('pong'); eventparser.removeAllListeners('pong');
} }

View file

@ -25,7 +25,7 @@ class Message {
this.data.clientId = client.id; this.data.clientId = client.id;
} }
const data = this.toString(); const data = this.toString();
logger.debug(client.getTag() + ' sending data: ' + data); // logger.debug(client.getTag() + ' sending data: ' + data);
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
client.socket.write(this.toString() + constants.EVENT_DELIMITER, resolve); client.socket.write(this.toString() + constants.EVENT_DELIMITER, resolve);
}); });

View file

@ -1,5 +1,4 @@
const Client = require('./Client.js'); const Client = require('./Client.js');
const AudioServer = require('./AudioServer.js');
const net = require('net'); const net = require('net');
class Server { class Server {
@ -21,9 +20,9 @@ class Server {
this.server.on('connection', (socket) => { this.server.on('connection', (socket) => {
this.#addClient(socket); this.#addClient(socket);
}); });
this.server.on('error', (err) => { this.server.on('error', (error) => {
logger.error('ERROR IN SERVER ' + err); logger.error('communication server encountered an error: ' + error);
reject('an unexpected error occured: ' + err); // reject('an unexpected error occured: ' + error);
}); });
}); });
} }