475 lines
18 KiB
Python
Executable File
475 lines
18 KiB
Python
Executable File
from nicegui import ui
|
|
import os
|
|
import threading
|
|
import time
|
|
import subprocess
|
|
import json
|
|
import re
|
|
import math # <--- ADICIONADO AQUI
|
|
from collections import deque
|
|
from datetime import datetime
|
|
|
|
# ==============================================================================
|
|
# --- SEÇÃO 1: CONFIGURAÇÕES GLOBAIS E CONSTANTES ---
|
|
# ==============================================================================
|
|
|
|
ROOT_DIR = "/downloads"
|
|
OUTPUT_BASE = "/downloads/finalizados"
|
|
|
|
# Caminhos dos drivers Intel problemáticos (NÃO ALTERAR)
|
|
BAD_DRIVERS = [
|
|
"/usr/lib/x86_64-linux-gnu/dri/iHD_drv_video.so",
|
|
"/usr/lib/x86_64-linux-gnu/dri/iHD_drv_video.so.1"
|
|
]
|
|
|
|
# VARIÁVEIS DE ESTADO (MEMÓRIA RAM)
|
|
CURRENT_STATUS = {
|
|
"running": False,
|
|
"stop_requested": False,
|
|
"file": "",
|
|
"pct_file": 0.0,
|
|
"pct_total": 0.0,
|
|
"current_index": 0,
|
|
"total_files": 0,
|
|
"log": "Aguardando...",
|
|
"speed": "N/A"
|
|
}
|
|
|
|
# Histórico dos últimos 50 processamentos
|
|
HISTORY_LOG = deque(maxlen=50)
|
|
|
|
|
|
# ==============================================================================
|
|
# --- SEÇÃO 2: UTILITÁRIOS (Backend) ---
|
|
# ==============================================================================
|
|
|
|
def prepare_driver_environment():
|
|
"""Configura o ambiente para usar o driver i965 e remove os problemáticos."""
|
|
os.environ["LIBVA_DRIVER_NAME"] = "i965"
|
|
for driver in BAD_DRIVERS:
|
|
if os.path.exists(driver):
|
|
try:
|
|
os.remove(driver)
|
|
except Exception as e:
|
|
print(f"Erro ao remover driver: {e}")
|
|
|
|
def get_video_duration(filepath):
|
|
"""Usa ffprobe para descobrir a duração total do vídeo em segundos."""
|
|
cmd = [
|
|
"ffprobe", "-v", "error", "-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1", filepath
|
|
]
|
|
try:
|
|
output = subprocess.check_output(cmd).decode().strip()
|
|
return float(output)
|
|
except:
|
|
return 1.0
|
|
|
|
def parse_time_to_seconds(time_str):
|
|
"""Converte o timecode do FFmpeg (HH:MM:SS.ms) para segundos float."""
|
|
try:
|
|
parts = time_str.split(':')
|
|
h = int(parts[0])
|
|
m = int(parts[1])
|
|
s = float(parts[2])
|
|
return h * 3600 + m * 60 + s
|
|
except:
|
|
return 0.0
|
|
|
|
def format_size(size_bytes):
|
|
"""Formata bytes para leitura humana (MB, GB)."""
|
|
if size_bytes == 0:
|
|
return "0B"
|
|
|
|
# --- CORREÇÃO AQUI (Math em vez de os.path) ---
|
|
size_name = ("B", "KB", "MB", "GB", "TB")
|
|
try:
|
|
i = int(math.log(size_bytes, 1024) // 1)
|
|
p = math.pow(1024, i)
|
|
s = round(size_bytes / p, 2)
|
|
return f"{s} {size_name[i]}"
|
|
except:
|
|
return f"{size_bytes} B"
|
|
|
|
def clean_metadata_title(title):
|
|
"""Limpa o título das faixas de áudio/legenda usando Regex."""
|
|
if not title:
|
|
return ""
|
|
|
|
# Lista de termos para remover (Case Insensitive)
|
|
junk_terms = [
|
|
r'\b5\.1\b', r'\b7\.1\b', r'\b2\.0\b',
|
|
r'\baac\b', r'\bac3\b', r'\beac3\b', r'\batmos\b', r'\bdts\b', r'\btruehd\b',
|
|
r'\bh264\b', r'\bx264\b', r'\bx265\b', r'\bhevc\b', r'\b1080p\b', r'\b720p\b', r'\b4k\b',
|
|
r'\bbludv\b', r'\bcomandotorrents\b', r'\brarbg\b', r'\bwww\..+\.com\b',
|
|
r'\bcópia\b', r'\boriginal\b'
|
|
]
|
|
|
|
clean_title = title
|
|
for pattern in junk_terms:
|
|
clean_title = re.sub(pattern, '', clean_title, flags=re.IGNORECASE)
|
|
|
|
clean_title = re.sub(r'\s+', ' ', clean_title).strip()
|
|
return clean_title.strip('-.|[]()').strip()
|
|
|
|
|
|
# ==============================================================================
|
|
# --- SEÇÃO 3: LÓGICA DO FFMPEG ---
|
|
# ==============================================================================
|
|
|
|
def build_ffmpeg_command(input_file, output_file):
|
|
"""Constrói o comando FFmpeg inteligente."""
|
|
|
|
cmd_probe = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", input_file]
|
|
try:
|
|
res = subprocess.run(cmd_probe, capture_output=True, text=True, env=os.environ)
|
|
data = json.loads(res.stdout)
|
|
except:
|
|
return [
|
|
"ffmpeg", "-y", "-hwaccel", "vaapi", "-hwaccel_device", "/dev/dri/renderD128",
|
|
"-hwaccel_output_format", "vaapi", "-i", input_file, "-map", "0",
|
|
"-c:v", "h264_vaapi", "-qp", "25", "-c:a", "copy", "-c:s", "copy", output_file
|
|
]
|
|
|
|
input_streams = data.get('streams', [])
|
|
map_args = ["-map", "0:v:0"]
|
|
metadata_args = []
|
|
|
|
found_pt_audio = False
|
|
|
|
# ÁUDIO
|
|
audio_idx = 0
|
|
for stream in input_streams:
|
|
if stream['codec_type'] == 'audio':
|
|
tags = stream.get('tags', {})
|
|
lang = tags.get('language', 'und').lower()
|
|
title = tags.get('title', '')
|
|
|
|
if lang in ['por', 'pt', 'pob', 'pt-br', 'eng', 'en', 'jpn', 'ja', 'und']:
|
|
map_args.extend(["-map", f"0:{stream['index']}"])
|
|
|
|
new_title = clean_metadata_title(title)
|
|
if not new_title:
|
|
if lang in ['por', 'pt', 'pob', 'pt-br']: new_title = "Português"
|
|
elif lang in ['eng', 'en']: new_title = "Inglês"
|
|
elif lang in ['jpn', 'ja']: new_title = "Japonês"
|
|
|
|
metadata_args.extend([f"-metadata:s:a:{audio_idx}", f"title={new_title}"])
|
|
|
|
if lang in ['por', 'pt', 'pob', 'pt-br'] and not found_pt_audio:
|
|
metadata_args.extend([f"-disposition:a:{audio_idx}", "default"])
|
|
found_pt_audio = True
|
|
else:
|
|
metadata_args.extend([f"-disposition:a:{audio_idx}", "0"])
|
|
|
|
audio_idx += 1
|
|
|
|
if audio_idx == 0:
|
|
map_args.extend(["-map", "0:a"])
|
|
|
|
# LEGENDAS
|
|
sub_idx = 0
|
|
for stream in input_streams:
|
|
if stream['codec_type'] == 'subtitle':
|
|
tags = stream.get('tags', {})
|
|
lang = tags.get('language', 'und').lower()
|
|
title = tags.get('title', '')
|
|
is_forced = 'forced' in stream.get('disposition', {})
|
|
|
|
if lang in ['por', 'pt', 'pob', 'pt-br']:
|
|
map_args.extend(["-map", f"0:{stream['index']}"])
|
|
|
|
new_title = clean_metadata_title(title)
|
|
metadata_args.extend([f"-metadata:s:s:{sub_idx}", f"title={new_title}"])
|
|
|
|
if is_forced or "forç" in (title or "").lower():
|
|
metadata_args.extend([f"-disposition:s:{sub_idx}", "forced"])
|
|
else:
|
|
metadata_args.extend([f"-disposition:s:{sub_idx}", "0"])
|
|
|
|
sub_idx += 1
|
|
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-hwaccel", "vaapi", "-hwaccel_device", "/dev/dri/renderD128",
|
|
"-hwaccel_output_format", "vaapi",
|
|
"-i", input_file
|
|
]
|
|
cmd += map_args
|
|
cmd += [
|
|
"-c:v", "h264_vaapi", "-qp", "25", "-compression_level", "0",
|
|
"-c:a", "copy", "-c:s", "copy"
|
|
]
|
|
cmd += metadata_args
|
|
cmd.append(output_file)
|
|
|
|
return cmd
|
|
|
|
|
|
# ==============================================================================
|
|
# --- SEÇÃO 4: WORKER THREAD ---
|
|
# ==============================================================================
|
|
|
|
class EncoderWorker(threading.Thread):
|
|
def __init__(self, input_folder, delete_original=False):
|
|
super().__init__()
|
|
self.input_folder = input_folder
|
|
self.delete_original = delete_original
|
|
self.daemon = True
|
|
|
|
def run(self):
|
|
global CURRENT_STATUS, HISTORY_LOG
|
|
prepare_driver_environment()
|
|
|
|
CURRENT_STATUS["running"] = True
|
|
CURRENT_STATUS["stop_requested"] = False
|
|
CURRENT_STATUS["log"] = "Escaneando arquivos..."
|
|
|
|
files_to_process = []
|
|
for r, d, f in os.walk(self.input_folder):
|
|
if "finalizados" in r or "temp" in r: continue
|
|
for file in f:
|
|
if file.lower().endswith(('.mkv', '.mp4', '.avi')):
|
|
files_to_process.append(os.path.join(r, file))
|
|
|
|
CURRENT_STATUS["total_files"] = len(files_to_process)
|
|
|
|
for i, fpath in enumerate(files_to_process):
|
|
if CURRENT_STATUS["stop_requested"]:
|
|
break
|
|
|
|
fname = os.path.basename(fpath)
|
|
|
|
CURRENT_STATUS["file"] = fname
|
|
CURRENT_STATUS["current_index"] = i + 1
|
|
CURRENT_STATUS["pct_file"] = 0
|
|
CURRENT_STATUS["pct_total"] = int((i / len(files_to_process)) * 100)
|
|
|
|
rel = os.path.relpath(fpath, self.input_folder)
|
|
out_file = os.path.join(OUTPUT_BASE, os.path.basename(self.input_folder), rel)
|
|
out_file = os.path.splitext(out_file)[0] + ".mkv"
|
|
os.makedirs(os.path.dirname(out_file), exist_ok=True)
|
|
|
|
size_before = os.path.getsize(fpath)
|
|
cmd = build_ffmpeg_command(fpath, out_file)
|
|
total_sec = get_video_duration(fpath)
|
|
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=os.environ)
|
|
|
|
for line in proc.stdout:
|
|
if CURRENT_STATUS["stop_requested"]:
|
|
proc.terminate()
|
|
break
|
|
|
|
if "time=" in line:
|
|
match = re.search(r"time=(\d{2}:\d{2}:\d{2}\.\d{2})", line)
|
|
if match:
|
|
sec = parse_time_to_seconds(match.group(1))
|
|
pct = min(int((sec/total_sec)*100), 100)
|
|
CURRENT_STATUS["pct_file"] = pct
|
|
|
|
speed_match = re.search(r"speed=\s*(\S+)", line)
|
|
if speed_match:
|
|
CURRENT_STATUS["speed"] = speed_match.group(1)
|
|
CURRENT_STATUS["log"] = f"Vel: {CURRENT_STATUS['speed']}"
|
|
|
|
proc.wait()
|
|
|
|
final_status = "Erro"
|
|
|
|
if CURRENT_STATUS["stop_requested"]:
|
|
if os.path.exists(out_file): os.remove(out_file)
|
|
final_status = "Cancelado"
|
|
|
|
elif proc.returncode == 0:
|
|
final_status = "✅ Sucesso"
|
|
size_after = os.path.getsize(out_file) if os.path.exists(out_file) else 0
|
|
diff = size_after - size_before
|
|
|
|
HISTORY_LOG.appendleft({
|
|
"time": datetime.now().strftime("%H:%M:%S"),
|
|
"file": fname,
|
|
"status": final_status,
|
|
"orig_size": format_size(size_before),
|
|
"final_size": format_size(size_after),
|
|
"diff": ("+" if diff > 0 else "") + format_size(diff)
|
|
})
|
|
|
|
if self.delete_original:
|
|
try:
|
|
os.remove(fpath)
|
|
CURRENT_STATUS["log"] = "Original excluído com sucesso."
|
|
except: pass
|
|
else:
|
|
HISTORY_LOG.appendleft({
|
|
"time": datetime.now().strftime("%H:%M:%S"),
|
|
"file": fname,
|
|
"status": "❌ Falha",
|
|
"orig_size": format_size(size_before),
|
|
"final_size": "-",
|
|
"diff": "-"
|
|
})
|
|
|
|
CURRENT_STATUS["running"] = False
|
|
CURRENT_STATUS["log"] = "Parado" if CURRENT_STATUS["stop_requested"] else "Finalizado"
|
|
CURRENT_STATUS["pct_file"] = 100
|
|
CURRENT_STATUS["pct_total"] = 100
|
|
|
|
|
|
# ==============================================================================
|
|
# --- SEÇÃO 5: FRONTEND ---
|
|
# ==============================================================================
|
|
|
|
class EncoderInterface:
|
|
def __init__(self):
|
|
self.path = ROOT_DIR
|
|
self.timer = None
|
|
self.delete_switch = None
|
|
self.main_container = None
|
|
|
|
if CURRENT_STATUS["running"]:
|
|
self.view_mode = 'monitor'
|
|
else:
|
|
self.view_mode = 'explorer'
|
|
|
|
self.main_container = ui.column().classes('w-full h-full gap-4')
|
|
self.refresh_ui()
|
|
|
|
def refresh_ui(self):
|
|
self.main_container.clear()
|
|
with self.main_container:
|
|
if self.view_mode == 'explorer':
|
|
self.render_breadcrumbs()
|
|
self.render_options()
|
|
self.render_folder_list()
|
|
self.render_history_btn()
|
|
elif self.view_mode == 'monitor':
|
|
self.render_monitor()
|
|
elif self.view_mode == 'history':
|
|
self.render_history_table()
|
|
|
|
def navigate(self, path):
|
|
if os.path.exists(path) and os.path.isdir(path):
|
|
self.path = path
|
|
self.refresh_ui()
|
|
else:
|
|
ui.notify('Erro ao acessar pasta', type='negative')
|
|
|
|
def start_encoding(self):
|
|
should_delete = self.delete_switch.value if self.delete_switch else False
|
|
|
|
CURRENT_STATUS["pct_file"] = 0
|
|
CURRENT_STATUS["pct_total"] = 0
|
|
|
|
t = EncoderWorker(self.path, delete_original=should_delete)
|
|
t.start()
|
|
|
|
ui.notify('Iniciando Conversão...', type='positive')
|
|
self.view_mode = 'monitor'
|
|
self.refresh_ui()
|
|
|
|
def stop_encoding(self):
|
|
CURRENT_STATUS["stop_requested"] = True
|
|
ui.notify('Solicitando parada...', type='warning')
|
|
|
|
def render_breadcrumbs(self):
|
|
with ui.row().classes('w-full items-center bg-gray-100 p-2 rounded gap-1'):
|
|
ui.button('🏠', on_click=lambda: self.navigate(ROOT_DIR)).props('flat dense text-color=grey-8')
|
|
if self.path != ROOT_DIR:
|
|
rel = os.path.relpath(self.path, ROOT_DIR)
|
|
parts = rel.split(os.sep)
|
|
acc = ROOT_DIR
|
|
for part in parts:
|
|
ui.icon('chevron_right', color='grey')
|
|
acc = os.path.join(acc, part)
|
|
ui.button(part, on_click=lambda p=acc: self.navigate(p)).props('flat dense no-caps text-color=primary')
|
|
|
|
def render_options(self):
|
|
with ui.card().classes('w-full mt-2 p-2 bg-blue-50'):
|
|
with ui.row().classes('items-center w-full justify-between'):
|
|
self.delete_switch = ui.switch('Excluir original ao finalizar com sucesso?').props('color=red')
|
|
ui.button("🚀 Iniciar Conversão", on_click=self.start_encoding).props('push color=primary')
|
|
|
|
def render_folder_list(self):
|
|
try:
|
|
entries = sorted([e for e in os.scandir(self.path) if e.is_dir() and not e.name.startswith('.')], key=lambda e: e.name.lower())
|
|
except: return
|
|
|
|
with ui.column().classes('w-full gap-1 mt-2'):
|
|
if self.path != ROOT_DIR:
|
|
with ui.item(on_click=lambda: self.navigate(os.path.dirname(self.path))).classes('bg-gray-200 hover:bg-gray-300 cursor-pointer rounded'):
|
|
with ui.item_section().props('avatar'): ui.icon('arrow_upward', color='grey')
|
|
with ui.item_section(): ui.item_label('.. (Subir nível)')
|
|
|
|
if not entries:
|
|
ui.label("Nenhuma subpasta encontrada aqui.").classes('text-grey italic p-4')
|
|
|
|
for entry in entries:
|
|
with ui.item(on_click=lambda p=entry.path: self.navigate(p)).classes('hover:bg-blue-50 cursor-pointer rounded border-b border-gray-100'):
|
|
with ui.item_section().props('avatar'): ui.icon('folder', color='amber')
|
|
with ui.item_section(): ui.item_label(entry.name).classes('font-medium')
|
|
|
|
def render_history_btn(self):
|
|
ui.separator().classes('mt-4')
|
|
ui.button('Ver Histórico (Últimos 50)', on_click=lambda: self.set_view('history')).props('outline w-full')
|
|
|
|
def set_view(self, mode):
|
|
self.view_mode = mode
|
|
self.refresh_ui()
|
|
|
|
def render_history_table(self):
|
|
ui.label('Histórico de Conversões').classes('text-xl font-bold mb-4')
|
|
|
|
columns = [
|
|
{'name': 'time', 'label': 'Hora', 'field': 'time', 'align': 'left'},
|
|
{'name': 'file', 'label': 'Arquivo', 'field': 'file', 'align': 'left'},
|
|
{'name': 'status', 'label': 'Status', 'field': 'status', 'align': 'center'},
|
|
{'name': 'orig', 'label': 'Tam. Orig.', 'field': 'orig_size'},
|
|
{'name': 'final', 'label': 'Tam. Final', 'field': 'final_size'},
|
|
{'name': 'diff', 'label': 'Diferença', 'field': 'diff'},
|
|
]
|
|
|
|
rows = list(HISTORY_LOG)
|
|
ui.table(columns=columns, rows=rows, row_key='file').classes('w-full')
|
|
|
|
ui.button('Voltar', on_click=lambda: self.set_view('explorer')).props('outline mt-4')
|
|
|
|
def render_monitor(self):
|
|
ui.label('Monitor de Conversão').classes('text-xl font-bold mb-4')
|
|
|
|
lbl_file = ui.label('Inicializando...')
|
|
progress_file = ui.linear_progress(value=0).classes('w-full')
|
|
lbl_log = ui.label('---').classes('text-caption text-grey')
|
|
|
|
ui.separator().classes('my-4')
|
|
|
|
lbl_total = ui.label('Total: 0/0')
|
|
progress_total = ui.linear_progress(value=0).classes('w-full')
|
|
|
|
row_btns = ui.row().classes('mt-6 gap-4')
|
|
with row_btns:
|
|
btn_stop = ui.button('🛑 Parar Tudo', on_click=self.stop_encoding).props('color=red')
|
|
btn_back = ui.button('Voltar / Novo', on_click=lambda: self.set_view('explorer')).props('outline')
|
|
btn_back.set_visibility(False)
|
|
|
|
def update_monitor():
|
|
if not CURRENT_STATUS["running"] and CURRENT_STATUS["pct_total"] >= 100:
|
|
btn_stop.set_visibility(False)
|
|
btn_back.set_visibility(True)
|
|
lbl_file.text = "Todos os processos finalizados."
|
|
|
|
lbl_file.text = f"Arquivo: {CURRENT_STATUS['file']}"
|
|
progress_file.value = CURRENT_STATUS['pct_file'] / 100
|
|
lbl_log.text = f"{int(CURRENT_STATUS['pct_file'])}% | {CURRENT_STATUS['log']}"
|
|
|
|
lbl_total.text = f"Fila: {CURRENT_STATUS['current_index']} de {CURRENT_STATUS['total_files']}"
|
|
progress_total.value = CURRENT_STATUS['pct_total'] / 100
|
|
|
|
self.timer = ui.timer(0.5, update_monitor)
|
|
|
|
# ==============================================================================
|
|
# --- SEÇÃO 6: EXPORTAÇÃO PARA O MAIN.PY ---
|
|
# ==============================================================================
|
|
|
|
def create_ui():
|
|
return EncoderInterface() |