further 'random' development
This commit is contained in:
parent
4fa68a3059
commit
0a1b648007
4 changed files with 126 additions and 116 deletions
|
@ -1,18 +1,12 @@
|
||||||
const net = require('net');
|
const net = require('net');
|
||||||
|
const { sleep } = require('../libs/util');
|
||||||
const Message = require('./Message');
|
const Message = require('./Message');
|
||||||
|
|
||||||
class Audiostream {
|
class Audiostream {
|
||||||
|
|
||||||
constructor(data) {
|
constructor(eventParser) {
|
||||||
this.host = config?.server?.host || "127.0.0.1";
|
this.eventParser = eventParser;
|
||||||
this.port = data.port;
|
this.#handleEvents();
|
||||||
this.clientId = data.clientId;
|
|
||||||
this.size = data.size;
|
|
||||||
this.threshold = (this.size / 100) * 10;
|
|
||||||
this.#handleSocket(net.connect({
|
|
||||||
host: this.getHost(),
|
|
||||||
port: this.getPort()
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
getHost() {
|
getHost() {
|
||||||
|
@ -27,10 +21,48 @@ class Audiostream {
|
||||||
return this.getHost() + ':' + this.getPort();
|
return this.getHost() + ':' + this.getPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#initialize(data) {
|
||||||
|
this.host = config?.server?.host || "127.0.0.1";
|
||||||
|
this.port = data.port;
|
||||||
|
this.clientId = data.clientId;
|
||||||
|
this.size = data.size;
|
||||||
|
this.threshold = data.threshold;
|
||||||
|
this.#handleSocket(net.connect({
|
||||||
|
host: this.getHost(),
|
||||||
|
port: this.getPort()
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#handleEvents() {
|
||||||
|
this.eventParser.on('audio:initialize', (data) => {
|
||||||
|
logger.debug('handling event \'audio:initialize\'...');
|
||||||
|
this.#initialize(data);
|
||||||
|
});
|
||||||
|
this.eventParser.on('audio:play', (data) => {
|
||||||
|
logger.debug('handling event \'audio:play\'...');
|
||||||
|
global.player.play(data?.position);
|
||||||
|
});
|
||||||
|
this.eventParser.on('audio:pause', (data) => {
|
||||||
|
logger.debug('handling event \'audio:pause\'...');
|
||||||
|
global.player.pause();
|
||||||
|
});
|
||||||
|
this.eventParser.on('audio:stop', () => {
|
||||||
|
logger.debug('handling event \'audio:stop\'...');
|
||||||
|
global.player.stop();
|
||||||
|
});
|
||||||
|
global.player.on('statechange', (data) => {
|
||||||
|
new Message('audio:state', {
|
||||||
|
clientId: this.clientId,
|
||||||
|
state: data.state,
|
||||||
|
position: global.player.getPosition()
|
||||||
|
}).send();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
#handleSocket(socket) {
|
#handleSocket(socket) {
|
||||||
socket.on('connect', () => {
|
socket.on('connect', () => {
|
||||||
logger.debug('connected to audio server \'' + this.getTag() + '\'...');
|
logger.debug('connected to audio server \'' + this.getTag() + '\'...');
|
||||||
new Message('audiostream-ready', this.clientId).send(socket);
|
new Message('audio:register', { clientId: this.clientId, socket: socket.localPort }).send();
|
||||||
global.player.prepare(this.size, this.threshold);
|
global.player.prepare(this.size, this.threshold);
|
||||||
});
|
});
|
||||||
socket.on('error', (error) => {
|
socket.on('error', (error) => {
|
||||||
|
@ -48,21 +80,7 @@ class Audiostream {
|
||||||
socket.on('close', () => {
|
socket.on('close', () => {
|
||||||
logger.info('connection to audio server \'' + this.getTag() + '\' closed');
|
logger.info('connection to audio server \'' + this.getTag() + '\' closed');
|
||||||
});
|
});
|
||||||
global.player.on('ready', () => {
|
|
||||||
global.player.play();
|
|
||||||
});
|
|
||||||
global.player.on('playing', async () => {
|
|
||||||
await new Promise((resolve, reject) => {
|
|
||||||
setTimeout(resolve, 10000);
|
|
||||||
});
|
|
||||||
global.player.pause();
|
|
||||||
await new Promise((resolve, reject) => {
|
|
||||||
setTimeout(resolve, 2000);
|
|
||||||
});
|
|
||||||
global.player.play();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = Audiostream;
|
module.exports = Audiostream;
|
|
@ -52,6 +52,7 @@ class Connection {
|
||||||
logger.info('connected to communication server \'' + this.getTag() + '\'...');
|
logger.info('connected to communication server \'' + this.getTag() + '\'...');
|
||||||
this.socket = socket;
|
this.socket = socket;
|
||||||
this.eventParser = new EventParser();
|
this.eventParser = new EventParser();
|
||||||
|
this.audiostream = new Audiostream(this.eventParser);
|
||||||
this.heartbeat = new Heartbeat(this.eventParser);
|
this.heartbeat = new Heartbeat(this.eventParser);
|
||||||
this.#handleHeartbeat();
|
this.#handleHeartbeat();
|
||||||
socket.on('timeout', () => {
|
socket.on('timeout', () => {
|
||||||
|
@ -66,10 +67,6 @@ class Connection {
|
||||||
socket.on('data', (data) => {
|
socket.on('data', (data) => {
|
||||||
this.#handleEventData(data);
|
this.#handleEventData(data);
|
||||||
});
|
});
|
||||||
this.eventParser.on('audiostream-initialize', (data) => {
|
|
||||||
logger.debug('handling event \'audiostream-initialize\'...');
|
|
||||||
new Audiostream(data)
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async #handleEventData(data) {
|
async #handleEventData(data) {
|
||||||
|
@ -101,10 +98,16 @@ class Connection {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#handleAudioEvents() {
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
destroy() {
|
destroy() {
|
||||||
if (this.heartbeat !== undefined) {
|
if (this.heartbeat !== undefined) {
|
||||||
this.heartbeat.destroy();
|
this.heartbeat.destroy();
|
||||||
this.heartbeat.removeAllListeners('timeout');
|
this.heartbeat.removeAllListeners('timeout');
|
||||||
|
this.heartbeat = undefined;
|
||||||
}
|
}
|
||||||
if (this.socket !== undefined) {
|
if (this.socket !== undefined) {
|
||||||
this.socket.removeAllListeners('connect');
|
this.socket.removeAllListeners('connect');
|
||||||
|
|
|
@ -5,7 +5,6 @@ const createWriteStream = require('fs').createWriteStream;
|
||||||
const unlink = require('fs/promises').unlink;
|
const unlink = require('fs/promises').unlink;
|
||||||
const resolve = require('path').resolve;
|
const resolve = require('path').resolve;
|
||||||
|
|
||||||
const STATE_SPAWNED = 'spawned';
|
|
||||||
const STATE_READY = 'ready';
|
const STATE_READY = 'ready';
|
||||||
const STATE_PLAYING = 'playing';
|
const STATE_PLAYING = 'playing';
|
||||||
const STATE_PAUSED = 'paused';
|
const STATE_PAUSED = 'paused';
|
||||||
|
@ -20,35 +19,50 @@ class Player extends EventEmitter {
|
||||||
this.events = [];
|
this.events = [];
|
||||||
this.buffer = [];
|
this.buffer = [];
|
||||||
this.tmp = {
|
this.tmp = {
|
||||||
file: resolve('/tmp/kannon.tmp')
|
file: resolve(global.config?.tmp || '/tmp/kannon.tmp')
|
||||||
}
|
};
|
||||||
this.buffersize = 0;
|
this.buffersize = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
async prepare(size, threshold) {
|
async prepare(size, threshold) {
|
||||||
logger.debug('preparing audio player...');
|
logger.debug('preparing audio player...');
|
||||||
|
await this.#reset();
|
||||||
|
await this.#removeTemporaryFile();
|
||||||
this.size = size;
|
this.size = size;
|
||||||
this.threshold = threshold;
|
this.threshold = threshold;
|
||||||
this.tmp.stream = createWriteStream(this.tmp.file);
|
this.tmp.stream = createWriteStream(this.tmp.file);
|
||||||
this.#reset();
|
this.tmp.stream.on('ready', async () => {
|
||||||
this.#spawnProcess();
|
const timestamp = Date.now();
|
||||||
|
while (this.tmp.stream.bytesWritten !== this.size) {
|
||||||
|
if (this.buffer.length === 0) {
|
||||||
|
await sleep(1);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const tmp = this.buffer[0];
|
||||||
|
this.buffer.shift();
|
||||||
|
this.tmp.stream.write(tmp);
|
||||||
|
if (this.tmp.announced !== true && this.tmp.stream.bytesWritten >= this.threshold) {
|
||||||
|
this.#setState(STATE_READY);
|
||||||
|
logger.debug('threshold of ' + this.threshold + ' bytes reached after ' + (Date.now() - timestamp) + 'ms');
|
||||||
|
this.tmp.announced = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.tmp.stream.end();
|
||||||
|
this.tmp.stream.close();
|
||||||
|
logger.debug('finished writing of ' + this.size + ' bytes after ' + (Date.now() - timestamp) + 'ms');
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
feed(buffer) {
|
feed(buffer) {
|
||||||
this.buffer.push(buffer);
|
this.buffer.push(buffer);
|
||||||
this.buffersize += buffer.length;
|
|
||||||
if (this.isSpawned() && this.buffersize >= this.threshold) {
|
|
||||||
this.#setState(STATE_READY);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async play() {
|
async play(position) {
|
||||||
if (this.isPlaying()) {
|
if (this.isPlaying()) {
|
||||||
this.stop();
|
this.stop();
|
||||||
}
|
}
|
||||||
await this.#spawnProcess();
|
await this.#spawnProcess(position);
|
||||||
this.process.stderr.on('data', (data) => {
|
this.process.stderr.on('data', (data) => {
|
||||||
this.#setState(STATE_PLAYING);
|
|
||||||
data = data.toString();
|
data = data.toString();
|
||||||
const position = data.toString().trim().split(' ')[0];
|
const position = data.toString().trim().split(' ')[0];
|
||||||
if (position.length === 0 || isNaN(position)) {
|
if (position.length === 0 || isNaN(position)) {
|
||||||
|
@ -59,45 +73,16 @@ class Player extends EventEmitter {
|
||||||
this.process.stdin.on('error', (error) => {
|
this.process.stdin.on('error', (error) => {
|
||||||
this.#setState(STATE_ERROR, error);
|
this.#setState(STATE_ERROR, error);
|
||||||
});
|
});
|
||||||
if (this.process.spawnargs[this.process.spawnargs.length] === this.tmp.file) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (this.buffer === undefined || this.buffer.length === 0) {
|
|
||||||
logger.warn('aborting playback of an empty buffer...');
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const timestamp = Date.now();
|
|
||||||
while (true) {
|
|
||||||
if (this.buffer.length === 0 && this.buffersize !== this.size) {
|
|
||||||
await sleep(1);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const tmp = this.buffer[0];
|
|
||||||
this.buffer.shift();
|
|
||||||
if (this.buffer.length === 0 && this.buffersize === this.size) {
|
|
||||||
logger.warn('BUFFER EMPTIED AFTER ' + ( Date.now() - timestamp) + 'ms')
|
|
||||||
this.process.stdin.end(tmp);
|
|
||||||
this.tmp.stream.end(tmp);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
this.tmp.stream.write(tmp);
|
|
||||||
this.process.stdin.write(tmp);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async pause() {
|
async pause() {
|
||||||
|
await this.#reset(true);
|
||||||
this.#setState(STATE_PAUSED);
|
this.#setState(STATE_PAUSED);
|
||||||
this.#reset();
|
|
||||||
this.#spawnProcess();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async stop() {
|
async stop() {
|
||||||
|
await this.#reset();
|
||||||
this.#setState(STATE_STOPPED);
|
this.#setState(STATE_STOPPED);
|
||||||
this.#reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
isSpawned() {
|
|
||||||
return this.state === STATE_SPAWNED;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
isReady() {
|
isReady() {
|
||||||
|
@ -120,50 +105,29 @@ class Player extends EventEmitter {
|
||||||
return this.state === STATE_ERROR;
|
return this.state === STATE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
async #spawnProcess() {
|
getPosition() {
|
||||||
if (this.process !== undefined) {
|
return this.position;
|
||||||
return;
|
}
|
||||||
}
|
|
||||||
|
async #spawnProcess(position) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
const args = [
|
const args = [
|
||||||
'-vn',
|
'-vn',
|
||||||
'-nodisp'
|
'-nodisp'
|
||||||
];
|
];
|
||||||
if (this.isPaused()) {
|
if (this.isPaused() && !isNaN(position)) {
|
||||||
args.unshift('-ss', this.position);
|
args.unshift('-ss', position);
|
||||||
args.push(this.tmp.file);
|
|
||||||
} else {
|
|
||||||
args.push('-');
|
|
||||||
}
|
}
|
||||||
|
args.push(this.tmp.file);
|
||||||
this.process = spawn("ffplay", args);
|
this.process = spawn("ffplay", args);
|
||||||
this.process.on('error', (error) => {
|
this.process.on('error', (error) => {
|
||||||
this.#reset();
|
this.#reset();
|
||||||
// TODO: try/catch error
|
// TODO: try/catch error
|
||||||
reject('error spawning process \'ffplay\': ' + error);
|
reject('error spawning process \'ffplay\': ' + error);
|
||||||
});
|
});
|
||||||
this.process.on('exit', (code, signal) => {
|
|
||||||
let msg = 'process \'ffplay\' exited with';
|
|
||||||
if (code !== undefined) {
|
|
||||||
msg += ' code \'' + code + '\'';
|
|
||||||
} else {
|
|
||||||
msg += ' signal \'' + signal + '\'';
|
|
||||||
}
|
|
||||||
logger.debug(msg);
|
|
||||||
this.#closeStdIO();
|
|
||||||
});
|
|
||||||
this.process.on('close', (code, signal) => {
|
|
||||||
let msg = 'process \'ffplay\' closed with';
|
|
||||||
if (code !== undefined) {
|
|
||||||
msg += ' code \'' + code + '\'';
|
|
||||||
} else {
|
|
||||||
msg += ' signal \'' + signal + '\'';
|
|
||||||
}
|
|
||||||
logger.debug(msg);
|
|
||||||
this.process = undefined;
|
|
||||||
});
|
|
||||||
this.process.on('spawn', () => {
|
this.process.on('spawn', () => {
|
||||||
logger.info('spawned process \'ffplay\' (pid: ' + this.process.pid + ')...');
|
logger.info('spawned process \'ffplay\' (pid: ' + this.process.pid + ')...');
|
||||||
this.#setState(STATE_SPAWNED);
|
this.#setState(STATE_PLAYING);
|
||||||
resolve();
|
resolve();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -180,15 +144,33 @@ class Player extends EventEmitter {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
logger.debug('emitting state \'' + state + '\' of audio player...');
|
logger.debug('emitting state \'' + state + '\' of audio player...');
|
||||||
this.emit(state, data);
|
this.emit(this.state, { data: data });
|
||||||
|
this.emit('statechange', { state: this.state });
|
||||||
this.events.push(state);
|
this.events.push(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
#killProcess() {
|
async #killProcess() {
|
||||||
|
if (this.process === undefined) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const pid = this.process.pid;
|
||||||
this.#closeStdIO();
|
this.#closeStdIO();
|
||||||
if (this.process?.killed === false) {
|
if (this.process?.killed === false) {
|
||||||
this.process.kill('SIGTERM');
|
this.process.kill('SIGTERM');
|
||||||
}
|
}
|
||||||
|
await new Promise((resolve, reject) => {
|
||||||
|
this.process.on('close', (code, signal) => {
|
||||||
|
let msg = 'process \'ffplay\' (pid: ' + pid + ') closed with';
|
||||||
|
if (code !== undefined) {
|
||||||
|
msg += ' code \'' + code + '\'';
|
||||||
|
} else {
|
||||||
|
msg += ' signal \'' + signal + '\'';
|
||||||
|
}
|
||||||
|
logger.debug(msg);
|
||||||
|
this.process = undefined;
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#closeStdIO() {
|
#closeStdIO() {
|
||||||
|
@ -201,20 +183,26 @@ class Player extends EventEmitter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async #reset() {
|
async #removeTemporaryFile() {
|
||||||
this.#killProcess();
|
try {
|
||||||
|
await unlink(this.tmp.file);
|
||||||
|
} catch (error) {
|
||||||
|
if (error?.code === 'ENOENT') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.error('error removing temporary file \'' + this.tmp.file + '\': ' + error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async #reset(paused) {
|
||||||
|
await this.#killProcess();
|
||||||
|
this.timestamp = Date.now();
|
||||||
this.events = [];
|
this.events = [];
|
||||||
this.buffer = [];
|
this.buffer = [];
|
||||||
if (!this.isPaused()) {
|
if (paused === true) {
|
||||||
this.position = 0;
|
return;
|
||||||
this.backupbuffer = [];
|
|
||||||
this.buffersize = 0;
|
|
||||||
try {
|
|
||||||
await unlink(this.tmp.file);
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('no file to unlink ' + this.tmp.file);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
this.position = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,5 +11,6 @@
|
||||||
"reconnect": {
|
"reconnect": {
|
||||||
"limit": 0,
|
"limit": 0,
|
||||||
"delay": 1000
|
"delay": 1000
|
||||||
}
|
},
|
||||||
|
"tmp": "/tmp/kannon.tmp"
|
||||||
}
|
}
|
Loading…
Reference in a new issue