Files
clei-flow/app/ui/manual_tools.py

550 lines
26 KiB
Python
Executable File

from nicegui import ui, run, app
import os
import shutil
import asyncio
import datetime
import subprocess
import json
import hashlib
from pathlib import Path
from core.state import state # Para integrar com o Watcher
# --- CONFIGURAÇÕES ---
ROOT_DIR = "/downloads"
THUMB_DIR = "/app/data/thumbs"
os.makedirs(THUMB_DIR, exist_ok=True)
# Registra rota para servir as miniaturas
app.add_static_files('/thumbs', THUMB_DIR)
# Mapeamento de Extensões para Ícones e Cores
ICON_MAP = {
'video': {'icon': 'movie', 'color': 'purple-6'},
'subtitle': {'icon': 'subtitles', 'color': 'orange-5', 'exts': {'.srt', '.sub', '.ass', '.vtt'}},
'image': {'icon': 'image', 'color': 'pink-5', 'exts': {'.jpg', '.jpeg', '.png', '.gif', '.webp'}},
'text': {'icon': 'description', 'color': 'grey-7', 'exts': {'.txt', '.nfo', '.log', '.md'}},
'folder': {'icon': 'folder', 'color': 'amber-8'},
'default': {'icon': 'insert_drive_file', 'color': 'blue-grey'}
}
# --- GERENCIADOR DE MINIATURAS (FILA) ---
class ThumbnailManager:
def __init__(self):
self.queue = asyncio.Queue()
self.processing = False
async def add(self, filepath, img_element):
"""Adiciona um vídeo na fila para gerar thumbnail"""
vid_hash = hashlib.md5(filepath.encode()).hexdigest()
thumb_path = os.path.join(THUMB_DIR, f"{vid_hash}.jpg")
# Se já existe, mostra direto
if os.path.exists(thumb_path):
img_element.set_source(f'/thumbs/{vid_hash}.jpg')
img_element.classes(remove='opacity-0')
return
# Se não, põe na fila
await self.queue.put((filepath, thumb_path, img_element))
if not self.processing:
asyncio.create_task(self.process_queue())
async def process_queue(self):
self.processing = True
while not self.queue.empty():
try:
filepath, thumb_path, img_element = await self.queue.get()
# Verifica se o elemento de imagem ainda existe na tela (se o usuário não mudou de pasta)
if img_element.is_deleted:
continue
if not os.path.exists(thumb_path):
# Gera thumbnail (frame em 10s)
cmd = [
'ffmpeg', '-y', '-ss', '00:00:10', '-i', filepath,
'-frames:v', '1', '-vf', 'scale=320:-1', '-q:v', '5', thumb_path
]
await run.io_bound(subprocess.run, cmd, capture_output=True)
if os.path.exists(thumb_path):
# Força atualização da imagem (timestamp evita cache do navegador)
img_element.set_source(f'/thumbs/{os.path.basename(thumb_path)}?t={datetime.datetime.now().timestamp()}')
img_element.classes(remove='opacity-0')
# Pequena pausa para não travar a CPU
await asyncio.sleep(0.05)
except Exception as e:
print(f"Erro Thumb: {e}")
self.processing = False
thumb_manager = ThumbnailManager()
# --- UTILITÁRIOS ---
async def get_human_size(size):
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024: return f"{size:.2f} {unit}"
size /= 1024
return f"{size:.2f} TB"
def get_file_type(filename):
ext = os.path.splitext(filename)[1].lower()
if ext in ['.mkv', '.mp4', '.avi', '.mov', '.wmv']: return 'video'
for type_name, data in ICON_MAP.items():
if 'exts' in data and ext in data['exts']: return type_name
return 'default'
async def get_media_info_async(filepath):
def _probe():
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", filepath]
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
return json.loads(res.stdout)
except: return None
data = await run.io_bound(_probe)
if not data: return None
fmt = data.get('format', {})
info = {
"filename": os.path.basename(filepath),
"size": int(fmt.get('size', 0)),
"duration": float(fmt.get('duration', 0)),
"bitrate": int(fmt.get('bit_rate', 0)),
"video": [], "audio": [], "subtitle": []
}
for s in data.get('streams', []):
stype = s.get('codec_type')
lang = s.get('tags', {}).get('language', 'und').upper()
codec = s.get('codec_name', 'unknown').upper()
if stype == 'video':
info['video'].append({"codec": codec, "res": f"{s.get('width','?')}x{s.get('height','?')}", "fps": s.get('r_frame_rate', '')})
elif stype == 'audio':
info['audio'].append({"lang": lang, "codec": codec, "ch": s.get('channels', 0), "title": s.get('tags', {}).get('title', '')})
elif stype == 'subtitle':
info['subtitle'].append({"lang": lang, "codec": codec})
return info
# --- CLASSE GERENCIADORA ---
class FileManager:
def __init__(self):
self.path = ROOT_DIR
self.view_mode = 'grid'
self.is_selecting = False
self.selected_items = set()
self.cached_entries = []
self.refreshing = False
# Elementos UI
self.container_content = None
self.footer = None
self.lbl_selection_count = None
self.btn_select_mode = None
self.header_row = None
# Bind Teclado Global
self.keyboard = ui.keyboard(on_key=self.handle_key)
async def handle_key(self, e):
if not e.action.keydown: return
if e.key == 'F2':
if len(self.selected_items) == 1:
self.open_rename_dialog(list(self.selected_items)[0])
elif e.key == 'Delete':
await self.confirm_delete_selected()
elif e.key == 'Escape':
if self.is_selecting: self.toggle_select_mode()
elif e.key == 'a' and e.modifiers.ctrl:
if not self.is_selecting: self.toggle_select_mode()
self.select_all(self.cached_entries)
# --- WRAPPERS SEGUROS PARA LAMBDA ASYNC ---
# Isso resolve o erro "coroutine never awaited"
def safe_nav(self, path):
asyncio.create_task(self.navigate(path))
def safe_nav_up(self):
asyncio.create_task(self.navigate_up())
# --- NAVEGAÇÃO ---
async def navigate(self, path):
if os.path.exists(path) and os.path.isdir(path):
self.path = path
self.selected_items.clear()
self.is_selecting = False
self.update_footer_state()
if self.header_row:
self.header_row.clear()
with self.header_row: self.build_header_content()
await self.refresh()
else:
ui.notify('Caminho inválido.', type='negative')
async def navigate_up(self):
parent = os.path.dirname(self.path)
if parent and os.path.exists(parent): await self.navigate(parent)
# --- UPLOAD ---
def open_upload_dialog(self):
with ui.dialog() as dialog, ui.card():
ui.label(f'Upload para: {os.path.basename(self.path)}').classes('font-bold')
async def handle(e):
try:
name = None
if hasattr(e, 'file'):
name = getattr(e.file, 'filename', None) or getattr(e.file, 'name', None)
if not name: name = getattr(e, 'name', 'arquivo_sem_nome')
content = b''
if hasattr(e, 'content'): content = e.content.read()
elif hasattr(e, 'file'):
if hasattr(e.file, 'seek'): await e.file.seek(0)
if hasattr(e.file, 'read'): content = await e.file.read()
if not content: ui.notify('Erro: Arquivo vazio', type='warning'); return
target = os.path.join(self.path, name)
await run.io_bound(self._save_file, target, content)
ui.notify(f'Sucesso: {name}', type='positive')
except Exception as ex: ui.notify(f'Erro: {ex}', type='negative')
ui.upload(on_upload=handle, auto_upload=True, multiple=True).props('accept=*').classes('w-full')
ui.button('Fechar', on_click=lambda: (dialog.close(), self.safe_nav(self.path))).props('flat w-full')
dialog.open()
def _save_file(self, target, content):
with open(target, 'wb') as f: f.write(content)
# --- EDITOR DE TEXTO ---
async def open_text_editor(self, filepath):
if os.path.getsize(filepath) > 1024 * 1024:
ui.notify('Arquivo muito grande.', type='warning'); return
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: content = f.read()
except: ui.notify('Erro ao ler arquivo.', type='negative'); return
with ui.dialog() as dialog, ui.card().classes('w-full max-w-4xl h-[80vh] flex flex-col'):
ui.label(f'Editando: {os.path.basename(filepath)}').classes('font-bold')
editor = ui.textarea(value=content).classes('w-full flex-grow font-mono text-sm').props('outlined')
async def save():
try:
with open(filepath, 'w', encoding='utf-8') as f: f.write(editor.value)
ui.notify('Salvo!', type='positive'); dialog.close()
except Exception as e: ui.notify(f'Erro: {e}', type='negative')
with ui.row().classes('w-full justify-end'):
ui.button('Cancelar', on_click=dialog.close).props('flat')
ui.button('Salvar', on_click=save).props('icon=save color=green')
dialog.open()
# --- BOT INTEGRATION ---
async def send_to_cleiflow(self, filepath):
if not state.watcher: ui.notify('Watcher offline.', type='warning'); return
ui.notify(f'Processando {os.path.basename(filepath)}...', type='info')
asyncio.create_task(state.watcher.process_pipeline(Path(filepath)))
# --- EXCLUSÃO SEGURA ---
async def confirm_delete(self, items_to_delete):
if not items_to_delete: return
is_single = len(items_to_delete) == 1
title = "Confirmar Exclusão"
msg = f"Excluir permanentemente:\n{os.path.basename(items_to_delete[0])}?" if is_single else f"Excluir {len(items_to_delete)} itens selecionados?"
with ui.dialog() as dialog, ui.card():
ui.label(title).classes('text-xl font-bold text-red-600')
ui.label(msg).classes('text-gray-700 whitespace-pre-wrap')
async def execute():
dialog.close()
count = 0
for item in items_to_delete:
try:
if os.path.isdir(item): await run.io_bound(shutil.rmtree, item)
else: await run.io_bound(os.remove, item)
count += 1
except: pass
ui.notify(f'{count} excluídos.', type='positive')
self.selected_items.clear()
self.update_footer_state()
await self.refresh()
with ui.row().classes('w-full justify-end mt-4'):
ui.button('Cancelar', on_click=dialog.close).props('flat')
ui.button('EXCLUIR', on_click=execute).props('color=red icon=delete')
dialog.open()
async def confirm_delete_selected(self):
await self.confirm_delete(list(self.selected_items))
# --- MOVIMENTAÇÃO ---
async def open_move_dialog(self, target_items=None):
items = target_items if target_items else list(self.selected_items)
if not items: return
browser_path = ROOT_DIR
with ui.dialog() as dialog, ui.card().classes('w-96 h-[500px] flex flex-col p-4'):
ui.label(f'Mover {len(items)} item(s)').classes('font-bold')
lbl_path = ui.label(ROOT_DIR).classes('text-xs bg-gray-100 p-2 w-full truncate border rounded')
scroll = ui.scroll_area().classes('flex-grow border rounded p-1 bg-white')
async def load_folders(p):
nonlocal browser_path; browser_path = p
lbl_path.text = p; scroll.clear()
try:
entries = await run.io_bound(os.scandir, p)
sorted_e = sorted([e for e in entries if e.is_dir() and not e.name.startswith('.')], key=lambda e: e.name.lower())
with scroll:
parent = os.path.dirname(p)
if parent and os.path.exists(parent):
ui.button('..', on_click=lambda: load_folders(parent)).props('flat dense icon=arrow_upward w-full align=left')
for e in sorted_e:
ui.button(e.name, on_click=lambda path=e.path: load_folders(path)).props('flat dense w-full align=left icon=folder color=amber')
except: pass
ui.timer(0, lambda: load_folders(ROOT_DIR), once=True)
async def execute():
dialog.close(); ui.notify('Movendo...', type='info')
count = 0
for item in items:
try:
tgt = os.path.join(browser_path, os.path.basename(item))
if item != tgt:
await run.io_bound(shutil.move, item, tgt)
count += 1
except Exception as e: ui.notify(f"Erro: {e}", type='negative')
ui.notify(f'{count} movidos!', type='positive')
if not target_items: self.selected_items.clear()
self.update_footer_state()
await self.refresh()
with ui.row().classes('w-full justify-between mt-auto'):
ui.button('Cancelar', on_click=dialog.close).props('flat')
ui.button('Mover Aqui', on_click=execute).props('color=green icon=drive_file_move')
dialog.open()
# --- UI HELPERS ---
def open_rename_dialog(self, path):
with ui.dialog() as dialog, ui.card():
ui.label('Renomear'); name = ui.input('Novo Nome', value=os.path.basename(path)).classes('w-full')
async def save():
try:
new_path = os.path.join(os.path.dirname(path), name.value)
await run.io_bound(os.rename, path, new_path)
dialog.close(); await self.refresh()
except Exception as e: ui.notify(str(e), type='negative')
ui.button('Salvar', on_click=save).props('color=blue')
dialog.open()
def open_create_folder(self):
with ui.dialog() as dialog, ui.card():
ui.label('Nova Pasta'); name = ui.input('Nome')
async def create():
try:
await run.io_bound(os.makedirs, os.path.join(self.path, name.value))
dialog.close(); await self.refresh()
except Exception as e: ui.notify(str(e), type='negative')
ui.button('Criar', on_click=create).props('color=green')
dialog.open()
def toggle_select_mode(self):
self.is_selecting = not self.is_selecting
if not self.is_selecting: self.selected_items.clear()
self.update_footer_state()
self.refresh_ui_only()
def toggle_selection(self, item_path):
if item_path in self.selected_items: self.selected_items.remove(item_path)
else: self.selected_items.add(item_path)
self.update_footer_state()
self.refresh_ui_only()
def select_all(self, entries):
current = {e.path for e in entries}
if self.selected_items.issuperset(current): self.selected_items.difference_update(current)
else: self.selected_items.update(current)
self.update_footer_state()
self.refresh_ui_only()
def update_footer_state(self):
if self.footer:
self.footer.set_visibility(self.is_selecting)
if self.lbl_selection_count: self.lbl_selection_count.text = f'{len(self.selected_items)} item(s)'
# --- LAYOUT PRINCIPAL ---
def create_layout(self):
self.header_row = ui.row().classes('w-full items-center bg-gray-100 p-2 rounded-lg gap-2 sticky top-0 z-20 shadow-sm')
with self.header_row: self.build_header_content()
self.container_content = ui.column().classes('w-full gap-4 pb-24')
with ui.row().classes('fixed bottom-0 left-0 w-full z-50 bg-white border-t p-2 justify-center gap-4 shadow-[0_-4px_10px_rgba(0,0,0,0.1)]') as f:
self.footer = f
self.lbl_selection_count = ui.label('0 selecionados').classes('font-bold self-center')
ui.button('Mover', on_click=lambda: self.open_move_dialog(None)).props('color=amber icon=drive_file_move dense')
ui.button('Excluir', on_click=self.confirm_delete_selected).props('color=red icon=delete dense')
ui.button(icon='close', on_click=self.toggle_select_mode).props('flat round dense')
self.footer.visible = False
def build_header_content(self):
if self.path != ROOT_DIR:
ui.button(icon='arrow_upward', on_click=self.safe_nav_up).props('flat round dense')
else:
ui.button(icon='home').props('flat round dense disabled text-color=grey')
rel = os.path.relpath(self.path, ROOT_DIR) if self.path.startswith(ROOT_DIR) else self.path
parts = rel.split(os.sep) if rel != '.' else []
with ui.row().classes('items-center gap-0 overflow-hidden flex-grow'):
# CORREÇÃO: Usando safe_nav para evitar erro de coroutine
ui.button('root', on_click=lambda: self.safe_nav(ROOT_DIR)).props('flat dense no-caps min-w-0 px-1 text-xs')
acc = ROOT_DIR
for part in parts:
acc = os.path.join(acc, part)
ui.label('/').classes('text-gray-400')
ui.button(part, on_click=lambda p=acc: self.safe_nav(p)).props('flat dense no-caps min-w-0 px-1 text-xs')
self.btn_select_mode = ui.button(icon='check_box', on_click=self.toggle_select_mode).props('flat round dense')
ui.button(icon='create_new_folder', on_click=self.open_create_folder).props('flat round dense')
ui.button(icon='cloud_upload', on_click=self.open_upload_dialog).props('flat round dense')
ui.button(icon='view_list', on_click=self.toggle_view).props('flat round dense')
ui.button(icon='refresh', on_click=lambda: self.safe_nav(self.path)).props('flat round dense')
def toggle_view(self):
self.view_mode = 'list' if self.view_mode == 'grid' else 'grid'
self.refresh_ui_only()
# --- REFRESH ---
async def refresh(self):
if self.refreshing: return
self.refreshing = True
try:
entries = await run.io_bound(os.scandir, self.path)
self.cached_entries = sorted(entries, key=lambda e: (not e.is_dir(), e.name.lower()))
except Exception as e:
self.cached_entries = []
ui.notify(f"Erro: {e}", type='negative')
self.refresh_ui_only()
self.refreshing = False
def refresh_ui_only(self):
if self.container_content:
self.container_content.clear()
color = 'green' if self.is_selecting else 'grey'
if self.btn_select_mode: self.btn_select_mode.props(f'text-color={color}')
if self.is_selecting:
with self.container_content:
with ui.row().classes('w-full px-2 py-1 bg-green-50 items-center justify-between text-xs text-green-800 rounded border border-green-200'):
ui.checkbox(f'Todos ({len(self.cached_entries)})', on_change=lambda: self.select_all(self.cached_entries)).props('dense size=xs')
with self.container_content:
if not self.cached_entries:
ui.label('Pasta Vazia').classes('w-full text-center text-gray-400 mt-10')
elif self.view_mode == 'grid':
with ui.grid().classes('w-full grid-cols-2 sm:grid-cols-3 md:grid-cols-4 lg:grid-cols-6 gap-3'):
for entry in self.cached_entries: self.render_card(entry)
else:
with ui.column().classes('w-full gap-0'):
for entry in self.cached_entries: self.render_list_item(entry)
def render_card(self, entry):
is_sel = entry.path in self.selected_items
is_dir = entry.is_dir()
ftype = 'folder' if is_dir else get_file_type(entry.name)
icon = ICON_MAP[ftype]['icon']
color = ICON_MAP[ftype]['color']
bg = 'bg-green-100 ring-2 ring-green-500' if is_sel else 'bg-white hover:shadow-md'
with ui.card().classes(f'w-full aspect-square p-2 items-center justify-center relative group cursor-pointer select-none {bg}') as card:
self.bind_context_menu(card, entry)
if ftype == 'video' and not is_dir:
img = ui.image().classes('w-full h-full object-cover rounded opacity-0 transition-opacity duration-500 absolute top-0 left-0')
asyncio.create_task(thumb_manager.add(entry.path, img))
with ui.column().classes('items-center justify-center z-0'):
ui.icon(icon, size='3rem', color=color)
else:
ui.icon(icon, size='3rem', color=color)
if self.is_selecting:
card.on('click', lambda: self.toggle_selection(entry.path))
ui.checkbox(value=is_sel, on_change=lambda: self.toggle_selection(entry.path)).props('dense').classes('absolute top-1 left-1 z-10').on('click', lambda e: e.stop_propagation())
else:
# CORREÇÃO: Usando safe_nav aqui também
if is_dir: card.on('click', lambda e, p=entry.path: self.safe_nav(p))
elif ftype == 'video': card.on('click', lambda e, p=entry.path: self.open_inspector(p))
elif ftype in ['subtitle', 'text']: card.on('click', lambda e, p=entry.path: self.open_text_editor(p))
ui.label(entry.name).classes('text-xs text-center leading-tight line-clamp-2 w-full break-all mt-auto z-10 bg-white/80 rounded px-1')
def render_list_item(self, entry):
is_sel = entry.path in self.selected_items
is_dir = entry.is_dir()
ftype = 'folder' if is_dir else get_file_type(entry.name)
icon = ICON_MAP[ftype]['icon']
color = ICON_MAP[ftype]['color']
bg = 'bg-green-100' if is_sel else 'hover:bg-gray-50'
with ui.row().classes(f'w-full items-center px-2 py-2 border-b cursor-pointer group {bg}') as row:
if self.is_selecting:
ui.checkbox(value=is_sel, on_change=lambda: self.toggle_selection(entry.path)).props('dense')
row.on('click', lambda: self.toggle_selection(entry.path))
else:
if is_dir: row.on('click', lambda p=entry.path: self.safe_nav(p))
elif ftype in ['subtitle', 'text']: row.on('click', lambda p=entry.path: self.open_text_editor(p))
self.bind_context_menu(row, entry)
ui.icon(icon, color=color.split('-')[0])
ui.label(entry.name).classes('text-sm font-medium flex-grow truncate')
def bind_context_menu(self, element, entry):
ftype = 'folder' if entry.is_dir() else get_file_type(entry.name)
with ui.menu() as m:
if ftype == 'video':
ui.menu_item('Media Info', on_click=lambda: self.open_inspector(entry.path))
ui.menu_item('Processar no Clei-Flow', on_click=lambda: self.send_to_cleiflow(entry.path))
if ftype in ['subtitle', 'text']:
ui.menu_item('Editar Texto', on_click=lambda: self.open_text_editor(entry.path))
ui.menu_item('Renomear', on_click=lambda: self.open_rename_dialog(entry.path))
ui.menu_item('Mover Para...', on_click=lambda: self.open_move_dialog([entry.path]))
ui.menu_item('Excluir', on_click=lambda: self.confirm_delete([entry.path])).props('text-color=red')
element.on('contextmenu.prevent', lambda: m.open())
# --- INSPECTOR ---
async def open_inspector(self, path):
dialog = ui.dialog()
with dialog, ui.card().classes('w-full max-w-3xl'):
with ui.row().classes('w-full justify-between items-start'):
ui.label(os.path.basename(path)).classes('text-lg font-bold break-all w-10/12 text-blue-800')
ui.button(icon='close', on_click=dialog.close).props('flat dense round')
content = ui.column().classes('w-full')
with content: ui.spinner('dots').classes('self-center')
dialog.open()
info = await get_media_info_async(path)
content.clear()
if not info:
with content: ui.label('Erro ao ler metadados.').classes('text-red'); return
with content:
with ui.row().classes('w-full bg-blue-50 p-2 rounded mb-4 gap-4'):
ui.label(f"⏱️ {datetime.timedelta(seconds=int(info['duration']))}")
ui.label(f"📦 {await get_human_size(info['size'])}")
ui.label(f"🚀 {int(info['bitrate']/1000)} kbps")
ui.label('Vídeo').classes('text-xs font-bold text-gray-500 uppercase mt-2')
for v in info['video']:
with ui.card().classes('w-full p-2 bg-gray-50 border-l-4 border-blue-500'):
ui.label(f"{v['codec']}{v['res']}{v['fps']} fps").classes('font-bold')
ui.label('Áudio').classes('text-xs font-bold text-gray-500 uppercase mt-4')
if info['audio']:
with ui.grid().classes('grid-cols-[auto_1fr_auto] w-full gap-2 text-sm'):
for a in info['audio']:
ui.label(a['lang']).classes('font-bold bg-gray-200 px-2 rounded text-center')
ui.label(f"{a['codec']} - {a['title']}")
ui.label(str(a['ch'])).classes('text-gray-500')
ui.label('Legendas').classes('text-xs font-bold text-gray-500 uppercase mt-4')
if info['subtitle']:
with ui.row().classes('w-full gap-2'):
for s in info['subtitle']:
color = 'green' if s['codec'] in ['subrip', 'ass'] else 'grey'
ui.chip(f"{s['lang']} ({s['codec']})", color=color).props('dense icon=subtitles')
def show():
fm = FileManager()
fm.create_layout()
# Usando safe_nav para evitar o erro inicial também
ui.timer(0, lambda: fm.safe_nav(ROOT_DIR), once=True)