Files
clei-flow/app/core/watcher.py
2026-02-10 00:56:46 +00:00

312 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import shutil
import re
from pathlib import Path
from database import AppConfig, Category
from core.renamer import RenamerCore
from core.ffmpeg_engine import FFmpegEngine
from core.bot import TelegramManager
from core.state import state
VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.avi', '.mov', '.wmv'}
class DirectoryWatcher:
def __init__(self, bot: TelegramManager):
self.bot = bot
self.renamer = RenamerCore()
self.is_running = False
self.temp_dir = Path('/app/temp')
self.temp_dir.mkdir(parents=True, exist_ok=True)
self.current_watch_path = None
self.current_process = None
self.pending_future = None
self.abort_flag = False
state.watcher = self
async def start(self):
state.log("🟡 Watcher Service: Pronto. Aguardando ativação no Dashboard...")
while True:
if self.is_running:
try:
config_path = AppConfig.get_val('monitor_path', '/downloads')
watch_dir = Path(config_path)
if str(watch_dir) != str(self.current_watch_path):
state.log(f"📁 Monitorando: {watch_dir}")
self.current_watch_path = watch_dir
if watch_dir.exists():
await self.scan_folder(watch_dir)
except Exception as e:
state.log(f"❌ Erro Watcher Loop: {e}")
await asyncio.sleep(5)
def abort_current_task(self):
state.log("🛑 Solicitando Cancelamento...")
self.abort_flag = True
if self.current_process:
try: self.current_process.kill()
except: pass
if self.pending_future and not self.pending_future.done():
self.pending_future.cancel()
async def scan_folder(self, input_dir: Path):
for file_path in input_dir.glob('**/*'):
if self.abort_flag:
self.abort_flag = False
if state.current_file:
state.update_task(state.current_file, 'error', label=f"{state.current_file} (Cancelado)")
return
if not self.is_running: return
if file_path.is_file() and file_path.suffix.lower() in VIDEO_EXTENSIONS:
if file_path.name.startswith('.') or 'processing' in file_path.name: continue
if file_path.name in state.tasks and state.tasks[file_path.name]['status'] == 'done': continue
try:
s1 = file_path.stat().st_size
await asyncio.sleep(1)
s2 = file_path.stat().st_size
if s1 != s2: continue
except: continue
await self.process_pipeline(file_path)
if self.abort_flag: return
async def process_pipeline(self, filepath: Path):
fname = filepath.name
self.abort_flag = False
state.current_file = fname
# --- 1. IDENTIFICAÇÃO ---
state.update_task(fname, 'running', progress=5, label=f"Identificando: {fname}...")
state.log(f"🔄 Iniciando: {fname}")
result = self.renamer.identify_file(str(filepath))
is_semi_auto = AppConfig.get_val('semi_auto', 'false') == 'true'
if is_semi_auto and result['status'] == 'MATCH':
result['status'] = 'AMBIGUOUS'
result['candidates'] = [result['match']]
target_info = None
if result['status'] == 'MATCH':
target_info = {'tmdb_id': result['match']['tmdb_id'], 'type': result['match']['type']}
state.update_task(fname, 'running', progress=10, label=f"ID: {result['match']['title']}")
elif result['status'] == 'AMBIGUOUS':
if 'candidates' not in result or not result['candidates']:
state.update_task(fname, 'error', 0, label="Erro: Ambíguo sem candidatos")
return
state.update_task(fname, 'warning', 10, label="Aguardando Telegram...")
self.pending_future = asyncio.ensure_future(
self.bot.ask_user_choice(fname, result['candidates'])
)
try:
user_choice = await self.pending_future
except asyncio.CancelledError:
state.update_task(fname, 'error', 0, label="Cancelado Manualmente")
return
self.pending_future = None
if not user_choice or self.abort_flag:
state.update_task(fname, 'skipped', 0, label="Ignorado/Cancelado")
return
target_info = user_choice
else:
msg = result.get('msg', 'Desconhecido')
state.update_task(fname, 'error', 0, label="Não Identificado")
state.log(f"❌ Falha Identificação: {msg}")
return
if self.abort_flag: return
# --- METADADOS E CATEGORIA ---
try:
details = self.renamer.get_details(target_info['tmdb_id'], target_info['type'])
r_title = getattr(details, 'title', getattr(details, 'name', 'Unknown')) or 'Unknown'
# Extração de IDs e Códigos para a nova Lógica
# IDs de Gênero (Ex: [16, 28])
tmdb_genre_ids = [str(g['id']) for g in getattr(details, 'genres', [])]
# Países de Origem (Ex: ['JP', 'US'])
origin_countries = getattr(details, 'origin_country', [])
if isinstance(origin_countries, str): origin_countries = [origin_countries]
origin_countries = [c.upper() for c in origin_countries]
state.log(f" Dados: Gêneros={tmdb_genre_ids} | Países={origin_countries}")
full_details = {'title': r_title, 'year': '0000', 'type': target_info['type']}
d_date = getattr(details, 'release_date', getattr(details, 'first_air_date', '0000'))
if d_date: full_details['year'] = str(d_date)[:4]
# --- 2. CATEGORIA INTELIGENTE (NOVA LÓGICA) ---
category = self.find_best_category(target_info['type'], tmdb_genre_ids, origin_countries)
if not category:
state.update_task(fname, 'error', 0, label="Sem Categoria")
return
state.log(f"📂 Categoria Vencedora: {category.name}")
# --- 3. CONVERSÃO ---
guessed_data = result.get('guessed', {})
relative_path = self.renamer.build_path(category, full_details, guessed_data)
temp_filename = os.path.basename(relative_path)
temp_output = self.temp_dir / temp_filename
state.update_task(fname, 'running', progress=15, label=f"Convertendo: {full_details['title']}")
engine = FFmpegEngine()
total_duration_sec = engine.get_duration(str(filepath))
total_duration_us = total_duration_sec * 1000000
cmd = engine.build_command(str(filepath), str(temp_output))
cmd.insert(1, '-progress'); cmd.insert(2, 'pipe:1')
self.current_process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
current_speed_str = ""
while True:
if self.abort_flag:
self.current_process.kill()
break
line_bytes = await self.current_process.stdout.readline()
if not line_bytes: break
line = line_bytes.decode('utf-8', errors='ignore').strip()
if '=' in line:
key, value = line.split('=', 1)
key = key.strip(); value = value.strip()
if key == 'out_time_us':
try:
current_us = int(value)
if total_duration_us > 0:
file_pct = (current_us / total_duration_us) * 100
if file_pct > 100: file_pct = 100
if file_pct < 0: file_pct = 0
global_pct = 15 + (file_pct * 0.84)
state.update_task(fname, 'running', progress=global_pct, file_progress=file_pct, speed=current_speed_str, label=f"Processando: {full_details['title']}")
except: pass
elif key == 'speed': current_speed_str = value
await self.current_process.wait()
if self.abort_flag:
state.update_task(fname, 'error', 0, label="Abortado")
if temp_output.exists(): os.remove(str(temp_output))
return
if self.current_process.returncode != 0:
err_log = await self.current_process.stderr.read()
state.log(f"❌ Erro FFmpeg: {err_log.decode('utf-8')[-200:]}")
state.update_task(fname, 'error', 0, label="Erro FFmpeg")
return
self.current_process = None
# --- 4. DEPLOY ---
state.update_task(fname, 'running', progress=99, label="Organizando...")
final_full_path = Path(category.target_path) / relative_path
final_full_path.parent.mkdir(parents=True, exist_ok=True)
if final_full_path.exists():
try: os.remove(str(final_full_path))
except: pass
shutil.move(str(temp_output), str(final_full_path))
if AppConfig.get_val('deploy_mode', 'move') == 'move':
try: os.remove(str(filepath))
except: pass
await self.bot.send_notification(f"🎬 Organizado: `{full_details['title']}`\n📂 {category.name}")
state.update_task(fname, 'done', 100, label=f"{full_details['title']}", file_progress=100, speed="Finalizado")
state.current_file = ""
if AppConfig.get_val('cleanup_empty_folders', 'true') == 'true':
try:
parent = filepath.parent
monitor_root = Path(AppConfig.get_val('monitor_path', '/downloads'))
if parent != monitor_root and not any(parent.iterdir()):
parent.rmdir()
except: pass
except Exception as e:
state.log(f"Erro Pipeline: {e}")
state.update_task(fname, 'error', 0, label=f"Erro: {e}")
def find_best_category(self, media_type, genre_ids, countries):
"""
Sistema de Pontuação 3.0 (Regras Estritas)
Agora compara IDs e Siglas, não texto.
"""
all_cats = list(Category.select())
if not all_cats: return None
candidates = []
# 1. Filtro Rígido de TIPO (Movie vs Series)
for cat in all_cats:
if media_type == 'movie' and cat.content_type == 'series': continue
if media_type != 'movie' and cat.content_type == 'movie': continue
candidates.append(cat)
if not candidates: return all_cats[0]
scored_cats = []
for cat in candidates:
score = 0
# Carrega filtros da categoria
cat_genres = cat.genre_filters.split(',') if cat.genre_filters else []
cat_countries = cat.country_filters.split(',') if cat.country_filters else []
cat_genres = [g for g in cat_genres if g] # Limpa vazios
cat_countries = [c for c in cat_countries if c]
# --- Lógica de Correspondência ---
match_genre = False
match_country = False
# Verifica Gêneros (Se a categoria tiver filtros, TEM que bater)
if cat_genres:
# Se o arquivo tiver pelo menos UM dos gêneros da lista
if any(gid in cat_genres for gid in genre_ids):
match_genre = True
score += 50 # Ganha muitos pontos por match específico
else:
# Se a categoria exige gênero e o arquivo não tem -> Categoria Descartada
score = -1000
else:
# Categoria genérica de gênero (aceita tudo)
match_genre = True
score += 10 # Pontuação base baixa
# Verifica Países
if cat_countries:
if any(cc in cat_countries for cc in countries):
match_country = True
score += 50 # Ganha pontos por match de país
else:
score = -1000 # Exige país mas não bate -> Descartada
else:
match_country = True
score += 10
# Bônus se for categoria Mista (geralmente usada para Anime/Desenho)
if cat.content_type == 'mixed' and score > 0:
score += 5
scored_cats.append((score, cat))
# Ordena
scored_cats.sort(key=lambda x: x[0], reverse=True)
best_match = scored_cats[0]
# Se a pontuação for negativa, significa que nenhuma regra bateu.
# Devemos pegar uma categoria genérica (sem filtros)
if best_match[0] < 0:
for cat in candidates:
if not cat.genre_filters and not cat.country_filters:
return cat
return best_match[1]