from nicegui import ui, app import os import threading import time import subprocess import json import re ROOT_DIR = "/downloads" OUTPUT_BASE = "/downloads/finalizados" STATUS_FILE = "/app/data/status.json" # --- BACKEND: PREPARAÇÃO DE DRIVERS --- def prepare_driver_environment(): os.environ["LIBVA_DRIVER_NAME"] = "i965" drivers_ruins = ["/usr/lib/x86_64-linux-gnu/dri/iHD_drv_video.so", "/usr/lib/x86_64-linux-gnu/dri/iHD_drv_video.so.1"] for driver in drivers_ruins: if os.path.exists(driver): try: os.remove(driver) except: pass # --- BACKEND: UTILS FFMPEG --- def get_video_duration(filepath): cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", filepath] try: return float(subprocess.check_output(cmd).decode().strip()) except: return None def parse_time_to_seconds(time_str): h, m, s = time_str.split(':') return int(h) * 3600 + int(m) * 60 + float(s) def get_streams_map(filepath): cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", filepath] try: res = subprocess.run(cmd, capture_output=True, text=True, env=os.environ) data = json.loads(res.stdout) except: return ["-map", "0"] map_args = ["-map", "0:v"] audio_found = False for s in data.get('streams', []): if s['codec_type'] == 'audio': lang = s.get('tags', {}).get('language', 'und').lower() if lang in ['por', 'pt', 'eng', 'en', 'jpn', 'ja', 'und']: map_args.extend(["-map", f"0:{s['index']}"]) audio_found = True if not audio_found: map_args.extend(["-map", "0:a"]) for s in data.get('streams', []): if s['codec_type'] == 'subtitle': lang = s.get('tags', {}).get('language', 'und').lower() if lang in ['por', 'pt', 'pob', 'pt-br']: map_args.extend(["-map", f"0:{s['index']}"]) return map_args # --- BACKEND: WORKER THREAD --- class EncoderWorker(threading.Thread): def __init__(self, input_folder): super().__init__() self.input_folder = input_folder self.daemon = True def run(self): prepare_driver_environment() files = [] for r, d, f in os.walk(self.input_folder): if "finalizados" in r or "temp" in r: continue for file in f: if file.lower().endswith(('.mkv', '.mp4', '.avi')): files.append(os.path.join(r, file)) total_files = len(files) stop_signal = False for i, fpath in enumerate(files): # Verifica Parada antes de começar o próximo if os.path.exists(STATUS_FILE): with open(STATUS_FILE, 'r') as f: if json.load(f).get('stop_requested'): stop_signal = True break fname = os.path.basename(fpath) # Status Inicial status = { "running": True, "stop_requested": False, "file": fname, "pct_file": 0, "pct_total": int((i / total_files) * 100), "current_index": i + 1, "total_files": total_files, "log": "Iniciando..." } with open(STATUS_FILE, 'w') as f: json.dump(status, f) rel = os.path.relpath(fpath, self.input_folder) out = os.path.join(OUTPUT_BASE, os.path.basename(self.input_folder), rel) os.makedirs(os.path.dirname(out), exist_ok=True) map_args = get_streams_map(fpath) cmd = [ "ffmpeg", "-y", "-hwaccel", "vaapi", "-hwaccel_device", "/dev/dri/renderD128", "-hwaccel_output_format", "vaapi", "-i", fpath ] cmd += map_args cmd += [ "-c:v", "h264_vaapi", "-qp", "25", "-compression_level", "0", "-c:a", "copy", "-c:s", "copy", out ] total_sec = get_video_duration(fpath) or 1 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=os.environ) for line in proc.stdout: # Verifica Parada DURANTE a conversão if "time=" in line: # Checa a cada atualização de tempo if os.path.exists(STATUS_FILE): with open(STATUS_FILE, 'r') as f: if json.load(f).get('stop_requested'): proc.terminate() # Mata o FFmpeg stop_signal = True break match = re.search(r"time=(\d{2}:\d{2}:\d{2}\.\d{2})", line) if match: sec = parse_time_to_seconds(match.group(1)) pct = min(int((sec/total_sec)*100), 100) status["pct_file"] = pct speed = re.search(r"speed=\s*(\S+)", line) if speed: status["log"] = f"Velocidade: {speed.group(1)}" with open(STATUS_FILE, 'w') as f: json.dump(status, f) proc.wait() if stop_signal: # Limpa arquivo incompleto se foi cancelado if os.path.exists(out): os.remove(out) break # Status Final final_msg = "Cancelado pelo usuário 🛑" if stop_signal else "Finalizado ✅" with open(STATUS_FILE, 'w') as f: json.dump({"running": False, "file": final_msg, "pct_file": 0 if stop_signal else 100, "pct_total": 100, "log": final_msg}, f) # --- FRONTEND: UI --- class EncoderInterface: def __init__(self): self.path = ROOT_DIR self.container = None self.view_mode = 'explorer' self.timer = None def navigate(self, path): if os.path.exists(path) and os.path.isdir(path): self.path = path self.refresh() else: ui.notify('Erro ao acessar pasta', type='negative') def refresh(self): if self.container: self.container.clear() with self.container: if self.view_mode == 'explorer': self.render_breadcrumbs() self.render_folder_list() else: self.render_monitor() def start_encoding(self): if os.path.exists(STATUS_FILE): os.remove(STATUS_FILE) t = EncoderWorker(self.path) t.start() ui.notify('Iniciado!', type='positive') self.view_mode = 'monitor' self.refresh() def stop_encoding(self): # Escreve o sinal de parada no arquivo JSON if os.path.exists(STATUS_FILE): try: with open(STATUS_FILE, 'r+') as f: data = json.load(f) data['stop_requested'] = True f.seek(0) json.dump(data, f) f.truncate() ui.notify('Parando processo... aguarde.', type='warning') except: pass def back_to_explorer(self): self.view_mode = 'explorer' self.refresh() def render_breadcrumbs(self): with ui.row().classes('w-full items-center bg-gray-100 p-2 rounded gap-1'): ui.button('🏠', on_click=lambda: self.navigate(ROOT_DIR)).props('flat dense text-color=grey-8') if self.path != ROOT_DIR: rel = os.path.relpath(self.path, ROOT_DIR) parts = rel.split(os.sep) acc = ROOT_DIR for part in parts: ui.icon('chevron_right', color='grey') acc = os.path.join(acc, part) ui.button(part, on_click=lambda p=acc: self.navigate(p)).props('flat dense no-caps text-color=primary') ui.space() ui.button("🚀 Converter Esta Pasta", on_click=self.start_encoding).props('push color=primary') def render_folder_list(self): try: entries = sorted([e for e in os.scandir(self.path) if e.is_dir() and not e.name.startswith('.')], key=lambda e: e.name.lower()) except: return with ui.column().classes('w-full gap-1 mt-2'): if self.path != ROOT_DIR: with ui.item(on_click=lambda: self.navigate(os.path.dirname(self.path))).classes('bg-blue-50 hover:bg-blue-100 cursor-pointer rounded'): with ui.item_section().props('avatar'): ui.icon('arrow_upward', color='grey') with ui.item_section(): ui.item_label('Voltar / Subir Nível') for entry in entries: with ui.item(on_click=lambda p=entry.path: self.navigate(p)).classes('hover:bg-gray-100 cursor-pointer rounded'): with ui.item_section().props('avatar'): ui.icon('folder', color='amber') with ui.item_section(): ui.item_label(entry.name).classes('font-medium') def render_monitor(self): ui.label('Monitor de Conversão').classes('text-xl font-bold mb-4') lbl_file = ui.label('Inicializando...') progress_file = ui.linear_progress(value=0).classes('w-full') lbl_status = ui.label('---') ui.separator().classes('my-4') lbl_total = ui.label('Total: 0/0') progress_total = ui.linear_progress(value=0).classes('w-full') # Botões de Controle row_btns = ui.row().classes('mt-4 gap-2') # Botão de Parar (Só aparece se estiver rodando) btn_stop = ui.button('🛑 Parar Processo', on_click=self.stop_encoding).props('color=red') # Botão Voltar (Só aparece se acabou) btn_back = ui.button('Voltar para Pastas', on_click=self.back_to_explorer).props('outline') btn_back.set_visibility(False) def update_loop(): if not os.path.exists(STATUS_FILE): return try: with open(STATUS_FILE, 'r') as f: data = json.load(f) is_running = data.get('running', False) lbl_file.text = f"Arquivo: {data.get('file', '?')}" val_file = data.get('pct_file', 0) / 100 progress_file.value = val_file lbl_status.text = f"Status: {int(val_file*100)}% | {data.get('log', '')}" if 'total_files' in data: curr = data.get('current_index', 0) tot = data.get('total_files', 0) lbl_total.text = f"Fila: {curr} de {tot} arquivos" val_total = data.get('pct_total', 0) / 100 progress_total.value = val_total # Controle de Visibilidade dos Botões if is_running: btn_stop.set_visibility(True) btn_back.set_visibility(False) else: btn_stop.set_visibility(False) btn_back.set_visibility(True) except: pass self.timer = ui.timer(1.0, update_loop) def create_ui(): enc = EncoderInterface() if os.path.exists(STATUS_FILE): try: with open(STATUS_FILE, 'r') as f: if json.load(f).get('running'): enc.view_mode = 'monitor' except: pass enc.container = ui.column().classes('w-full h-full p-4 gap-4') enc.refresh()