Files
clei-flow/app/core/bot.py

163 lines
5.8 KiB
Python
Executable File

import asyncio
import httpx
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, ContextTypes, CallbackQueryHandler, CommandHandler
from database import AppConfig
import logging
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("telegram").setLevel(logging.INFO)
class TelegramManager:
def __init__(self):
self.app = None
self.active_requests = {}
self.is_connected = False
async def check_internet(self):
"""Testa se o container tem internet"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
await client.get("https://www.google.com")
return True
except:
return False
async def start(self):
"""Inicia o Bot"""
token = AppConfig.get_val('telegram_token')
chat_id = AppConfig.get_val('telegram_chat_id')
if not token:
print("🟡 Bot: Token não configurado.")
return
if self.is_connected: return
print("🤖 Bot: Iniciando conexão...")
if not await self.check_internet():
print("❌ Bot: Sem internet.")
return
try:
self.app = ApplicationBuilder().token(token).build()
self.app.add_handler(CommandHandler("start", self.cmd_start))
self.app.add_handler(CommandHandler("id", self.cmd_id))
self.app.add_handler(CallbackQueryHandler(self.handle_selection))
await self.app.initialize()
await self.app.start()
await self.app.updater.start_polling(drop_pending_updates=True)
self.is_connected = True
print("✅ Bot Online!")
except Exception as e:
print(f"❌ Falha no Bot: {e}")
self.is_connected = False
async def stop(self):
"""Para o Bot"""
if self.app:
try:
if self.app.updater.running:
await self.app.updater.stop()
if self.app.running:
await self.app.stop()
await self.app.shutdown()
except Exception as e:
print(f"Erro ao parar bot: {e}")
finally:
self.is_connected = False
self.app = None
async def restart(self):
"""Reinicia a conexão (Útil após mudar token)"""
print("🔄 Bot: Reiniciando serviço...")
await self.stop()
await asyncio.sleep(1)
await self.start()
# --- COMANDOS ---
async def cmd_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
await update.message.reply_text(f"Olá! Configurado.\nSeu Chat ID é: `{chat_id}`", parse_mode='Markdown')
async def cmd_id(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(f"`{update.effective_chat.id}`", parse_mode='Markdown')
async def send_test_msg(self):
"""Envia mensagem de teste"""
if not self.is_connected: return False
chat_id = AppConfig.get_val('telegram_chat_id')
if not chat_id: return False
try:
await self.app.bot.send_message(chat_id=chat_id, text="🚀 Clei-Flow: Teste de conexão bem sucedido!")
return True
except:
return False
# --- INTERAÇÃO ---
async def ask_user_choice(self, filename, candidates):
chat_id = AppConfig.get_val('telegram_chat_id')
if not chat_id or not self.is_connected: return None
request_id = f"req_{filename}"
keyboard = []
for cand in candidates:
text = f"{cand['title']} ({cand['year']})"
callback_data = f"{request_id}|{cand['tmdb_id']}|{cand['type']}"
keyboard.append([InlineKeyboardButton(text, callback_data=callback_data)])
keyboard.append([InlineKeyboardButton("🚫 Ignorar", callback_data=f"{request_id}|IGNORE|NONE")])
reply_markup = InlineKeyboardMarkup(keyboard)
try:
await self.app.bot.send_message(
chat_id=chat_id,
text=f"🤔 <b>Clei-Flow:</b>\nArquivo: <code>{filename}</code>",
reply_markup=reply_markup,
parse_mode='HTML'
)
except: return None
loop = asyncio.get_running_loop()
future = loop.create_future()
self.active_requests[request_id] = future
try:
result = await asyncio.wait_for(future, timeout=43200)
return result
except asyncio.TimeoutError:
if request_id in self.active_requests: del self.active_requests[request_id]
return None
async def handle_selection(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
data = query.data.split('|')
if len(data) < 3: return
req_id = data[0]
tmdb_id = data[1]
media_type = data[2]
if req_id in self.active_requests:
future = self.active_requests[req_id]
if tmdb_id == 'IGNORE':
await query.edit_message_text(text=f"🚫 Ignorado.")
future.set_result(None)
else:
await query.edit_message_text(text=f"✅ Processando...")
future.set_result({'tmdb_id': int(tmdb_id), 'type': media_type})
del self.active_requests[req_id]
else:
await query.edit_message_text(text="⚠️ Expirado.")
async def send_notification(self, message):
chat_id = AppConfig.get_val('telegram_chat_id')
if self.app and chat_id and self.is_connected:
try:
await self.app.bot.send_message(chat_id=chat_id, text=message)
except: pass