Files
pymediamanager/app/modules/encoder.py
2026-02-03 00:51:41 +00:00

462 lines
19 KiB
Python
Executable File

from nicegui import ui
import os
import threading
import time
import subprocess
import json
import re
import math
import shutil
from collections import deque
from datetime import datetime
# ==============================================================================
# --- SEÇÃO 1: CONFIGURAÇÕES GLOBAIS E CONSTANTES ---
# ==============================================================================
ROOT_DIR = "/downloads"
OUTPUT_BASE = "/downloads/finalizados"
# Caminhos dos drivers Intel problemáticos (NÃO ALTERAR)
BAD_DRIVERS = [
"/usr/lib/x86_64-linux-gnu/dri/iHD_drv_video.so",
"/usr/lib/x86_64-linux-gnu/dri/iHD_drv_video.so.1"
]
# Extensões de legenda que vamos procurar
SUBTITLE_EXTS = ('.srt', '.sub', '.sbv', '.ass', '.vtt', '.ssa')
# VARIÁVEIS DE ESTADO (MEMÓRIA RAM)
CURRENT_STATUS = {
"running": False,
"stop_requested": False,
"file": "",
"pct_file": 0.0,
"pct_total": 0.0,
"current_index": 0,
"total_files": 0,
"log": "Aguardando...",
"speed": "N/A"
}
# Histórico dos últimos 50 processamentos
HISTORY_LOG = deque(maxlen=50)
# ==============================================================================
# --- SEÇÃO 2: UTILITÁRIOS (Backend) ---
# ==============================================================================
def prepare_driver_environment():
"""Configura o ambiente para usar o driver i965 e remove os problemáticos."""
os.environ["LIBVA_DRIVER_NAME"] = "i965"
for driver in BAD_DRIVERS:
if os.path.exists(driver):
try:
os.remove(driver)
except: pass
def get_video_duration(filepath):
"""Usa ffprobe para descobrir a duração total do vídeo em segundos."""
cmd = [
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", filepath
]
try:
output = subprocess.check_output(cmd).decode().strip()
return float(output)
except:
return 1.0
def parse_time_to_seconds(time_str):
"""Converte o timecode do FFmpeg (HH:MM:SS.ms) para segundos float."""
try:
parts = time_str.split(':')
h = int(parts[0])
m = int(parts[1])
s = float(parts[2])
return h * 3600 + m * 60 + s
except:
return 0.0
def format_size(size_bytes):
"""Formata bytes para leitura humana (MB, GB)."""
if size_bytes == 0: return "0B"
size_name = ("B", "KB", "MB", "GB", "TB")
try:
i = int(math.log(size_bytes, 1024) // 1)
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s} {size_name[i]}"
except:
return f"{size_bytes} B"
def clean_metadata_title(title):
"""Limpa o título das faixas de áudio/legenda usando Regex."""
if not title: return ""
junk_terms = [
r'\b5\.1\b', r'\b7\.1\b', r'\b2\.0\b',
r'\baac\b', r'\bac3\b', r'\beac3\b', r'\batmos\b', r'\bdts\b', r'\btruehd\b',
r'\bh264\b', r'\bx264\b', r'\bx265\b', r'\bhevc\b', r'\b1080p\b', r'\b720p\b', r'\b4k\b',
r'\bbludv\b', r'\bcomandotorrents\b', r'\brarbg\b', r'\bwww\..+\.com\b',
r'\bcópia\b', r'\boriginal\b'
]
clean_title = title
for pattern in junk_terms:
clean_title = re.sub(pattern, '', clean_title, flags=re.IGNORECASE)
clean_title = re.sub(r'\s+', ' ', clean_title).strip()
return clean_title.strip('-.|[]()').strip()
def handle_external_subtitles(src_video_path, dest_folder, delete_original=False):
"""
Procura legendas externas com o mesmo nome base do vídeo e as copia/move.
"""
try:
src_folder = os.path.dirname(src_video_path)
video_name = os.path.basename(src_video_path)
video_stem = os.path.splitext(video_name)[0]
for file in os.listdir(src_folder):
if file.lower().endswith(SUBTITLE_EXTS):
if file.startswith(video_stem):
remaining = file[len(video_stem):]
if not remaining or remaining.startswith('.'):
src_sub = os.path.join(src_folder, file)
dest_sub = os.path.join(dest_folder, file)
shutil.copy2(src_sub, dest_sub)
if delete_original:
os.remove(src_sub)
except Exception as e:
print(f"Erro legendas: {e}")
# ==============================================================================
# --- SEÇÃO 3: LÓGICA DO FFMPEG ---
# ==============================================================================
def build_ffmpeg_command(input_file, output_file):
"""Constrói o comando FFmpeg inteligente."""
cmd_probe = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", input_file]
try:
res = subprocess.run(cmd_probe, capture_output=True, text=True, env=os.environ)
data = json.loads(res.stdout)
except:
return [
"ffmpeg", "-y", "-hwaccel", "vaapi", "-hwaccel_device", "/dev/dri/renderD128",
"-hwaccel_output_format", "vaapi", "-i", input_file, "-map", "0",
"-c:v", "h264_vaapi", "-qp", "25", "-c:a", "copy", "-c:s", "copy", output_file
]
input_streams = data.get('streams', [])
map_args = ["-map", "0:v:0"]
metadata_args = []
found_pt_audio = False
# ÁUDIO
audio_idx = 0
for stream in input_streams:
if stream['codec_type'] == 'audio':
tags = stream.get('tags', {})
lang = tags.get('language', 'und').lower()
title = tags.get('title', '')
if lang in ['por', 'pt', 'pob', 'pt-br', 'eng', 'en', 'jpn', 'ja', 'und']:
map_args.extend(["-map", f"0:{stream['index']}"])
new_title = clean_metadata_title(title)
if not new_title:
if lang in ['por', 'pt', 'pob', 'pt-br']: new_title = "Português"
elif lang in ['eng', 'en']: new_title = "Inglês"
elif lang in ['jpn', 'ja']: new_title = "Japonês"
metadata_args.extend([f"-metadata:s:a:{audio_idx}", f"title={new_title}"])
if lang in ['por', 'pt', 'pob', 'pt-br'] and not found_pt_audio:
metadata_args.extend([f"-disposition:a:{audio_idx}", "default"])
found_pt_audio = True
else:
metadata_args.extend([f"-disposition:a:{audio_idx}", "0"])
audio_idx += 1
if audio_idx == 0: map_args.extend(["-map", "0:a"])
# LEGENDAS INTERNAS
sub_idx = 0
for stream in input_streams:
if stream['codec_type'] == 'subtitle':
tags = stream.get('tags', {})
lang = tags.get('language', 'und').lower()
title = tags.get('title', '')
is_forced = 'forced' in stream.get('disposition', {})
if lang in ['por', 'pt', 'pob', 'pt-br']:
map_args.extend(["-map", f"0:{stream['index']}"])
new_title = clean_metadata_title(title)
metadata_args.extend([f"-metadata:s:s:{sub_idx}", f"title={new_title}"])
if is_forced or "forç" in (title or "").lower():
metadata_args.extend([f"-disposition:s:{sub_idx}", "forced"])
else:
metadata_args.extend([f"-disposition:s:{sub_idx}", "0"])
sub_idx += 1
cmd = [
"ffmpeg", "-y", "-hwaccel", "vaapi", "-hwaccel_device", "/dev/dri/renderD128",
"-hwaccel_output_format", "vaapi", "-i", input_file
]
cmd += map_args
cmd += ["-c:v", "h264_vaapi", "-qp", "25", "-compression_level", "0", "-c:a", "copy", "-c:s", "copy"]
cmd += metadata_args
cmd.append(output_file)
return cmd
# ==============================================================================
# --- SEÇÃO 4: WORKER THREAD ---
# ==============================================================================
class EncoderWorker(threading.Thread):
def __init__(self, input_folder, delete_original=False):
super().__init__()
self.input_folder = input_folder
self.delete_original = delete_original
self.daemon = True
def run(self):
global CURRENT_STATUS, HISTORY_LOG
prepare_driver_environment()
CURRENT_STATUS["running"] = True
CURRENT_STATUS["stop_requested"] = False
CURRENT_STATUS["log"] = "Escaneando arquivos..."
files_to_process = []
for r, d, f in os.walk(self.input_folder):
if "finalizados" in r or "temp" in r: continue
for file in f:
if file.lower().endswith(('.mkv', '.mp4', '.avi')):
files_to_process.append(os.path.join(r, file))
CURRENT_STATUS["total_files"] = len(files_to_process)
for i, fpath in enumerate(files_to_process):
if CURRENT_STATUS["stop_requested"]: break
fname = os.path.basename(fpath)
CURRENT_STATUS["file"] = fname
CURRENT_STATUS["current_index"] = i + 1
CURRENT_STATUS["pct_file"] = 0
CURRENT_STATUS["pct_total"] = int((i / len(files_to_process)) * 100)
rel = os.path.relpath(fpath, self.input_folder)
out_file = os.path.join(OUTPUT_BASE, os.path.basename(self.input_folder), rel)
out_file = os.path.splitext(out_file)[0] + ".mkv"
dest_folder = os.path.dirname(out_file)
os.makedirs(dest_folder, exist_ok=True)
size_before = os.path.getsize(fpath)
cmd = build_ffmpeg_command(fpath, out_file)
total_sec = get_video_duration(fpath)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=os.environ)
for line in proc.stdout:
if CURRENT_STATUS["stop_requested"]:
proc.terminate()
break
if "time=" in line:
match = re.search(r"time=(\d{2}:\d{2}:\d{2}\.\d{2})", line)
if match:
sec = parse_time_to_seconds(match.group(1))
pct = min(int((sec/total_sec)*100), 100)
CURRENT_STATUS["pct_file"] = pct
speed = re.search(r"speed=\s*(\S+)", line)
if speed: CURRENT_STATUS["log"] = f"Vel: {speed.group(1)}"
proc.wait()
final_status = "Erro"
if CURRENT_STATUS["stop_requested"]:
if os.path.exists(out_file): os.remove(out_file)
final_status = "Cancelado"
elif proc.returncode == 0:
final_status = "✅ Sucesso"
size_after = os.path.getsize(out_file) if os.path.exists(out_file) else 0
diff = size_after - size_before
HISTORY_LOG.appendleft({
"time": datetime.now().strftime("%H:%M:%S"),
"file": fname,
"status": final_status,
"orig_size": format_size(size_before),
"final_size": format_size(size_after),
"diff": ("+" if diff > 0 else "") + format_size(diff)
})
handle_external_subtitles(fpath, dest_folder, self.delete_original)
if self.delete_original:
try:
os.remove(fpath)
CURRENT_STATUS["log"] = "Original excluído."
except: pass
else:
HISTORY_LOG.appendleft({
"time": datetime.now().strftime("%H:%M:%S"),
"file": fname,
"status": "❌ Falha",
"orig_size": format_size(size_before),
"final_size": "-",
"diff": "-"
})
CURRENT_STATUS["running"] = False
CURRENT_STATUS["log"] = "Parado" if CURRENT_STATUS["stop_requested"] else "Finalizado"
CURRENT_STATUS["pct_file"] = 100
CURRENT_STATUS["pct_total"] = 100
# ==============================================================================
# --- SEÇÃO 5: FRONTEND ---
# ==============================================================================
class EncoderInterface:
def __init__(self):
self.path = ROOT_DIR
self.timer = None
self.delete_switch = None
self.main_container = None
if CURRENT_STATUS["running"]:
self.view_mode = 'monitor'
else:
self.view_mode = 'explorer'
self.main_container = ui.column().classes('w-full h-full gap-4')
self.refresh_ui()
def refresh_ui(self):
self.main_container.clear()
with self.main_container:
if self.view_mode == 'explorer':
self.render_breadcrumbs()
self.render_options()
self.render_folder_list()
self.render_history_btn()
elif self.view_mode == 'monitor':
self.render_monitor()
elif self.view_mode == 'history':
self.render_history_table()
def navigate(self, path):
if os.path.exists(path) and os.path.isdir(path):
self.path = path
self.refresh_ui()
else:
ui.notify('Erro ao acessar pasta', type='negative')
def start_encoding(self):
should_delete = self.delete_switch.value if self.delete_switch else False
CURRENT_STATUS["pct_file"] = 0
CURRENT_STATUS["pct_total"] = 0
t = EncoderWorker(self.path, delete_original=should_delete)
t.start()
ui.notify('Iniciando Conversão...', type='positive')
self.view_mode = 'monitor'
self.refresh_ui()
def stop_encoding(self):
CURRENT_STATUS["stop_requested"] = True
ui.notify('Solicitando parada...', type='warning')
def render_breadcrumbs(self):
with ui.row().classes('w-full items-center bg-gray-100 p-2 rounded gap-1'):
ui.button('🏠', on_click=lambda: self.navigate(ROOT_DIR)).props('flat dense text-color=grey-8')
if self.path != ROOT_DIR:
rel = os.path.relpath(self.path, ROOT_DIR)
parts = rel.split(os.sep)
acc = ROOT_DIR
for part in parts:
ui.icon('chevron_right', color='grey')
acc = os.path.join(acc, part)
ui.button(part, on_click=lambda p=acc: self.navigate(p)).props('flat dense no-caps text-color=primary')
def render_options(self):
with ui.card().classes('w-full mt-2 p-2 bg-blue-50'):
with ui.row().classes('items-center w-full justify-between'):
self.delete_switch = ui.switch('Excluir original ao finalizar com sucesso?').props('color=red')
ui.button("🚀 Iniciar Conversão", on_click=self.start_encoding).props('push color=primary')
def render_folder_list(self):
try:
entries = sorted([e for e in os.scandir(self.path) if e.is_dir() and not e.name.startswith('.')], key=lambda e: e.name.lower())
except: return
with ui.column().classes('w-full gap-1 mt-2'):
if self.path != ROOT_DIR:
with ui.item(on_click=lambda: self.navigate(os.path.dirname(self.path))).classes('bg-gray-200 hover:bg-gray-300 cursor-pointer rounded'):
# Sintaxe correta: usando context manager (with)
with ui.item_section().props('avatar'):
ui.icon('arrow_upward', color='grey')
with ui.item_section():
ui.item_label('.. (Subir nível)')
for entry in entries:
with ui.item(on_click=lambda p=entry.path: self.navigate(p)).classes('hover:bg-blue-50 cursor-pointer rounded border-b border-gray-100'):
with ui.item_section().props('avatar'):
ui.icon('folder', color='amber')
with ui.item_section():
ui.item_label(entry.name).classes('font-medium')
def render_history_btn(self):
ui.separator().classes('mt-4')
ui.button('Ver Histórico (Últimos 50)', on_click=lambda: self.set_view('history')).props('outline w-full')
def set_view(self, mode):
self.view_mode = mode
self.refresh_ui()
def render_history_table(self):
ui.label('Histórico de Conversões').classes('text-xl font-bold mb-4')
columns = [
{'name': 'time', 'label': 'Hora', 'field': 'time', 'align': 'left'},
{'name': 'file', 'label': 'Arquivo', 'field': 'file', 'align': 'left'},
{'name': 'status', 'label': 'Status', 'field': 'status', 'align': 'center'},
{'name': 'orig', 'label': 'Tam. Orig.', 'field': 'orig_size'},
{'name': 'final', 'label': 'Tam. Final', 'field': 'final_size'},
{'name': 'diff', 'label': 'Diferença', 'field': 'diff'},
]
ui.table(columns=columns, rows=list(HISTORY_LOG), row_key='file').classes('w-full')
ui.button('Voltar', on_click=lambda: self.set_view('explorer')).props('outline mt-4')
def render_monitor(self):
ui.label('Monitor de Conversão').classes('text-xl font-bold mb-4')
lbl_file = ui.label('Inicializando...')
progress_file = ui.linear_progress(value=0).classes('w-full')
lbl_log = ui.label('---').classes('text-caption text-grey')
ui.separator().classes('my-4')
lbl_total = ui.label('Total: 0/0')
progress_total = ui.linear_progress(value=0).classes('w-full')
row_btns = ui.row().classes('mt-6 gap-4')
with row_btns:
btn_stop = ui.button('🛑 Parar Tudo', on_click=self.stop_encoding).props('color=red')
btn_back = ui.button('Voltar / Novo', on_click=lambda: self.set_view('explorer')).props('outline')
btn_back.set_visibility(False)
def update_monitor():
if not CURRENT_STATUS["running"] and CURRENT_STATUS["pct_total"] >= 100:
btn_stop.set_visibility(False)
btn_back.set_visibility(True)
lbl_file.text = "Todos os processos finalizados."
lbl_file.text = f"Arquivo: {CURRENT_STATUS['file']}"
progress_file.value = CURRENT_STATUS['pct_file'] / 100
lbl_log.text = f"{int(CURRENT_STATUS['pct_file'])}% | {CURRENT_STATUS['log']}"
lbl_total.text = f"Fila: {CURRENT_STATUS['current_index']} de {CURRENT_STATUS['total_files']}"
progress_total.value = CURRENT_STATUS['pct_total'] / 100
self.timer = ui.timer(0.5, update_monitor)
def create_ui():
return EncoderInterface()