diff --git a/classes/AudioServer.js b/classes/AudioServer.js index d8326df..59e7c8c 100644 --- a/classes/AudioServer.js +++ b/classes/AudioServer.js @@ -14,12 +14,14 @@ class AudioServer { this.listen = config?.server?.listen || '0.0.0.0'; this.port = 0; this.buffer = { - file: file + file: file, + stream: fs.createReadStream(file), + limit: (config?.audio.bufferlimit || 256) * 1048576 }; this.clients = []; this.sockets = []; this.playback = { - position: 0 + position: undefined }; this.server = net.createServer(); this.#prepare(); @@ -49,10 +51,9 @@ class AudioServer { this.buffer.size = stats.size; this.buffer.threshold = (this.buffer.size / 100) / (!isNaN(config.audio?.threshold) || 30); this.#announceAudioServer(); - this.#bufferFile(); } - #handleEvents(socket) { + #handleEvents() { eventparser.on('audio:register', (data) => { if (data?.clientId === undefined || data?.socket === undefined) { return; @@ -150,11 +151,32 @@ class AudioServer { async #sendData(client) { const timestamp = Date.now(); - const buffer = await this.#waitForBuffer(); - return new Promise((resolve, reject) => { - client.audiosocket.end(buffer, () => { - logger.debug(client.getTag() + ' sent audio file \'' + this.buffer.file + '\' after ' + (Date.now() - timestamp) + 'ms...'); - }); + const buffer = await this.#bufferFile(); + return new Promise(async (resolve, reject) => { + this.buffer.written = 0; + while (true) { + if (this.buffer.stream.bytesRead === this.buffer.size && buffer.length === 0) { + client.audiosocket.end(() => { + logger.debug(client.getTag() + ' sent audio file \'' + this.buffer.file + '\' after ' + (Date.now() - timestamp) + 'ms...'); + }); + break; + } + if (buffer.length === 0) { + this.buffer.stream.resume(); + await sleep(1); + continue; + } + if (!this.buffer.stream.isPaused()) { + this.buffer.stream.pause(); + } + const tmp = buffer[0]; + buffer.shift(); + client.audiosocket.write(tmp); + } + + // client.audiosocket.end(() => { + // logger.debug(client.getTag() + ' sent audio file \'' + this.buffer.file + '\' after ' + (Date.now() - timestamp) + 'ms...'); + // }); client.audiosocket.on('error', (error) => { logger.error(client.getTag() + ' encountered an error: ' + error); }); @@ -182,23 +204,31 @@ class AudioServer { } async #bufferFile() { - return new Promise((resolve, reject) => { + // const stream = fs.createReadStream(this.buffer.file); + const buffer = await new Promise((resolve, reject) => { const timestamp = Date.now(); const buffer = []; - const stream = fs.createReadStream(this.buffer.file); - stream.on('data', (data) => { + let resolved = false; + this.buffer.stream.on('data', (data) => { buffer.push(data); + if (resolved !== true && this.buffer.stream.bytesRead >= this.buffer.threshold) { + resolved = true; + logger.debug('buffering threshold of ' + this.buffer.threshold + ' bytes for file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms'); + resolve(buffer); + } }); - stream.on('close', () => { - this.buffer.data = Buffer.concat(buffer); - logger.debug('buffering file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.data.length + ' bytes)'); - resolve(); + this.buffer.stream.on('close', () => { + // this.buffer.data = Buffer.concat(buffer); + logger.debug('buffering file \'' + this.buffer.file + '\' took ' + (Date.now() - timestamp) + 'ms (size: ' + this.buffer.stream.bytesRead + ' bytes)'); + // resolve(); }); - stream.on('error', (error) => { + this.buffer.stream.on('error', (error) => { // TODO: handle with try/catch reject(error); }); }); + + return buffer; } #getClientById(clientId) { diff --git a/example_config_server.json b/example_config_server.json index 40a8941..8bbe359 100644 --- a/example_config_server.json +++ b/example_config_server.json @@ -22,6 +22,7 @@ "password": "kannon" }, "audio": { - "threshold": 10 + "threshold": 10, + "bufferlimit": 128 } } \ No newline at end of file