diff --git a/app.py b/app.py index 5f169ee..be99c41 100644 --- a/app.py +++ b/app.py @@ -11131,6 +11131,354 @@ def admin_zopk_generate_embeddings(): db.close() +# ============================================================ +# ZOPK SSE ENDPOINTS (Server-Sent Events for Progress Tracking) +# ============================================================ + +def sse_progress_generator(operation_func, db, **kwargs): + """ + Generic SSE generator for progress tracking. + + Args: + operation_func: Function to call (must accept progress_callback) + db: Database session + **kwargs: Additional arguments for operation_func + + Yields: + SSE formatted progress events + """ + import json + from dataclasses import asdict + + progress_queue = [] + + def progress_callback(update): + progress_queue.append(update) + + def run_operation(): + try: + result = operation_func(progress_callback=progress_callback, **kwargs) + return result + except Exception as e: + logger.error(f"SSE operation error: {e}") + return {'error': str(e)} + + # Start operation in separate thread + import threading + result_container = [None] + + def thread_target(): + result_container[0] = run_operation() + + thread = threading.Thread(target=thread_target) + thread.start() + + # Yield progress updates while thread is running + while thread.is_alive() or progress_queue: + while progress_queue: + update = progress_queue.pop(0) + data = asdict(update) + yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" + + if thread.is_alive(): + import time + time.sleep(0.1) + + thread.join() + + # Send final result + final_result = result_container[0] or {} + yield f"data: {json.dumps({'type': 'result', **final_result}, ensure_ascii=False)}\n\n" + + +@app.route('/admin/zopk/news/scrape-content/stream', methods=['GET']) +@login_required +def admin_zopk_scrape_content_stream(): + """ + SSE endpoint for streaming scrape progress. + + Query params: + - limit: int (default 30) - max articles to scrape + - force: bool (default false) - re-scrape already scraped + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 + + from zopk_content_scraper import ZOPKContentScraper + + limit = min(int(request.args.get('limit', 30)), 100) + force = request.args.get('force', 'false').lower() == 'true' + user_id = current_user.id + + def generate(): + import json + from dataclasses import asdict + + db = SessionLocal() + try: + scraper = ZOPKContentScraper(db, user_id=user_id) + + def progress_callback(update): + data = asdict(update) + yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" + + # This won't work with generator, need different approach + # Use a queue-based approach instead + + progress_updates = [] + + def queue_callback(update): + progress_updates.append(update) + + # Run in this thread, yielding updates as they come + from zopk_content_scraper import ZOPKContentScraper, ProgressUpdate + import time + + # Get articles to scrape + from database import ZOPKNews + query = db.query(ZOPKNews).filter( + ZOPKNews.status.in_(['approved', 'auto_approved']) + ) + + if not force: + from zopk_content_scraper import MAX_RETRY_ATTEMPTS + query = query.filter(ZOPKNews.scrape_status.in_(['pending', 'failed'])) + query = query.filter( + (ZOPKNews.scrape_status == 'pending') | + ((ZOPKNews.scrape_status == 'failed') & (ZOPKNews.scrape_attempts < MAX_RETRY_ATTEMPTS)) + ) + + query = query.order_by(ZOPKNews.created_at.desc()) + articles = query.limit(limit).all() + total = len(articles) + + if total == 0: + yield f"data: {json.dumps({'status': 'complete', 'message': 'Brak artykułów do scrapowania', 'total': 0}, ensure_ascii=False)}\n\n" + return + + # Send initial + yield f"data: {json.dumps({'current': 0, 'total': total, 'percent': 0, 'stage': 'scraping', 'status': 'processing', 'message': f'Rozpoczynam scraping {total} artykułów...'}, ensure_ascii=False)}\n\n" + + stats = {'scraped': 0, 'failed': 0, 'skipped': 0} + start_time = time.time() + + for idx, article in enumerate(articles, 1): + # Send processing update + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round((idx-1)/total*100, 1), 'stage': 'scraping', 'status': 'processing', 'message': f'Pobieram: {article.title[:50]}...', 'article_id': article.id, 'article_title': article.title[:80], 'details': {'source': article.source_name or 'nieznane', **stats}}, ensure_ascii=False)}\n\n" + + result = scraper.scrape_article(article.id) + + if result.status == 'scraped': + stats['scraped'] += 1 + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round(idx/total*100, 1), 'stage': 'scraping', 'status': 'success', 'message': f'✓ {result.word_count} słów: {article.title[:40]}...', 'article_id': article.id, 'details': {'word_count': result.word_count, **stats}}, ensure_ascii=False)}\n\n" + elif result.status == 'skipped': + stats['skipped'] += 1 + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round(idx/total*100, 1), 'stage': 'scraping', 'status': 'skipped', 'message': f'⊘ Pominięto: {article.title[:40]}...', 'article_id': article.id, 'details': stats}, ensure_ascii=False)}\n\n" + else: + stats['failed'] += 1 + error_msg = result.error[:50] if result.error else 'Nieznany błąd' + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round(idx/total*100, 1), 'stage': 'scraping', 'status': 'failed', 'message': f'✗ {error_msg}', 'article_id': article.id, 'details': {'error': result.error, **stats}}, ensure_ascii=False)}\n\n" + + processing_time = round(time.time() - start_time, 2) + + # Send completion + scraped_count = stats['scraped'] + failed_count = stats['failed'] + skipped_count = stats['skipped'] + complete_msg = f'Zakończono: {scraped_count} pobrano, {failed_count} błędów, {skipped_count} pominięto' + complete_data = {'current': total, 'total': total, 'percent': 100, 'stage': 'scraping', 'status': 'complete', 'message': complete_msg, 'details': {'processing_time': processing_time, **stats}} + yield f"data: {json.dumps(complete_data, ensure_ascii=False)}\n\n" + + except Exception as e: + logger.error(f"SSE scraping error: {e}") + yield f"data: {json.dumps({'status': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n" + finally: + db.close() + + return Response(generate(), mimetype='text/event-stream', headers={ + 'Cache-Control': 'no-cache', + 'X-Accel-Buffering': 'no' + }) + + +@app.route('/admin/zopk/knowledge/extract/stream', methods=['GET']) +@login_required +def admin_zopk_knowledge_extract_stream(): + """ + SSE endpoint for streaming knowledge extraction progress. + + Query params: + - limit: int (default 10) - max articles to process + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 + + limit = min(int(request.args.get('limit', 10)), 50) + user_id = current_user.id + + def generate(): + import json + import time + + db = SessionLocal() + try: + from zopk_knowledge_service import ZOPKKnowledgeService + from database import ZOPKNews + + service = ZOPKKnowledgeService(db, user_id=user_id) + + # Find articles ready for extraction + articles = db.query(ZOPKNews).filter( + ZOPKNews.status.in_(['approved', 'auto_approved']), + ZOPKNews.scrape_status == 'scraped', + ZOPKNews.knowledge_extracted == False + ).order_by( + ZOPKNews.created_at.desc() + ).limit(limit).all() + + total = len(articles) + + if total == 0: + yield f"data: {json.dumps({'status': 'complete', 'message': 'Brak artykułów do ekstrakcji', 'total': 0}, ensure_ascii=False)}\n\n" + return + + # Send initial + yield f"data: {json.dumps({'current': 0, 'total': total, 'percent': 0, 'stage': 'extracting', 'status': 'processing', 'message': f'Rozpoczynam ekstrakcję z {total} artykułów...'}, ensure_ascii=False)}\n\n" + + stats = {'success': 0, 'failed': 0, 'chunks': 0, 'facts': 0, 'entities': 0} + start_time = time.time() + + for idx, article in enumerate(articles, 1): + # Send processing update + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round((idx-1)/total*100, 1), 'stage': 'extracting', 'status': 'processing', 'message': f'Analizuję AI: {article.title[:50]}...', 'article_id': article.id, 'article_title': article.title[:80], 'details': stats}, ensure_ascii=False)}\n\n" + + result = service.extract_from_news(article.id) + + if result.success: + stats['success'] += 1 + stats['chunks'] += result.chunks_created + stats['facts'] += result.facts_created + stats['entities'] += result.entities_created + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round(idx/total*100, 1), 'stage': 'extracting', 'status': 'success', 'message': f'✓ {result.chunks_created}ch, {result.facts_created}f, {result.entities_created}e', 'article_id': article.id, 'details': {'new_chunks': result.chunks_created, 'new_facts': result.facts_created, 'new_entities': result.entities_created, **stats}}, ensure_ascii=False)}\n\n" + else: + stats['failed'] += 1 + error_msg = result.error[:50] if result.error else 'Nieznany błąd' + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round(idx/total*100, 1), 'stage': 'extracting', 'status': 'failed', 'message': f'✗ {error_msg}', 'article_id': article.id, 'details': {'error': result.error, **stats}}, ensure_ascii=False)}\n\n" + + processing_time = round(time.time() - start_time, 2) + + # Send completion + success_count = stats['success'] + chunks_count = stats['chunks'] + facts_count = stats['facts'] + entities_count = stats['entities'] + complete_msg = f'Zakończono: {success_count}/{total}. Utworzono: {chunks_count}ch, {facts_count}f, {entities_count}e' + complete_data = {'current': total, 'total': total, 'percent': 100, 'stage': 'extracting', 'status': 'complete', 'message': complete_msg, 'details': {'processing_time': processing_time, **stats}} + yield f"data: {json.dumps(complete_data, ensure_ascii=False)}\n\n" + + except Exception as e: + logger.error(f"SSE extraction error: {e}") + yield f"data: {json.dumps({'status': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n" + finally: + db.close() + + return Response(generate(), mimetype='text/event-stream', headers={ + 'Cache-Control': 'no-cache', + 'X-Accel-Buffering': 'no' + }) + + +@app.route('/admin/zopk/knowledge/embeddings/stream', methods=['GET']) +@login_required +def admin_zopk_embeddings_stream(): + """ + SSE endpoint for streaming embeddings generation progress. + + Query params: + - limit: int (default 50) - max chunks to process + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 + + limit = min(int(request.args.get('limit', 50)), 200) + user_id = current_user.id + + def generate(): + import json + import time + from gemini_service import GeminiService + + db = SessionLocal() + try: + from database import ZOPKKnowledgeChunk + + gemini = GeminiService() + + # Find chunks without embeddings + chunks = db.query(ZOPKKnowledgeChunk).filter( + ZOPKKnowledgeChunk.embedding.is_(None) + ).limit(limit).all() + + total = len(chunks) + + if total == 0: + yield f"data: {json.dumps({'status': 'complete', 'message': 'Brak chunks bez embeddingów', 'total': 0}, ensure_ascii=False)}\n\n" + return + + # Send initial + yield f"data: {json.dumps({'current': 0, 'total': total, 'percent': 0, 'stage': 'embedding', 'status': 'processing', 'message': f'Generuję embeddingi dla {total} chunks...'}, ensure_ascii=False)}\n\n" + + stats = {'success': 0, 'failed': 0} + start_time = time.time() + + for idx, chunk in enumerate(chunks, 1): + summary_short = chunk.summary[:40] if chunk.summary else f'chunk_{chunk.id}' + + # Send processing update + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round((idx-1)/total*100, 1), 'stage': 'embedding', 'status': 'processing', 'message': f'Embedding {idx}/{total}: {summary_short}...', 'details': stats}, ensure_ascii=False)}\n\n" + + try: + embedding = gemini.generate_embedding( + text=chunk.content, + task_type='retrieval_document', + title=chunk.summary, + user_id=user_id, + feature='zopk_chunk_embedding' + ) + + if embedding: + chunk.embedding = json.dumps(embedding) + stats['success'] += 1 + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round(idx/total*100, 1), 'stage': 'embedding', 'status': 'success', 'message': f'✓ 768 dim: {summary_short}', 'details': stats}, ensure_ascii=False)}\n\n" + else: + stats['failed'] += 1 + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round(idx/total*100, 1), 'stage': 'embedding', 'status': 'failed', 'message': f'✗ Brak odpowiedzi API', 'details': stats}, ensure_ascii=False)}\n\n" + + except Exception as e: + stats['failed'] += 1 + yield f"data: {json.dumps({'current': idx, 'total': total, 'percent': round(idx/total*100, 1), 'stage': 'embedding', 'status': 'failed', 'message': f'✗ {str(e)[:40]}', 'details': {'error': str(e), **stats}}, ensure_ascii=False)}\n\n" + + db.commit() + processing_time = round(time.time() - start_time, 2) + + # Send completion + success_count = stats['success'] + complete_msg = f'Zakończono: {success_count}/{total} embeddingów' + complete_data = {'current': total, 'total': total, 'percent': 100, 'stage': 'embedding', 'status': 'complete', 'message': complete_msg, 'details': {'processing_time': processing_time, **stats}} + yield f"data: {json.dumps(complete_data, ensure_ascii=False)}\n\n" + + except Exception as e: + logger.error(f"SSE embedding error: {e}") + yield f"data: {json.dumps({'status': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n" + finally: + db.close() + + return Response(generate(), mimetype='text/event-stream', headers={ + 'Cache-Control': 'no-cache', + 'X-Accel-Buffering': 'no' + }) + + @app.route('/api/zopk/knowledge/search', methods=['POST']) @login_required def api_zopk_knowledge_search(): diff --git a/templates/admin/zopk_dashboard.html b/templates/admin/zopk_dashboard.html index 38b01f4..72dd79b 100644 --- a/templates/admin/zopk_dashboard.html +++ b/templates/admin/zopk_dashboard.html @@ -1445,6 +1445,252 @@ + + + + +