2022-04-29 16:50:31 +02:00
const NodeSpeaker = require ( '../libs/speaker/index.js' ) ;
2022-04-27 15:52:14 +02:00
const EventEmitter = require ( 'events' ) ;
2022-05-03 14:58:02 +02:00
const { Readable , Writable } = require ( 'stream' ) ;
const { restart } = require ( '../libs/util.js' ) ;
const { Socket } = require ( 'net' ) ;
2022-04-27 15:52:14 +02:00
class StreamBuffer extends EventEmitter {
2022-04-29 16:50:31 +02:00
constructor ( stream , settings ) {
2022-04-27 15:52:14 +02:00
super ( ) ;
2022-05-02 14:20:33 +02:00
this . size = settings . size ;
2022-04-29 16:50:31 +02:00
this . times = {
start : Date . now ( )
} ;
this . playback = {
paused : false ,
hiccups : 0
} ;
2022-05-02 15:56:51 +02:00
this . audiosettings = settings ? . audio ;
2022-04-29 16:50:31 +02:00
this . # setupBuffer ( settings ? . threshold ) ;
2022-05-02 15:56:51 +02:00
this . # setupStreams ( stream ) ;
2022-04-27 15:52:14 +02:00
}
2022-05-02 14:20:33 +02:00
play ( ) {
2022-04-29 16:50:31 +02:00
this . # writeBufferedChunk ( 4096 , true ) ;
2022-04-27 15:52:14 +02:00
}
pause ( ) {
2022-04-29 16:50:31 +02:00
this . playback . paused = true ;
2022-05-02 14:20:33 +02:00
this . threshold . announced = false ;
this . streams . buffer . _readableState . buffer . clear ( ) ;
this . streams . buffer . _readableState . length = 0 ;
2022-04-29 16:50:31 +02:00
}
stop ( ) {
this . # destroy ( ) ;
}
getProgress ( ) {
2022-05-03 14:58:02 +02:00
return this . streams ? . output ? . bytesWritten || 0 ;
2022-04-29 16:50:31 +02:00
}
getHiccups ( ) {
return this . playback . hiccups ;
}
getTimes ( ) {
return this . times ;
2022-04-27 15:52:14 +02:00
}
# setupBuffer ( threshold ) {
if ( threshold === undefined || isNaN ( threshold ) ) {
2022-04-29 16:50:31 +02:00
threshold = 4096 ;
2022-04-27 15:52:14 +02:00
}
this . threshold = {
value : threshold ,
announced : false
} ;
this . limit = {
2022-05-02 15:03:07 +02:00
value : config ? . buffer ? . limit * 1048576 ,
2022-04-27 15:52:14 +02:00
announced : false
} ;
if ( isNaN ( this . limit . value ) || this . limit . value < this . threshold . value ) {
this . limit . value = this . threshold . value ;
}
}
2022-05-02 15:56:51 +02:00
# setupStreams ( stream ) {
2022-04-29 16:50:31 +02:00
if ( stream === undefined ) {
2022-04-27 15:52:14 +02:00
return ;
}
this . streams = {
2022-04-29 16:50:31 +02:00
input : stream ,
2022-04-27 15:52:14 +02:00
buffer : new Readable ( )
} ;
this . streams . buffer . _read = ( ) => { } ;
2022-05-02 15:56:51 +02:00
this . # createOutputStream ( ) ;
2022-04-27 15:52:14 +02:00
this . # handleBufferStream ( ) ;
this . # handleInputStream ( ) ;
this . # handleOutputStream ( ) ;
}
2022-05-02 15:56:51 +02:00
# createOutputStream ( ) {
if ( this . streams . output !== undefined ) {
this . streams . output . destroy ( ) ;
2022-05-03 14:58:02 +02:00
}
2022-05-02 15:56:51 +02:00
this . streams . output = new NodeSpeaker ( {
2022-05-02 15:58:10 +02:00
channels : this . audiosettings ? . channels || 2 ,
bitDepth : this . audiosettings ? . bitDepth || 16 ,
sampleRate : this . audiosettings ? . sampleRate || 44100 ,
2022-05-02 15:56:51 +02:00
} ) ;
}
2022-04-27 15:52:14 +02:00
# isThresholdReached ( ) {
2022-04-29 16:50:31 +02:00
return this . streams . buffer . readableLength >= this . threshold . value ;
2022-04-27 15:52:14 +02:00
}
# isThresholdAnnounced ( ) {
return this . threshold . announced === true ;
}
# isLimitReached ( ) {
2022-04-29 16:50:31 +02:00
return this . streams . buffer . readableLength >= this . limit . value ;
}
# writeBufferedChunk ( chunkSize , resume ) {
if ( resume === true ) {
this . playback . paused = false ;
this . emit ( 'play' ) ;
} else if ( this . playback . paused ) {
this . emit ( 'pause' ) ;
return ;
}
if ( chunkSize === undefined || isNaN ( chunkSize ) ) {
chunkSize = 4096 ;
}
if ( this . # isThresholdAnnounced === false ) {
return ;
}
let data ;
if ( this . streams . buffer . readableLength >= chunkSize ) {
data = this . streams . buffer . read ( chunkSize ) ;
} else {
data = this . streams . buffer . read ( ) ;
}
if ( data === undefined || data == null ) {
return ;
}
this . streams . output . write ( data ) ;
}
# fillBuffer ( ) {
const rebuffered = this . # rebufferChunk ( this . threshold . value ) ;
if ( rebuffered === undefined || this . # isThresholdAnnounced ( ) === true ) {
return ;
}
if ( this . # isThresholdReached ( ) ) {
this . times . threshold = Date . now ( ) ;
this . threshold . announced = true ;
this . emit ( constants . THRESHOLD ) ;
logger . debug ( 'buffer reached threshold of ' + this . threshold . value + ' bytes after ' + ( this . times . threshold - this . times . start ) + 'ms...' ) ;
return ;
}
this . # fillBuffer ( ) ;
}
# rebufferChunk ( chunkSize ) {
if ( chunkSize === undefined || isNaN ( chunkSize ) ) {
chunkSize = 4096 ;
}
if ( this . # isLimitReached ( ) ) {
return ;
}
let data ;
if ( this . streams . input . readableLength >= chunkSize ) {
data = this . streams . input . read ( chunkSize ) ;
} else {
data = this . streams . input . read ( ) ;
}
if ( data === undefined || data === null ) {
return ;
}
this . streams . buffer . push ( data ) ;
return data . length ;
2022-04-27 15:52:14 +02:00
}
# handleBufferStream ( ) {
if ( this . streams . buffer === undefined ) {
return ;
}
2022-04-29 16:50:31 +02:00
this . streams . buffer . on ( 'error' , ( error ) => {
logger . error ( 'buffer stream encountered an error: ' + error ) ;
2022-04-27 15:52:14 +02:00
} ) ;
2022-05-03 14:58:02 +02:00
this . streams . buffer . on ( 'close' , ( ) => {
if ( this . streams . buffer . destroyed !== true ) {
return this . streams . buffer . destroy ( ) ;
}
logger . debug ( 'buffer stream closed' ) ;
this . streams . buffer . removeAllListeners ( ) ;
} ) ;
2022-04-27 15:52:14 +02:00
}
# handleInputStream ( ) {
if ( this . streams . input === undefined ) {
return ;
}
2022-04-29 16:50:31 +02:00
this . streams . input . on ( 'error' , ( error ) => {
logger . error ( 'input stream encountered an error: ' + error ) ;
} ) ;
this . streams . input . on ( 'readable' , ( ) => {
this . # fillBuffer ( ) ;
} ) ;
this . streams . input . on ( 'close' , ( ) => {
2022-05-03 14:58:02 +02:00
if ( this . streams . input . destroyed !== true ) {
return this . streams . input . destroy ( ) ;
}
2022-04-29 16:50:31 +02:00
this . times . transmitted = Date . now ( ) ;
logger . debug ( 'input stream closed, transmitting file took ' + ( this . times . transmitted - this . times . start ) + 'ms' ) ;
2022-05-03 14:58:02 +02:00
this . streams . input . removeAllListeners ( ) ;
2022-04-27 15:52:14 +02:00
} ) ;
}
# handleOutputStream ( ) {
if ( this . streams . output === undefined ) {
return ;
}
2022-05-02 15:34:50 +02:00
this . streams . output . on ( 'error' , ( error ) => {
2022-04-29 16:50:31 +02:00
logger . error ( 'output stream encountered an error: ' + error ) ;
2022-05-03 14:58:02 +02:00
restart ( ) ;
2022-04-27 15:52:14 +02:00
} ) ;
2022-04-29 16:50:31 +02:00
this . streams . output . on ( 'written' , ( bytes ) => {
this . # writeBufferedChunk ( bytes ) ;
this . # rebufferChunk ( bytes ) ;
2022-04-27 15:52:14 +02:00
} ) ;
2022-04-29 16:50:31 +02:00
this . streams . output . on ( 'drain' , ( ) => {
this . times . drained = Date . now ( ) ;
2022-05-02 14:20:33 +02:00
logger . debug ( 'output stream is drained, wrote ' + this . streams . output . bytesWritten + '/' + this . size + ' bytes to the speaker after ' + ( this . times . drained - this . times . start ) + 'ms' ) ;
if ( this . streams . output . bytesWritten < this . size ) {
return ;
}
2022-04-29 16:50:31 +02:00
this . # destroy ( ) ;
2022-04-27 15:52:14 +02:00
} ) ;
2022-05-03 14:58:02 +02:00
this . streams . output . on ( 'close' , ( ) => {
if ( this . streams . output . destroyed !== true ) {
return this . streams . output . destroy ( ) ;
}
logger . debug ( 'output stream closed' ) ;
this . streams . output . removeAllListeners ( ) ;
} ) ;
2022-04-27 15:52:14 +02:00
this . streams . output . on ( 'hiccup' , ( ) => {
2022-04-29 16:50:31 +02:00
this . playback . hiccups ++ ;
logger . warn ( 'hiccup ' + this . playback . hiccups + ' detected...' ) ;
2022-04-27 15:52:14 +02:00
} ) ;
}
2022-04-29 16:50:31 +02:00
# destroy ( ) {
2022-05-03 14:58:02 +02:00
this . streams . input . destroy ( ) ;
this . streams . buffer . destroy ( ) ;
this . streams . output . close ( ) ;
2022-04-29 16:50:31 +02:00
this . times . stop = Date . now ( ) ;
this . emit ( 'close' ) ;
}
2022-04-27 15:52:14 +02:00
}
module . exports = StreamBuffer ;