import { effect, signal, batch } from '@preact/signals-core' import { runCommand } from './runner.mjs' import serial from '../serial/serial.mjs' class EncoderManager { constructor() { this.encoder = signal(null) this.frameMatcher = new RegExp('frame= *(\\d+\\.?\\d*).*fps= *(\\d+\\.?\\d*).*bitrate= *(\\d+\\.?\\d*).*speed= *(\\d+\\.?\\d*)') this.repeatedMatcher = new RegExp('repeated (\\d+) time') this.stopError = null this.attempting = signal(false) this.encoderStatus = signal(null) this.data = '' this.failues = 0 this.logHistory = [] this.setup() } updateSmartMonitor(status) { if (status.errors) { if ((new Date() - status.lastError) > 1000 * 60 * 60) { status.errors = status.repeated = 0 status.lastError = null } } if (status.speed < 1) { status.showSlowSpeed = new Date() } else if (status.speed > 1 || (status.showSlowSpeed && new Date() - status.showSlowSpeed > 1000 * 60 * 60 * 2)) { status.showSlowSpeed = null } if (status.errors && status.errors > 100) { this.stopError = new Error('Too many errors') this.safeStop() return true } return false } setup() { effect(() => { let attempting = this.attempting.value let status = this.encoderStatus.value let encoder = this.encoder.value if (this.stopError) { this.io?.io.emit('encoder.stats', null) this.io?.io.emit('encoder.status', this.status()) return serial.updateDisplay( 'Stream was stopped:', this.stopError.message, ) } if (attempting || (encoder && !status)) { this.io?.io.emit('encoder.status', this.status()) return serial.updateDisplay( 'Filadelfia streamer', 'Starting [ ]', ) } if (!encoder && status) { this.io?.io.emit('encoder.stats', null) this.io?.io.emit('encoder.status', this.status()) return serial.updateDisplay( 'Error, no encoder', 'but there was status', ) } if (!encoder) { this.io?.io.emit('encoder.stats', null) this.io?.io.emit('encoder.status', this.status()) return serial.updateDisplay( 'Filadelfia streamer', 'Status: Offline.', ) } if (this.updateSmartMonitor(status)) { return } if (status.lastPrint && (new Date() - status.lastPrint) < 1000) { return } this.io?.io.emit('encoder.stats', status) status.lastPrint = new Date() let prefix = 'LIVE' if (status.showSlowSpeed) { prefix = 'SLOW SPEED' } if (status.errors) { prefix = `${status.errors} ERRORS` } let fps = `fps=${status.fps.toFixed(1)}` return serial.updateDisplay( prefix + (` spd=${status.speed % 1 === 0 ? status.speed : status.speed.toFixed(3)}x`).padStart(20 - prefix.length), fps + (` kbps=${Math.round(status.bitrate)}`.padStart(20 - fps.length)), ) }) } init(core, io) { this.core = core this.io = io if (!this.core.db.data.encoder_settings) { this.core.db.data.encoder_settings = { device: '', format_code: '', command: 'ffmpeg.exe -f decklink -pix_fmt yuv420p -vcodec libx264 -vf yadif=1 -b:v 8000k -maxrate 8000k -minrate 8000k -bufsize 8000k -x264-params "nal-hrd=cbr" -preset faster -tune zerolatency -g 100 -keyint_min 100 -force_key_frames "expr:gte(t,n_forced*2)" -profile:v main -r 50 -ac 2 -c:a libfdk_aac -b:a 164k -ar 44100 -f mp4 test.mp4' } this.core.db.write() } this.safeStart() } log(level, message, sendToLogger = true) { this.logHistory.unshift(`[${(new Date()).toISOString().replace('T', ' ').split('.')[0]}] ${message}`) this.logHistory = this.logHistory.slice(0, 40) if (sendToLogger) { this.core.log[level]('ENCODER: ' + message) } this.io.io.emit('encoder.status', this.status()) } safeStart() { this.stopError = null if (this.attempting.value) return batch(() => { this.encoderStatus.value = null this.attempting.value = true }) this.start() .then( encoder => { this.encoder.value = encoder }, err => { this.log('error', err.message) } ) .then(() => { this.attempting.value = false }) } safeStop(process) { let encoder = process || this.encoder.value || this.attempting.value if (!encoder || encoder === true) return try { encoder.stdin.write('q') } catch { encoder.kill() } setTimeout(() => { if (encoder.exitCode === null) { encoder.kill() } }, 1000) } async start() { let settings = this.core.db.data.encoder_settings let command = settings.command if (settings.command.indexOf('-f decklink')) { if ((settings.command.indexOf('-i ') < 0 && !settings.device) || (settings.command.indexOf('-format_code') < 0 && !settings.format_code)) { throw new Error('Missing deckling device or format_code') } command = command.replace('-f decklink ', `${settings.format_code} -f decklink ${settings.device} `) } let process = await runCommand('', command, this.fromEncoder.bind(this), true) this.attempting.value = process let index = 0 while (index < 100 && process.exitCode === null && !process.killed && !this.encoderStatus.value) { if (index > 0 && index % 10 === 0) { let bars = Buffer.from(new Array(Math.min(index / 10, 9)).fill(255)) serial.updateDisplay( 'Filadelfia streamer', Buffer.concat([ Buffer.from('Starting ['), bars, Buffer.from(']'.padStart(10 - bars.length)) ]), ) } await new Promise(res => setTimeout(res, 100)) index++ } if (process.exitCode !== null) { throw new Error('Failed to start encoder, exit code: ' + process.exitCode) } if (!this.encoderStatus.value) { this.safeStop(process) throw new Error('Encoder did not start successfully, shutting down') } this.listen(process) return process } fromEncoder(message, source) { let lowered = message.toLocaleLowerCase() let status = this.encoderStatus.value // Limit normal logging to log file a little bit from the encoder, just to not cause a massive spam in our log file let allowNormalLogging = !status || !status.lastLog let allowWarnLogging = allowNormalLogging if (status?.lastLog && new Date() - status.lastLog > 1000 * 57) { allowWarnLogging = true if (new Date() - status.lastLog > 1000 * 60) { allowNormalLogging = true status.lastLog = new Date() } } // Handle buffered overrun detection if (lowered.indexOf('decklink') > 0 && lowered.indexOf('buffer overrun') > 0 && status) { status.errors += 1 status.lastError = new Date() this.encoderStatus.value = status return this.log('warn', message, allowWarnLogging) } // Handle repeated error detection let repeated = this.repeatedMatcher.exec(lowered) if (repeated && status) { let newRepeated = Number(repeated[1]) if (status.repeated > newRepeated) { status.errors += newRepeated } else { status.errors += (newRepeated - status.repeated) } status.repeated = newRepeated return this.log('warn', message, allowWarnLogging) } // Handle anything that is not a frame progress if (message.indexOf('frame=') < 0) { if (status) { status.errors += 1 status.lastError = new Date() } return this.log('info', message, allowNormalLogging) } let groups = this.frameMatcher.exec(lowered) if (!groups) { return this.log('warn', message) } status = status || { errors: 0, repeated: 0, lastError: null, lastLog: new Date() } this.encoderStatus.value = { ...status, frame: Number(groups[1]), fps: Number(groups[2]), bitrate: Number(groups[3]), speed: Number(groups[4]), } } listen(process) { process.on('error', function(err) { this.log('error', 'Encode process encountered an error: ' + err.message) }) process.once('exit', (code) => { if (code !== 0) { this.log('error', 'Encoder returned error code: ' + code) } else { this.log('warn', 'Encoder closed normally') } batch(() => { this.encoder.value = this.encoderStatus.value = null }) }) } status() { return { status: this.encoderStatus.value, stopError: this.stopError?.message, starting: Boolean(this.attempting.value), running: Boolean(this.encoder.value), log: this.logHistory.join('\n'), settings: this.core.db.data.encoder_settings, } } } const encoder = new EncoderManager() export default encoder;