import asyncio import os import shutil import re from pathlib import Path from database import AppConfig, Category from core.renamer import RenamerCore from core.ffmpeg_engine import FFmpegEngine from core.bot import TelegramManager from core.state import state VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.avi', '.mov', '.wmv'} class DirectoryWatcher: def __init__(self, bot: TelegramManager): self.bot = bot self.renamer = RenamerCore() self.is_running = False self.temp_dir = Path('/app/temp') self.temp_dir.mkdir(parents=True, exist_ok=True) self.current_watch_path = None self.current_process = None self.pending_future = None self.abort_flag = False state.watcher = self async def start(self): state.log("🟡 Watcher Service: Pronto. Aguardando ativação no Dashboard...") while True: if self.is_running: try: config_path = AppConfig.get_val('monitor_path', '/downloads') watch_dir = Path(config_path) if str(watch_dir) != str(self.current_watch_path): state.log(f"📁 Monitorando: {watch_dir}") self.current_watch_path = watch_dir if watch_dir.exists(): await self.scan_folder(watch_dir) except Exception as e: state.log(f"❌ Erro Watcher Loop: {e}") await asyncio.sleep(5) def abort_current_task(self): state.log("🛑 Solicitando Cancelamento...") self.abort_flag = True if self.current_process: try: self.current_process.kill() except: pass if self.pending_future and not self.pending_future.done(): self.pending_future.cancel() async def scan_folder(self, input_dir: Path): for file_path in input_dir.glob('**/*'): if self.abort_flag: self.abort_flag = False if state.current_file: state.update_task(state.current_file, 'error', label=f"{state.current_file} (Cancelado)") return if not self.is_running: return if file_path.is_file() and file_path.suffix.lower() in VIDEO_EXTENSIONS: if file_path.name.startswith('.') or 'processing' in file_path.name: continue if file_path.name in state.tasks and state.tasks[file_path.name]['status'] == 'done': continue try: s1 = file_path.stat().st_size await asyncio.sleep(1) s2 = file_path.stat().st_size if s1 != s2: continue except: continue await self.process_pipeline(file_path) if self.abort_flag: return async def process_pipeline(self, filepath: Path): fname = filepath.name self.abort_flag = False state.current_file = fname # --- 1. IDENTIFICAÇÃO --- state.update_task(fname, 'running', progress=5, label=f"Identificando: {fname}...") state.log(f"🔄 Iniciando: {fname}") result = self.renamer.identify_file(str(filepath)) is_semi_auto = AppConfig.get_val('semi_auto', 'false') == 'true' if is_semi_auto and result['status'] == 'MATCH': result['status'] = 'AMBIGUOUS' result['candidates'] = [result['match']] target_info = None if result['status'] == 'MATCH': target_info = {'tmdb_id': result['match']['tmdb_id'], 'type': result['match']['type']} state.update_task(fname, 'running', progress=10, label=f"ID: {result['match']['title']}") elif result['status'] == 'AMBIGUOUS': if 'candidates' not in result or not result['candidates']: state.update_task(fname, 'error', 0, label="Erro: Ambíguo sem candidatos") return state.update_task(fname, 'warning', 10, label="Aguardando Telegram...") self.pending_future = asyncio.ensure_future( self.bot.ask_user_choice(fname, result['candidates']) ) try: user_choice = await self.pending_future except asyncio.CancelledError: state.update_task(fname, 'error', 0, label="Cancelado Manualmente") return self.pending_future = None if not user_choice or self.abort_flag: state.update_task(fname, 'skipped', 0, label="Ignorado/Cancelado") return target_info = user_choice else: msg = result.get('msg', 'Desconhecido') state.update_task(fname, 'error', 0, label="Não Identificado") state.log(f"❌ Falha Identificação: {msg}") return if self.abort_flag: return # --- METADADOS --- try: details = self.renamer.get_details(target_info['tmdb_id'], target_info['type']) r_title = getattr(details, 'title', getattr(details, 'name', 'Unknown')) or 'Unknown' tmdb_genre_ids = [str(g['id']) for g in getattr(details, 'genres', [])] origin_countries = getattr(details, 'origin_country', []) if isinstance(origin_countries, str): origin_countries = [origin_countries] origin_countries = [c.upper() for c in origin_countries] full_details = {'title': r_title, 'year': '0000', 'type': target_info['type']} d_date = getattr(details, 'release_date', getattr(details, 'first_air_date', '0000')) if d_date: full_details['year'] = str(d_date)[:4] # --- 2. CATEGORIA INTELIGENTE --- category = self.find_best_category(target_info['type'], tmdb_genre_ids, origin_countries) if not category: state.update_task(fname, 'error', 0, label="Sem Categoria") return state.log(f"📂 Categoria: {category.name}") # --- 3. CONVERSÃO --- guessed_data = result.get('guessed', {}) relative_path = self.renamer.build_path(category, full_details, guessed_data) temp_filename = os.path.basename(relative_path) temp_output = self.temp_dir / temp_filename state.update_task(fname, 'running', progress=15, label=f"Convertendo: {full_details['title']}") engine = FFmpegEngine() # Pega duração (Segundos) total_duration_sec = engine.get_duration(str(filepath)) total_duration_us = total_duration_sec * 1000000 if total_duration_sec == 0: state.log("⚠️ Aviso: Não foi possível ler a duração do vídeo. Progresso pode falhar.") cmd = engine.build_command(str(filepath), str(temp_output)) # Injeta flags para progresso via Pipe cmd.insert(1, '-progress'); cmd.insert(2, 'pipe:1') # Log do comando para debug printable_cmd = " ".join([f'"{c}"' if " " in c else c for c in cmd]) state.log(f"🛠️ Executando FFmpeg (Debug no Log): ...{printable_cmd[-50:]}") # Log curto self.current_process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) current_speed_str = "" # Loop de Leitura (CORRIGIDO E COMPLETO) while True: if self.abort_flag: self.current_process.kill() break # Lê stdout (Progresso) line_bytes = await self.current_process.stdout.readline() if not line_bytes: # Se stdout acabou, verifica se o processo morreu if self.current_process.returncode is not None: break # Verifica stderr se houver erro err_bytes = await self.current_process.stderr.read(1024) if err_bytes: err_msg = err_bytes.decode('utf-8', errors='ignore') # Loga apenas erros reais, ignora warnings comuns if "Error" in err_msg or "Invalid" in err_msg: state.log(f"FFmpeg Stderr: {err_msg}") if not line_bytes and not err_bytes: break # Acabou tudo line = line_bytes.decode('utf-8', errors='ignore').strip() if '=' in line: key, value = line.split('=', 1) key = key.strip(); value = value.strip() if key == 'out_time_us': try: current_us = int(value) if total_duration_us > 0: file_pct = (current_us / total_duration_us) * 100 if file_pct > 100: file_pct = 100 if file_pct < 0: file_pct = 0 global_pct = 15 + (file_pct * 0.84) state.update_task( fname, 'running', progress=global_pct, file_progress=file_pct, speed=current_speed_str, label=f"Processando: {full_details['title']}" ) except: pass elif key == 'speed': current_speed_str = value await self.current_process.wait() if self.abort_flag: state.update_task(fname, 'error', 0, label="Abortado") if temp_output.exists(): os.remove(str(temp_output)) return if self.current_process.returncode != 0: # Se falhar, tenta ler o resto do stderr err_log = await self.current_process.stderr.read() msg = err_log.decode('utf-8')[-300:] state.log(f"❌ Erro FFmpeg Fatal: {msg}") state.update_task(fname, 'error', 0, label="Erro FFmpeg") return self.current_process = None # --- 4. DEPLOY --- state.update_task(fname, 'running', progress=99, label="Organizando...") final_full_path = Path(category.target_path) / relative_path final_full_path.parent.mkdir(parents=True, exist_ok=True) if final_full_path.exists(): try: os.remove(str(final_full_path)) except: pass shutil.move(str(temp_output), str(final_full_path)) if AppConfig.get_val('deploy_mode', 'move') == 'move': try: os.remove(str(filepath)) except: pass await self.bot.send_notification(f"🎬 Organizado: `{full_details['title']}`\n📂 {category.name}") state.update_task(fname, 'done', 100, label=f"{full_details['title']}", file_progress=100, speed="Finalizado") state.current_file = "" if AppConfig.get_val('cleanup_empty_folders', 'true') == 'true': try: parent = filepath.parent monitor_root = Path(AppConfig.get_val('monitor_path', '/downloads')) if parent != monitor_root and not any(parent.iterdir()): parent.rmdir() except: pass except Exception as e: state.log(f"Erro Pipeline: {e}") state.update_task(fname, 'error', 0, label=f"Erro: {e}") def find_best_category(self, media_type, genre_ids, countries): """ Sistema de Pontuação 3.0 (Regras Estritas) """ all_cats = list(Category.select()) if not all_cats: return None candidates = [] for cat in all_cats: if media_type == 'movie' and cat.content_type == 'series': continue if media_type != 'movie' and cat.content_type == 'movie': continue candidates.append(cat) if not candidates: return all_cats[0] scored_cats = [] for cat in candidates: score = 0 cat_genres = cat.genre_filters.split(',') if cat.genre_filters else [] cat_countries = cat.country_filters.split(',') if cat.country_filters else [] cat_genres = [g for g in cat_genres if g] cat_countries = [c for c in cat_countries if c] match_genre = False match_country = False if cat_genres: if any(gid in cat_genres for gid in genre_ids): score += 50 else: score = -1000 else: score += 10 if cat_countries: if any(cc in cat_countries for cc in countries): score += 50 else: score = -1000 else: score += 10 if cat.content_type == 'mixed' and score > 0: score += 5 scored_cats.append((score, cat)) scored_cats.sort(key=lambda x: x[0], reverse=True) best_match = scored_cats[0] if best_match[0] < 0: for cat in candidates: if not cat.genre_filters and not cat.country_filters: return cat return best_match[1]