296 lines
8.8 KiB
JavaScript
296 lines
8.8 KiB
JavaScript
|
import { effect, signal, batch } from '@preact/signals-core'
|
||
|
import { runCommand } from './runner.mjs'
|
||
|
import serial from '../serial/serial.mjs'
|
||
|
|
||
|
class EncoderManager {
|
||
|
constructor() {
|
||
|
this.encoder = signal(null)
|
||
|
this.frameMatcher = new RegExp('frame= *(\\d+\\.?\\d*).*fps= *(\\d+\\.?\\d*).*bitrate= *(\\d+\\.?\\d*).*speed= *(\\d+\\.?\\d*)')
|
||
|
this.repeatedMatcher = new RegExp('repeated (\\d+) time')
|
||
|
|
||
|
this.stopError = null
|
||
|
this.attempting = signal(false)
|
||
|
this.encoderStatus = signal(null)
|
||
|
this.data = ''
|
||
|
this.failues = 0
|
||
|
this.logHistory = []
|
||
|
this.setup()
|
||
|
}
|
||
|
|
||
|
updateSmartMonitor(status) {
|
||
|
if (status.errors) {
|
||
|
if ((new Date() - status.lastError) > 1000 * 60 * 60) {
|
||
|
status.errors = status.repeated = 0
|
||
|
status.lastError = null
|
||
|
}
|
||
|
}
|
||
|
if (status.speed < 1) {
|
||
|
status.showSlowSpeed = new Date()
|
||
|
} else if (status.speed > 1 || (status.showSlowSpeed && new Date() - status.showSlowSpeed > 1000 * 60 * 60 * 2)) {
|
||
|
status.showSlowSpeed = null
|
||
|
}
|
||
|
if (status.errors && status.errors > 100) {
|
||
|
this.stopError = new Error('Too many errors')
|
||
|
this.safeStop()
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
setup() {
|
||
|
effect(() => {
|
||
|
let attempting = this.attempting.value
|
||
|
let status = this.encoderStatus.value
|
||
|
let encoder = this.encoder.value
|
||
|
|
||
|
if (this.stopError) {
|
||
|
this.io?.io.emit('encoder.stats', null)
|
||
|
this.io?.io.emit('encoder.status', this.status())
|
||
|
return serial.updateDisplay(
|
||
|
'Stream was stopped:',
|
||
|
this.stopError.message,
|
||
|
)
|
||
|
}
|
||
|
if (attempting || (encoder && !status)) {
|
||
|
this.io?.io.emit('encoder.status', this.status())
|
||
|
return serial.updateDisplay(
|
||
|
'Filadelfia streamer',
|
||
|
'Starting [ ]',
|
||
|
)
|
||
|
}
|
||
|
if (!encoder && status) {
|
||
|
this.io?.io.emit('encoder.stats', null)
|
||
|
this.io?.io.emit('encoder.status', this.status())
|
||
|
return serial.updateDisplay(
|
||
|
'Error, no encoder',
|
||
|
'but there was status',
|
||
|
)
|
||
|
}
|
||
|
if (!encoder) {
|
||
|
this.io?.io.emit('encoder.stats', null)
|
||
|
this.io?.io.emit('encoder.status', this.status())
|
||
|
return serial.updateDisplay(
|
||
|
'Filadelfia streamer',
|
||
|
'Status: Offline.',
|
||
|
)
|
||
|
}
|
||
|
if (this.updateSmartMonitor(status)) {
|
||
|
return
|
||
|
}
|
||
|
if (status.lastPrint && (new Date() - status.lastPrint) < 1000) {
|
||
|
return
|
||
|
}
|
||
|
this.io?.io.emit('encoder.stats', status)
|
||
|
status.lastPrint = new Date()
|
||
|
let prefix = 'LIVE'
|
||
|
if (status.showSlowSpeed) {
|
||
|
prefix = 'SLOW SPEED'
|
||
|
}
|
||
|
if (status.errors) {
|
||
|
prefix = `${status.errors} ERRORS`
|
||
|
}
|
||
|
let fps = `fps=${status.fps.toFixed(1)}`
|
||
|
return serial.updateDisplay(
|
||
|
prefix + (` spd=${status.speed % 1 === 0 ? status.speed : status.speed.toFixed(3)}x`).padStart(20 - prefix.length),
|
||
|
fps + (` kbps=${Math.round(status.bitrate)}`.padStart(20 - fps.length)),
|
||
|
)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
init(core, io) {
|
||
|
this.core = core
|
||
|
this.io = io
|
||
|
if (!this.core.db.data.encoder_settings) {
|
||
|
this.core.db.data.encoder_settings = {
|
||
|
device: '',
|
||
|
format_code: '',
|
||
|
command: 'ffmpeg.exe -f decklink -pix_fmt yuv420p -vcodec libx264 -vf yadif=1 -b:v 8000k -maxrate 8000k -minrate 8000k -bufsize 8000k -x264-params "nal-hrd=cbr" -preset faster -tune zerolatency -g 100 -keyint_min 100 -force_key_frames "expr:gte(t,n_forced*2)" -profile:v main -r 50 -ac 2 -c:a libfdk_aac -b:a 164k -ar 44100 -f mp4 test.mp4'
|
||
|
}
|
||
|
this.core.db.write()
|
||
|
}
|
||
|
this.safeStart()
|
||
|
}
|
||
|
|
||
|
log(level, message, sendToLogger = true) {
|
||
|
this.logHistory.unshift(`[${(new Date()).toISOString().replace('T', ' ').split('.')[0]}] ${message}`)
|
||
|
this.logHistory = this.logHistory.slice(0, 40)
|
||
|
if (sendToLogger) {
|
||
|
this.core.log[level]('ENCODER: ' + message)
|
||
|
}
|
||
|
this.io.io.emit('encoder.status', this.status())
|
||
|
}
|
||
|
|
||
|
safeStart() {
|
||
|
this.stopError = null
|
||
|
if (this.attempting.value) return
|
||
|
|
||
|
batch(() => {
|
||
|
this.encoderStatus.value = null
|
||
|
this.attempting.value = true
|
||
|
})
|
||
|
|
||
|
this.start()
|
||
|
.then(
|
||
|
encoder => {
|
||
|
this.encoder.value = encoder
|
||
|
},
|
||
|
err => {
|
||
|
this.log('error', err.message)
|
||
|
}
|
||
|
)
|
||
|
.then(() => {
|
||
|
this.attempting.value = false
|
||
|
})
|
||
|
}
|
||
|
|
||
|
safeStop(process) {
|
||
|
let encoder = process || this.encoder.value || this.attempting.value
|
||
|
if (!encoder || encoder === true) return
|
||
|
try {
|
||
|
encoder.stdin.write('q')
|
||
|
} catch {
|
||
|
encoder.kill()
|
||
|
}
|
||
|
|
||
|
setTimeout(() => {
|
||
|
if (encoder.exitCode === null) {
|
||
|
encoder.kill()
|
||
|
}
|
||
|
}, 1000)
|
||
|
}
|
||
|
|
||
|
async start() {
|
||
|
let settings = this.core.db.data.encoder_settings
|
||
|
let command = settings.command
|
||
|
if (settings.command.indexOf('-f decklink')) {
|
||
|
if ((settings.command.indexOf('-i ') < 0 && !settings.device)
|
||
|
|| (settings.command.indexOf('-format_code') < 0 && !settings.format_code)) {
|
||
|
throw new Error('Missing deckling device or format_code')
|
||
|
}
|
||
|
command = command.replace('-f decklink ', `${settings.format_code} -f decklink ${settings.device} `)
|
||
|
}
|
||
|
|
||
|
let process = await runCommand('', command, this.fromEncoder.bind(this), true)
|
||
|
this.attempting.value = process
|
||
|
let index = 0
|
||
|
while (index < 100 && process.exitCode === null && !process.killed && !this.encoderStatus.value) {
|
||
|
if (index > 0 && index % 10 === 0) {
|
||
|
let bars = Buffer.from(new Array(Math.min(index / 10, 9)).fill(255))
|
||
|
serial.updateDisplay(
|
||
|
'Filadelfia streamer',
|
||
|
Buffer.concat([
|
||
|
Buffer.from('Starting ['),
|
||
|
bars,
|
||
|
Buffer.from(']'.padStart(10 - bars.length))
|
||
|
]),
|
||
|
)
|
||
|
}
|
||
|
await new Promise(res => setTimeout(res, 100))
|
||
|
index++
|
||
|
}
|
||
|
if (process.exitCode !== null) {
|
||
|
throw new Error('Failed to start encoder, exit code: ' + process.exitCode)
|
||
|
}
|
||
|
if (!this.encoderStatus.value) {
|
||
|
this.safeStop(process)
|
||
|
throw new Error('Encoder did not start successfully, shutting down')
|
||
|
}
|
||
|
this.listen(process)
|
||
|
|
||
|
return process
|
||
|
}
|
||
|
|
||
|
fromEncoder(message, source) {
|
||
|
let lowered = message.toLocaleLowerCase()
|
||
|
let status = this.encoderStatus.value
|
||
|
|
||
|
// Limit normal logging to log file a little bit from the encoder, just to not cause a massive spam in our log file
|
||
|
let allowNormalLogging = !status || !status.lastLog
|
||
|
let allowWarnLogging = allowNormalLogging
|
||
|
if (status?.lastLog && new Date() - status.lastLog > 1000 * 57) {
|
||
|
allowWarnLogging = true
|
||
|
if (new Date() - status.lastLog > 1000 * 60) {
|
||
|
allowNormalLogging = true
|
||
|
status.lastLog = new Date()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Handle buffered overrun detection
|
||
|
if (lowered.indexOf('decklink') > 0 && lowered.indexOf('buffer overrun') > 0 && status) {
|
||
|
status.errors += 1
|
||
|
status.lastError = new Date()
|
||
|
this.encoderStatus.value = status
|
||
|
return this.log('warn', message, allowWarnLogging)
|
||
|
}
|
||
|
|
||
|
// Handle repeated error detection
|
||
|
let repeated = this.repeatedMatcher.exec(lowered)
|
||
|
if (repeated && status) {
|
||
|
let newRepeated = Number(repeated[1])
|
||
|
if (status.repeated > newRepeated) {
|
||
|
status.errors += newRepeated
|
||
|
} else {
|
||
|
status.errors += (newRepeated - status.repeated)
|
||
|
}
|
||
|
status.repeated = newRepeated
|
||
|
return this.log('warn', message, allowWarnLogging)
|
||
|
}
|
||
|
|
||
|
// Handle anything that is not a frame progress
|
||
|
if (message.indexOf('frame=') < 0) {
|
||
|
if (status) {
|
||
|
status.errors += 1
|
||
|
status.lastError = new Date()
|
||
|
}
|
||
|
return this.log('info', message, allowNormalLogging)
|
||
|
}
|
||
|
let groups = this.frameMatcher.exec(lowered)
|
||
|
if (!groups) {
|
||
|
return this.log('warn', message)
|
||
|
}
|
||
|
status = status || {
|
||
|
errors: 0,
|
||
|
repeated: 0,
|
||
|
lastError: null,
|
||
|
lastLog: new Date()
|
||
|
}
|
||
|
this.encoderStatus.value = {
|
||
|
...status,
|
||
|
frame: Number(groups[1]),
|
||
|
fps: Number(groups[2]),
|
||
|
bitrate: Number(groups[3]),
|
||
|
speed: Number(groups[4]),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
listen(process) {
|
||
|
process.on('error', function(err) {
|
||
|
this.log('error', 'Encode process encountered an error: ' + err.message)
|
||
|
})
|
||
|
process.once('exit', (code) => {
|
||
|
if (code !== 0) {
|
||
|
this.log('error', 'Encoder returned error code: ' + code)
|
||
|
} else {
|
||
|
this.log('warn', 'Encoder closed normally')
|
||
|
}
|
||
|
batch(() => {
|
||
|
this.encoder.value = this.encoderStatus.value = null
|
||
|
})
|
||
|
})
|
||
|
}
|
||
|
|
||
|
status() {
|
||
|
return {
|
||
|
status: this.encoderStatus.value,
|
||
|
stopError: this.stopError?.message,
|
||
|
starting: Boolean(this.attempting.value),
|
||
|
running: Boolean(this.encoder.value),
|
||
|
log: this.logHistory.join('\n'),
|
||
|
settings: this.core.db.data.encoder_settings,
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
const encoder = new EncoderManager()
|
||
|
export default encoder;
|