diff --git a/classes/Audiostream.js b/classes/Audiostream.js index 0f44820..ba696b6 100644 --- a/classes/Audiostream.js +++ b/classes/Audiostream.js @@ -60,10 +60,10 @@ class Audiostream { } #handleSocket(socket) { - socket.on('connect', () => { + socket.on('connect', async () => { logger.debug('connected to audio server \'' + this.getTag() + '\'...'); - new Message('audio:register', { clientId: this.clientId, socket: socket.localPort }).send(); - global.player.prepare(this.size, this.threshold); + await global.player.prepare(this.size, this.threshold); + new Message('audio:register', { clientId: this.clientId, port: socket.localPort }).send(); }); socket.on('error', (error) => { logger.error('error connecting to audio server \'' + this.getTag() + '\': ' + error); @@ -79,6 +79,7 @@ class Audiostream { }); socket.on('close', () => { logger.info('connection to audio server \'' + this.getTag() + '\' closed'); + global.player.stopFeed(); }); } } diff --git a/classes/EventParser.js b/classes/EventParser.js index aad8e62..fb65dde 100644 --- a/classes/EventParser.js +++ b/classes/EventParser.js @@ -1,4 +1,3 @@ -const { EVENT_DELIMITER } = require('../libs/constants.js'); const EventEmitter = require('events'); class EventParser extends EventEmitter { @@ -13,7 +12,7 @@ class EventParser extends EventEmitter { return; } this.buffer += data; - const indexOfEnd = this.buffer.indexOf(EVENT_DELIMITER); + const indexOfEnd = this.buffer.indexOf(constants.EVENT_DELIMITER); if (indexOfEnd === -1) { return; } diff --git a/classes/Heartbeat.js b/classes/Heartbeat.js index ad1bdfe..901816f 100644 --- a/classes/Heartbeat.js +++ b/classes/Heartbeat.js @@ -31,10 +31,9 @@ class Heartbeat extends EventEmitter { } async #listenForPingPong() { - this.eventParser.on('ping', (data) => { + this.eventParser.on('ping', () => { logger.debug('handling event \'ping\', responding with \'pong\'...'); - data.client = Date.now(); - new Message('pong', data).send(); + new Message('pong').send(); }); this.eventParser.on('pong', () => { logger.debug('handling event \'pong\'...'); diff --git a/classes/Message.js b/classes/Message.js index f2f2454..5331b3d 100644 --- a/classes/Message.js +++ b/classes/Message.js @@ -1,5 +1,3 @@ -const { EVENT_DELIMITER } = require('../libs/constants.js'); - class Message { constructor(id, data) { @@ -29,7 +27,7 @@ class Message { const data = this.toString(); logger.debug('sending data to \'' + socket.remoteAddress + ':' + socket.remotePort + '\': ' + data); await new Promise((resolve, reject) => { - socket.write(data + EVENT_DELIMITER, resolve); + socket.write(data + constants.EVENT_DELIMITER, resolve); }); } diff --git a/classes/Player.js b/classes/Player.js index 29964a3..42d0ed8 100644 --- a/classes/Player.js +++ b/classes/Player.js @@ -1,27 +1,19 @@ const EventEmitter = require('events'); -const { sleep } = require('../libs/util.js'); const { spawn } = require('child_process'); const createWriteStream = require('fs').createWriteStream; const unlink = require('fs/promises').unlink; const resolve = require('path').resolve; -const STATE_READY = 'ready'; -const STATE_PLAYING = 'playing'; -const STATE_PAUSED = 'paused'; -const STATE_STOPPED = 'stopped'; -const STATE_ERROR = 'error'; - class Player extends EventEmitter { constructor() { super(); + this.timestamp = Date.now(); this.position = 0; this.events = []; - this.buffer = []; this.tmp = { file: resolve(global.config?.tmp || '/tmp/kannon.tmp') }; - this.buffersize = 0; } async prepare(size, threshold) { @@ -31,35 +23,26 @@ class Player extends EventEmitter { this.size = size; this.threshold = threshold; this.tmp.stream = createWriteStream(this.tmp.file); - this.tmp.stream.on('ready', async () => { - const timestamp = Date.now(); - while (this.tmp.stream.bytesWritten !== this.size) { - if (this.buffer.length === 0) { - await sleep(1); - continue; - } - const tmp = this.buffer[0]; - this.buffer.shift(); - this.tmp.stream.write(tmp); - if (this.tmp.announced !== true && this.tmp.stream.bytesWritten >= this.threshold) { - this.#setState(STATE_READY); - logger.debug('threshold of ' + this.threshold + ' bytes reached after ' + (Date.now() - timestamp) + 'ms'); - this.tmp.announced = true; - } - } - this.tmp.stream.end(); - this.tmp.stream.close(); - logger.debug('finished writing of ' + this.size + ' bytes after ' + (Date.now() - timestamp) + 'ms'); - }); } - feed(buffer) { - this.buffer.push(buffer); + async feed(buffer) { + this.tmp.stream.write(buffer); + if (this.tmp.announced === undefined && this.tmp.stream.bytesWritten >= this.threshold) { + this.tmp.announced = true; + this.#setState(constants.STATE_READY); + logger.debug('threshold of ' + this.threshold + ' bytes reached after ' + (Date.now() - this.timestamp) + 'ms'); + } + } + + stopFeed() { + logger.debug('finished writing of ' + this.tmp.stream.bytesWritten + ' bytes after ' + (Date.now() - this.timestamp) + 'ms'); + this.tmp.stream.end(); + this.tmp.stream.close(); } async play(position) { if (this.isPlaying()) { - this.stop(); + await this.stop(); } await this.#spawnProcess(position); this.process.stderr.on('data', (data) => { @@ -71,38 +54,38 @@ class Player extends EventEmitter { this.position = position; }); this.process.stdin.on('error', (error) => { - this.#setState(STATE_ERROR, error); + this.#setState(constants.STATE_ERROR, error); }); } async pause() { await this.#reset(true); - this.#setState(STATE_PAUSED); + this.#setState(constants.STATE_PAUSED); } async stop() { await this.#reset(); - this.#setState(STATE_STOPPED); + this.#setState(constants.STATE_STOPPED); } isReady() { - return this.state === STATE_READY; + return this.state === constants.STATE_READY; } isPlaying() { - return this.state === STATE_PLAYING; + return this.state === constants.STATE_PLAYING; } isPaused() { - return this.state === STATE_PAUSED; + return this.state === constants.STATE_PAUSED; } isFinished() { - return this.state === STATE_STOPPED; + return this.state === constants.STATE_STOPPED; } hasError() { - return this.state === STATE_ERROR; + return this.state === constants.STATE_ERROR; } getPosition() { @@ -127,7 +110,7 @@ class Player extends EventEmitter { }); this.process.on('spawn', () => { logger.info('spawned process \'ffplay\' (pid: ' + this.process.pid + ')...'); - this.#setState(STATE_PLAYING); + this.#setState(constants.STATE_PLAYING); resolve(); }); }); @@ -198,7 +181,6 @@ class Player extends EventEmitter { await this.#killProcess(); this.timestamp = Date.now(); this.events = []; - this.buffer = []; if (paused === true) { return; } diff --git a/kannon-client.js b/kannon-client.js index b23235e..9a1f992 100644 --- a/kannon-client.js +++ b/kannon-client.js @@ -21,6 +21,7 @@ async function main() { exit('could not read config file at \'' + configPath + '\''); } handleExit(); + global.constants = require('./libs/constants.js'); global.logger.info("launching " + packageJSON.name + " " + packageJSON.version + "..."); global.player = new Player(); global.connection = new Connection(); diff --git a/libs/constants.js b/libs/constants.js index 720c3a1..b7d6973 100644 --- a/libs/constants.js +++ b/libs/constants.js @@ -2,5 +2,11 @@ module.exports = { SOCKET_EVENT_PING: 'ping', SOCKET_EVENT_PONG: 'pong', + STATE_READY: 'ready', + STATE_PLAYING: 'playing', + STATE_PAUSED: 'paused', + STATE_STOPPED: 'stopped', + STATE_ERROR: 'error', + EVENT_DELIMITER: '<<< kannon >>>' } \ No newline at end of file