312 lines
13 KiB
Python
312 lines
13 KiB
Python
import asyncio
|
||
import os
|
||
import shutil
|
||
import re
|
||
from pathlib import Path
|
||
from database import AppConfig, Category
|
||
from core.renamer import RenamerCore
|
||
from core.ffmpeg_engine import FFmpegEngine
|
||
from core.bot import TelegramManager
|
||
from core.state import state
|
||
|
||
VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.avi', '.mov', '.wmv'}
|
||
|
||
class DirectoryWatcher:
|
||
def __init__(self, bot: TelegramManager):
|
||
self.bot = bot
|
||
self.renamer = RenamerCore()
|
||
self.is_running = False
|
||
self.temp_dir = Path('/app/temp')
|
||
self.temp_dir.mkdir(parents=True, exist_ok=True)
|
||
self.current_watch_path = None
|
||
self.current_process = None
|
||
self.pending_future = None
|
||
self.abort_flag = False
|
||
state.watcher = self
|
||
|
||
async def start(self):
|
||
state.log("🟡 Watcher Service: Pronto. Aguardando ativação no Dashboard...")
|
||
while True:
|
||
if self.is_running:
|
||
try:
|
||
config_path = AppConfig.get_val('monitor_path', '/downloads')
|
||
watch_dir = Path(config_path)
|
||
if str(watch_dir) != str(self.current_watch_path):
|
||
state.log(f"📁 Monitorando: {watch_dir}")
|
||
self.current_watch_path = watch_dir
|
||
if watch_dir.exists():
|
||
await self.scan_folder(watch_dir)
|
||
except Exception as e:
|
||
state.log(f"❌ Erro Watcher Loop: {e}")
|
||
await asyncio.sleep(5)
|
||
|
||
def abort_current_task(self):
|
||
state.log("🛑 Solicitando Cancelamento...")
|
||
self.abort_flag = True
|
||
if self.current_process:
|
||
try: self.current_process.kill()
|
||
except: pass
|
||
if self.pending_future and not self.pending_future.done():
|
||
self.pending_future.cancel()
|
||
|
||
async def scan_folder(self, input_dir: Path):
|
||
for file_path in input_dir.glob('**/*'):
|
||
if self.abort_flag:
|
||
self.abort_flag = False
|
||
if state.current_file:
|
||
state.update_task(state.current_file, 'error', label=f"{state.current_file} (Cancelado)")
|
||
return
|
||
if not self.is_running: return
|
||
if file_path.is_file() and file_path.suffix.lower() in VIDEO_EXTENSIONS:
|
||
if file_path.name.startswith('.') or 'processing' in file_path.name: continue
|
||
if file_path.name in state.tasks and state.tasks[file_path.name]['status'] == 'done': continue
|
||
try:
|
||
s1 = file_path.stat().st_size
|
||
await asyncio.sleep(1)
|
||
s2 = file_path.stat().st_size
|
||
if s1 != s2: continue
|
||
except: continue
|
||
await self.process_pipeline(file_path)
|
||
if self.abort_flag: return
|
||
|
||
async def process_pipeline(self, filepath: Path):
|
||
fname = filepath.name
|
||
self.abort_flag = False
|
||
state.current_file = fname
|
||
|
||
# --- 1. IDENTIFICAÇÃO ---
|
||
state.update_task(fname, 'running', progress=5, label=f"Identificando: {fname}...")
|
||
state.log(f"🔄 Iniciando: {fname}")
|
||
|
||
result = self.renamer.identify_file(str(filepath))
|
||
|
||
is_semi_auto = AppConfig.get_val('semi_auto', 'false') == 'true'
|
||
if is_semi_auto and result['status'] == 'MATCH':
|
||
result['status'] = 'AMBIGUOUS'
|
||
result['candidates'] = [result['match']]
|
||
|
||
target_info = None
|
||
|
||
if result['status'] == 'MATCH':
|
||
target_info = {'tmdb_id': result['match']['tmdb_id'], 'type': result['match']['type']}
|
||
state.update_task(fname, 'running', progress=10, label=f"ID: {result['match']['title']}")
|
||
|
||
elif result['status'] == 'AMBIGUOUS':
|
||
if 'candidates' not in result or not result['candidates']:
|
||
state.update_task(fname, 'error', 0, label="Erro: Ambíguo sem candidatos")
|
||
return
|
||
|
||
state.update_task(fname, 'warning', 10, label="Aguardando Telegram...")
|
||
self.pending_future = asyncio.ensure_future(
|
||
self.bot.ask_user_choice(fname, result['candidates'])
|
||
)
|
||
try:
|
||
user_choice = await self.pending_future
|
||
except asyncio.CancelledError:
|
||
state.update_task(fname, 'error', 0, label="Cancelado Manualmente")
|
||
return
|
||
|
||
self.pending_future = None
|
||
if not user_choice or self.abort_flag:
|
||
state.update_task(fname, 'skipped', 0, label="Ignorado/Cancelado")
|
||
return
|
||
target_info = user_choice
|
||
else:
|
||
msg = result.get('msg', 'Desconhecido')
|
||
state.update_task(fname, 'error', 0, label="Não Identificado")
|
||
state.log(f"❌ Falha Identificação: {msg}")
|
||
return
|
||
|
||
if self.abort_flag: return
|
||
|
||
# --- METADADOS E CATEGORIA ---
|
||
try:
|
||
details = self.renamer.get_details(target_info['tmdb_id'], target_info['type'])
|
||
r_title = getattr(details, 'title', getattr(details, 'name', 'Unknown')) or 'Unknown'
|
||
|
||
# Extração de IDs e Códigos para a nova Lógica
|
||
# IDs de Gênero (Ex: [16, 28])
|
||
tmdb_genre_ids = [str(g['id']) for g in getattr(details, 'genres', [])]
|
||
# Países de Origem (Ex: ['JP', 'US'])
|
||
origin_countries = getattr(details, 'origin_country', [])
|
||
if isinstance(origin_countries, str): origin_countries = [origin_countries]
|
||
origin_countries = [c.upper() for c in origin_countries]
|
||
|
||
state.log(f"ℹ️ Dados: Gêneros={tmdb_genre_ids} | Países={origin_countries}")
|
||
|
||
full_details = {'title': r_title, 'year': '0000', 'type': target_info['type']}
|
||
d_date = getattr(details, 'release_date', getattr(details, 'first_air_date', '0000'))
|
||
if d_date: full_details['year'] = str(d_date)[:4]
|
||
|
||
# --- 2. CATEGORIA INTELIGENTE (NOVA LÓGICA) ---
|
||
category = self.find_best_category(target_info['type'], tmdb_genre_ids, origin_countries)
|
||
|
||
if not category:
|
||
state.update_task(fname, 'error', 0, label="Sem Categoria")
|
||
return
|
||
|
||
state.log(f"📂 Categoria Vencedora: {category.name}")
|
||
|
||
# --- 3. CONVERSÃO ---
|
||
guessed_data = result.get('guessed', {})
|
||
relative_path = self.renamer.build_path(category, full_details, guessed_data)
|
||
|
||
temp_filename = os.path.basename(relative_path)
|
||
temp_output = self.temp_dir / temp_filename
|
||
|
||
state.update_task(fname, 'running', progress=15, label=f"Convertendo: {full_details['title']}")
|
||
|
||
engine = FFmpegEngine()
|
||
total_duration_sec = engine.get_duration(str(filepath))
|
||
total_duration_us = total_duration_sec * 1000000
|
||
|
||
cmd = engine.build_command(str(filepath), str(temp_output))
|
||
cmd.insert(1, '-progress'); cmd.insert(2, 'pipe:1')
|
||
|
||
self.current_process = await asyncio.create_subprocess_exec(
|
||
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||
)
|
||
|
||
current_speed_str = ""
|
||
while True:
|
||
if self.abort_flag:
|
||
self.current_process.kill()
|
||
break
|
||
line_bytes = await self.current_process.stdout.readline()
|
||
if not line_bytes: break
|
||
line = line_bytes.decode('utf-8', errors='ignore').strip()
|
||
if '=' in line:
|
||
key, value = line.split('=', 1)
|
||
key = key.strip(); value = value.strip()
|
||
if key == 'out_time_us':
|
||
try:
|
||
current_us = int(value)
|
||
if total_duration_us > 0:
|
||
file_pct = (current_us / total_duration_us) * 100
|
||
if file_pct > 100: file_pct = 100
|
||
if file_pct < 0: file_pct = 0
|
||
global_pct = 15 + (file_pct * 0.84)
|
||
state.update_task(fname, 'running', progress=global_pct, file_progress=file_pct, speed=current_speed_str, label=f"Processando: {full_details['title']}")
|
||
except: pass
|
||
elif key == 'speed': current_speed_str = value
|
||
|
||
await self.current_process.wait()
|
||
|
||
if self.abort_flag:
|
||
state.update_task(fname, 'error', 0, label="Abortado")
|
||
if temp_output.exists(): os.remove(str(temp_output))
|
||
return
|
||
|
||
if self.current_process.returncode != 0:
|
||
err_log = await self.current_process.stderr.read()
|
||
state.log(f"❌ Erro FFmpeg: {err_log.decode('utf-8')[-200:]}")
|
||
state.update_task(fname, 'error', 0, label="Erro FFmpeg")
|
||
return
|
||
self.current_process = None
|
||
|
||
# --- 4. DEPLOY ---
|
||
state.update_task(fname, 'running', progress=99, label="Organizando...")
|
||
final_full_path = Path(category.target_path) / relative_path
|
||
final_full_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
if final_full_path.exists():
|
||
try: os.remove(str(final_full_path))
|
||
except: pass
|
||
|
||
shutil.move(str(temp_output), str(final_full_path))
|
||
|
||
if AppConfig.get_val('deploy_mode', 'move') == 'move':
|
||
try: os.remove(str(filepath))
|
||
except: pass
|
||
|
||
await self.bot.send_notification(f"🎬 Organizado: `{full_details['title']}`\n📂 {category.name}")
|
||
state.update_task(fname, 'done', 100, label=f"{full_details['title']}", file_progress=100, speed="Finalizado")
|
||
state.current_file = ""
|
||
|
||
if AppConfig.get_val('cleanup_empty_folders', 'true') == 'true':
|
||
try:
|
||
parent = filepath.parent
|
||
monitor_root = Path(AppConfig.get_val('monitor_path', '/downloads'))
|
||
if parent != monitor_root and not any(parent.iterdir()):
|
||
parent.rmdir()
|
||
except: pass
|
||
|
||
except Exception as e:
|
||
state.log(f"Erro Pipeline: {e}")
|
||
state.update_task(fname, 'error', 0, label=f"Erro: {e}")
|
||
|
||
def find_best_category(self, media_type, genre_ids, countries):
|
||
"""
|
||
Sistema de Pontuação 3.0 (Regras Estritas)
|
||
Agora compara IDs e Siglas, não texto.
|
||
"""
|
||
all_cats = list(Category.select())
|
||
if not all_cats: return None
|
||
|
||
candidates = []
|
||
|
||
# 1. Filtro Rígido de TIPO (Movie vs Series)
|
||
for cat in all_cats:
|
||
if media_type == 'movie' and cat.content_type == 'series': continue
|
||
if media_type != 'movie' and cat.content_type == 'movie': continue
|
||
candidates.append(cat)
|
||
|
||
if not candidates: return all_cats[0]
|
||
|
||
scored_cats = []
|
||
for cat in candidates:
|
||
score = 0
|
||
|
||
# Carrega filtros da categoria
|
||
cat_genres = cat.genre_filters.split(',') if cat.genre_filters else []
|
||
cat_countries = cat.country_filters.split(',') if cat.country_filters else []
|
||
cat_genres = [g for g in cat_genres if g] # Limpa vazios
|
||
cat_countries = [c for c in cat_countries if c]
|
||
|
||
# --- Lógica de Correspondência ---
|
||
match_genre = False
|
||
match_country = False
|
||
|
||
# Verifica Gêneros (Se a categoria tiver filtros, TEM que bater)
|
||
if cat_genres:
|
||
# Se o arquivo tiver pelo menos UM dos gêneros da lista
|
||
if any(gid in cat_genres for gid in genre_ids):
|
||
match_genre = True
|
||
score += 50 # Ganha muitos pontos por match específico
|
||
else:
|
||
# Se a categoria exige gênero e o arquivo não tem -> Categoria Descartada
|
||
score = -1000
|
||
else:
|
||
# Categoria genérica de gênero (aceita tudo)
|
||
match_genre = True
|
||
score += 10 # Pontuação base baixa
|
||
|
||
# Verifica Países
|
||
if cat_countries:
|
||
if any(cc in cat_countries for cc in countries):
|
||
match_country = True
|
||
score += 50 # Ganha pontos por match de país
|
||
else:
|
||
score = -1000 # Exige país mas não bate -> Descartada
|
||
else:
|
||
match_country = True
|
||
score += 10
|
||
|
||
# Bônus se for categoria Mista (geralmente usada para Anime/Desenho)
|
||
if cat.content_type == 'mixed' and score > 0:
|
||
score += 5
|
||
|
||
scored_cats.append((score, cat))
|
||
|
||
# Ordena
|
||
scored_cats.sort(key=lambda x: x[0], reverse=True)
|
||
best_match = scored_cats[0]
|
||
|
||
# Se a pontuação for negativa, significa que nenhuma regra bateu.
|
||
# Devemos pegar uma categoria genérica (sem filtros)
|
||
if best_match[0] < 0:
|
||
for cat in candidates:
|
||
if not cat.genre_filters and not cat.country_filters:
|
||
return cat
|
||
|
||
return best_match[1] |