From 1b4cd31c41274336acbbfcc6d7700854d602992a Mon Sep 17 00:00:00 2001 From: Maciej Pienczyn Date: Fri, 16 Jan 2026 20:15:30 +0100 Subject: [PATCH] feat(zopk): Knowledge Base + NordaGPT integration (FAZY 0-3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FAZA 0 - Web Scraping: - Migracja 015: pola full_content, scrape_status w zopk_news - zopk_content_scraper.py: scraper z rate limiting i selektorami FAZA 1 - Knowledge Extraction: - zopk_knowledge_service.py: chunking, facts, entities extraction - Endpointy /admin/zopk/knowledge/extract FAZA 2 - Embeddings: - gemini_service.py: generate_embedding(), generate_embeddings_batch() - Model text-embedding-004 (768 dimensions) FAZA 3 - NordaGPT Integration: - nordabiz_chat.py: _is_zopk_query(), _get_zopk_knowledge_context() - System prompt z bazą wiedzy ZOPK - Semantic search w kontekście chatu Co-Authored-By: Claude Opus 4.5 --- app.py | 325 ++++++ database.py | 13 + database/migrations/015_zopk_full_content.sql | 58 + gemini_service.py | 120 ++ nordabiz_chat.py | 203 +++- zopk_content_scraper.py | 670 +++++++++++ zopk_knowledge_service.py | 1039 +++++++++++++++++ 7 files changed, 2427 insertions(+), 1 deletion(-) create mode 100644 database/migrations/015_zopk_full_content.sql create mode 100644 zopk_content_scraper.py create mode 100644 zopk_knowledge_service.py diff --git a/app.py b/app.py index ff16989..7a7ee95 100644 --- a/app.py +++ b/app.py @@ -10849,6 +10849,331 @@ def api_zopk_search_news(): db.close() +# ============================================================ +# ZOPK CONTENT SCRAPING (Knowledge Base Pipeline) +# ============================================================ + +@app.route('/admin/zopk/news/scrape-stats') +@login_required +def admin_zopk_scrape_stats(): + """ + Get content scraping statistics. + + Returns JSON with: + - total_approved: Total approved/auto_approved articles + - scraped: Successfully scraped articles + - pending: Articles waiting to be scraped + - failed: Failed scraping attempts + - skipped: Skipped (social media, paywalls) + - ready_for_extraction: Scraped but not yet processed for knowledge + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 + + from zopk_content_scraper import get_scrape_stats + + db = SessionLocal() + try: + stats = get_scrape_stats(db) + return jsonify({ + 'success': True, + **stats + }) + except Exception as e: + logger.error(f"Error getting scrape stats: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + finally: + db.close() + + +@app.route('/admin/zopk/news/scrape-content', methods=['POST']) +@login_required +def admin_zopk_scrape_content(): + """ + Batch scrape article content from source URLs. + + Request JSON: + - limit: int (default 50) - max articles to scrape + - force: bool (default false) - re-scrape already scraped + + Response: + - scraped: number of successfully scraped + - failed: number of failures + - skipped: number of skipped (social media, etc.) + - errors: list of error details + - scraped_articles: list of scraped article info + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 + + from zopk_content_scraper import ZOPKContentScraper + + db = SessionLocal() + try: + data = request.get_json() or {} + limit = min(data.get('limit', 50), 100) # Max 100 at once + force = data.get('force', False) + + scraper = ZOPKContentScraper(db, user_id=current_user.id) + result = scraper.batch_scrape(limit=limit, force=force) + + return jsonify({ + 'success': True, + 'message': f"Scraping zakończony: {result['scraped']} pobrano, " + f"{result['failed']} błędów, {result['skipped']} pominięto", + **result + }) + + except Exception as e: + db.rollback() + logger.error(f"Error in batch scrape: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + finally: + db.close() + + +@app.route('/admin/zopk/news//scrape', methods=['POST']) +@login_required +def admin_zopk_scrape_single(news_id): + """ + Scrape content for a single article. + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 + + from zopk_content_scraper import ZOPKContentScraper + + db = SessionLocal() + try: + scraper = ZOPKContentScraper(db, user_id=current_user.id) + result = scraper.scrape_article(news_id) + + if result.success: + return jsonify({ + 'success': True, + 'message': f"Pobrano treść: {result.word_count} słów", + 'word_count': result.word_count, + 'status': result.status + }) + else: + return jsonify({ + 'success': False, + 'error': result.error, + 'status': result.status + }), 400 + + except Exception as e: + db.rollback() + logger.error(f"Error scraping article {news_id}: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + finally: + db.close() + + +# ============================================================ +# ZOPK KNOWLEDGE EXTRACTION (AI-powered) +# ============================================================ + +@app.route('/admin/zopk/knowledge/stats') +@login_required +def admin_zopk_knowledge_stats(): + """ + Get knowledge extraction statistics. + + Returns: + - articles: stats about articles (approved, scraped, extracted) + - knowledge_base: stats about chunks, facts, entities, relations + - top_entities: most mentioned entities + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 + + from zopk_knowledge_service import get_knowledge_stats + + db = SessionLocal() + try: + stats = get_knowledge_stats(db) + return jsonify({ + 'success': True, + **stats + }) + except Exception as e: + logger.error(f"Error getting knowledge stats: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + finally: + db.close() + + +@app.route('/admin/zopk/knowledge/extract', methods=['POST']) +@login_required +def admin_zopk_knowledge_extract(): + """ + Batch extract knowledge from scraped articles. + + Request JSON: + - limit: int (default 50) - max articles to process + + Response: + - success/failed counts + - chunks/facts/entities/relations created + - errors list + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 + + from zopk_knowledge_service import ZOPKKnowledgeService + + db = SessionLocal() + try: + data = request.get_json() or {} + limit = min(data.get('limit', 50), 100) + + service = ZOPKKnowledgeService(db, user_id=current_user.id) + result = service.batch_extract(limit=limit) + + return jsonify({ + 'success': True, + 'message': f"Ekstrakcja zakończona: {result['success']}/{result['total']} artykułów. " + f"Utworzono: {result['chunks_created']} chunks, {result['facts_created']} faktów, " + f"{result['entities_created']} encji, {result['relations_created']} relacji.", + **result + }) + + except Exception as e: + db.rollback() + logger.error(f"Error in knowledge extraction: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + finally: + db.close() + + +@app.route('/admin/zopk/knowledge/extract/', methods=['POST']) +@login_required +def admin_zopk_knowledge_extract_single(news_id): + """ + Extract knowledge from a single article. + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 + + from zopk_knowledge_service import ZOPKKnowledgeService + + db = SessionLocal() + try: + service = ZOPKKnowledgeService(db, user_id=current_user.id) + result = service.extract_from_news(news_id) + + if result.success: + return jsonify({ + 'success': True, + 'message': f"Wyekstrahowano: {result.chunks_created} chunks, " + f"{result.facts_created} faktów, {result.entities_created} encji", + 'chunks_created': result.chunks_created, + 'facts_created': result.facts_created, + 'entities_created': result.entities_created, + 'relations_created': result.relations_created, + 'processing_time': result.processing_time + }) + else: + return jsonify({ + 'success': False, + 'error': result.error + }), 400 + + except Exception as e: + db.rollback() + logger.error(f"Error extracting from news {news_id}: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + finally: + db.close() + + +@app.route('/admin/zopk/knowledge/embeddings', methods=['POST']) +@login_required +def admin_zopk_generate_embeddings(): + """ + Generate embeddings for chunks that don't have them. + + Request JSON: + - limit: int (default 100) - max chunks to process + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 + + from zopk_knowledge_service import generate_chunk_embeddings + + db = SessionLocal() + try: + data = request.get_json() or {} + limit = min(data.get('limit', 100), 500) + + result = generate_chunk_embeddings(db, limit=limit, user_id=current_user.id) + + return jsonify({ + 'success': True, + 'message': f"Wygenerowano embeddings: {result['success']}/{result['total']}", + **result + }) + + except Exception as e: + db.rollback() + logger.error(f"Error generating embeddings: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + finally: + db.close() + + +@app.route('/api/zopk/knowledge/search', methods=['POST']) +@login_required +def api_zopk_knowledge_search(): + """ + Semantic search in ZOPK knowledge base. + + Request JSON: + - query: str (required) - search query + - limit: int (default 5) - max results + + Response: + - chunks: list of matching knowledge chunks with similarity scores + - facts: list of relevant facts + """ + from zopk_knowledge_service import search_knowledge, get_relevant_facts + + db = SessionLocal() + try: + data = request.get_json() or {} + query = data.get('query', '') + + if not query: + return jsonify({'success': False, 'error': 'Query wymagane'}), 400 + + limit = min(data.get('limit', 5), 20) + + # Search chunks + chunks = search_knowledge( + db, + query=query, + limit=limit, + user_id=current_user.id + ) + + # Get relevant facts + facts = get_relevant_facts(db, query=query, limit=limit) + + return jsonify({ + 'success': True, + 'query': query, + 'chunks': chunks, + 'facts': facts + }) + + except Exception as e: + logger.error(f"Error in knowledge search: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + finally: + db.close() + + # ============================================================ # KRS AUDIT (Krajowy Rejestr Sądowy) # ============================================================ diff --git a/database.py b/database.py index bec38b1..499db70 100644 --- a/database.py +++ b/database.py @@ -1902,6 +1902,19 @@ class ZOPKNews(Base): is_featured = Column(Boolean, default=False) views_count = Column(Integer, default=0) + # Full content (scraped from source URL) - for knowledge extraction + full_content = Column(Text) # Full article text (without HTML, ads, navigation) + content_scraped_at = Column(DateTime) # When content was scraped + scrape_status = Column(String(20), default='pending', index=True) # pending, scraped, failed, skipped + scrape_error = Column(Text) # Error message if scraping failed + scrape_attempts = Column(Integer, default=0) # Number of scraping attempts + content_word_count = Column(Integer) # Word count of scraped content + content_language = Column(String(10), default='pl') # pl, en + + # Knowledge extraction status + knowledge_extracted = Column(Boolean, default=False, index=True) # True if chunks/facts/entities extracted + knowledge_extracted_at = Column(DateTime) # When knowledge was extracted + created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) diff --git a/database/migrations/015_zopk_full_content.sql b/database/migrations/015_zopk_full_content.sql new file mode 100644 index 0000000..545c867 --- /dev/null +++ b/database/migrations/015_zopk_full_content.sql @@ -0,0 +1,58 @@ +-- Migration 015: Add full_content fields to zopk_news for knowledge base extraction +-- Date: 2026-01-16 +-- Purpose: Store scraped article content for AI knowledge extraction + +-- ============================================================ +-- ADD NEW COLUMNS TO zopk_news +-- ============================================================ + +-- Full article content (scraped from source URL) +ALTER TABLE zopk_news ADD COLUMN IF NOT EXISTS full_content TEXT; + +-- Content scraping metadata +ALTER TABLE zopk_news ADD COLUMN IF NOT EXISTS content_scraped_at TIMESTAMP; +ALTER TABLE zopk_news ADD COLUMN IF NOT EXISTS scrape_status VARCHAR(20) DEFAULT 'pending'; +-- Status values: pending, scraped, failed, skipped + +-- Scraping error tracking +ALTER TABLE zopk_news ADD COLUMN IF NOT EXISTS scrape_error TEXT; +ALTER TABLE zopk_news ADD COLUMN IF NOT EXISTS scrape_attempts INTEGER DEFAULT 0; + +-- Content metadata (extracted during scraping) +ALTER TABLE zopk_news ADD COLUMN IF NOT EXISTS content_word_count INTEGER; +ALTER TABLE zopk_news ADD COLUMN IF NOT EXISTS content_language VARCHAR(10) DEFAULT 'pl'; + +-- Knowledge extraction status +ALTER TABLE zopk_news ADD COLUMN IF NOT EXISTS knowledge_extracted BOOLEAN DEFAULT FALSE; +ALTER TABLE zopk_news ADD COLUMN IF NOT EXISTS knowledge_extracted_at TIMESTAMP; + +-- ============================================================ +-- INDEXES FOR EFFICIENT QUERYING +-- ============================================================ + +-- Index for finding articles to scrape +CREATE INDEX IF NOT EXISTS idx_zopk_news_scrape_status ON zopk_news(scrape_status); + +-- Index for finding articles ready for knowledge extraction +CREATE INDEX IF NOT EXISTS idx_zopk_news_knowledge_extracted ON zopk_news(knowledge_extracted); + +-- Composite index for scraping pipeline +CREATE INDEX IF NOT EXISTS idx_zopk_news_scrape_pipeline +ON zopk_news(status, scrape_status, knowledge_extracted); + +-- ============================================================ +-- COMMENTS +-- ============================================================ + +COMMENT ON COLUMN zopk_news.full_content IS 'Full article text scraped from source URL (without HTML, ads, navigation)'; +COMMENT ON COLUMN zopk_news.scrape_status IS 'pending=not scraped, scraped=success, failed=error, skipped=not scrapeable'; +COMMENT ON COLUMN zopk_news.scrape_error IS 'Error message if scraping failed'; +COMMENT ON COLUMN zopk_news.scrape_attempts IS 'Number of scraping attempts (for retry logic)'; +COMMENT ON COLUMN zopk_news.content_word_count IS 'Word count of scraped content'; +COMMENT ON COLUMN zopk_news.knowledge_extracted IS 'True if chunks/facts/entities extracted'; + +-- ============================================================ +-- GRANT PERMISSIONS +-- ============================================================ + +GRANT ALL ON TABLE zopk_news TO nordabiz_app; diff --git a/gemini_service.py b/gemini_service.py index 655e0ee..1234811 100644 --- a/gemini_service.py +++ b/gemini_service.py @@ -404,6 +404,126 @@ class GeminiService: except Exception as e: logger.error(f"Failed to log API cost: {e}") + def generate_embedding( + self, + text: str, + task_type: str = 'retrieval_document', + title: Optional[str] = None, + user_id: Optional[int] = None, + feature: str = 'embedding' + ) -> Optional[List[float]]: + """ + Generate embedding vector for text using Google's text-embedding model. + + Args: + text: Text to embed + task_type: One of: + - 'retrieval_document': For documents to be retrieved + - 'retrieval_query': For search queries + - 'semantic_similarity': For comparing texts + - 'classification': For text classification + - 'clustering': For text clustering + title: Optional title for document (improves quality) + user_id: User ID for cost tracking + feature: Feature name for cost tracking + + Returns: + 768-dimensional embedding vector or None on error + + Cost: ~$0.00001 per 1K tokens (very cheap) + """ + if not text or not text.strip(): + logger.warning("Empty text provided for embedding") + return None + + start_time = time.time() + + try: + # Use text-embedding-004 model (768 dimensions) + # This is Google's recommended model for embeddings + result = genai.embed_content( + model='models/text-embedding-004', + content=text, + task_type=task_type, + title=title + ) + + embedding = result.get('embedding') + + if not embedding: + logger.error("No embedding returned from API") + return None + + # Log cost (embedding API is very cheap) + latency_ms = int((time.time() - start_time) * 1000) + token_count = len(text) // 4 # Approximate + + # Embedding pricing: ~$0.00001 per 1K tokens + cost_usd = (token_count / 1000) * 0.00001 + + logger.debug( + f"Embedding generated: {len(embedding)} dims, " + f"{token_count} tokens, {latency_ms}ms, ${cost_usd:.8f}" + ) + + # Log to database (if cost tracking is important) + if DB_AVAILABLE and user_id: + try: + db = SessionLocal() + try: + usage_log = AIUsageLog( + request_type=feature, + model='text-embedding-004', + tokens_input=token_count, + tokens_output=0, + cost_cents=cost_usd * 100, + user_id=user_id, + prompt_length=len(text), + response_length=len(embedding) * 4, # 4 bytes per float + response_time_ms=latency_ms, + success=True + ) + db.add(usage_log) + db.commit() + finally: + db.close() + except Exception as e: + logger.error(f"Failed to log embedding cost: {e}") + + return embedding + + except Exception as e: + logger.error(f"Embedding generation error: {e}") + return None + + def generate_embeddings_batch( + self, + texts: List[str], + task_type: str = 'retrieval_document', + user_id: Optional[int] = None + ) -> List[Optional[List[float]]]: + """ + Generate embeddings for multiple texts. + + Args: + texts: List of texts to embed + task_type: Task type for all embeddings + user_id: User ID for cost tracking + + Returns: + List of embedding vectors (None for failed items) + """ + results = [] + for text in texts: + embedding = self.generate_embedding( + text=text, + task_type=task_type, + user_id=user_id, + feature='embedding_batch' + ) + results.append(embedding) + return results + # Global service instance (initialized in app.py) _gemini_service: Optional[GeminiService] = None diff --git a/nordabiz_chat.py b/nordabiz_chat.py index 0dfb66a..586ab04 100644 --- a/nordabiz_chat.py +++ b/nordabiz_chat.py @@ -18,12 +18,16 @@ Created: 2025-11-23 import os import time +import logging from datetime import datetime from typing import Dict, List, Any, Optional import google.generativeai as genai import gemini_service from search_service import search_companies +# Module logger +logger = logging.getLogger(__name__) + from database import ( SessionLocal, Company, @@ -58,6 +62,13 @@ try: except ImportError: FEEDBACK_LEARNING_AVAILABLE = False +# Import ZOPK knowledge service for semantic search +try: + from zopk_knowledge_service import search_knowledge, get_relevant_facts + ZOPK_KNOWLEDGE_AVAILABLE = True +except ImportError: + ZOPK_KNOWLEDGE_AVAILABLE = False + class NordaBizChatEngine: """ @@ -347,7 +358,7 @@ class NordaBizChatEngine: from datetime import timedelta news_cutoff = datetime.now() - timedelta(days=30) recent_news = db.query(ZOPKNews).filter( - ZOPKNews.status == 'approved', + ZOPKNews.status.in_(['approved', 'auto_approved']), ZOPKNews.published_at >= news_cutoff ).order_by(ZOPKNews.published_at.desc()).limit(10).all() @@ -361,6 +372,12 @@ class NordaBizChatEngine: for news in recent_news ] + # === ZOPK KNOWLEDGE BASE (semantic search) === + # Detect if question is about ZOPK topics + if self._is_zopk_query(current_message): + zopk_knowledge = self._get_zopk_knowledge_context(db, current_message) + context['zopk_knowledge'] = zopk_knowledge + # === ETAP 2: Tablica B2B, Kalendarz, Forum === # Add upcoming events (next 60 days) @@ -600,6 +617,135 @@ class NordaBizChatEngine: # Extract Company objects from SearchResult return [result.company for result in results] + def _is_zopk_query(self, message: str) -> bool: + """ + Check if the message is related to ZOPK (Zielony Okręg Przemysłowy Kaszubia). + + ZOPK topics include: + - Offshore wind energy (Baltic Power, Baltica) + - Nuclear power plant (Lubiatowo-Kopalino) + - Kongsberg investment in Rumia + - Infrastructure (Via Pomerania, S6, Droga Czerwona) + - Hydrogen, data centers + """ + zopk_keywords = [ + # Main project + 'zopk', 'zielony okręg', 'okręg przemysłowy', + # Offshore + 'offshore', 'farmy wiatrowe', 'energetyka wiatrowa', 'bałtyk', 'baltic power', + 'baltica', 'orsted', 'morska energia', + # Nuclear + 'elektrownia jądrowa', 'atomowa', 'lubiatowo', 'kopalino', 'pej', + # Kongsberg + 'kongsberg', 'inwestycje norweskie', 'przemysł obronny', + # Infrastructure + 'via pomerania', 'droga czerwona', 's6', 'port gdynia', + # Other + 'wodór', 'centra danych', 'samsonowicz', 'transformacja energetyczna', + # Organizations + 'norda biznes', 'izba przedsiębiorców', 'rumia invest' + ] + + message_lower = message.lower() + return any(kw in message_lower for kw in zopk_keywords) + + def _get_zopk_knowledge_context(self, db, message: str) -> Dict[str, Any]: + """ + Get ZOPK knowledge base context for the current message. + + Uses semantic search to find relevant: + - Knowledge chunks (text fragments with embeddings) + - Facts (structured information) + - Entities (companies, people, projects) + + Args: + db: Database session + message: User's question + + Returns: + Dict with chunks, facts, entities + """ + from database import ZOPKKnowledgeEntity, ZOPKKnowledgeChunk, ZOPKNews + + context = { + 'chunks': [], + 'facts': [], + 'entities': [] + } + + # Check if knowledge service is available + if not ZOPK_KNOWLEDGE_AVAILABLE: + logger.warning("ZOPK knowledge service not available") + return context + + try: + # Semantic search in knowledge chunks + chunks = search_knowledge( + db, + query=message, + limit=5, + min_similarity=0.3, + user_id=None # Don't track cost for context building + ) + + # Enrich chunks with source information + for c in chunks: + chunk_data = { + 'content': c['content'][:400], # Limit length + 'summary': c.get('summary', ''), + 'similarity': c.get('similarity', 0), + 'source': 'nieznane', + 'date': '' + } + + # Get source news info if available + if c.get('source_news_id'): + news = db.query(ZOPKNews).filter( + ZOPKNews.id == c['source_news_id'] + ).first() + if news: + chunk_data['source'] = news.source_name or news.source_domain or 'nieznane' + if news.published_at: + chunk_data['date'] = news.published_at.strftime('%Y-%m-%d') + + context['chunks'].append(chunk_data) + + # Get relevant facts + facts = get_relevant_facts(db, query=message, limit=5) + context['facts'] = [ + { + 'fact': f['full_text'], + 'type': f['fact_type'], + 'confidence': f.get('confidence_score', 0), + 'value': f.get('numeric_value'), + 'unit': f.get('numeric_unit') + } + for f in facts + ] + + # Get top mentioned entities (always include for context) + top_entities = db.query(ZOPKKnowledgeEntity).filter( + ZOPKKnowledgeEntity.mentions_count > 1 + ).order_by( + ZOPKKnowledgeEntity.mentions_count.desc() + ).limit(10).all() + + context['entities'] = [ + { + 'name': e.name, + 'type': e.entity_type, + 'description': e.short_description or '', + 'mentions': e.mentions_count + } + for e in top_entities + ] + + except Exception as e: + logger.error(f"Error getting ZOPK knowledge context: {e}") + # Return empty context on error, don't break chat + + return context + def _query_ai( self, context: Dict[str, Any], @@ -799,6 +945,61 @@ BŁĘDNIE (NIE RÓB - resetuje numerację): system_prompt += json.dumps(context['recent_news'], ensure_ascii=False, indent=None) system_prompt += "\n" + # Add ZOPK Knowledge Base context (semantic search results) + if context.get('zopk_knowledge'): + zopk = context['zopk_knowledge'] + system_prompt += "\n\n🌍 BAZA WIEDZY ZOPK (Zielony Okręg Przemysłowy Kaszubia):\n" + system_prompt += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n" + + # Add knowledge chunks (most relevant excerpts) + if zopk.get('chunks'): + system_prompt += "\n📄 FRAGMENTY WIEDZY (semantycznie dopasowane):\n" + for i, chunk in enumerate(zopk['chunks'][:5], 1): + system_prompt += f"\n[{i}] {chunk.get('summary', '')}\n" + system_prompt += f" Źródło: {chunk.get('source', 'nieznane')} ({chunk.get('date', '')})\n" + if chunk.get('content'): + # Skrócona treść (max 300 znaków) + content_preview = chunk['content'][:300] + if len(chunk['content']) > 300: + content_preview += "..." + system_prompt += f" Treść: {content_preview}\n" + + # Add verified facts + if zopk.get('facts'): + system_prompt += "\n📌 ZWERYFIKOWANE FAKTY:\n" + for fact in zopk['facts'][:10]: + confidence_stars = "★" * int(fact.get('confidence', 0) * 5) + system_prompt += f"• {fact.get('fact', '')} [{confidence_stars}]\n" + if fact.get('value') and fact.get('unit'): + system_prompt += f" Wartość: {fact['value']} {fact['unit']}\n" + + # Add key entities + if zopk.get('entities'): + system_prompt += "\n🏢 KLUCZOWE PODMIOTY ZOPK:\n" + for entity in zopk['entities'][:8]: + entity_icon = { + 'organization': '🏛️', + 'company': '🏢', + 'person': '👤', + 'location': '📍', + 'project': '🎯', + 'technology': '⚡' + }.get(entity.get('type', ''), '•') + system_prompt += f"{entity_icon} {entity.get('name', '')} ({entity.get('type', '')})" + if entity.get('description'): + system_prompt += f" - {entity['description']}" + if entity.get('mentions'): + system_prompt += f" [{entity['mentions']} wzmianek]" + system_prompt += "\n" + + system_prompt += "\n🎯 ZASADY ODPOWIEDZI O ZOPK:\n" + system_prompt += "1. Odpowiadaj na podstawie bazy wiedzy (nie wymyślaj faktów)\n" + system_prompt += "2. Cytuj źródła: \"Według [portal] z [data]...\"\n" + system_prompt += "3. Podawaj konkretne daty i liczby gdy dostępne\n" + system_prompt += "4. Wymieniaj organizacje i osoby zaangażowane\n" + system_prompt += "5. Jeśli brak informacji w bazie - powiedz wprost\n" + system_prompt += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n" + # Add upcoming events (Etap 2) if context.get('upcoming_events'): system_prompt += "\n\n📅 KALENDARZ WYDARZEŃ:\n" diff --git a/zopk_content_scraper.py b/zopk_content_scraper.py new file mode 100644 index 0000000..e31b4fe --- /dev/null +++ b/zopk_content_scraper.py @@ -0,0 +1,670 @@ +""" +ZOPK Content Scraper - Pobieranie pełnej treści artykułów dla bazy wiedzy. + +Scraper respektuje robots.txt i stosuje rate limiting. +Obsługuje główne polskie portale newsowe. + +Usage: + from zopk_content_scraper import ZOPKContentScraper + + scraper = ZOPKContentScraper(db_session) + result = scraper.scrape_article(news_id=123) + # lub batch: + result = scraper.batch_scrape(limit=50) +""" + +import re +import time +import logging +import hashlib +from datetime import datetime +from typing import Dict, List, Optional, Tuple +from urllib.parse import urlparse +from dataclasses import dataclass + +import requests +from bs4 import BeautifulSoup, Comment, NavigableString + +from database import ZOPKNews + +# Configure logging +logger = logging.getLogger(__name__) + +# ============================================================ +# CONFIGURATION +# ============================================================ + +# User-Agent identifying the bot +USER_AGENT = 'NordaBizBot/1.0 (+https://nordabiznes.pl/bot; kontakt@nordabiznes.pl)' + +# Request timeout in seconds +REQUEST_TIMEOUT = 15 + +# Maximum content length (chars) to avoid memory issues +MAX_CONTENT_LENGTH = 100000 # ~100KB of text + +# Rate limiting: seconds between requests per domain +RATE_LIMITS = { + 'trojmiasto.pl': 2.0, + 'dziennikbaltycki.pl': 2.0, + 'nordafm.pl': 1.5, + 'ttm24.pl': 1.5, + 'radiogdansk.pl': 1.5, + 'portalmorski.pl': 1.5, + 'biznes.pap.pl': 2.0, + 'default': 3.0 +} + +# Maximum retry attempts +MAX_RETRY_ATTEMPTS = 3 + +# ============================================================ +# CONTENT SELECTORS PER DOMAIN +# ============================================================ + +# CSS selectors for article content extraction +# Order matters - first match wins +CONTENT_SELECTORS = { + 'trojmiasto.pl': [ + 'article.article-content', + 'div.article-body', + 'div.article__content', + 'div[itemprop="articleBody"]', + ], + 'dziennikbaltycki.pl': [ + 'div.article-body', + 'article.article-main', + 'div[itemprop="articleBody"]', + 'div.art-content', + ], + 'nordafm.pl': [ + 'div.entry-content', + 'article.post-content', + 'div.post-body', + ], + 'ttm24.pl': [ + 'div.post-content', + 'article.entry-content', + 'div.article-content', + ], + 'radiogdansk.pl': [ + 'div.article-content', + 'div.entry-content', + 'article.post', + ], + 'portalmorski.pl': [ + 'div.article-content', + 'div.entry-content', + 'article.post-content', + ], + 'biznes.pap.pl': [ + 'div.article-content', + 'div.news-content', + 'article.content', + ], + 'gov.pl': [ + 'div.article-content', + 'main.main-content', + 'div.content', + ], + 'default': [ + 'article', + 'div[itemprop="articleBody"]', + 'div.article-content', + 'div.article-body', + 'div.entry-content', + 'div.post-content', + 'main.content', + 'main', + ] +} + +# Elements to remove from content +ELEMENTS_TO_REMOVE = [ + 'script', 'style', 'nav', 'header', 'footer', 'aside', + 'form', 'iframe', 'noscript', 'svg', 'canvas', + '.advertisement', '.ad', '.ads', '.advert', '.banner', + '.social-share', '.share-buttons', '.sharing', + '.related-articles', '.related-posts', '.recommendations', + '.comments', '.comment-section', '#comments', + '.newsletter', '.subscription', '.subscribe', + '.cookie-notice', '.cookie-banner', '.gdpr', + '.popup', '.modal', '.overlay', + '.sidebar', '.widget', '.navigation', + '.breadcrumb', '.breadcrumbs', + '.author-bio', '.author-box', + '.tags', '.tag-list', '.categories', + '.pagination', '.pager', + '[data-ad]', '[data-advertisement]', +] + +# Domains that are not scrapeable (paywalls, dynamic content, etc.) +SKIP_DOMAINS = [ + 'facebook.com', + 'twitter.com', + 'x.com', + 'linkedin.com', + 'youtube.com', + 'instagram.com', +] + + +# ============================================================ +# DATA CLASSES +# ============================================================ + +@dataclass +class ScrapeResult: + """Result of scraping an article.""" + success: bool + content: Optional[str] = None + word_count: int = 0 + error: Optional[str] = None + status: str = 'pending' # scraped, failed, skipped + + +# ============================================================ +# SCRAPER CLASS +# ============================================================ + +class ZOPKContentScraper: + """ + Scraper for ZOPK news article content. + + Features: + - Domain-specific content selectors + - Rate limiting per domain + - HTML cleaning (removes ads, navigation, etc.) + - Retry logic with exponential backoff + - robots.txt respect (via User-Agent) + """ + + def __init__(self, db_session, user_id: Optional[int] = None): + """ + Initialize scraper. + + Args: + db_session: SQLAlchemy database session + user_id: Optional user ID for audit logging + """ + self.db = db_session + self.user_id = user_id + self._last_request_time: Dict[str, float] = {} + self._session = self._create_session() + + def _create_session(self) -> requests.Session: + """Create requests session with proper headers.""" + session = requests.Session() + session.headers.update({ + 'User-Agent': USER_AGENT, + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'pl-PL,pl;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate', + 'Connection': 'keep-alive', + }) + return session + + def _get_domain(self, url: str) -> str: + """Extract domain from URL.""" + try: + parsed = urlparse(url) + domain = parsed.netloc.lower() + # Remove www. prefix + if domain.startswith('www.'): + domain = domain[4:] + return domain + except Exception: + return 'unknown' + + def _get_rate_limit(self, domain: str) -> float: + """Get rate limit for domain.""" + # Check exact domain first + if domain in RATE_LIMITS: + return RATE_LIMITS[domain] + # Check if domain ends with known domain + for known_domain, limit in RATE_LIMITS.items(): + if domain.endswith(known_domain): + return limit + return RATE_LIMITS['default'] + + def _wait_for_rate_limit(self, domain: str) -> None: + """Wait if needed to respect rate limiting.""" + limit = self._get_rate_limit(domain) + last_time = self._last_request_time.get(domain, 0) + elapsed = time.time() - last_time + if elapsed < limit: + wait_time = limit - elapsed + logger.debug(f"Rate limiting: waiting {wait_time:.2f}s for {domain}") + time.sleep(wait_time) + self._last_request_time[domain] = time.time() + + def _should_skip_domain(self, domain: str) -> bool: + """Check if domain should be skipped.""" + for skip in SKIP_DOMAINS: + if skip in domain: + return True + return False + + def _get_content_selectors(self, domain: str) -> List[str]: + """Get CSS selectors for domain.""" + # Check exact domain + if domain in CONTENT_SELECTORS: + return CONTENT_SELECTORS[domain] + # Check if domain ends with known domain + for known_domain, selectors in CONTENT_SELECTORS.items(): + if known_domain != 'default' and domain.endswith(known_domain): + return selectors + return CONTENT_SELECTORS['default'] + + def _fetch_html(self, url: str) -> Tuple[Optional[str], Optional[str]]: + """ + Fetch HTML content from URL. + + Returns: + Tuple of (html_content, error_message) + """ + domain = self._get_domain(url) + + # Check if domain should be skipped + if self._should_skip_domain(domain): + return None, f"Domain {domain} is not scrapeable (social media/paywall)" + + # Apply rate limiting + self._wait_for_rate_limit(domain) + + try: + response = self._session.get( + url, + timeout=REQUEST_TIMEOUT, + allow_redirects=True + ) + response.raise_for_status() + + # Check content type + content_type = response.headers.get('Content-Type', '') + if 'text/html' not in content_type and 'application/xhtml' not in content_type: + return None, f"Not HTML content: {content_type}" + + # Detect encoding + response.encoding = response.apparent_encoding or 'utf-8' + + return response.text, None + + except requests.exceptions.Timeout: + return None, "Request timeout" + except requests.exceptions.TooManyRedirects: + return None, "Too many redirects" + except requests.exceptions.HTTPError as e: + return None, f"HTTP error: {e.response.status_code}" + except requests.exceptions.ConnectionError: + return None, "Connection error" + except requests.exceptions.RequestException as e: + return None, f"Request error: {str(e)}" + + def _clean_html(self, soup: BeautifulSoup) -> BeautifulSoup: + """Remove unwanted elements from HTML.""" + # Remove comments + for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): + comment.extract() + + # Remove unwanted elements + for selector in ELEMENTS_TO_REMOVE: + if selector.startswith('.') or selector.startswith('#') or selector.startswith('['): + # CSS selector + for element in soup.select(selector): + element.decompose() + else: + # Tag name + for element in soup.find_all(selector): + element.decompose() + + return soup + + def _extract_content(self, html: str, domain: str) -> Tuple[Optional[str], Optional[str]]: + """ + Extract article content from HTML. + + Returns: + Tuple of (content_text, error_message) + """ + try: + soup = BeautifulSoup(html, 'html.parser') + + # Clean HTML first + soup = self._clean_html(soup) + + # Try domain-specific selectors + selectors = self._get_content_selectors(domain) + content_element = None + + for selector in selectors: + content_element = soup.select_one(selector) + if content_element: + logger.debug(f"Found content with selector: {selector}") + break + + if not content_element: + # Fallback: try to find largest text block + content_element = self._find_largest_text_block(soup) + + if not content_element: + return None, "Could not find article content" + + # Extract text + text = self._extract_text(content_element) + + if not text or len(text) < 100: + return None, "Extracted content too short" + + # Truncate if too long + if len(text) > MAX_CONTENT_LENGTH: + text = text[:MAX_CONTENT_LENGTH] + "..." + logger.warning(f"Content truncated to {MAX_CONTENT_LENGTH} chars") + + return text, None + + except Exception as e: + logger.error(f"Error extracting content: {e}") + return None, f"Extraction error: {str(e)}" + + def _find_largest_text_block(self, soup: BeautifulSoup) -> Optional[BeautifulSoup]: + """Find the largest text block in the page (fallback method).""" + candidates = soup.find_all(['article', 'main', 'div', 'section']) + + best_element = None + best_score = 0 + + for element in candidates: + # Skip small elements + text = element.get_text(strip=True) + if len(text) < 200: + continue + + # Calculate score based on text density and paragraph count + paragraphs = len(element.find_all('p')) + text_length = len(text) + + # Prefer elements with many paragraphs + score = text_length + (paragraphs * 100) + + if score > best_score: + best_score = score + best_element = element + + return best_element + + def _extract_text(self, element: BeautifulSoup) -> str: + """Extract clean text from element.""" + # Get text with proper spacing + lines = [] + + for child in element.descendants: + if isinstance(child, NavigableString): + text = str(child).strip() + if text: + lines.append(text) + elif child.name in ['br', 'p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li']: + lines.append('\n') + + # Join and clean + text = ' '.join(lines) + + # Clean up whitespace + text = re.sub(r'\s+', ' ', text) + text = re.sub(r'\n\s*\n', '\n\n', text) + text = text.strip() + + return text + + def _count_words(self, text: str) -> int: + """Count words in text.""" + if not text: + return 0 + words = re.findall(r'\b\w+\b', text) + return len(words) + + def scrape_article(self, news_id: int) -> ScrapeResult: + """ + Scrape content for a single article. + + Args: + news_id: ID of ZOPKNews record + + Returns: + ScrapeResult with content or error + """ + # Get news record + news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first() + + if not news: + return ScrapeResult( + success=False, + error=f"News record {news_id} not found", + status='failed' + ) + + # Check if already scraped + if news.scrape_status == 'scraped' and news.full_content: + return ScrapeResult( + success=True, + content=news.full_content, + word_count=news.content_word_count or 0, + status='scraped' + ) + + url = news.url + domain = self._get_domain(url) + + logger.info(f"Scraping article {news_id}: {url}") + + # Check if should skip + if self._should_skip_domain(domain): + news.scrape_status = 'skipped' + news.scrape_error = f"Domain {domain} not scrapeable" + self.db.commit() + return ScrapeResult( + success=False, + error=f"Domain {domain} not scrapeable", + status='skipped' + ) + + # Fetch HTML + html, fetch_error = self._fetch_html(url) + + if fetch_error: + news.scrape_status = 'failed' + news.scrape_error = fetch_error + news.scrape_attempts = (news.scrape_attempts or 0) + 1 + self.db.commit() + return ScrapeResult( + success=False, + error=fetch_error, + status='failed' + ) + + # Extract content + content, extract_error = self._extract_content(html, domain) + + if extract_error: + news.scrape_status = 'failed' + news.scrape_error = extract_error + news.scrape_attempts = (news.scrape_attempts or 0) + 1 + self.db.commit() + return ScrapeResult( + success=False, + error=extract_error, + status='failed' + ) + + # Success - update database + word_count = self._count_words(content) + + news.full_content = content + news.content_word_count = word_count + news.content_scraped_at = datetime.now() + news.scrape_status = 'scraped' + news.scrape_error = None + news.scrape_attempts = (news.scrape_attempts or 0) + 1 + + self.db.commit() + + logger.info(f"Successfully scraped article {news_id}: {word_count} words") + + return ScrapeResult( + success=True, + content=content, + word_count=word_count, + status='scraped' + ) + + def batch_scrape( + self, + limit: int = 50, + status_filter: Optional[str] = None, + force: bool = False + ) -> Dict: + """ + Batch scrape articles. + + Args: + limit: Maximum number of articles to scrape + status_filter: Filter by approval status (approved, auto_approved) + force: If True, re-scrape even already scraped articles + + Returns: + Dict with statistics + """ + logger.info(f"Starting batch scrape: limit={limit}, force={force}") + + # Build query + query = self.db.query(ZOPKNews) + + # Filter by approval status + if status_filter: + query = query.filter(ZOPKNews.status == status_filter) + else: + # Default: only approved/auto_approved articles + query = query.filter(ZOPKNews.status.in_(['approved', 'auto_approved'])) + + # Filter by scrape status + if not force: + query = query.filter(ZOPKNews.scrape_status.in_(['pending', 'failed'])) + # Limit retry attempts for failed + query = query.filter( + (ZOPKNews.scrape_status == 'pending') | + ((ZOPKNews.scrape_status == 'failed') & (ZOPKNews.scrape_attempts < MAX_RETRY_ATTEMPTS)) + ) + + # Order by creation date (newest first) + query = query.order_by(ZOPKNews.created_at.desc()) + + # Limit + articles = query.limit(limit).all() + + # Statistics + stats = { + 'total': len(articles), + 'scraped': 0, + 'failed': 0, + 'skipped': 0, + 'errors': [], + 'scraped_articles': [], + 'processing_time': 0 + } + + start_time = time.time() + + for article in articles: + result = self.scrape_article(article.id) + + if result.status == 'scraped': + stats['scraped'] += 1 + stats['scraped_articles'].append({ + 'id': article.id, + 'title': article.title[:100], + 'word_count': result.word_count, + 'source': article.source_name + }) + elif result.status == 'skipped': + stats['skipped'] += 1 + else: + stats['failed'] += 1 + stats['errors'].append({ + 'id': article.id, + 'url': article.url, + 'error': result.error + }) + + stats['processing_time'] = round(time.time() - start_time, 2) + + logger.info( + f"Batch scrape complete: {stats['scraped']} scraped, " + f"{stats['failed']} failed, {stats['skipped']} skipped " + f"in {stats['processing_time']}s" + ) + + return stats + + def get_scrape_statistics(self) -> Dict: + """Get scraping statistics.""" + from sqlalchemy import func + + # Count by scrape_status + status_counts = self.db.query( + ZOPKNews.scrape_status, + func.count(ZOPKNews.id) + ).filter( + ZOPKNews.status.in_(['approved', 'auto_approved']) + ).group_by(ZOPKNews.scrape_status).all() + + status_dict = {status: count for status, count in status_counts} + + # Total approved articles + total_approved = self.db.query(func.count(ZOPKNews.id)).filter( + ZOPKNews.status.in_(['approved', 'auto_approved']) + ).scalar() + + # Articles ready for knowledge extraction + ready_for_extraction = self.db.query(func.count(ZOPKNews.id)).filter( + ZOPKNews.scrape_status == 'scraped', + ZOPKNews.knowledge_extracted == False + ).scalar() + + # Average word count + avg_word_count = self.db.query(func.avg(ZOPKNews.content_word_count)).filter( + ZOPKNews.scrape_status == 'scraped' + ).scalar() + + return { + 'total_approved': total_approved or 0, + 'scraped': status_dict.get('scraped', 0), + 'pending': status_dict.get('pending', 0) + status_dict.get(None, 0), + 'failed': status_dict.get('failed', 0), + 'skipped': status_dict.get('skipped', 0), + 'ready_for_extraction': ready_for_extraction or 0, + 'avg_word_count': round(avg_word_count or 0, 0) + } + + +# ============================================================ +# STANDALONE FUNCTIONS FOR CRON/CLI +# ============================================================ + +def scrape_pending_articles(db_session, limit: int = 50) -> Dict: + """ + Convenience function for cron jobs. + + Usage: + from zopk_content_scraper import scrape_pending_articles + result = scrape_pending_articles(db_session, limit=50) + """ + scraper = ZOPKContentScraper(db_session) + return scraper.batch_scrape(limit=limit) + + +def get_scrape_stats(db_session) -> Dict: + """ + Get scraping statistics for monitoring. + """ + scraper = ZOPKContentScraper(db_session) + return scraper.get_scrape_statistics() diff --git a/zopk_knowledge_service.py b/zopk_knowledge_service.py new file mode 100644 index 0000000..1b82788 --- /dev/null +++ b/zopk_knowledge_service.py @@ -0,0 +1,1039 @@ +""" +ZOPK Knowledge Service - Ekstrakcja wiedzy z artykułów dla bazy wiedzy. + +Pipeline: +1. Chunking - dzielenie tekstu na fragmenty 500-1000 tokenów +2. AI Extraction - ekstrakcja faktów i encji przez Gemini +3. Entity Linking - identyfikacja i deduplikacja encji +4. Relation Extraction - wykrywanie relacji między encjami +5. Embedding Generation - wektory dla semantic search (FAZA 2) + +Usage: + from zopk_knowledge_service import ZOPKKnowledgeService + + service = ZOPKKnowledgeService(db_session) + result = service.extract_from_news(news_id=123) + # lub batch: + result = service.batch_extract(limit=50) +""" + +import re +import json +import logging +import hashlib +from datetime import datetime +from typing import Dict, List, Optional, Tuple, Any +from dataclasses import dataclass, field + +from database import ( + ZOPKNews, + ZOPKKnowledgeChunk, + ZOPKKnowledgeEntity, + ZOPKKnowledgeFact, + ZOPKKnowledgeEntityMention, + ZOPKKnowledgeRelation, +) + +# Configure logging +logger = logging.getLogger(__name__) + +# ============================================================ +# CONFIGURATION +# ============================================================ + +# Chunk size settings +MIN_CHUNK_SIZE = 200 # tokens +MAX_CHUNK_SIZE = 1000 # tokens +CHUNK_OVERLAP = 100 # tokens overlap between chunks +APPROX_CHARS_PER_TOKEN = 4 # Polish text approximation + +# AI extraction settings +MAX_FACTS_PER_CHUNK = 10 +MAX_ENTITIES_PER_CHUNK = 15 + +# Entity types +ENTITY_TYPES = [ + 'company', # Firma, organizacja biznesowa + 'person', # Osoba + 'place', # Miejsce, lokalizacja + 'organization', # Instytucja rządowa, NGO + 'project', # Projekt, inicjatywa + 'technology', # Technologia, produkt + 'event', # Wydarzenie, konferencja +] + +# Fact types +FACT_TYPES = [ + 'statistic', # Liczby, dane ilościowe + 'event', # Zdarzenie z datą + 'statement', # Wypowiedź, deklaracja + 'decision', # Decyzja, postanowienie + 'milestone', # Kamień milowy projektu + 'partnership', # Partnerstwo, współpraca + 'investment', # Inwestycja +] + +# Relation types +RELATION_TYPES = [ + 'investor_in', # A inwestuje w B + 'partner_of', # A jest partnerem B + 'located_in', # A znajduje się w B + 'manages', # A zarządza B + 'works_for', # A pracuje dla B + 'part_of', # A jest częścią B + 'cooperates_with', # A współpracuje z B + 'produces', # A produkuje B + 'supplies_to', # A dostarcza do B +] + +# ============================================================ +# AI PROMPTS +# ============================================================ + +EXTRACTION_SYSTEM_PROMPT = """Jesteś ekspertem ds. ekstrakcji wiedzy z artykułów o projekcie Zielony Okręg Przemysłowy Kaszubia (ZOPK). + +ZOPK to strategiczny projekt transformacji energetycznej i przemysłowej Pomorza, obejmujący: +- Morską energetykę wiatrową (Baltic Power, Baltica, F.E.W. Baltic) +- Elektrownię jądrową (Lubiatowo-Kopalino, PEJ) +- Inwestycje norweskie Kongsberg w Rumi +- Infrastrukturę: Via Pomerania, S6, Droga Czerwona +- Centra danych, laboratoria wodorowe + +Twoim zadaniem jest wyodrębnić z tekstu: +1. FAKTY - konkretne, weryfikowalne informacje (liczby, daty, decyzje) +2. ENCJE - nazwy własne (firmy, osoby, miejsca, projekty) +3. RELACJE - powiązania między encjami""" + +EXTRACTION_USER_PROMPT = """Przeanalizuj poniższy fragment artykułu i wyodrębnij strukturalne informacje. + +TEKST: +{chunk_text} + +ŹRÓDŁO: {source_name} ({published_date}) + +Zwróć JSON w formacie: +{{ + "facts": [ + {{ + "type": "statistic|event|statement|decision|milestone|partnership|investment", + "subject": "podmiot faktu", + "predicate": "czasownik/relacja", + "object": "dopełnienie/informacja", + "full_text": "pełne zdanie opisujące fakt", + "numeric_value": null lub liczba, + "numeric_unit": null lub "PLN|EUR|MW|jobs|%", + "date_value": null lub "YYYY-MM-DD", + "confidence": 0.0-1.0 + }} + ], + "entities": [ + {{ + "type": "company|person|place|organization|project|technology|event", + "name": "nazwa encji", + "description": "krótki opis (1 zdanie)", + "role": "rola w kontekście (np. inwestor, koordynator)" + }} + ], + "relations": [ + {{ + "entity_a": "nazwa encji A", + "entity_b": "nazwa encji B", + "relation": "investor_in|partner_of|located_in|manages|works_for|part_of|cooperates_with|produces|supplies_to", + "description": "opis relacji" + }} + ], + "summary": "1-2 zdaniowe podsumowanie fragmentu", + "keywords": ["słowo1", "słowo2", "słowo3"] +}} + +ZASADY: +- Wyodrębniaj TYLKO informacje wprost zawarte w tekście +- Nie dodawaj informacji z własnej wiedzy +- Podawaj confidence score dla każdego faktu (1.0 = pewny, 0.5 = prawdopodobny) +- Dla liczb zawsze podawaj jednostkę (PLN, EUR, MW, jobs, %) +- Dla dat używaj formatu YYYY-MM-DD +- Nazwy encji pisz dokładnie jak w tekście""" + + +# ============================================================ +# DATA CLASSES +# ============================================================ + +@dataclass +class ChunkData: + """Data for a single chunk.""" + content: str + index: int + token_count: int + + +@dataclass +class ExtractionResult: + """Result of knowledge extraction from a news article.""" + success: bool + news_id: int + chunks_created: int = 0 + facts_created: int = 0 + entities_created: int = 0 + relations_created: int = 0 + error: Optional[str] = None + processing_time: float = 0.0 + + +# ============================================================ +# CHUNKING FUNCTIONS +# ============================================================ + +def estimate_tokens(text: str) -> int: + """Estimate token count for text.""" + if not text: + return 0 + # Polish text: ~4 chars per token on average + return len(text) // APPROX_CHARS_PER_TOKEN + + +def split_into_sentences(text: str) -> List[str]: + """Split text into sentences.""" + # Polish sentence boundaries + sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZĄĆĘŁŃÓŚŹŻ])' + sentences = re.split(sentence_pattern, text) + return [s.strip() for s in sentences if s.strip()] + + +def create_chunks(text: str) -> List[ChunkData]: + """ + Split text into chunks of appropriate size. + + Strategy: + - Split by paragraphs first + - Combine small paragraphs + - Split large paragraphs by sentences + - Maintain overlap between chunks + """ + if not text: + return [] + + chunks = [] + current_chunk = [] + current_tokens = 0 + + # Split by paragraphs + paragraphs = text.split('\n\n') + + for para in paragraphs: + para = para.strip() + if not para: + continue + + para_tokens = estimate_tokens(para) + + # If paragraph is too large, split by sentences + if para_tokens > MAX_CHUNK_SIZE: + sentences = split_into_sentences(para) + for sentence in sentences: + sent_tokens = estimate_tokens(sentence) + + if current_tokens + sent_tokens > MAX_CHUNK_SIZE: + # Save current chunk + if current_chunk: + chunk_text = ' '.join(current_chunk) + chunks.append(ChunkData( + content=chunk_text, + index=len(chunks), + token_count=current_tokens + )) + # Overlap: keep last sentence + current_chunk = [current_chunk[-1]] if current_chunk else [] + current_tokens = estimate_tokens(current_chunk[0]) if current_chunk else 0 + + current_chunk.append(sentence) + current_tokens += sent_tokens + else: + # Add paragraph to current chunk + if current_tokens + para_tokens > MAX_CHUNK_SIZE: + # Save current chunk + if current_chunk: + chunk_text = '\n\n'.join(current_chunk) + chunks.append(ChunkData( + content=chunk_text, + index=len(chunks), + token_count=current_tokens + )) + current_chunk = [] + current_tokens = 0 + + current_chunk.append(para) + current_tokens += para_tokens + + # Save last chunk + if current_chunk: + chunk_text = '\n\n'.join(current_chunk) if len(current_chunk[0]) > 100 else ' '.join(current_chunk) + if estimate_tokens(chunk_text) >= MIN_CHUNK_SIZE: + chunks.append(ChunkData( + content=chunk_text, + index=len(chunks), + token_count=current_tokens + )) + + return chunks + + +# ============================================================ +# KNOWLEDGE SERVICE CLASS +# ============================================================ + +class ZOPKKnowledgeService: + """ + Service for extracting knowledge from ZOPK news articles. + + Features: + - Text chunking with overlap + - AI-powered fact extraction (Gemini) + - Named entity recognition + - Relation extraction + - Entity deduplication + """ + + def __init__(self, db_session, user_id: Optional[int] = None): + """ + Initialize service. + + Args: + db_session: SQLAlchemy database session + user_id: Optional user ID for cost tracking + """ + self.db = db_session + self.user_id = user_id + self._gemini_service = None + + @property + def gemini(self): + """Lazy-load Gemini service.""" + if self._gemini_service is None: + from gemini_service import GeminiService + self._gemini_service = GeminiService() + return self._gemini_service + + def _normalize_entity_name(self, name: str) -> str: + """Normalize entity name for deduplication.""" + if not name: + return '' + # Lowercase, remove extra spaces + normalized = name.lower().strip() + normalized = re.sub(r'\s+', ' ', normalized) + # Remove Polish diacritics for matching + trans = str.maketrans('ąćęłńóśźżĄĆĘŁŃÓŚŹŻ', 'acelnoszzACELNOSZZ') + normalized = normalized.translate(trans) + return normalized + + def _find_or_create_entity( + self, + name: str, + entity_type: str, + description: Optional[str] = None + ) -> ZOPKKnowledgeEntity: + """ + Find existing entity or create new one. + Uses normalized name for deduplication. + """ + normalized = self._normalize_entity_name(name) + + # Try to find existing + existing = self.db.query(ZOPKKnowledgeEntity).filter( + ZOPKKnowledgeEntity.normalized_name == normalized + ).first() + + if existing: + # Update mention count + existing.mentions_count = (existing.mentions_count or 0) + 1 + existing.last_mentioned_at = datetime.now() + # Update description if better + if description and (not existing.description or len(description) > len(existing.description)): + existing.description = description + return existing + + # Create new entity + entity = ZOPKKnowledgeEntity( + entity_type=entity_type, + name=name, + normalized_name=normalized, + description=description, + short_description=description[:500] if description else None, + mentions_count=1, + first_mentioned_at=datetime.now(), + last_mentioned_at=datetime.now() + ) + self.db.add(entity) + self.db.flush() # Get ID + + return entity + + def _extract_with_ai( + self, + chunk: ChunkData, + source_name: str, + published_date: str + ) -> Optional[Dict]: + """ + Extract facts, entities, and relations using Gemini AI. + + Returns parsed JSON or None on error. + """ + try: + prompt = EXTRACTION_USER_PROMPT.format( + chunk_text=chunk.content, + source_name=source_name, + published_date=published_date + ) + + response = self.gemini.generate_text( + prompt=prompt, + system_prompt=EXTRACTION_SYSTEM_PROMPT, + temperature=0.1, # Low temperature for consistency + max_tokens=2000, + user_id=self.user_id, + feature='zopk_knowledge_extraction' + ) + + if not response: + logger.warning("Empty response from Gemini") + return None + + # Parse JSON from response + # Handle markdown code blocks + json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response) + if json_match: + json_str = json_match.group(1) + else: + json_str = response + + # Clean and parse + json_str = json_str.strip() + data = json.loads(json_str) + + return data + + except json.JSONDecodeError as e: + logger.error(f"JSON parse error: {e}") + return None + except Exception as e: + logger.error(f"AI extraction error: {e}") + return None + + def _save_chunk( + self, + news: ZOPKNews, + chunk: ChunkData, + extraction_data: Optional[Dict] + ) -> ZOPKKnowledgeChunk: + """Save chunk to database.""" + db_chunk = ZOPKKnowledgeChunk( + source_news_id=news.id, + content=chunk.content, + chunk_index=chunk.index, + token_count=chunk.token_count, + chunk_type='narrative', # Default + summary=extraction_data.get('summary') if extraction_data else None, + keywords=extraction_data.get('keywords') if extraction_data else None, + language='pl', + extraction_model='gemini-2.0-flash', + extracted_at=datetime.now() + ) + self.db.add(db_chunk) + self.db.flush() + + return db_chunk + + def _save_facts( + self, + chunk: ZOPKKnowledgeChunk, + news: ZOPKNews, + facts_data: List[Dict] + ) -> int: + """Save extracted facts to database.""" + count = 0 + + for fact in facts_data[:MAX_FACTS_PER_CHUNK]: + try: + # Parse numeric value + numeric_value = fact.get('numeric_value') + if numeric_value is not None: + try: + numeric_value = float(numeric_value) + except (ValueError, TypeError): + numeric_value = None + + # Parse date + date_value = fact.get('date_value') + if date_value: + try: + from datetime import datetime as dt + date_value = dt.strptime(date_value, '%Y-%m-%d').date() + except (ValueError, TypeError): + date_value = None + + db_fact = ZOPKKnowledgeFact( + source_chunk_id=chunk.id, + source_news_id=news.id, + fact_type=fact.get('type', 'statement'), + subject=fact.get('subject'), + predicate=fact.get('predicate'), + object=fact.get('object'), + full_text=fact.get('full_text', ''), + numeric_value=numeric_value, + numeric_unit=fact.get('numeric_unit'), + date_value=date_value, + confidence_score=fact.get('confidence', 0.5) + ) + self.db.add(db_fact) + count += 1 + + except Exception as e: + logger.error(f"Error saving fact: {e}") + continue + + return count + + def _save_entities_and_relations( + self, + chunk: ZOPKKnowledgeChunk, + entities_data: List[Dict], + relations_data: List[Dict] + ) -> Tuple[int, int]: + """Save entities and relations to database.""" + entity_count = 0 + relation_count = 0 + + # Map of name -> entity for relations + entity_map = {} + + # Save entities + for ent in entities_data[:MAX_ENTITIES_PER_CHUNK]: + try: + entity = self._find_or_create_entity( + name=ent.get('name', ''), + entity_type=ent.get('type', 'organization'), + description=ent.get('description') + ) + entity_map[ent.get('name', '').lower()] = entity + + # Create mention link + mention = ZOPKKnowledgeEntityMention( + chunk_id=chunk.id, + entity_id=entity.id, + mention_text=ent.get('name'), + mention_type='direct', + role_in_context=ent.get('role'), + confidence=0.9 + ) + self.db.add(mention) + entity_count += 1 + + except Exception as e: + logger.error(f"Error saving entity: {e}") + continue + + # Flush to get entity IDs + self.db.flush() + + # Save relations + for rel in relations_data: + try: + entity_a_name = rel.get('entity_a', '').lower() + entity_b_name = rel.get('entity_b', '').lower() + + entity_a = entity_map.get(entity_a_name) + entity_b = entity_map.get(entity_b_name) + + if not entity_a or not entity_b: + continue + + # Check if relation already exists + existing = self.db.query(ZOPKKnowledgeRelation).filter( + ZOPKKnowledgeRelation.entity_a_id == entity_a.id, + ZOPKKnowledgeRelation.entity_b_id == entity_b.id, + ZOPKKnowledgeRelation.relation_type == rel.get('relation') + ).first() + + if not existing: + db_relation = ZOPKKnowledgeRelation( + entity_a_id=entity_a.id, + entity_b_id=entity_b.id, + relation_type=rel.get('relation', 'cooperates_with'), + evidence_text=rel.get('description'), + source_chunk_id=chunk.id, + confidence=0.8, + strength=3 + ) + self.db.add(db_relation) + relation_count += 1 + + except Exception as e: + logger.error(f"Error saving relation: {e}") + continue + + return entity_count, relation_count + + def extract_from_news(self, news_id: int) -> ExtractionResult: + """ + Extract knowledge from a single news article. + + Args: + news_id: ID of ZOPKNews record + + Returns: + ExtractionResult with statistics + """ + import time + start_time = time.time() + + # Get news record + news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first() + + if not news: + return ExtractionResult( + success=False, + news_id=news_id, + error=f"News record {news_id} not found" + ) + + # Check if already extracted + if news.knowledge_extracted: + return ExtractionResult( + success=True, + news_id=news_id, + error="Already extracted" + ) + + # Check if content is scraped + if not news.full_content: + return ExtractionResult( + success=False, + news_id=news_id, + error="No content scraped" + ) + + logger.info(f"Extracting knowledge from news {news_id}: {news.title[:50]}...") + + # Create chunks + chunks = create_chunks(news.full_content) + + if not chunks: + return ExtractionResult( + success=False, + news_id=news_id, + error="No chunks created" + ) + + # Statistics + chunks_created = 0 + facts_created = 0 + entities_created = 0 + relations_created = 0 + + # Process each chunk + for chunk in chunks: + try: + # Extract with AI + extraction_data = self._extract_with_ai( + chunk=chunk, + source_name=news.source_name or 'unknown', + published_date=news.published_at.strftime('%Y-%m-%d') if news.published_at else 'unknown' + ) + + # Save chunk + db_chunk = self._save_chunk(news, chunk, extraction_data) + chunks_created += 1 + + if extraction_data: + # Save facts + facts_created += self._save_facts( + db_chunk, + news, + extraction_data.get('facts', []) + ) + + # Save entities and relations + ent_count, rel_count = self._save_entities_and_relations( + db_chunk, + extraction_data.get('entities', []), + extraction_data.get('relations', []) + ) + entities_created += ent_count + relations_created += rel_count + + except Exception as e: + logger.error(f"Error processing chunk {chunk.index}: {e}") + continue + + # Mark as extracted + news.knowledge_extracted = True + news.knowledge_extracted_at = datetime.now() + self.db.commit() + + processing_time = time.time() - start_time + + logger.info( + f"Extracted from news {news_id}: " + f"{chunks_created} chunks, {facts_created} facts, " + f"{entities_created} entities, {relations_created} relations " + f"in {processing_time:.2f}s" + ) + + return ExtractionResult( + success=True, + news_id=news_id, + chunks_created=chunks_created, + facts_created=facts_created, + entities_created=entities_created, + relations_created=relations_created, + processing_time=processing_time + ) + + def batch_extract(self, limit: int = 50) -> Dict: + """ + Batch extract knowledge from scraped articles. + + Args: + limit: Maximum number of articles to process + + Returns: + Dict with statistics + """ + import time + + logger.info(f"Starting batch extraction: limit={limit}") + + # Find articles ready for extraction + articles = self.db.query(ZOPKNews).filter( + ZOPKNews.status.in_(['approved', 'auto_approved']), + ZOPKNews.scrape_status == 'scraped', + ZOPKNews.knowledge_extracted == False + ).order_by( + ZOPKNews.created_at.desc() + ).limit(limit).all() + + stats = { + 'total': len(articles), + 'success': 0, + 'failed': 0, + 'chunks_created': 0, + 'facts_created': 0, + 'entities_created': 0, + 'relations_created': 0, + 'errors': [], + 'processing_time': 0 + } + + start_time = time.time() + + for article in articles: + result = self.extract_from_news(article.id) + + if result.success: + stats['success'] += 1 + stats['chunks_created'] += result.chunks_created + stats['facts_created'] += result.facts_created + stats['entities_created'] += result.entities_created + stats['relations_created'] += result.relations_created + else: + stats['failed'] += 1 + if result.error: + stats['errors'].append({ + 'id': article.id, + 'title': article.title[:100], + 'error': result.error + }) + + stats['processing_time'] = round(time.time() - start_time, 2) + + logger.info( + f"Batch extraction complete: {stats['success']}/{stats['total']} success " + f"in {stats['processing_time']}s" + ) + + return stats + + def get_extraction_statistics(self) -> Dict: + """Get knowledge extraction statistics.""" + from sqlalchemy import func + + # Articles stats + total_approved = self.db.query(func.count(ZOPKNews.id)).filter( + ZOPKNews.status.in_(['approved', 'auto_approved']) + ).scalar() + + scraped = self.db.query(func.count(ZOPKNews.id)).filter( + ZOPKNews.scrape_status == 'scraped' + ).scalar() + + extracted = self.db.query(func.count(ZOPKNews.id)).filter( + ZOPKNews.knowledge_extracted == True + ).scalar() + + pending = self.db.query(func.count(ZOPKNews.id)).filter( + ZOPKNews.scrape_status == 'scraped', + ZOPKNews.knowledge_extracted == False + ).scalar() + + # Knowledge base stats + chunks_count = self.db.query(func.count(ZOPKKnowledgeChunk.id)).scalar() + facts_count = self.db.query(func.count(ZOPKKnowledgeFact.id)).scalar() + entities_count = self.db.query(func.count(ZOPKKnowledgeEntity.id)).scalar() + relations_count = self.db.query(func.count(ZOPKKnowledgeRelation.id)).scalar() + + # Top entities by mentions + top_entities = self.db.query( + ZOPKKnowledgeEntity.name, + ZOPKKnowledgeEntity.entity_type, + ZOPKKnowledgeEntity.mentions_count + ).order_by( + ZOPKKnowledgeEntity.mentions_count.desc() + ).limit(10).all() + + return { + 'articles': { + 'total_approved': total_approved or 0, + 'scraped': scraped or 0, + 'extracted': extracted or 0, + 'pending': pending or 0 + }, + 'knowledge_base': { + 'chunks': chunks_count or 0, + 'facts': facts_count or 0, + 'entities': entities_count or 0, + 'relations': relations_count or 0 + }, + 'top_entities': [ + {'name': e[0], 'type': e[1], 'mentions': e[2]} + for e in top_entities + ] + } + + +# ============================================================ +# SEMANTIC SEARCH (FAZA 2) +# ============================================================ + +def search_knowledge( + db_session, + query: str, + limit: int = 5, + min_similarity: float = 0.3, + user_id: Optional[int] = None +) -> List[Dict]: + """ + Semantic search in ZOPK knowledge base. + + Args: + db_session: SQLAlchemy session + query: Search query + limit: Max results to return + min_similarity: Minimum cosine similarity (0-1) + user_id: User ID for cost tracking + + Returns: + List of matching chunks with similarity scores + """ + from gemini_service import GeminiService + import json + + # Generate query embedding + gemini = GeminiService() + query_embedding = gemini.generate_embedding( + text=query, + task_type='retrieval_query', + user_id=user_id, + feature='zopk_knowledge_search' + ) + + if not query_embedding: + logger.warning("Failed to generate query embedding") + return [] + + # Search in database + # Note: This uses PostgreSQL pgvector for efficient similarity search + # For now, we'll do a simpler approach with JSON embeddings + + chunks = db_session.query(ZOPKKnowledgeChunk).filter( + ZOPKKnowledgeChunk.embedding.isnot(None) + ).all() + + results = [] + + for chunk in chunks: + try: + # Parse stored embedding + if isinstance(chunk.embedding, str): + chunk_embedding = json.loads(chunk.embedding) + else: + chunk_embedding = chunk.embedding + + if not chunk_embedding: + continue + + # Calculate cosine similarity + similarity = _cosine_similarity(query_embedding, chunk_embedding) + + if similarity >= min_similarity: + results.append({ + 'chunk_id': chunk.id, + 'content': chunk.content[:500], + 'summary': chunk.summary, + 'keywords': chunk.keywords, + 'similarity': round(similarity, 4), + 'source_news_id': chunk.source_news_id, + 'importance': chunk.importance_score + }) + except Exception as e: + logger.error(f"Error processing chunk {chunk.id}: {e}") + continue + + # Sort by similarity + results.sort(key=lambda x: x['similarity'], reverse=True) + + return results[:limit] + + +def _cosine_similarity(vec1: List[float], vec2: List[float]) -> float: + """Calculate cosine similarity between two vectors.""" + import math + + if len(vec1) != len(vec2): + return 0.0 + + dot_product = sum(a * b for a, b in zip(vec1, vec2)) + norm1 = math.sqrt(sum(a * a for a in vec1)) + norm2 = math.sqrt(sum(b * b for b in vec2)) + + if norm1 == 0 or norm2 == 0: + return 0.0 + + return dot_product / (norm1 * norm2) + + +def get_relevant_facts( + db_session, + query: str, + limit: int = 10 +) -> List[Dict]: + """ + Get facts relevant to a query. + + Uses keyword matching for now, can be enhanced with embeddings. + """ + from sqlalchemy import or_ + + # Simple keyword search + keywords = query.lower().split() + + facts = db_session.query(ZOPKKnowledgeFact).filter( + ZOPKKnowledgeFact.is_verified == True + ).all() + + results = [] + + for fact in facts: + score = 0 + fact_text = (fact.full_text or '').lower() + + for keyword in keywords: + if keyword in fact_text: + score += 1 + + if score > 0: + results.append({ + 'fact_id': fact.id, + 'fact_type': fact.fact_type, + 'full_text': fact.full_text, + 'subject': fact.subject, + 'numeric_value': float(fact.numeric_value) if fact.numeric_value else None, + 'numeric_unit': fact.numeric_unit, + 'confidence': float(fact.confidence_score) if fact.confidence_score else None, + 'relevance_score': score + }) + + # Sort by relevance + results.sort(key=lambda x: x['relevance_score'], reverse=True) + + return results[:limit] + + +def generate_chunk_embeddings(db_session, limit: int = 100, user_id: Optional[int] = None) -> Dict: + """ + Generate embeddings for chunks that don't have them. + + Args: + db_session: SQLAlchemy session + limit: Max chunks to process + user_id: User ID for cost tracking + + Returns: + Dict with statistics + """ + import json + from gemini_service import GeminiService + + gemini = GeminiService() + + # Find chunks without embeddings + chunks = db_session.query(ZOPKKnowledgeChunk).filter( + ZOPKKnowledgeChunk.embedding.is_(None) + ).limit(limit).all() + + stats = { + 'total': len(chunks), + 'success': 0, + 'failed': 0 + } + + for chunk in chunks: + try: + embedding = gemini.generate_embedding( + text=chunk.content, + task_type='retrieval_document', + title=chunk.summary, + user_id=user_id, + feature='zopk_chunk_embedding' + ) + + if embedding: + # Store as JSON string + chunk.embedding = json.dumps(embedding) + stats['success'] += 1 + else: + stats['failed'] += 1 + + except Exception as e: + logger.error(f"Error generating embedding for chunk {chunk.id}: {e}") + stats['failed'] += 1 + + db_session.commit() + + logger.info(f"Generated embeddings: {stats['success']}/{stats['total']} success") + + return stats + + +# ============================================================ +# STANDALONE FUNCTIONS FOR CRON/CLI +# ============================================================ + +def extract_pending_articles(db_session, limit: int = 50) -> Dict: + """ + Convenience function for cron jobs. + + Usage: + from zopk_knowledge_service import extract_pending_articles + result = extract_pending_articles(db_session, limit=50) + """ + service = ZOPKKnowledgeService(db_session) + return service.batch_extract(limit=limit) + + +def get_knowledge_stats(db_session) -> Dict: + """ + Get knowledge extraction statistics for monitoring. + """ + service = ZOPKKnowledgeService(db_session) + return service.get_extraction_statistics()