From 0a1b64800796a0fb53f6c872fae288d6a53c834e Mon Sep 17 00:00:00 2001 From: velvettear Date: Wed, 20 Apr 2022 16:15:51 +0200 Subject: [PATCH] further 'random' development --- classes/Audiostream.js | 68 +++++++++++------- classes/Connection.js | 11 +-- classes/Player.js | 160 +++++++++++++++++++---------------------- example_config.json | 3 +- 4 files changed, 126 insertions(+), 116 deletions(-) diff --git a/classes/Audiostream.js b/classes/Audiostream.js index 870ad9f..0f44820 100644 --- a/classes/Audiostream.js +++ b/classes/Audiostream.js @@ -1,18 +1,12 @@ const net = require('net'); +const { sleep } = require('../libs/util'); const Message = require('./Message'); class Audiostream { - constructor(data) { - this.host = config?.server?.host || "127.0.0.1"; - this.port = data.port; - this.clientId = data.clientId; - this.size = data.size; - this.threshold = (this.size / 100) * 10; - this.#handleSocket(net.connect({ - host: this.getHost(), - port: this.getPort() - })); + constructor(eventParser) { + this.eventParser = eventParser; + this.#handleEvents(); } getHost() { @@ -27,10 +21,48 @@ class Audiostream { return this.getHost() + ':' + this.getPort(); } + #initialize(data) { + this.host = config?.server?.host || "127.0.0.1"; + this.port = data.port; + this.clientId = data.clientId; + this.size = data.size; + this.threshold = data.threshold; + this.#handleSocket(net.connect({ + host: this.getHost(), + port: this.getPort() + })); + } + + #handleEvents() { + this.eventParser.on('audio:initialize', (data) => { + logger.debug('handling event \'audio:initialize\'...'); + this.#initialize(data); + }); + this.eventParser.on('audio:play', (data) => { + logger.debug('handling event \'audio:play\'...'); + global.player.play(data?.position); + }); + this.eventParser.on('audio:pause', (data) => { + logger.debug('handling event \'audio:pause\'...'); + global.player.pause(); + }); + this.eventParser.on('audio:stop', () => { + logger.debug('handling event \'audio:stop\'...'); + global.player.stop(); + }); + global.player.on('statechange', (data) => { + new Message('audio:state', { + clientId: this.clientId, + state: data.state, + position: global.player.getPosition() + }).send(); + }); + } + #handleSocket(socket) { socket.on('connect', () => { logger.debug('connected to audio server \'' + this.getTag() + '\'...'); - new Message('audiostream-ready', this.clientId).send(socket); + new Message('audio:register', { clientId: this.clientId, socket: socket.localPort }).send(); global.player.prepare(this.size, this.threshold); }); socket.on('error', (error) => { @@ -48,21 +80,7 @@ class Audiostream { socket.on('close', () => { logger.info('connection to audio server \'' + this.getTag() + '\' closed'); }); - global.player.on('ready', () => { - global.player.play(); - }); - global.player.on('playing', async () => { - await new Promise((resolve, reject) => { - setTimeout(resolve, 10000); - }); - global.player.pause(); - await new Promise((resolve, reject) => { - setTimeout(resolve, 2000); - }); - global.player.play(); - }); } - } module.exports = Audiostream; \ No newline at end of file diff --git a/classes/Connection.js b/classes/Connection.js index 091cf54..dbb6c64 100644 --- a/classes/Connection.js +++ b/classes/Connection.js @@ -52,6 +52,7 @@ class Connection { logger.info('connected to communication server \'' + this.getTag() + '\'...'); this.socket = socket; this.eventParser = new EventParser(); + this.audiostream = new Audiostream(this.eventParser); this.heartbeat = new Heartbeat(this.eventParser); this.#handleHeartbeat(); socket.on('timeout', () => { @@ -66,10 +67,6 @@ class Connection { socket.on('data', (data) => { this.#handleEventData(data); }); - this.eventParser.on('audiostream-initialize', (data) => { - logger.debug('handling event \'audiostream-initialize\'...'); - new Audiostream(data) - }); } async #handleEventData(data) { @@ -101,10 +98,16 @@ class Connection { }); } + #handleAudioEvents() { + + + } + destroy() { if (this.heartbeat !== undefined) { this.heartbeat.destroy(); this.heartbeat.removeAllListeners('timeout'); + this.heartbeat = undefined; } if (this.socket !== undefined) { this.socket.removeAllListeners('connect'); diff --git a/classes/Player.js b/classes/Player.js index 31030db..29964a3 100644 --- a/classes/Player.js +++ b/classes/Player.js @@ -5,7 +5,6 @@ const createWriteStream = require('fs').createWriteStream; const unlink = require('fs/promises').unlink; const resolve = require('path').resolve; -const STATE_SPAWNED = 'spawned'; const STATE_READY = 'ready'; const STATE_PLAYING = 'playing'; const STATE_PAUSED = 'paused'; @@ -20,35 +19,50 @@ class Player extends EventEmitter { this.events = []; this.buffer = []; this.tmp = { - file: resolve('/tmp/kannon.tmp') - } + file: resolve(global.config?.tmp || '/tmp/kannon.tmp') + }; this.buffersize = 0; } async prepare(size, threshold) { logger.debug('preparing audio player...'); + await this.#reset(); + await this.#removeTemporaryFile(); this.size = size; this.threshold = threshold; this.tmp.stream = createWriteStream(this.tmp.file); - this.#reset(); - this.#spawnProcess(); + this.tmp.stream.on('ready', async () => { + const timestamp = Date.now(); + while (this.tmp.stream.bytesWritten !== this.size) { + if (this.buffer.length === 0) { + await sleep(1); + continue; + } + const tmp = this.buffer[0]; + this.buffer.shift(); + this.tmp.stream.write(tmp); + if (this.tmp.announced !== true && this.tmp.stream.bytesWritten >= this.threshold) { + this.#setState(STATE_READY); + logger.debug('threshold of ' + this.threshold + ' bytes reached after ' + (Date.now() - timestamp) + 'ms'); + this.tmp.announced = true; + } + } + this.tmp.stream.end(); + this.tmp.stream.close(); + logger.debug('finished writing of ' + this.size + ' bytes after ' + (Date.now() - timestamp) + 'ms'); + }); } feed(buffer) { this.buffer.push(buffer); - this.buffersize += buffer.length; - if (this.isSpawned() && this.buffersize >= this.threshold) { - this.#setState(STATE_READY); - } } - async play() { + async play(position) { if (this.isPlaying()) { this.stop(); } - await this.#spawnProcess(); + await this.#spawnProcess(position); this.process.stderr.on('data', (data) => { - this.#setState(STATE_PLAYING); data = data.toString(); const position = data.toString().trim().split(' ')[0]; if (position.length === 0 || isNaN(position)) { @@ -59,45 +73,16 @@ class Player extends EventEmitter { this.process.stdin.on('error', (error) => { this.#setState(STATE_ERROR, error); }); - if (this.process.spawnargs[this.process.spawnargs.length] === this.tmp.file) { - return; - } - if (this.buffer === undefined || this.buffer.length === 0) { - logger.warn('aborting playback of an empty buffer...'); - return; - } - const timestamp = Date.now(); - while (true) { - if (this.buffer.length === 0 && this.buffersize !== this.size) { - await sleep(1); - continue; - } - const tmp = this.buffer[0]; - this.buffer.shift(); - if (this.buffer.length === 0 && this.buffersize === this.size) { - logger.warn('BUFFER EMPTIED AFTER ' + ( Date.now() - timestamp) + 'ms') - this.process.stdin.end(tmp); - this.tmp.stream.end(tmp); - break; - } - this.tmp.stream.write(tmp); - this.process.stdin.write(tmp); - } } async pause() { + await this.#reset(true); this.#setState(STATE_PAUSED); - this.#reset(); - this.#spawnProcess(); } async stop() { + await this.#reset(); this.#setState(STATE_STOPPED); - this.#reset(); - } - - isSpawned() { - return this.state === STATE_SPAWNED; } isReady() { @@ -120,50 +105,29 @@ class Player extends EventEmitter { return this.state === STATE_ERROR; } - async #spawnProcess() { - if (this.process !== undefined) { - return; - } + getPosition() { + return this.position; + } + + async #spawnProcess(position) { return new Promise((resolve, reject) => { const args = [ '-vn', '-nodisp' ]; - if (this.isPaused()) { - args.unshift('-ss', this.position); - args.push(this.tmp.file); - } else { - args.push('-'); + if (this.isPaused() && !isNaN(position)) { + args.unshift('-ss', position); } + args.push(this.tmp.file); this.process = spawn("ffplay", args); this.process.on('error', (error) => { this.#reset(); // TODO: try/catch error reject('error spawning process \'ffplay\': ' + error); }); - this.process.on('exit', (code, signal) => { - let msg = 'process \'ffplay\' exited with'; - if (code !== undefined) { - msg += ' code \'' + code + '\''; - } else { - msg += ' signal \'' + signal + '\''; - } - logger.debug(msg); - this.#closeStdIO(); - }); - this.process.on('close', (code, signal) => { - let msg = 'process \'ffplay\' closed with'; - if (code !== undefined) { - msg += ' code \'' + code + '\''; - } else { - msg += ' signal \'' + signal + '\''; - } - logger.debug(msg); - this.process = undefined; - }); this.process.on('spawn', () => { logger.info('spawned process \'ffplay\' (pid: ' + this.process.pid + ')...'); - this.#setState(STATE_SPAWNED); + this.#setState(STATE_PLAYING); resolve(); }); }); @@ -180,15 +144,33 @@ class Player extends EventEmitter { return; } logger.debug('emitting state \'' + state + '\' of audio player...'); - this.emit(state, data); + this.emit(this.state, { data: data }); + this.emit('statechange', { state: this.state }); this.events.push(state); } - #killProcess() { + async #killProcess() { + if (this.process === undefined) { + return; + } + const pid = this.process.pid; this.#closeStdIO(); if (this.process?.killed === false) { this.process.kill('SIGTERM'); } + await new Promise((resolve, reject) => { + this.process.on('close', (code, signal) => { + let msg = 'process \'ffplay\' (pid: ' + pid + ') closed with'; + if (code !== undefined) { + msg += ' code \'' + code + '\''; + } else { + msg += ' signal \'' + signal + '\''; + } + logger.debug(msg); + this.process = undefined; + resolve(); + }); + }); } #closeStdIO() { @@ -201,20 +183,26 @@ class Player extends EventEmitter { } } - async #reset() { - this.#killProcess(); + async #removeTemporaryFile() { + try { + await unlink(this.tmp.file); + } catch (error) { + if (error?.code === 'ENOENT') { + return; + } + logger.error('error removing temporary file \'' + this.tmp.file + '\': ' + error); + } + } + + async #reset(paused) { + await this.#killProcess(); + this.timestamp = Date.now(); this.events = []; this.buffer = []; - if (!this.isPaused()) { - this.position = 0; - this.backupbuffer = []; - this.buffersize = 0; - try { - await unlink(this.tmp.file); - } catch (error) { - logger.error('no file to unlink ' + this.tmp.file); - } + if (paused === true) { + return; } + this.position = 0; } } diff --git a/example_config.json b/example_config.json index 807fda2..1c59319 100644 --- a/example_config.json +++ b/example_config.json @@ -11,5 +11,6 @@ "reconnect": { "limit": 0, "delay": 1000 - } + }, + "tmp": "/tmp/kannon.tmp" } \ No newline at end of file