initial commit

This commit is contained in:
Daniel Sommer 2022-04-14 14:25:48 +02:00
commit 416b404894
19 changed files with 886 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
config.json
node_modules/
npm-debug.log

1
.nvmrc Normal file
View file

@ -0,0 +1 @@
17

18
.vscode/launch.json vendored Normal file
View file

@ -0,0 +1,18 @@
{
"version": "0.2.0",
"configurations": [
{
"type": "pwa-node",
"runtimeVersion": "17",
"request": "launch",
"name": "kannon-client",
"skipFiles": [
"<node_internals>/**"
],
"program": "${workspaceFolder}/kannon-client.js",
"args": [
"${workspaceFolder}/example_config.json"
]
}
]
}

20
LICENSE.md Normal file
View file

@ -0,0 +1,20 @@
# MIT License
**Copyright (c) 2022 Daniel Sommer \<daniel.sommer@velvettear.de\>**
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:
The above copyright notice and this permission notice (including the next
paragraph) shall be included in all copies or substantial portions of the
Software.
**THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF
OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.**

8
README.md Normal file
View file

@ -0,0 +1,8 @@
# kannon-client
client for the multi room audio player [kannon](https://git.velvettear.de/velvettear/kannon.git)
## requirements
- node.js
- [nvm](https://github.com/nvm-sh/nvm)

66
classes/Audiostream.js Normal file
View file

@ -0,0 +1,66 @@
const net = require('net');
const Message = require('./Message');
const Player = require('./Player.js');
class Audiostream {
constructor(data) {
this.host = config?.server?.host || "127.0.0.1";
this.port = data.port;
this.clientId = data.clientId;
this.size = data.size;
this.threshold = (this.size / 100) * 10;
this.#handleSocket(net.connect({
host: this.getHost(),
port: this.getPort()
}));
}
getHost() {
return this.host;
}
getPort() {
return this.port;
}
getTag() {
return this.getHost() + ':' + this.getPort();
}
#handleSocket(socket) {
socket.on('connect', () => {
logger.debug('connected to audio server \'' + this.getTag() + '\'...');
new Message('audiostream-ready', this.clientId).send(socket);
global.player.prepare(this.size, this.threshold);
});
socket.on('error', (error) => {
logger.error('error connecting to audio server \'' + this.getTag() + '\': ' + error);
});
socket.on('timeout', () => {
logger.warn('connection to audio server \'' + this.getTag() + '\' timed out');
});
socket.on('data', (data) => {
global.player.feed(data);
});
socket.on('end', () => {
logger.info('connection to audio server \'' + this.getTag() + '\' ended');
});
socket.on('close', () => {
logger.info('connection to audio server \'' + this.getTag() + '\' closed');
});
// global.player.on('spawned', () => {
// while (this.buffer.length > 0) {
// const tmp = this.buffer[0];
// this.buffer.shift();
// global.player.feed(tmp);
// }
// });
global.player.on('ready', () => {
global.player.play();
});
}
}
module.exports = Audiostream;

125
classes/Connection.js Normal file
View file

@ -0,0 +1,125 @@
const util = require('../libs/util.js');
const net = require('net');
const Heartbeat = require('./Heartbeat.js');
const EventParser = require('./EventParser.js');
const Audiostream = require('./Audiostream.js');
class Connection {
constructor() {
if (util.isUnset(config.server.host)) {
logger.warn('host is not defined - defaulting to \'127.0.0.1\'...');
}
if (util.isUnset(config.server.port)) {
logger.warn('port is not defined - defaulting to \'3000\'...');
}
this.host = config?.server?.host || "127.0.0.1";
this.port = config?.server?.port || "3000";
}
getHost() {
return this.host;
}
getPort() {
return this.port;
}
getTag() {
return this.getHost() + ':' + this.getPort();
}
initialize() {
return this.#handleSocket(net.connect({
host: this.host,
port: this.port
}));
}
#handleSocket(socket) {
return new Promise((resolve, reject) => {
socket.on('connect', () => {
this.#handleEventConnect(resolve, socket);
});
socket.on('error', (error) => {
this.#handleEventError(reject, error);
});
});
}
#handleEventConnect(resolve, socket) {
logger.info('connected to communication server \'' + this.getTag() + '\'...');
this.socket = socket;
this.eventParser = new EventParser();
this.heartbeat = new Heartbeat(this.eventParser);
this.#handleHeartbeat();
socket.on('timeout', () => {
this.#handleEventTimeout();
});
socket.on('close', () => {
this.#handleEventClose(resolve);
});
socket.on('end', () => {
this.#handleEventEnd();
});
socket.on('data', (data) => {
this.#handleEventData(data);
});
this.eventParser.on('audiostream-initialize', (data) => {
logger.debug('handling event \'audiostream-initialize\'...');
new Audiostream(data)
});
}
async #handleEventData(data) {
this.eventParser.parse(data);
}
#handleEventTimeout() {
logger.warn('connection to communication server \'' + this.getTag() + '\' timed out');
}
#handleEventError(reject, error) {
this.destroy();
return reject('error connecting to communication server \'' + this.getTag() + '\': ' + error);
}
#handleEventEnd() {
logger.info('connection to communication server \'' + this.getTag() + '\' ended');
}
#handleEventClose(resolve) {
logger.info('connection to communication server \'' + this.getTag() + '\' closed');
this.destroy();
return resolve();
}
#handleHeartbeat() {
this.heartbeat.on('timeout', () => {
logger.warn('heartbeat to communication server \'' + this.getTag() + '\' timed out');
});
}
destroy() {
if (this.heartbeat !== undefined) {
this.heartbeat.destroy();
this.heartbeat.removeAllListeners('timeout');
}
if (this.socket !== undefined) {
this.socket.removeAllListeners('connect');
this.socket.removeAllListeners('error');
this.socket.removeAllListeners('timeout');
this.socket.removeAllListeners('close');
this.socket.removeAllListeners('end');
this.socket.removeAllListeners('data');
this.socket.end();
this.socket.destroy();
this.socket = undefined;
}
}
}
module.exports = Connection;

30
classes/EventParser.js Normal file
View file

@ -0,0 +1,30 @@
const { EVENT_DELIMITER } = require('../libs/constants.js');
const EventEmitter = require('events');
class EventParser extends EventEmitter {
constructor() {
super();
this.buffer = '';
}
parse(data) {
if (data === undefined) {
return;
}
this.buffer += data;
const indexOfEnd = this.buffer.indexOf(EVENT_DELIMITER);
if (indexOfEnd === -1) {
return;
}
const event = JSON.parse(this.buffer.substring(0, indexOfEnd));
this.buffer = '';
if (event.id === undefined) {
return;
}
const eventId = event.id.toLowerCase();
this.emit(eventId, event.data);
}
}
module.exports = EventParser;

55
classes/Heartbeat.js Normal file
View file

@ -0,0 +1,55 @@
const EventEmitter = require('events');
const Message = require('./Message.js');
class Heartbeat extends EventEmitter {
constructor(eventParser) {
super();
this.interval = config?.server?.heartbeat || 10000;
this.eventParser = eventParser;
this.#listenForPingPong();
this.#sendPing();
}
async #sendPing() {
if (this.timeout !== undefined) {
clearTimeout(this.timeout);
}
if (this.alive === false) {
this.emit('timeout');
return;
} else if (this.alive === undefined) {
await new Promise((resolve, reject) => {
setTimeout(resolve, this.interval);
})
}
this.alive = false;
await new Message('ping').send();
this.timeout = setTimeout(() => {
this.#sendPing();
}, this.interval);
}
async #listenForPingPong() {
this.eventParser.on('ping', (data) => {
logger.debug('handling event \'ping\', responding with \'pong\'...');
data.client = Date.now();
new Message('pong', data).send();
});
this.eventParser.on('pong', () => {
logger.debug('handling event \'pong\'...');
this.alive = true;
});
}
destroy() {
if (this.timeout !== undefined) {
clearTimeout(this.timeout);
}
this.eventParser.removeAllListeners('ping');
this.eventParser.removeAllListeners('pong');
}
}
module.exports = Heartbeat;

145
classes/Logger.js Normal file
View file

@ -0,0 +1,145 @@
const moment = require('moment');
// constants
const LOG_PREFIX_DEBUG = 'debug';
const LOG_PREFIX_INFO = 'info';
const LOG_PREFIX_WARNING = 'warning';
const LOG_PREFIX_ERROR = 'error';
const LOGLEVEL_DEBUG = 0;
const LOGLEVEL_INFO = 1;
const LOGLEVEL_WARNING = 2;
const LOGLEVEL_ERROR = 3;
class Logger {
constructor(loglevel, timestamp) {
this.setLogLevel(loglevel);
this.setTimestamp(timestamp);
}
// set the loglevel
setLogLevel(value) {
switch (value) {
case LOG_PREFIX_DEBUG:
case LOGLEVEL_DEBUG:
this.loglevel = LOGLEVEL_DEBUG;
break;
case LOG_PREFIX_INFO:
case LOGLEVEL_INFO:
this.loglevel = LOGLEVEL_INFO;
break;
case LOG_PREFIX_WARNING:
case LOGLEVEL_WARNING:
this.loglevel = LOGLEVEL_WARNING;
break;
case LOG_PREFIX_ERROR:
case LOGLEVEL_ERROR:
this.loglevel = LOGLEVEL_ERROR;
break;
default:
this.loglevel = LOGLEVEL_INFO;
break;
}
}
// get the timestamp format
setTimestamp(value) {
this.timestamp = value || 'DD.MM.YYYY HH:mm:ss:SS';
}
// log a http request - response object
http(object) {
if (object === undefined) {
return;
}
let message = '[' + object.request.method + ':' + object.code + '] url: \'' + object.request.url + '\'';
let counter = 1;
for (let param in object.request.body) {
message += ', parameter ' + counter + ': \'' + param + '=' + object.request.body[param] + '\'';
counter++;
}
if (object.request.timestamp) {
message += ' > ' + (new Date().getTime() - object.request.timestamp) + 'ms';
}
if (object.data) {
message += ' > data: ' + object.data;
}
if (object.code != 200) {
error(message.trim());
return;
}
this.debug(message.trim());
}
// prefix log with 'info'
info(message) {
if (this.loglevel > LOGLEVEL_INFO) {
return;
}
this.trace(message);
}
// prefix log with 'info'
warn(message) {
if (this.loglevel > LOGLEVEL_WARNING) {
return;
}
this.trace(message, 'warning');
}
// prefix log with 'debug'
debug(message) {
if (this.loglevel > LOGLEVEL_DEBUG) {
return;
}
this.trace(message, 'debug');
}
// prefix log with 'error'
error(message) {
if (this.loglevel > LOGLEVEL_ERROR) {
return;
}
if (message.stack) {
this.trace(message.stack, 'error');
return;
}
if (message.errors !== undefined) {
for (let index = 0; index < message.errors.length; index++) {
this.trace(message.errors[index], 'error');
}
return;
}
this.trace(message.toString(), 'error');
}
// default logging function
trace(message, prefix) {
if (message === undefined || message === null || message.length === 0) {
return;
}
if (prefix === undefined || prefix === null || prefix.length === 0) {
prefix = 'info';
}
let print;
switch (prefix) {
case 'error':
print = console.error;
break;
case 'debug':
print = console.debug;
break;
case 'warning':
print = console.warn;
break;
default:
print = console.log;
}
message = moment().format(this.timestamp) + ' | ' + prefix + ' > ' + message;
print(message);
}
}
module.exports = Logger;

38
classes/Message.js Normal file
View file

@ -0,0 +1,38 @@
const { EVENT_DELIMITER } = require('../libs/constants.js');
class Message {
constructor(id, data) {
this.id = id;
this.data = data;
}
getId() {
return this.id;
}
getData() {
return this.data;
}
toString() {
return JSON.stringify(this);
}
async send(socket) {
if (socket === undefined) {
socket = connection.socket;
}
if (socket === undefined) {
return;
}
const data = this.toString();
logger.debug('sending data to \'' + socket.remoteAddress + ':' + socket.remotePort + '\': ' + data);
await new Promise((resolve, reject) => {
socket.write(data + EVENT_DELIMITER, resolve);
});
}
}
module.exports = Message;

191
classes/Player.js Normal file
View file

@ -0,0 +1,191 @@
const EventEmitter = require('events');
const { sleep } = require('../libs/util.js');
const { spawn } = require('child_process');
const fs = require('fs');
const STATE_SPAWNED = 'spawned';
const STATE_READY = 'ready';
const STATE_PLAYING = 'playing';
const STATE_PAUSED = 'paused';
const STATE_FINISHED = 'finished';
const STATE_ERROR = 'error';
class Player extends EventEmitter {
constructor() {
super();
this.position = 0;
this.events = [];
this.buffer = [];
this.buffersize = 0;
}
async prepare(size, threshold) {
logger.debug('preparing audio player...');
this.size = size;
this.threshold = threshold;
this.#reset();
await this.#spawnProcess();
}
feed(buffer) {
this.buffer.push(buffer);
this.buffersize += buffer.length;
if (this.isSpawned() && this.buffersize >= this.threshold) {
this.#setState(STATE_READY);
}
}
async play() {
if (this.buffer === undefined || this.buffer.length === 0) {
logger.warn('aborting playback of an empty buffer...');
return;
}
if (this.isPlaying()) {
this.stop();
}
await this.#spawnProcess();
this.process.stderr.on('data', (data) => {
this.#setState(STATE_PLAYING);
data = data.toString();
const position = data.toString().trim().split(' ')[0];
if (position.length === 0 || isNaN(position)) {
return;
}
this.position = position;
logger.debug('CURRENT POSITION: ' + this.position);
});
this.process.stdin.on('error', (error) => {
this.#setState(STATE_ERROR, error);
});
while (true) {
if (this.buffer.length === 0 && this.buffersize !== this.size) {
await sleep(1);
continue;
}
const tmp = this.buffer[0];
this.buffer.shift();
if (this.buffer.length === 0 && this.buffersize === this.size) {
this.process.stdin.end(tmp);
break;
}
this.process.stdin.write(tmp);
}
}
async pause() {
this.#reset(true);
}
async stop() {
this.#reset();
}
isSpawned() {
return this.state === STATE_SPAWNED;
}
isReady() {
return this.state === STATE_READY;
}
isPlaying() {
return this.state === STATE_PLAYING;
}
isPaused() {
return this.state === STATE_PAUSED;
}
isFinished() {
return this.state === STATE_FINISHED;
}
hasError() {
return this.state === STATE_ERROR;
}
async #spawnProcess() {
return new Promise((resolve, reject) => {
if (this.process !== undefined) {
resolve();
}
this.process = spawn("ffplay", ['-vn', '-nodisp', '-']);
this.process.on('error', (error) => {
this.#reset();
// TODO: try/catch error
reject('error spawning process \'ffplay\': ' + error);
});
this.process.on('exit', (code, signal) => {
let msg = 'process \'ffplay\' exited with';
if (code !== undefined) {
msg += ' code \'' + code + '\'';
} else {
msg += ' signal \'' + signal + '\'';
}
logger.debug(msg);
this.#closeStdIO();
});
this.process.on('close', (code, signal) => {
let msg = 'process \'ffplay\' closed with';
if (code !== undefined) {
msg += ' code \'' + code + '\'';
} else {
msg += ' signal \'' + signal + '\'';
}
logger.debug(msg);
this.process = undefined;
});
this.process.on('spawn', () => {
logger.info('spawned process \'ffplay\' (pid: ' + this.process.pid + ')...');
this.#setState(STATE_SPAWNED);
resolve();
});
});
}
#setState(state, data) {
if (this.state === state) {
return;
}
this.state = state;
logger.debug('setting state of audio player to \'' + state + '\'...');
if (this.events.includes(state)) {
return;
}
logger.debug('emitting state of audio player...');
this.emit(state, data);
this.events.push(state);
}
#killProcess() {
this.#closeStdIO();
if (this.process?.killed === false) {
this.process.kill('SIGTERM');
}
}
#closeStdIO() {
if (this.process?.stdio === undefined) {
return;
}
logger.debug('closing all stdio streams of process \'ffplay\' (pid: ' + this.process.pid + ')...');
for (let index = 0; index < this.process.stdio.length; index++) {
this.process.stdio[index].destroy();
}
}
#reset(keepBuffer) {
this.#killProcess();
this.position = 0;
this.events = [];
if (keepBuffer !== true) {
this.buffer = [];
this.buffersize = 0;
}
}
}
module.exports = Player;

15
example_config.json Normal file
View file

@ -0,0 +1,15 @@
{
"server": {
"host": "127.0.0.1",
"port": 3000,
"heartbeat": 10000
},
"log": {
"level": "debug",
"timestamp": "DD.MM.YYYY HH:mm:ss:SS"
},
"reconnect": {
"limit": 0,
"delay": 1000
}
}

69
kannon-client.js Normal file
View file

@ -0,0 +1,69 @@
const packageJSON = require('./package.json');
const path = require('path');
const Connection = require('./classes/Connection.js');
const Logger = require('./classes/Logger.js');
const Player = require('./classes/Player');
const INTERRUPTS = ['beforeExit', 'SIGINT', 'SIGTERM'];
main();
async function main() {
global.reconnects = 0;
global.logger = new Logger();
let configPath = path.resolve(process.argv[2] || __dirname + '/config.json');
try {
global.config = require(configPath);
global.logger.setLogLevel(global.config.log?.level);
global.logger.setTimestamp(global.config.log?.timestamp);
} catch (err) {
exit('could not read config file at \'' + configPath + '\'');
}
handleExit();
global.logger.info("launching " + packageJSON.name + " " + packageJSON.version + "...");
global.player = new Player();
global.connection = new Connection();
while (true) {
try {
await global.connection.initialize();
global.reconnects = 0;
} catch (err) {
const limit = global.config.reconnect?.limit;
if (isNaN(limit) || (global.reconnects >= limit && limit > 0)) {
return exit(err);
}
global.logger.error(err);
const delay = global.config.reconnect?.delay || 1000;
global.reconnects++;
global.logger.warn('retry ' + global.reconnects + '/' + limit + " in " + delay + 'ms...');
await new Promise((resolve, reject) => {
setTimeout(resolve, delay);
})
}
}
};
function handleExit() {
for (var index = 0; index < INTERRUPTS.length; index++) {
process.on(INTERRUPTS[index], (code) => {
exit(undefined, code);
});
}
}
function exit(err, code) {
if (code === undefined) {
code = 0;
if (err !== undefined) {
code = 1;
}
}
if (err) {
global.logger.error(err);
global.logger.error(packageJSON.name + ' ' + packageJSON.version + ' ended due to an error');
} else {
global.logger.info(packageJSON.name + ' ' + packageJSON.version + ' shutting down gracefully')
}
process.exit(code);
}

12
kannon-client.service Normal file
View file

@ -0,0 +1,12 @@
[Unit]
Description=kannon-client (a multi room audio player)
[Service]
Type=simple
User=root
Group=root
WorkingDirectory=/opt/kannon-client
ExecStart=/opt/nvm/nvm-exec node kannon-client.js
[Install]
WantedBy=multi-user.target

6
libs/constants.js Normal file
View file

@ -0,0 +1,6 @@
module.exports = {
SOCKET_EVENT_PING: 'ping',
SOCKET_EVENT_PONG: 'pong',
EVENT_DELIMITER: '<<< kannon >>>'
}

32
libs/util.js Normal file
View file

@ -0,0 +1,32 @@
function isEnabled(parameter) {
return isSet(parameter?.enabled) && parameter.enabled === true;
}
function isDisabled(parameter) {
return isSet(parameter?.enabled) && parameter.enabled === false;
}
function isSet(parameter) {
return parameter !== undefined;
}
function isUnset(parameter) {
return !isSet(parameter);
}
async function sleep(ms) {
if (isNaN(ms)) {
return;
}
return new Promise((resolve, reject) => {
setTimeout(resolve, ms);
});
}
module.exports = {
isEnabled,
isDisabled,
isSet,
isUnset,
sleep
}

31
package-lock.json generated Normal file
View file

@ -0,0 +1,31 @@
{
"name": "kannon-client",
"version": "0.0.1",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "kannon-client",
"version": "0.0.1",
"license": "MIT",
"dependencies": {
"moment": "^2.29.1"
}
},
"node_modules/moment": {
"version": "2.29.2",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.29.2.tgz",
"integrity": "sha512-UgzG4rvxYpN15jgCmVJwac49h9ly9NurikMWGPdVxm8GZD6XjkKPxDTjQQ43gtGgnV3X0cAyWDdP2Wexoquifg==",
"engines": {
"node": "*"
}
}
},
"dependencies": {
"moment": {
"version": "2.29.2",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.29.2.tgz",
"integrity": "sha512-UgzG4rvxYpN15jgCmVJwac49h9ly9NurikMWGPdVxm8GZD6XjkKPxDTjQQ43gtGgnV3X0cAyWDdP2Wexoquifg=="
}
}
}

21
package.json Normal file
View file

@ -0,0 +1,21 @@
{
"name": "kannon-client",
"version": "0.0.1",
"description": "a multi room audio player",
"main": "kannon-client.js",
"scripts": {},
"keywords": [
"audio",
"player",
"multi room"
],
"author": "Daniel Sommer <daniel.sommer@velvettear.de>",
"license": "MIT",
"repository": {
"type": "git",
"url": "https://git.velvettear.de/velvettear/kannon-client.git"
},
"dependencies": {
"moment": "^2.29.1"
}
}