major optimizations and fixes

This commit is contained in:
Daniel Sommer 2022-04-21 13:40:00 +02:00
parent 0a1b648007
commit 1330caa44a
7 changed files with 39 additions and 53 deletions

View file

@ -60,10 +60,10 @@ class Audiostream {
} }
#handleSocket(socket) { #handleSocket(socket) {
socket.on('connect', () => { socket.on('connect', async () => {
logger.debug('connected to audio server \'' + this.getTag() + '\'...'); logger.debug('connected to audio server \'' + this.getTag() + '\'...');
new Message('audio:register', { clientId: this.clientId, socket: socket.localPort }).send(); await global.player.prepare(this.size, this.threshold);
global.player.prepare(this.size, this.threshold); new Message('audio:register', { clientId: this.clientId, port: socket.localPort }).send();
}); });
socket.on('error', (error) => { socket.on('error', (error) => {
logger.error('error connecting to audio server \'' + this.getTag() + '\': ' + error); logger.error('error connecting to audio server \'' + this.getTag() + '\': ' + error);
@ -79,6 +79,7 @@ class Audiostream {
}); });
socket.on('close', () => { socket.on('close', () => {
logger.info('connection to audio server \'' + this.getTag() + '\' closed'); logger.info('connection to audio server \'' + this.getTag() + '\' closed');
global.player.stopFeed();
}); });
} }
} }

View file

@ -1,4 +1,3 @@
const { EVENT_DELIMITER } = require('../libs/constants.js');
const EventEmitter = require('events'); const EventEmitter = require('events');
class EventParser extends EventEmitter { class EventParser extends EventEmitter {
@ -13,7 +12,7 @@ class EventParser extends EventEmitter {
return; return;
} }
this.buffer += data; this.buffer += data;
const indexOfEnd = this.buffer.indexOf(EVENT_DELIMITER); const indexOfEnd = this.buffer.indexOf(constants.EVENT_DELIMITER);
if (indexOfEnd === -1) { if (indexOfEnd === -1) {
return; return;
} }

View file

@ -31,10 +31,9 @@ class Heartbeat extends EventEmitter {
} }
async #listenForPingPong() { async #listenForPingPong() {
this.eventParser.on('ping', (data) => { this.eventParser.on('ping', () => {
logger.debug('handling event \'ping\', responding with \'pong\'...'); logger.debug('handling event \'ping\', responding with \'pong\'...');
data.client = Date.now(); new Message('pong').send();
new Message('pong', data).send();
}); });
this.eventParser.on('pong', () => { this.eventParser.on('pong', () => {
logger.debug('handling event \'pong\'...'); logger.debug('handling event \'pong\'...');

View file

@ -1,5 +1,3 @@
const { EVENT_DELIMITER } = require('../libs/constants.js');
class Message { class Message {
constructor(id, data) { constructor(id, data) {
@ -29,7 +27,7 @@ class Message {
const data = this.toString(); const data = this.toString();
logger.debug('sending data to \'' + socket.remoteAddress + ':' + socket.remotePort + '\': ' + data); logger.debug('sending data to \'' + socket.remoteAddress + ':' + socket.remotePort + '\': ' + data);
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
socket.write(data + EVENT_DELIMITER, resolve); socket.write(data + constants.EVENT_DELIMITER, resolve);
}); });
} }

View file

@ -1,27 +1,19 @@
const EventEmitter = require('events'); const EventEmitter = require('events');
const { sleep } = require('../libs/util.js');
const { spawn } = require('child_process'); const { spawn } = require('child_process');
const createWriteStream = require('fs').createWriteStream; const createWriteStream = require('fs').createWriteStream;
const unlink = require('fs/promises').unlink; const unlink = require('fs/promises').unlink;
const resolve = require('path').resolve; const resolve = require('path').resolve;
const STATE_READY = 'ready';
const STATE_PLAYING = 'playing';
const STATE_PAUSED = 'paused';
const STATE_STOPPED = 'stopped';
const STATE_ERROR = 'error';
class Player extends EventEmitter { class Player extends EventEmitter {
constructor() { constructor() {
super(); super();
this.timestamp = Date.now();
this.position = 0; this.position = 0;
this.events = []; this.events = [];
this.buffer = [];
this.tmp = { this.tmp = {
file: resolve(global.config?.tmp || '/tmp/kannon.tmp') file: resolve(global.config?.tmp || '/tmp/kannon.tmp')
}; };
this.buffersize = 0;
} }
async prepare(size, threshold) { async prepare(size, threshold) {
@ -31,35 +23,26 @@ class Player extends EventEmitter {
this.size = size; this.size = size;
this.threshold = threshold; this.threshold = threshold;
this.tmp.stream = createWriteStream(this.tmp.file); this.tmp.stream = createWriteStream(this.tmp.file);
this.tmp.stream.on('ready', async () => {
const timestamp = Date.now();
while (this.tmp.stream.bytesWritten !== this.size) {
if (this.buffer.length === 0) {
await sleep(1);
continue;
}
const tmp = this.buffer[0];
this.buffer.shift();
this.tmp.stream.write(tmp);
if (this.tmp.announced !== true && this.tmp.stream.bytesWritten >= this.threshold) {
this.#setState(STATE_READY);
logger.debug('threshold of ' + this.threshold + ' bytes reached after ' + (Date.now() - timestamp) + 'ms');
this.tmp.announced = true;
}
}
this.tmp.stream.end();
this.tmp.stream.close();
logger.debug('finished writing of ' + this.size + ' bytes after ' + (Date.now() - timestamp) + 'ms');
});
} }
feed(buffer) { async feed(buffer) {
this.buffer.push(buffer); this.tmp.stream.write(buffer);
if (this.tmp.announced === undefined && this.tmp.stream.bytesWritten >= this.threshold) {
this.tmp.announced = true;
this.#setState(constants.STATE_READY);
logger.debug('threshold of ' + this.threshold + ' bytes reached after ' + (Date.now() - this.timestamp) + 'ms');
}
}
stopFeed() {
logger.debug('finished writing of ' + this.tmp.stream.bytesWritten + ' bytes after ' + (Date.now() - this.timestamp) + 'ms');
this.tmp.stream.end();
this.tmp.stream.close();
} }
async play(position) { async play(position) {
if (this.isPlaying()) { if (this.isPlaying()) {
this.stop(); await this.stop();
} }
await this.#spawnProcess(position); await this.#spawnProcess(position);
this.process.stderr.on('data', (data) => { this.process.stderr.on('data', (data) => {
@ -71,38 +54,38 @@ class Player extends EventEmitter {
this.position = position; this.position = position;
}); });
this.process.stdin.on('error', (error) => { this.process.stdin.on('error', (error) => {
this.#setState(STATE_ERROR, error); this.#setState(constants.STATE_ERROR, error);
}); });
} }
async pause() { async pause() {
await this.#reset(true); await this.#reset(true);
this.#setState(STATE_PAUSED); this.#setState(constants.STATE_PAUSED);
} }
async stop() { async stop() {
await this.#reset(); await this.#reset();
this.#setState(STATE_STOPPED); this.#setState(constants.STATE_STOPPED);
} }
isReady() { isReady() {
return this.state === STATE_READY; return this.state === constants.STATE_READY;
} }
isPlaying() { isPlaying() {
return this.state === STATE_PLAYING; return this.state === constants.STATE_PLAYING;
} }
isPaused() { isPaused() {
return this.state === STATE_PAUSED; return this.state === constants.STATE_PAUSED;
} }
isFinished() { isFinished() {
return this.state === STATE_STOPPED; return this.state === constants.STATE_STOPPED;
} }
hasError() { hasError() {
return this.state === STATE_ERROR; return this.state === constants.STATE_ERROR;
} }
getPosition() { getPosition() {
@ -127,7 +110,7 @@ class Player extends EventEmitter {
}); });
this.process.on('spawn', () => { this.process.on('spawn', () => {
logger.info('spawned process \'ffplay\' (pid: ' + this.process.pid + ')...'); logger.info('spawned process \'ffplay\' (pid: ' + this.process.pid + ')...');
this.#setState(STATE_PLAYING); this.#setState(constants.STATE_PLAYING);
resolve(); resolve();
}); });
}); });
@ -198,7 +181,6 @@ class Player extends EventEmitter {
await this.#killProcess(); await this.#killProcess();
this.timestamp = Date.now(); this.timestamp = Date.now();
this.events = []; this.events = [];
this.buffer = [];
if (paused === true) { if (paused === true) {
return; return;
} }

View file

@ -21,6 +21,7 @@ async function main() {
exit('could not read config file at \'' + configPath + '\''); exit('could not read config file at \'' + configPath + '\'');
} }
handleExit(); handleExit();
global.constants = require('./libs/constants.js');
global.logger.info("launching " + packageJSON.name + " " + packageJSON.version + "..."); global.logger.info("launching " + packageJSON.name + " " + packageJSON.version + "...");
global.player = new Player(); global.player = new Player();
global.connection = new Connection(); global.connection = new Connection();

View file

@ -2,5 +2,11 @@ module.exports = {
SOCKET_EVENT_PING: 'ping', SOCKET_EVENT_PING: 'ping',
SOCKET_EVENT_PONG: 'pong', SOCKET_EVENT_PONG: 'pong',
STATE_READY: 'ready',
STATE_PLAYING: 'playing',
STATE_PAUSED: 'paused',
STATE_STOPPED: 'stopped',
STATE_ERROR: 'error',
EVENT_DELIMITER: '<<< kannon >>>' EVENT_DELIMITER: '<<< kannon >>>'
} }