from nicegui import ui import os import threading import time import subprocess import json import re import math import shutil from collections import deque from datetime import datetime # ============================================================================== # --- SEÇÃO 1: CONFIGURAÇÕES GLOBAIS E CONSTANTES --- # ============================================================================== ROOT_DIR = "/downloads" OUTPUT_BASE = "/downloads/finalizados" # Caminhos dos drivers Intel problemáticos (NÃO ALTERAR) BAD_DRIVERS = [ "/usr/lib/x86_64-linux-gnu/dri/iHD_drv_video.so", "/usr/lib/x86_64-linux-gnu/dri/iHD_drv_video.so.1" ] # Extensões de legenda que vamos procurar SUBTITLE_EXTS = ('.srt', '.sub', '.sbv', '.ass', '.vtt', '.ssa') # VARIÁVEIS DE ESTADO (MEMÓRIA RAM) CURRENT_STATUS = { "running": False, "stop_requested": False, "file": "", "pct_file": 0.0, "pct_total": 0.0, "current_index": 0, "total_files": 0, "log": "Aguardando...", "speed": "N/A" } # Histórico dos últimos 50 processamentos HISTORY_LOG = deque(maxlen=50) # ============================================================================== # --- SEÇÃO 2: UTILITÁRIOS (Backend) --- # ============================================================================== def prepare_driver_environment(): """Configura o ambiente para usar o driver i965 e remove os problemáticos.""" os.environ["LIBVA_DRIVER_NAME"] = "i965" for driver in BAD_DRIVERS: if os.path.exists(driver): try: os.remove(driver) except: pass def get_video_duration(filepath): """Usa ffprobe para descobrir a duração total do vídeo em segundos.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", filepath ] try: output = subprocess.check_output(cmd).decode().strip() return float(output) except: return 1.0 def parse_time_to_seconds(time_str): """Converte o timecode do FFmpeg (HH:MM:SS.ms) para segundos float.""" try: parts = time_str.split(':') h = int(parts[0]) m = int(parts[1]) s = float(parts[2]) return h * 3600 + m * 60 + s except: return 0.0 def format_size(size_bytes): """Formata bytes para leitura humana (MB, GB).""" if size_bytes == 0: return "0B" size_name = ("B", "KB", "MB", "GB", "TB") try: i = int(math.log(size_bytes, 1024) // 1) p = math.pow(1024, i) s = round(size_bytes / p, 2) return f"{s} {size_name[i]}" except: return f"{size_bytes} B" def clean_metadata_title(title): """Limpa o título das faixas de áudio/legenda usando Regex.""" if not title: return "" junk_terms = [ r'\b5\.1\b', r'\b7\.1\b', r'\b2\.0\b', r'\baac\b', r'\bac3\b', r'\beac3\b', r'\batmos\b', r'\bdts\b', r'\btruehd\b', r'\bh264\b', r'\bx264\b', r'\bx265\b', r'\bhevc\b', r'\b1080p\b', r'\b720p\b', r'\b4k\b', r'\bbludv\b', r'\bcomandotorrents\b', r'\brarbg\b', r'\bwww\..+\.com\b', r'\bcópia\b', r'\boriginal\b' ] clean_title = title for pattern in junk_terms: clean_title = re.sub(pattern, '', clean_title, flags=re.IGNORECASE) clean_title = re.sub(r'\s+', ' ', clean_title).strip() return clean_title.strip('-.|[]()').strip() def handle_external_subtitles(src_video_path, dest_folder, delete_original=False): """ Procura legendas externas com o mesmo nome base do vídeo e as copia/move. """ try: src_folder = os.path.dirname(src_video_path) video_name = os.path.basename(src_video_path) video_stem = os.path.splitext(video_name)[0] for file in os.listdir(src_folder): if file.lower().endswith(SUBTITLE_EXTS): if file.startswith(video_stem): remaining = file[len(video_stem):] if not remaining or remaining.startswith('.'): src_sub = os.path.join(src_folder, file) dest_sub = os.path.join(dest_folder, file) shutil.copy2(src_sub, dest_sub) if delete_original: os.remove(src_sub) except Exception as e: print(f"Erro legendas: {e}") # ============================================================================== # --- SEÇÃO 3: LÓGICA DO FFMPEG --- # ============================================================================== def build_ffmpeg_command(input_file, output_file): """Constrói o comando FFmpeg inteligente.""" cmd_probe = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", input_file] try: res = subprocess.run(cmd_probe, capture_output=True, text=True, env=os.environ) data = json.loads(res.stdout) except: return [ "ffmpeg", "-y", "-hwaccel", "vaapi", "-hwaccel_device", "/dev/dri/renderD128", "-hwaccel_output_format", "vaapi", "-i", input_file, "-map", "0", "-c:v", "h264_vaapi", "-qp", "25", "-c:a", "copy", "-c:s", "copy", output_file ] input_streams = data.get('streams', []) map_args = ["-map", "0:v:0"] metadata_args = [] found_pt_audio = False # ÁUDIO audio_idx = 0 for stream in input_streams: if stream['codec_type'] == 'audio': tags = stream.get('tags', {}) lang = tags.get('language', 'und').lower() title = tags.get('title', '') if lang in ['por', 'pt', 'pob', 'pt-br', 'eng', 'en', 'jpn', 'ja', 'und']: map_args.extend(["-map", f"0:{stream['index']}"]) new_title = clean_metadata_title(title) if not new_title: if lang in ['por', 'pt', 'pob', 'pt-br']: new_title = "Português" elif lang in ['eng', 'en']: new_title = "Inglês" elif lang in ['jpn', 'ja']: new_title = "Japonês" metadata_args.extend([f"-metadata:s:a:{audio_idx}", f"title={new_title}"]) if lang in ['por', 'pt', 'pob', 'pt-br'] and not found_pt_audio: metadata_args.extend([f"-disposition:a:{audio_idx}", "default"]) found_pt_audio = True else: metadata_args.extend([f"-disposition:a:{audio_idx}", "0"]) audio_idx += 1 if audio_idx == 0: map_args.extend(["-map", "0:a"]) # LEGENDAS INTERNAS sub_idx = 0 for stream in input_streams: if stream['codec_type'] == 'subtitle': tags = stream.get('tags', {}) lang = tags.get('language', 'und').lower() title = tags.get('title', '') is_forced = 'forced' in stream.get('disposition', {}) if lang in ['por', 'pt', 'pob', 'pt-br']: map_args.extend(["-map", f"0:{stream['index']}"]) new_title = clean_metadata_title(title) metadata_args.extend([f"-metadata:s:s:{sub_idx}", f"title={new_title}"]) if is_forced or "forç" in (title or "").lower(): metadata_args.extend([f"-disposition:s:{sub_idx}", "forced"]) else: metadata_args.extend([f"-disposition:s:{sub_idx}", "0"]) sub_idx += 1 cmd = [ "ffmpeg", "-y", "-hwaccel", "vaapi", "-hwaccel_device", "/dev/dri/renderD128", "-hwaccel_output_format", "vaapi", "-i", input_file ] cmd += map_args cmd += ["-c:v", "h264_vaapi", "-qp", "25", "-compression_level", "0", "-c:a", "copy", "-c:s", "copy"] cmd += metadata_args cmd.append(output_file) return cmd # ============================================================================== # --- SEÇÃO 4: WORKER THREAD --- # ============================================================================== class EncoderWorker(threading.Thread): def __init__(self, input_folder, delete_original=False): super().__init__() self.input_folder = input_folder self.delete_original = delete_original self.daemon = True def run(self): global CURRENT_STATUS, HISTORY_LOG prepare_driver_environment() CURRENT_STATUS["running"] = True CURRENT_STATUS["stop_requested"] = False CURRENT_STATUS["log"] = "Escaneando arquivos..." files_to_process = [] for r, d, f in os.walk(self.input_folder): if "finalizados" in r or "temp" in r: continue for file in f: if file.lower().endswith(('.mkv', '.mp4', '.avi')): files_to_process.append(os.path.join(r, file)) CURRENT_STATUS["total_files"] = len(files_to_process) for i, fpath in enumerate(files_to_process): if CURRENT_STATUS["stop_requested"]: break fname = os.path.basename(fpath) CURRENT_STATUS["file"] = fname CURRENT_STATUS["current_index"] = i + 1 CURRENT_STATUS["pct_file"] = 0 CURRENT_STATUS["pct_total"] = int((i / len(files_to_process)) * 100) rel = os.path.relpath(fpath, self.input_folder) out_file = os.path.join(OUTPUT_BASE, os.path.basename(self.input_folder), rel) out_file = os.path.splitext(out_file)[0] + ".mkv" dest_folder = os.path.dirname(out_file) os.makedirs(dest_folder, exist_ok=True) size_before = os.path.getsize(fpath) cmd = build_ffmpeg_command(fpath, out_file) total_sec = get_video_duration(fpath) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=os.environ) for line in proc.stdout: if CURRENT_STATUS["stop_requested"]: proc.terminate() break if "time=" in line: match = re.search(r"time=(\d{2}:\d{2}:\d{2}\.\d{2})", line) if match: sec = parse_time_to_seconds(match.group(1)) pct = min(int((sec/total_sec)*100), 100) CURRENT_STATUS["pct_file"] = pct speed = re.search(r"speed=\s*(\S+)", line) if speed: CURRENT_STATUS["log"] = f"Vel: {speed.group(1)}" proc.wait() final_status = "Erro" if CURRENT_STATUS["stop_requested"]: if os.path.exists(out_file): os.remove(out_file) final_status = "Cancelado" elif proc.returncode == 0: final_status = "✅ Sucesso" size_after = os.path.getsize(out_file) if os.path.exists(out_file) else 0 diff = size_after - size_before HISTORY_LOG.appendleft({ "time": datetime.now().strftime("%H:%M:%S"), "file": fname, "status": final_status, "orig_size": format_size(size_before), "final_size": format_size(size_after), "diff": ("+" if diff > 0 else "") + format_size(diff) }) handle_external_subtitles(fpath, dest_folder, self.delete_original) if self.delete_original: try: os.remove(fpath) CURRENT_STATUS["log"] = "Original excluído." except: pass else: HISTORY_LOG.appendleft({ "time": datetime.now().strftime("%H:%M:%S"), "file": fname, "status": "❌ Falha", "orig_size": format_size(size_before), "final_size": "-", "diff": "-" }) CURRENT_STATUS["running"] = False CURRENT_STATUS["log"] = "Parado" if CURRENT_STATUS["stop_requested"] else "Finalizado" CURRENT_STATUS["pct_file"] = 100 CURRENT_STATUS["pct_total"] = 100 # ============================================================================== # --- SEÇÃO 5: FRONTEND --- # ============================================================================== class EncoderInterface: def __init__(self): self.path = ROOT_DIR self.timer = None self.delete_switch = None self.main_container = None if CURRENT_STATUS["running"]: self.view_mode = 'monitor' else: self.view_mode = 'explorer' self.main_container = ui.column().classes('w-full h-full gap-4') self.refresh_ui() def refresh_ui(self): self.main_container.clear() with self.main_container: if self.view_mode == 'explorer': self.render_breadcrumbs() self.render_options() self.render_folder_list() self.render_history_btn() elif self.view_mode == 'monitor': self.render_monitor() elif self.view_mode == 'history': self.render_history_table() def navigate(self, path): if os.path.exists(path) and os.path.isdir(path): self.path = path self.refresh_ui() else: ui.notify('Erro ao acessar pasta', type='negative') def start_encoding(self): should_delete = self.delete_switch.value if self.delete_switch else False CURRENT_STATUS["pct_file"] = 0 CURRENT_STATUS["pct_total"] = 0 t = EncoderWorker(self.path, delete_original=should_delete) t.start() ui.notify('Iniciando Conversão...', type='positive') self.view_mode = 'monitor' self.refresh_ui() def stop_encoding(self): CURRENT_STATUS["stop_requested"] = True ui.notify('Solicitando parada...', type='warning') def render_breadcrumbs(self): with ui.row().classes('w-full items-center bg-gray-100 p-2 rounded gap-1'): ui.button('🏠', on_click=lambda: self.navigate(ROOT_DIR)).props('flat dense text-color=grey-8') if self.path != ROOT_DIR: rel = os.path.relpath(self.path, ROOT_DIR) parts = rel.split(os.sep) acc = ROOT_DIR for part in parts: ui.icon('chevron_right', color='grey') acc = os.path.join(acc, part) ui.button(part, on_click=lambda p=acc: self.navigate(p)).props('flat dense no-caps text-color=primary') def render_options(self): with ui.card().classes('w-full mt-2 p-2 bg-blue-50'): with ui.row().classes('items-center w-full justify-between'): self.delete_switch = ui.switch('Excluir original ao finalizar com sucesso?').props('color=red') ui.button("🚀 Iniciar Conversão", on_click=self.start_encoding).props('push color=primary') def render_folder_list(self): try: entries = sorted([e for e in os.scandir(self.path) if e.is_dir() and not e.name.startswith('.')], key=lambda e: e.name.lower()) except: return with ui.column().classes('w-full gap-1 mt-2'): if self.path != ROOT_DIR: with ui.item(on_click=lambda: self.navigate(os.path.dirname(self.path))).classes('bg-gray-200 hover:bg-gray-300 cursor-pointer rounded'): # Sintaxe correta: usando context manager (with) with ui.item_section().props('avatar'): ui.icon('arrow_upward', color='grey') with ui.item_section(): ui.item_label('.. (Subir nível)') for entry in entries: with ui.item(on_click=lambda p=entry.path: self.navigate(p)).classes('hover:bg-blue-50 cursor-pointer rounded border-b border-gray-100'): with ui.item_section().props('avatar'): ui.icon('folder', color='amber') with ui.item_section(): ui.item_label(entry.name).classes('font-medium') def render_history_btn(self): ui.separator().classes('mt-4') ui.button('Ver Histórico (Últimos 50)', on_click=lambda: self.set_view('history')).props('outline w-full') def set_view(self, mode): self.view_mode = mode self.refresh_ui() def render_history_table(self): ui.label('Histórico de Conversões').classes('text-xl font-bold mb-4') columns = [ {'name': 'time', 'label': 'Hora', 'field': 'time', 'align': 'left'}, {'name': 'file', 'label': 'Arquivo', 'field': 'file', 'align': 'left'}, {'name': 'status', 'label': 'Status', 'field': 'status', 'align': 'center'}, {'name': 'orig', 'label': 'Tam. Orig.', 'field': 'orig_size'}, {'name': 'final', 'label': 'Tam. Final', 'field': 'final_size'}, {'name': 'diff', 'label': 'Diferença', 'field': 'diff'}, ] ui.table(columns=columns, rows=list(HISTORY_LOG), row_key='file').classes('w-full') ui.button('Voltar', on_click=lambda: self.set_view('explorer')).props('outline mt-4') def render_monitor(self): ui.label('Monitor de Conversão').classes('text-xl font-bold mb-4') lbl_file = ui.label('Inicializando...') progress_file = ui.linear_progress(value=0).classes('w-full') lbl_log = ui.label('---').classes('text-caption text-grey') ui.separator().classes('my-4') lbl_total = ui.label('Total: 0/0') progress_total = ui.linear_progress(value=0).classes('w-full') row_btns = ui.row().classes('mt-6 gap-4') with row_btns: btn_stop = ui.button('🛑 Parar Tudo', on_click=self.stop_encoding).props('color=red') btn_back = ui.button('Voltar / Novo', on_click=lambda: self.set_view('explorer')).props('outline') btn_back.set_visibility(False) def update_monitor(): if not CURRENT_STATUS["running"] and CURRENT_STATUS["pct_total"] >= 100: btn_stop.set_visibility(False) btn_back.set_visibility(True) lbl_file.text = "Todos os processos finalizados." lbl_file.text = f"Arquivo: {CURRENT_STATUS['file']}" progress_file.value = CURRENT_STATUS['pct_file'] / 100 lbl_log.text = f"{int(CURRENT_STATUS['pct_file'])}% | {CURRENT_STATUS['log']}" lbl_total.text = f"Fila: {CURRENT_STATUS['current_index']} de {CURRENT_STATUS['total_files']}" progress_total.value = CURRENT_STATUS['pct_total'] / 100 self.timer = ui.timer(0.5, update_monitor) def create_ui(): return EncoderInterface()