const NodeSpeaker = require('speaker'); const EventEmitter = require('events'); const { spawn } = require('child_process'); const createWriteStream = require('fs').createWriteStream; const unlink = require('fs/promises').unlink; const resolve = require('path').resolve; class Player extends EventEmitter { constructor() { super(); this.timestamp = Date.now(); this.position = 0; this.events = []; this.tmp = { file: resolve(global.config?.tmp || '/tmp/kannon.tmp') }; this.buffer = { size: 0, elements: [] }; } async prepare(threshold, stream) { logger.debug('preparing audio player...'); await this.#reset(); // await this.#removeTemporaryFile(); this.buffer.threshold = threshold; this.stream = stream; // this.tmp.stream = createWriteStream(this.tmp.file); // this.speaker = new Speaker(speakeroptions.channel, speakeroptions.bitDepth, speakeroptions.sampleRate); this.speaker = new NodeSpeaker({ channels: 2, bitDepth: 16, sampleRate: 44100 }); this.speaker.on('open', () => { logger.debug('speaker opened...'); }); this.speaker.on('flush', () => { logger.debug('speaker flushed...'); }); this.speaker.on('close', () => { logger.debug('speaker closed...'); }); this.speaker.on('drain', () => { if (this.isPlaying() === false) { return; } this.playFromBuffer(); }); this.buffer.limit = config?.buffer?.limit; if (isNaN(this.buffer.limit) || this.buffer.limit < this.buffer.threshold) { this.buffer.limit = this.buffer.threshold; } this.#fillBuffer(); } #fillBuffer() { this.stream.on('data', (data) => { this.buffer.size += data.length; this.buffer.elements.push(data); if (this.buffer.announced === undefined && this.buffer.size >= this.buffer.threshold) { this.buffer.announced = true; this.#setState(constants.STATE_READY); logger.debug('buffer threshold of ' + this.buffer.threshold + ' bytes reached after ' + (Date.now() - this.timestamp) + 'ms'); } if (this.buffer.size >= this.buffer.limit) { logger.debug('buffer limit of ' + this.buffer.limit + ' bytes reached, pausing read stream...'); this.stream.pause(); } }); } speak() { this.#setState(constants.STATE_PLAYING); this.playFromBuffer(); } playFromBuffer() { const tmp = this.buffer.elements[0]; this.buffer.elements.shift(); this.buffer.size -= tmp.length; this.speaker.write(tmp); if (this.buffer.size < this.buffer.limit) { logger.debug('fell below buffer limit of ' + this.buffer.limit + ' bytes, resuming read stream...'); this.stream.resume(); } } // async feed(buffer) { // this.tmp.stream.write(buffer); // if (this.tmp.announced === undefined && this.tmp.stream.bytesWritten >= this.threshold) { // this.tmp.announced = true; // this.#setState(constants.STATE_READY); // logger.debug('threshold of ' + this.threshold + ' bytes reached after ' + (Date.now() - this.timestamp) + 'ms'); // } // } stopFeed() { // logger.debug('finished writing of ' + this.tmp.stream.bytesWritten + ' bytes after ' + (Date.now() - this.timestamp) + 'ms'); // this.tmp.stream.end(); // this.tmp.stream.close(); } async play(position) { if (this.isPlaying()) { await this.stop(); } await this.#spawnProcess(position); this.process.stderr.on('data', (data) => { data = data.toString(); const position = data.toString().trim().split(' ')[0]; if (position.length === 0 || isNaN(position)) { return; } this.position = position; }); this.process.stdin.on('error', (error) => { this.#setState(constants.STATE_ERROR, error); }); } async pause() { await this.#reset(true); this.#setState(constants.STATE_PAUSED); } async stop() { await this.#reset(); this.#setState(constants.STATE_STOPPED); } isReady() { return this.state === constants.STATE_READY; } isPlaying() { return this.state === constants.STATE_PLAYING; } isPaused() { return this.state === constants.STATE_PAUSED; } isFinished() { return this.state === constants.STATE_STOPPED; } hasError() { return this.state === constants.STATE_ERROR; } getPosition() { return this.position; } async #spawnProcess(position) { return new Promise((resolve, reject) => { const args = [ '-vn', '-nodisp' ]; if (this.isPaused() && !isNaN(position)) { args.unshift('-ss', position); } args.push(this.tmp.file); this.process = spawn("ffplay", args); this.process.on('error', (error) => { this.#reset(); // TODO: try/catch error reject('error spawning process \'ffplay\': ' + error); }); this.process.on('spawn', () => { logger.info('spawned process \'ffplay\' (pid: ' + this.process.pid + ')...'); this.#setState(constants.STATE_PLAYING); resolve(); }); }); } #setState(state, data) { if (this.state === state) { return; } this.state = state; logger.debug('setting state of audio player to \'' + state + '\'...'); if (this.events.includes(state)) { return; } logger.debug('emitting state \'' + state + '\' of audio player...'); this.emit(this.state, { data: data }); this.emit('statechange', { state: this.state }); this.events.push(state); } async #killProcess() { if (this.process === undefined) { return; } const pid = this.process.pid; this.#closeStdIO(); if (this.process?.killed === false) { this.process.kill('SIGTERM'); } await new Promise((resolve, reject) => { this.process.on('close', (code, signal) => { let msg = 'process \'ffplay\' (pid: ' + pid + ') closed with'; if (code !== undefined) { msg += ' code \'' + code + '\''; } else { msg += ' signal \'' + signal + '\''; } logger.debug(msg); this.process = undefined; resolve(); }); }); } #closeStdIO() { if (this.process?.stdio === undefined) { return; } logger.debug('closing all stdio streams of process \'ffplay\' (pid: ' + this.process.pid + ')...'); for (let index = 0; index < this.process.stdio.length; index++) { this.process.stdio[index].destroy(); } } async #removeTemporaryFile() { try { await unlink(this.tmp.file); } catch (error) { if (error?.code === 'ENOENT') { return; } logger.error('error removing temporary file \'' + this.tmp.file + '\': ' + error); } } async #reset(paused) { await this.#killProcess(); this.timestamp = Date.now(); this.events = []; if (paused === true) { return; } this.position = 0; } } module.exports = Player;