church_streamer/api/encoder/encoder.mjs

296 lines
8.8 KiB
JavaScript
Raw Normal View History

import { effect, signal, batch } from '@preact/signals-core'
import { runCommand } from './runner.mjs'
import serial from '../serial/serial.mjs'
class EncoderManager {
constructor() {
this.encoder = signal(null)
this.frameMatcher = new RegExp('frame= *(\\d+\\.?\\d*).*fps= *(\\d+\\.?\\d*).*bitrate= *(\\d+\\.?\\d*).*speed= *(\\d+\\.?\\d*)')
this.repeatedMatcher = new RegExp('repeated (\\d+) time')
this.stopError = null
this.attempting = signal(false)
this.encoderStatus = signal(null)
this.data = ''
this.failues = 0
this.logHistory = []
this.setup()
}
updateSmartMonitor(status) {
if (status.errors) {
if ((new Date() - status.lastError) > 1000 * 60 * 60) {
status.errors = status.repeated = 0
status.lastError = null
}
}
if (status.speed < 1) {
status.showSlowSpeed = new Date()
} else if (status.speed > 1 || (status.showSlowSpeed && new Date() - status.showSlowSpeed > 1000 * 60 * 60 * 2)) {
status.showSlowSpeed = null
}
if (status.errors && status.errors > 100) {
this.stopError = new Error('Too many errors')
this.safeStop()
return true
}
return false
}
setup() {
effect(() => {
let attempting = this.attempting.value
let status = this.encoderStatus.value
let encoder = this.encoder.value
if (this.stopError) {
this.io?.io.emit('encoder.stats', null)
this.io?.io.emit('encoder.status', this.status())
return serial.updateDisplay(
'Stream was stopped:',
this.stopError.message,
)
}
if (attempting || (encoder && !status)) {
this.io?.io.emit('encoder.status', this.status())
return serial.updateDisplay(
'Filadelfia streamer',
'Starting [ ]',
)
}
if (!encoder && status) {
this.io?.io.emit('encoder.stats', null)
this.io?.io.emit('encoder.status', this.status())
return serial.updateDisplay(
'Error, no encoder',
'but there was status',
)
}
if (!encoder) {
this.io?.io.emit('encoder.stats', null)
this.io?.io.emit('encoder.status', this.status())
return serial.updateDisplay(
'Filadelfia streamer',
'Status: Offline.',
)
}
if (this.updateSmartMonitor(status)) {
return
}
if (status.lastPrint && (new Date() - status.lastPrint) < 1000) {
return
}
this.io?.io.emit('encoder.stats', status)
status.lastPrint = new Date()
let prefix = 'LIVE'
if (status.showSlowSpeed) {
prefix = 'SLOW SPEED'
}
if (status.errors) {
prefix = `${status.errors} ERRORS`
}
let fps = `fps=${status.fps.toFixed(1)}`
return serial.updateDisplay(
prefix + (` spd=${status.speed % 1 === 0 ? status.speed : status.speed.toFixed(3)}x`).padStart(20 - prefix.length),
fps + (` kbps=${Math.round(status.bitrate)}`.padStart(20 - fps.length)),
)
})
}
init(core, io) {
this.core = core
this.io = io
if (!this.core.db.data.encoder_settings) {
this.core.db.data.encoder_settings = {
device: '',
format_code: '',
command: 'ffmpeg.exe -f decklink -pix_fmt yuv420p -vcodec libx264 -vf yadif=1 -b:v 8000k -maxrate 8000k -minrate 8000k -bufsize 8000k -x264-params "nal-hrd=cbr" -preset faster -tune zerolatency -g 100 -keyint_min 100 -force_key_frames "expr:gte(t,n_forced*2)" -profile:v main -r 50 -ac 2 -c:a libfdk_aac -b:a 164k -ar 44100 -f mp4 test.mp4'
}
this.core.db.write()
}
this.safeStart()
}
log(level, message, sendToLogger = true) {
this.logHistory.unshift(`[${(new Date()).toISOString().replace('T', ' ').split('.')[0]}] ${message}`)
this.logHistory = this.logHistory.slice(0, 40)
if (sendToLogger) {
this.core.log[level]('ENCODER: ' + message)
}
this.io.io.emit('encoder.status', this.status())
}
safeStart() {
this.stopError = null
if (this.attempting.value) return
batch(() => {
this.encoderStatus.value = null
this.attempting.value = true
})
this.start()
.then(
encoder => {
this.encoder.value = encoder
},
err => {
this.log('error', err.message)
}
)
.then(() => {
this.attempting.value = false
})
}
safeStop(process) {
let encoder = process || this.encoder.value || this.attempting.value
if (!encoder || encoder === true) return
try {
encoder.stdin.write('q')
} catch {
encoder.kill()
}
setTimeout(() => {
if (encoder.exitCode === null) {
encoder.kill()
}
}, 1000)
}
async start() {
let settings = this.core.db.data.encoder_settings
let command = settings.command
if (settings.command.indexOf('-f decklink')) {
if ((settings.command.indexOf('-i ') < 0 && !settings.device)
|| (settings.command.indexOf('-format_code') < 0 && !settings.format_code)) {
throw new Error('Missing deckling device or format_code')
}
command = command.replace('-f decklink ', `${settings.format_code} -f decklink ${settings.device} `)
}
let process = await runCommand('', command, this.fromEncoder.bind(this), true)
this.attempting.value = process
let index = 0
while (index < 100 && process.exitCode === null && !process.killed && !this.encoderStatus.value) {
if (index > 0 && index % 10 === 0) {
let bars = Buffer.from(new Array(Math.min(index / 10, 9)).fill(255))
serial.updateDisplay(
'Filadelfia streamer',
Buffer.concat([
Buffer.from('Starting ['),
bars,
Buffer.from(']'.padStart(10 - bars.length))
]),
)
}
await new Promise(res => setTimeout(res, 100))
index++
}
if (process.exitCode !== null) {
throw new Error('Failed to start encoder, exit code: ' + process.exitCode)
}
if (!this.encoderStatus.value) {
this.safeStop(process)
throw new Error('Encoder did not start successfully, shutting down')
}
this.listen(process)
return process
}
fromEncoder(message, source) {
let lowered = message.toLocaleLowerCase()
let status = this.encoderStatus.value
// Limit normal logging to log file a little bit from the encoder, just to not cause a massive spam in our log file
let allowNormalLogging = !status || !status.lastLog
let allowWarnLogging = allowNormalLogging
if (status?.lastLog && new Date() - status.lastLog > 1000 * 57) {
allowWarnLogging = true
if (new Date() - status.lastLog > 1000 * 60) {
allowNormalLogging = true
status.lastLog = new Date()
}
}
// Handle buffered overrun detection
if (lowered.indexOf('decklink') > 0 && lowered.indexOf('buffer overrun') > 0 && status) {
status.errors += 1
status.lastError = new Date()
this.encoderStatus.value = status
return this.log('warn', message, allowWarnLogging)
}
// Handle repeated error detection
let repeated = this.repeatedMatcher.exec(lowered)
if (repeated && status) {
let newRepeated = Number(repeated[1])
if (status.repeated > newRepeated) {
status.errors += newRepeated
} else {
status.errors += (newRepeated - status.repeated)
}
status.repeated = newRepeated
return this.log('warn', message, allowWarnLogging)
}
// Handle anything that is not a frame progress
if (message.indexOf('frame=') < 0) {
if (status) {
status.errors += 1
status.lastError = new Date()
}
return this.log('info', message, allowNormalLogging)
}
let groups = this.frameMatcher.exec(lowered)
if (!groups) {
return this.log('warn', message)
}
status = status || {
errors: 0,
repeated: 0,
lastError: null,
lastLog: new Date()
}
this.encoderStatus.value = {
...status,
frame: Number(groups[1]),
fps: Number(groups[2]),
bitrate: Number(groups[3]),
speed: Number(groups[4]),
}
}
listen(process) {
process.on('error', function(err) {
this.log('error', 'Encode process encountered an error: ' + err.message)
})
process.once('exit', (code) => {
if (code !== 0) {
this.log('error', 'Encoder returned error code: ' + code)
} else {
this.log('warn', 'Encoder closed normally')
}
batch(() => {
this.encoder.value = this.encoderStatus.value = null
})
})
}
status() {
return {
status: this.encoderStatus.value,
stopError: this.stopError?.message,
starting: Boolean(this.attempting.value),
running: Boolean(this.encoder.value),
log: this.logHistory.join('\n'),
settings: this.core.db.data.encoder_settings,
}
}
}
const encoder = new EncoderManager()
export default encoder;