kannon/classes/PCMStream.js

197 lines
No EOL
6.7 KiB
JavaScript

const path = require('path');
const { spawn } = require('child_process');
const { tmpdir } = require('os');
const { unlink, open } = require('fs/promises');
const EventEmitter = require('events');
class PCMStream extends EventEmitter {
constructor(file, start, format, channels, sampleRate) {
super();
this.file = file;
this.start = start || 0;
this.discarded = 0;
this.ffmpeg = {
format: format || 'pcm_s16le',
channels: channels || 2,
sampleRate: sampleRate || 44100
};
this.fifo = {};
}
async prepare() {
if (this.file === undefined) {
throw new Error('cannot prepare pcm stream from an undefined file');
}
this.file = path.resolve(this.file);
await this.#createFifo();
await this.#spawnFFmpeg();
await this.#readFifo();
}
resume() {
this.fifo?.stream.on('data', async (data) => {
if (this.start === 0 || this.discarded >= this.start) {
this.emit('data', data);
return;
}
let tmp = data.length + this.discarded;
if (tmp < this.start) {
this.discarded = tmp;
return;
}
tmp = this.start - this.discarded;
this.discarded += tmp;
data = data.slice(tmp);
this.emit('data', data);
});
this.fifo?.stream?.resume();
}
pause() {
this.fifo?.stream?.pause();
this.fifo?.stream?.removeAllListeners('data');
}
isPaused() {
return this.fifo?.stream?.isPaused();
}
async #spawnFFmpeg() {
if (this.file === undefined) {
throw new Error('can not convert an undefined file to pcm');
}
const args = [
'-y',
'-i',
this.file,
'-acodec',
this.ffmpeg.format,
'-ac',
this.ffmpeg.channels,
'-ar',
this.ffmpeg.sampleRate,
'-f',
's16le',
this.fifo.file
];
return new Promise((resolve, reject) => {
this.ffmpeg.process = spawn('ffmpeg', args);
this.ffmpeg.process.on('spawn', () => {
logger.debug('successfully spawned process \'ffmpeg\' (args: ' + args + ') for pcm conversion...');
this.ffmpeg.timestamp = Date.now();
resolve();
});
this.ffmpeg.process.on('error', async (error) => {
logger.error('encountered an error spawning process \'ffmpeg\' (args: ' + args + ') for pcm conversion: ' + error);
await this.destroy();
reject(error);
});
this.ffmpeg.process.on('close', async (code, signal) => {
let msg = 'process \'ffmpeg\' (args: ' + args + ') closed';
if (code !== undefined) {
msg += ' with code \'' + code + '\'';
} else {
msg += ' with signal \'' + signal + '\'';
}
msg += ' after ' + (Date.now() - this.ffmpeg.timestamp) + 'ms';
logger.debug(msg);
await this.destroy();
this.emit('close');
});
});
}
async #createFifo() {
let fifo = path.join(tmpdir(), 'kannon.fifo');
try {
await unlink(fifo);
} catch (error) {
if (error.code !== 'ENOENT') {
logger.error('encountered an error deleting the fifo file \'' + fifo + '\': ' + error);
}
}
this.fifo.process = spawn('mkfifo', [fifo]);
return new Promise((resolve, reject) => {
this.fifo.process.on('spawn', () => {
logger.debug('successfully spawned process \'mkfifo\' (args: ' + fifo + ')...');
this.fifo.file = fifo;
});
this.fifo.process.on('error', async (error) => {
logger.error('encountered an error spawning process \'mkfifo\' (args: \'' + fifo + '\'): ' + error);
await this.destroy();
reject(error);
});
this.fifo.process.on('close', (code, signal) => {
let msg = 'process \'mkfifo\' (args: \'' + fifo + '\') closed';
if (code !== undefined) {
msg += ' with code \'' + code + '\'';
} else {
msg += ' with signal ' + signal + '\'';
}
logger.debug(msg);
resolve();
});
});
}
async #readFifo() {
if (this.fifo.file === undefined) {
throw new Error('can not read from undefined fifo file');
}
const timestamp = Date.now();
this.fifo.fd = await open(this.fifo.file);
this.fifo.stream = this.fifo.fd.createReadStream();
return new Promise((resolve, reject) => {
this.fifo.stream.on('error', async (error) => {
logger.error('encountered an error reading from fifo file \'' + this.fifo + '\': ' + error);
await this.destroy();
reject(error);
});
this.fifo.stream.on('close', () => {
logger.debug('read stream for fifo file \'' + this.fifo.file + '\' closed after ' + (Date.now() - timestamp) + 'ms (read ' + this.fifo.stream.bytesRead + ' bytes)');
});
this.fifo.stream.on('readable', () => {
this.fifo.stream.removeAllListeners('readable');
resolve();
});
this.fifo.stream.on('drain', () => {
logger.warn('FIFO STREAM DRAINED');
});
});
}
async #deleteFifo() {
if (this.fifo.file === undefined) {
return;
}
try {
await unlink(this.fifo.file);
} catch (error) {
logger.error('encountered an error deleting the fifo file \'' + this.fifo.file + '\': ' + error);
}
}
async destroy() {
if (this.ffmpeg.process.killed != true) {
this.ffmpeg.process.kill();
this.ffmpeg.process = undefined;
}
if (this.fifo.process.killed !== true) {
this.fifo.process.kill();
this.fifo.process = undefined;
}
if (this.fifo.stream.destroyed !== true) {
this.fifo.stream.destroy();
this.fifo.stream = undefined;
}
if (this.fifo.fd.closed !== true) {
this.fifo.fd.close();
this.fifo.fd = undefined;
}
await this.#deleteFifo();
this.destroyed = true;
}
}
module.exports = PCMStream;