implemented on-the-fly conversion to pcm data via ffmpeg for audio files

This commit is contained in:
Daniel Sommer 2022-05-04 15:05:57 +02:00
parent 73824f93d9
commit a535aafbfe
4 changed files with 238 additions and 60 deletions

View file

@ -38,14 +38,14 @@ class Api {
if (global.audioserver !== undefined) { if (global.audioserver !== undefined) {
await global.audioserver.destroy(); await global.audioserver.destroy();
} }
global.audioserver = new AudioServer('/home/velvettear/mounts/kingston/public/test.pcm'); global.audioserver = new AudioServer('/home/velvettear/mounts/kingston/public/YOU_SUFFER.mp3');
}); });
this.#registerEndpoint(constants.API_RESUME, constants.REQUEST_METHOD_POST, async () => { this.#registerEndpoint(constants.API_RESUME, constants.REQUEST_METHOD_POST, async () => {
if (global.audioserver === undefined) { if (global.audioserver === undefined) {
return; return;
} }
await global.audioserver.destroy(); await global.audioserver.destroy();
global.audioserver = new AudioServer('/home/velvettear/mounts/kingston/public/test.pcm', global.audioserver.progress); global.audioserver = new AudioServer('/home/velvettear/mounts/kingston/public/YOU_SUFFER.mp3', global.audioserver.progress);
}); });
this.#registerEndpoint(constants.API_PAUSE, constants.REQUEST_METHOD_POST, async () => { this.#registerEndpoint(constants.API_PAUSE, constants.REQUEST_METHOD_POST, async () => {
if (global.audioserver === undefined) { if (global.audioserver === undefined) {

View file

@ -1,17 +1,14 @@
const net = require('net'); const net = require('net');
const { stat, open } = require('fs/promises'); const { stat, open } = require('fs/promises');
const Message = require('./Message.js'); const Message = require('./Message.js');
const constants = require('../libs/constants.js'); const PCMStream = require('./PCMStream.js');
class AudioServer { class AudioServer {
constructor(file, progress) { constructor(file, progress) {
this.listen = config?.server?.listen || '0.0.0.0'; this.listen = config?.server?.listen || '0.0.0.0';
this.port = 0; this.port = 0;
this.buffer = { this.file = file;
file: file
};
this.clients = []; this.clients = [];
this.sockets = []; this.sockets = [];
this.broadcasts = {}; this.broadcasts = {};
@ -37,36 +34,32 @@ class AudioServer {
this.sockets.push(socket); this.sockets.push(socket);
}); });
this.server.on('error', (err) => { this.server.on('error', (err) => {
logger.error('ERROR IN AUDIOSERVER ' + err);
reject('audio server encountered an error: ' + err); reject('audio server encountered an error: ' + err);
}); });
this.server.on('close', () => { this.server.on('close', () => {
logger.info('audio server closed'); logger.info('audio server closed');
}); });
}); });
await this.#prepareBuffer(); // TODO: GET AUDIO INFO FROM FILE AND PASS TO PCMSTREAM FOR CONVERSION
this.pcm = new PCMStream(this.file, this.progress);
try {
await this.pcm.prepare();
} catch (error) {
logger.error('encountered an error creating pcm stream: ' + error);
this.pcm = undefined;
this.aborted = true;
return;
}
this.#calculateThreshold();
this.#announceAudioServer(); this.#announceAudioServer();
} }
async #prepareBuffer() { #calculateThreshold() {
if (this.buffer.fd !== undefined) { this.threshold = config.audio?.threshold;
this.buffer.fd.close(); if (isNaN(this.threshold)) {
this.threshold = 16;
} }
if (this.buffer.stream?.destroyed === false) { this.threshold = this.threshold * 1024;
this.buffer.stream.close();
this.buffer.stream.destroy();
}
this.buffer.fd = await open(this.buffer.file);
this.buffer.stream = this.buffer.fd.createReadStream({
start: this.progress
});
const stats = await stat(this.buffer.file);
this.buffer.size = stats.size - this.progress;
let percentage = 30;
if (!(isNaN(config.audio?.threshold))) {
percentage = config.audio.threshold;
}
this.buffer.threshold = (this.buffer.size / 100) * percentage;
} }
#handleEvents() { #handleEvents() {
@ -111,12 +104,12 @@ class AudioServer {
logger.debug(msg); logger.debug(msg);
}); });
client.audiosocket.on('drain', () => { client.audiosocket.on('drain', () => {
if (this.buffer.stream === undefined || this.buffer.stream.isPaused() === false) { if (this.pcm === undefined || this.pcm.isPaused() === false) {
return; return;
} }
this.#isFileTransmitted(client); this.#checkFileTransmission(client);
// logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...'); // logger.debug(client.getTag() + ' backpressure is relieved, resuming read stream...');
this.buffer.stream.resume(); this.pcm.resume();
}); });
} }
@ -195,23 +188,17 @@ class AudioServer {
async #handleStatePlaying(client) { async #handleStatePlaying(client) {
logger.debug(client.getTag() + ' has started playback...'); logger.debug(client.getTag() + ' has started playback...');
// TODO: remove - test only
// await sleep(10000);
// this.#pausePlayback();
} }
async #handleStatePaused(client, data) { async #handleStatePaused(client, data) {
if (client === undefined || data === undefined) { if (client === undefined || data === undefined) {
return; return;
} }
logger.debug(client.getTag() + ' paused playback, progress: \'' + data.progress + '/' + this.buffer.size + '...'); logger.debug(client.getTag() + ' paused playback, progress: \'' + data.progress + '...');
// TODO: remove - test only
// await sleep(100);
// this.#startPlayback();
} }
async #handleStateStopped(client, data) { async #handleStateStopped(client, data) {
logger.debug(client.getTag() + ' stopped playback, progress: ' + data.progress + '/' + this.buffer.size + '...'); logger.debug(client.getTag() + ' stopped playback, progress: ' + data.progress + '...');
if (!this.#allClientsInState(constants.CLIENT_STATE_STOPPED)) { if (!this.#allClientsInState(constants.CLIENT_STATE_STOPPED)) {
return; return;
} }
@ -219,7 +206,7 @@ class AudioServer {
} }
async #handleStateError(client, data) { async #handleStateError(client, data) {
logger.error(client.getTag() + ' experienced an error during playback, progress: \'' + data.progress + '/' + this.buffer.size + ': ' + error); logger.error(client.getTag() + ' experienced an error during playback, progress: \'' + data.progress + ': ' + error);
} }
#getClientById(clientId) { #getClientById(clientId) {
@ -239,9 +226,8 @@ class AudioServer {
this.broadcasts[constants.CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', { this.broadcasts[constants.CLIENT_STATE_REGISTERED] = await new Message('audio:initialize', {
port: this.server.address().port, port: this.server.address().port,
settings: { settings: {
size: this.buffer.size, threshold: this.threshold,
threshold: this.buffer.threshold, // TODO: GET AUDIO INFO FROM DATABASE AND PASS TO CLIENT(S) FOR PLAYBACK
// TODO: GET AUDIO INFO FROM DATABASE
audio: { audio: {
channels: 2, channels: 2,
bitDepth: 16, bitDepth: 16,
@ -270,7 +256,8 @@ class AudioServer {
async #transmitFile() { async #transmitFile() {
const timestamp = Date.now(); const timestamp = Date.now();
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.buffer.stream.on('data', (data) => { this.pcm.resume();
this.pcm.on('data', (data) => {
for (let index = 0; index < this.clients.length; index++) { for (let index = 0; index < this.clients.length; index++) {
const client = this.clients[index]; const client = this.clients[index];
if (client.audiosocket.destroyed) { if (client.audiosocket.destroyed) {
@ -279,29 +266,24 @@ class AudioServer {
} }
if (client.audiosocket.write(data) !== true) { if (client.audiosocket.write(data) !== true) {
// logger.debug(client.getTag() + ' detected backpressure, pausing read stream...'); // logger.debug(client.getTag() + ' detected backpressure, pausing read stream...');
this.buffer.stream.pause(); this.pcm.pause();
continue; continue;
} }
this.#isFileTransmitted(client); this.#checkFileTransmission(client);
} }
}); });
this.buffer.stream.on('close', () => { this.pcm.on('close', () => {
logger.debug('reading file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.stream.bytesRead + ' bytes)'); logger.debug('transmitting to pcm data converted file \'' + this.file + '\' took ' + (Date.now() - timestamp) + 'ms');
resolve(); resolve();
}); });
this.buffer.stream.on('error', (error) => {
// TODO: handle with try / catch
logger.debug('STREAM ERROR!');
reject(error);
});
}); });
} }
#isFileTransmitted(client) { #checkFileTransmission(client) {
if (client?.audiosocket === undefined) { if (client?.audiosocket === undefined || this.pcm === undefined) {
return; return;
} }
if (client.audiosocket.bytesWritten < this.buffer.size) { if (this.pcm.destroyed !== true) {
return false; return false;
} }
logger.debug(client.getTag() + ' transmitted ' + client.audiosocket.bytesWritten + ' bytes after ' + (Date.now()) + 'ms'); logger.debug(client.getTag() + ' transmitted ' + client.audiosocket.bytesWritten + ' bytes after ' + (Date.now()) + 'ms');
@ -320,9 +302,6 @@ class AudioServer {
} }
audiosocket.destroy(); audiosocket.destroy();
} }
this.buffer?.fd?.close();
this.buffer?.stream?.close();
this.buffer?.stream?.destroy();
if (this.server?.listening !== true) { if (this.server?.listening !== true) {
return; return;
} }
@ -336,6 +315,10 @@ class AudioServer {
resolve(); resolve();
}); });
}); });
if (this.pcm.destroyed === false) {
await this.pcm.destroy();
}
this.pcm = undefined;
} }
} }

195
classes/PCMStream.js Normal file
View file

@ -0,0 +1,195 @@
const path = require('path');
const { spawn } = require('child_process');
const { tmpdir } = require('os');
const { unlink, open } = require('fs/promises');
const EventEmitter = require('events');
class PCMStream extends EventEmitter {
constructor(file, start, format, channels, sampleRate) {
super();
this.file = file;
this.start = start || 0;
this.discarded = 0;
this.ffmpeg = {
format: format || 'pcm_s16le',
channels: channels || 2,
sampleRate: sampleRate || 44100
};
this.fifo = {};
}
async prepare() {
if (this.file === undefined) {
throw new Error('cannot prepare pcm stream from an undefined file');
}
this.file = path.resolve(this.file);
await this.#createFifo();
await this.#spawnFFmpeg();
await this.#readFifo();
}
resume() {
this.fifo?.stream.on('data', async (data) => {
if (this.start === 0 || this.discarded >= this.start) {
this.emit('data', data);
return;
}
let tmp = data.length + this.discarded;
if (tmp < this.start) {
this.discarded = tmp;
return;
}
tmp = this.start - this.discarded;
this.discarded += tmp;
data = data.slice(tmp);
this.emit('data', data);
});
this.fifo?.stream?.resume();
}
pause() {
this.fifo?.stream?.pause();
this.fifo?.stream?.removeAllListeners('data');
}
isPaused() {
return this.fifo?.stream?.isPaused();
}
async #spawnFFmpeg() {
if (this.file === undefined) {
throw new Error('can not convert an undefined file to pcm');
}
const args = [
'-y',
'-i',
this.file,
'-acodec',
this.ffmpeg.format,
'-ac',
this.ffmpeg.channels,
'-ar',
this.ffmpeg.sampleRate,
'-f',
's16le',
this.fifo.file
];
return new Promise((resolve, reject) => {
this.ffmpeg.process = spawn('ffmpeg', args);
this.ffmpeg.process.on('spawn', () => {
logger.debug('successfully spawned process \'ffmpeg\' (args: ' + args + ') for pcm conversion...');
this.ffmpeg.timestamp = Date.now();
resolve();
});
this.ffmpeg.process.on('error', async (error) => {
logger.error('encountered an error spawning process \'ffmpeg\' (args: ' + args + ') for pcm conversion: ' + error);
await this.destroy();
reject(error);
});
this.ffmpeg.process.on('close', async (code, signal) => {
let msg = 'process \'ffmpeg\' (args: ' + args + ') closed';
if (code !== undefined) {
msg += ' with code \'' + code + '\'';
} else {
msg += ' with signal \'' + signal + '\'';
}
msg += ' after ' + (Date.now() - this.ffmpeg.timestamp) + 'ms';
logger.debug(msg);
await this.destroy();
this.emit('close');
});
});
}
async #createFifo() {
let fifo = path.join(tmpdir(), 'kannon.fifo');
try {
await unlink(fifo);
} catch (error) {
logger.debug('theres no fifo file to delete...');
}
this.fifo.process = spawn('mkfifo', [fifo]);
return new Promise((resolve, reject) => {
this.fifo.process.on('spawn', () => {
logger.debug('successfully spawned process \'mkfifo\' (args: ' + fifo + ')...');
this.fifo.file = fifo;
});
this.fifo.process.on('error', async (error) => {
logger.error('encountered an error spawning process \'mkfifo\' (args: \'' + fifo + '\'): ' + error);
await this.destroy();
reject(error);
});
this.fifo.process.on('close', (code, signal) => {
let msg = 'process \'mkfifo\' (args: \'' + fifo + '\') closed';
if (code !== undefined) {
msg += ' with code \'' + code + '\'';
} else {
msg += ' with signal ' + signal + '\'';
}
logger.debug(msg);
resolve();
});
});
}
async #readFifo() {
if (this.fifo.file === undefined) {
throw new Error('can not read from undefined fifo file');
}
const timestamp = Date.now();
this.fifo.fd = await open(this.fifo.file);
this.fifo.stream = this.fifo.fd.createReadStream();
return new Promise((resolve, reject) => {
this.fifo.stream.on('error', async (error) => {
logger.error('encountered an error reading from fifo file \'' + this.fifo + '\': ' + error);
await this.destroy();
reject(error);
});
this.fifo.stream.on('close', () => {
logger.debug('read stream for fifo file \'' + this.fifo.file + '\' closed after ' + (Date.now() - timestamp) + 'ms (read ' + this.fifo.stream.bytesRead + ' bytes)');
});
this.fifo.stream.on('readable', () => {
this.fifo.stream.removeAllListeners('readable');
resolve();
});
this.fifo.stream.on('drain', () => {
logger.warn('FIFO STREAM DRAINED');
});
});
}
async #deleteFifo() {
if (this.fifo.file === undefined) {
return;
}
try {
await unlink(this.fifo.file);
} catch (error) {
logger.error('encountered an error deleting the fifo file \'' + this.fifo.file + '\': ' + error);
}
}
async destroy() {
if (this.ffmpeg.process.killed != true) {
this.ffmpeg.process.kill();
this.ffmpeg.process = undefined;
}
if (this.fifo.process.killed !== true) {
this.fifo.process.kill();
this.fifo.process = undefined;
}
if (this.fifo.stream.destroyed !== true) {
this.fifo.stream.destroy();
this.fifo.stream = undefined;
}
if (this.fifo.fd.closed !== true) {
this.fifo.fd.close();
this.fifo.fd = undefined;
}
await this.#deleteFifo();
this.destroyed = true;
}
}
module.exports = PCMStream;

View file

@ -28,6 +28,6 @@
}, },
"audio": { "audio": {
"nodelay": false, "nodelay": false,
"threshold": 1 "threshold": 8
} }
} }