""" ZOPK Knowledge Service - Ekstrakcja wiedzy z artykułów dla bazy wiedzy. Pipeline: 1. Chunking - dzielenie tekstu na fragmenty 500-1000 tokenów 2. AI Extraction - ekstrakcja faktów i encji przez Gemini 3. Entity Linking - identyfikacja i deduplikacja encji 4. Relation Extraction - wykrywanie relacji między encjami 5. Embedding Generation - wektory dla semantic search (FAZA 2) Usage: from zopk_knowledge_service import ZOPKKnowledgeService service = ZOPKKnowledgeService(db_session) result = service.extract_from_news(news_id=123) # lub batch: result = service.batch_extract(limit=50) """ import re import json import logging import hashlib from datetime import datetime from typing import Dict, List, Optional, Tuple, Any, Callable from dataclasses import dataclass, field # Import progress tracking from scraper from zopk_content_scraper import ProgressUpdate, ProgressCallback from database import ( ZOPKNews, ZOPKKnowledgeChunk, ZOPKKnowledgeEntity, ZOPKKnowledgeFact, ZOPKKnowledgeEntityMention, ZOPKKnowledgeRelation, ) # Configure logging logger = logging.getLogger(__name__) # ============================================================ # CONFIGURATION # ============================================================ # Chunk size settings # NOTE: Reduced from 1000 to 500 tokens due to Gemini safety filter issues # Long texts (~4000 chars) trigger safety blocks, ~2000 chars work reliably MIN_CHUNK_SIZE = 200 # tokens MAX_CHUNK_SIZE = 500 # tokens (~2000 chars for Polish text) CHUNK_OVERLAP = 50 # tokens overlap between chunks (reduced proportionally) APPROX_CHARS_PER_TOKEN = 4 # Polish text approximation # AI extraction settings MAX_FACTS_PER_CHUNK = 10 MAX_ENTITIES_PER_CHUNK = 15 # Entity types ENTITY_TYPES = [ 'company', # Firma, organizacja biznesowa 'person', # Osoba 'place', # Miejsce, lokalizacja 'organization', # Instytucja rządowa, NGO 'project', # Projekt, inicjatywa 'technology', # Technologia, produkt 'event', # Wydarzenie, konferencja ] # Fact types FACT_TYPES = [ 'statistic', # Liczby, dane ilościowe 'event', # Zdarzenie z datą 'statement', # Wypowiedź, deklaracja 'decision', # Decyzja, postanowienie 'milestone', # Kamień milowy projektu 'partnership', # Partnerstwo, współpraca 'investment', # Inwestycja ] # Relation types RELATION_TYPES = [ 'investor_in', # A inwestuje w B 'partner_of', # A jest partnerem B 'located_in', # A znajduje się w B 'manages', # A zarządza B 'works_for', # A pracuje dla B 'part_of', # A jest częścią B 'cooperates_with', # A współpracuje z B 'produces', # A produkuje B 'supplies_to', # A dostarcza do B ] # ============================================================ # AI PROMPTS # ============================================================ # Ultra-simplified prompt to avoid Gemini safety filter issues # Note: Complex JSON schemas with pipe characters were triggering filters # Note: max_tokens parameter also triggers filters - don't use it! EXTRACTION_USER_PROMPT = """Przeanalizuj artykuł i wyodrębnij informacje w formacie JSON. ARTYKUŁ: {chunk_text} Zwróć JSON z następującą strukturą: {{ "facts": [ {{"text": "pełny fakt", "type": "investment"}} ], "entities": [ {{"name": "Nazwa", "type": "company"}} ], "summary": "krótkie podsumowanie" }} Typy faktów: investment, decision, event, statistic, partnership, milestone Typy encji: company, person, place, organization, project""" # System prompt is now empty - the user prompt contains all necessary instructions EXTRACTION_SYSTEM_PROMPT = "" # ============================================================ # DATA CLASSES # ============================================================ @dataclass class ChunkData: """Data for a single chunk.""" content: str index: int token_count: int @dataclass class ExtractionResult: """Result of knowledge extraction from a news article.""" success: bool news_id: int chunks_created: int = 0 facts_created: int = 0 entities_created: int = 0 relations_created: int = 0 error: Optional[str] = None processing_time: float = 0.0 # ============================================================ # CHUNKING FUNCTIONS # ============================================================ def estimate_tokens(text: str) -> int: """Estimate token count for text.""" if not text: return 0 # Polish text: ~4 chars per token on average return len(text) // APPROX_CHARS_PER_TOKEN def split_into_sentences(text: str) -> List[str]: """Split text into sentences.""" # Polish sentence boundaries sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZĄĆĘŁŃÓŚŹŻ])' sentences = re.split(sentence_pattern, text) return [s.strip() for s in sentences if s.strip()] def create_chunks(text: str) -> List[ChunkData]: """ Split text into chunks of appropriate size. Strategy: - Split by paragraphs first - Combine small paragraphs - Split large paragraphs by sentences - Maintain overlap between chunks """ if not text: return [] chunks = [] current_chunk = [] current_tokens = 0 # Split by paragraphs paragraphs = text.split('\n\n') for para in paragraphs: para = para.strip() if not para: continue para_tokens = estimate_tokens(para) # If paragraph is too large, split by sentences if para_tokens > MAX_CHUNK_SIZE: sentences = split_into_sentences(para) for sentence in sentences: sent_tokens = estimate_tokens(sentence) if current_tokens + sent_tokens > MAX_CHUNK_SIZE: # Save current chunk if current_chunk: chunk_text = ' '.join(current_chunk) chunks.append(ChunkData( content=chunk_text, index=len(chunks), token_count=current_tokens )) # Overlap: keep last sentence current_chunk = [current_chunk[-1]] if current_chunk else [] current_tokens = estimate_tokens(current_chunk[0]) if current_chunk else 0 current_chunk.append(sentence) current_tokens += sent_tokens else: # Add paragraph to current chunk if current_tokens + para_tokens > MAX_CHUNK_SIZE: # Save current chunk if current_chunk: chunk_text = '\n\n'.join(current_chunk) chunks.append(ChunkData( content=chunk_text, index=len(chunks), token_count=current_tokens )) current_chunk = [] current_tokens = 0 current_chunk.append(para) current_tokens += para_tokens # Save last chunk if current_chunk: chunk_text = '\n\n'.join(current_chunk) if len(current_chunk[0]) > 100 else ' '.join(current_chunk) if estimate_tokens(chunk_text) >= MIN_CHUNK_SIZE: chunks.append(ChunkData( content=chunk_text, index=len(chunks), token_count=current_tokens )) return chunks # ============================================================ # KNOWLEDGE SERVICE CLASS # ============================================================ class ZOPKKnowledgeService: """ Service for extracting knowledge from ZOPK news articles. Features: - Text chunking with overlap - AI-powered fact extraction (Gemini) - Named entity recognition - Relation extraction - Entity deduplication """ def __init__(self, db_session, user_id: Optional[int] = None): """ Initialize service. Args: db_session: SQLAlchemy database session user_id: Optional user ID for cost tracking """ self.db = db_session self.user_id = user_id self._gemini_service = None @property def gemini(self): """Lazy-load Gemini service.""" if self._gemini_service is None: from gemini_service import GeminiService self._gemini_service = GeminiService() return self._gemini_service def _normalize_entity_name(self, name: str) -> str: """Normalize entity name for deduplication.""" if not name: return '' # Lowercase, remove extra spaces normalized = name.lower().strip() normalized = re.sub(r'\s+', ' ', normalized) # Remove Polish diacritics for matching trans = str.maketrans('ąćęłńóśźżĄĆĘŁŃÓŚŹŻ', 'acelnoszzACELNOSZZ') normalized = normalized.translate(trans) return normalized def _find_or_create_entity( self, name: str, entity_type: str, description: Optional[str] = None ) -> ZOPKKnowledgeEntity: """ Find existing entity or create new one. Uses normalized name for deduplication. """ normalized = self._normalize_entity_name(name) # Try to find existing existing = self.db.query(ZOPKKnowledgeEntity).filter( ZOPKKnowledgeEntity.normalized_name == normalized ).first() if existing: # Update mention count existing.mentions_count = (existing.mentions_count or 0) + 1 existing.last_mentioned_at = datetime.now() # Update description if better if description and (not existing.description or len(description) > len(existing.description)): existing.description = description return existing # Create new entity entity = ZOPKKnowledgeEntity( entity_type=entity_type, name=name, normalized_name=normalized, description=description, short_description=description[:500] if description else None, mentions_count=1, first_mentioned_at=datetime.now(), last_mentioned_at=datetime.now() ) self.db.add(entity) self.db.flush() # Get ID return entity def _extract_with_ai( self, chunk: ChunkData, source_name: str, published_date: str ) -> Optional[Dict]: """ Extract facts, entities, and relations using Gemini AI. Returns parsed JSON or None on error. """ try: # Truncate chunk to avoid Gemini safety filter issues with long texts # Testing showed ~4000 chars triggers safety blocks, ~2000 chars works MAX_PROMPT_CHARS = 2000 chunk_text = chunk.content[:MAX_PROMPT_CHARS] if len(chunk.content) > MAX_PROMPT_CHARS: logger.debug(f"Truncated chunk from {len(chunk.content)} to {MAX_PROMPT_CHARS} chars") # Simplified single prompt (system prompt removed to avoid safety filter issues) prompt = EXTRACTION_USER_PROMPT.format( chunk_text=chunk_text, source_name=source_name, published_date=published_date ) # NOTE: max_tokens removed - testing showed it triggers safety filters! response = self.gemini.generate_text( prompt=prompt, temperature=0.1, # Low temperature for consistency user_id=self.user_id, feature='zopk_knowledge_extraction' ) if not response: logger.warning("Empty response from Gemini") return None # Parse JSON from response # Handle markdown code blocks json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response) if json_match: json_str = json_match.group(1) else: json_str = response # Clean and parse json_str = json_str.strip() data = json.loads(json_str) return data except json.JSONDecodeError as e: logger.error(f"JSON parse error: {e}") return None except Exception as e: logger.error(f"AI extraction error: {e}") return None def _save_chunk( self, news: ZOPKNews, chunk: ChunkData, extraction_data: Optional[Dict] ) -> ZOPKKnowledgeChunk: """Save chunk to database.""" db_chunk = ZOPKKnowledgeChunk( source_news_id=news.id, content=chunk.content, chunk_index=chunk.index, token_count=chunk.token_count, chunk_type='narrative', # Default summary=extraction_data.get('summary') if extraction_data else None, keywords=extraction_data.get('keywords') if extraction_data else None, language='pl', extraction_model='gemini-3-flash-preview', extracted_at=datetime.now() ) self.db.add(db_chunk) self.db.flush() return db_chunk def _save_facts( self, chunk: ZOPKKnowledgeChunk, news: ZOPKNews, facts_data: List[Dict] ) -> int: """Save extracted facts to database.""" count = 0 for fact in facts_data[:MAX_FACTS_PER_CHUNK]: try: # Parse numeric value numeric_value = fact.get('numeric_value') if numeric_value is not None: try: numeric_value = float(numeric_value) except (ValueError, TypeError): numeric_value = None # Parse date date_value = fact.get('date_value') if date_value: try: from datetime import datetime as dt date_value = dt.strptime(date_value, '%Y-%m-%d').date() except (ValueError, TypeError): date_value = None # Support both old format (full_text) and new simplified format (text) fact_text = fact.get('text') or fact.get('full_text', '') db_fact = ZOPKKnowledgeFact( source_chunk_id=chunk.id, source_news_id=news.id, fact_type=fact.get('type', 'statement'), subject=fact.get('subject'), predicate=fact.get('predicate'), object=fact.get('object'), full_text=fact_text, numeric_value=numeric_value, numeric_unit=fact.get('numeric_unit'), date_value=date_value, confidence_score=fact.get('confidence', 0.5) ) self.db.add(db_fact) count += 1 except Exception as e: logger.error(f"Error saving fact: {e}") continue return count def _save_entities_and_relations( self, chunk: ZOPKKnowledgeChunk, entities_data: List[Dict], relations_data: List[Dict] ) -> Tuple[int, int]: """Save entities and relations to database.""" entity_count = 0 relation_count = 0 # Map of name -> entity for relations entity_map = {} # Save entities for ent in entities_data[:MAX_ENTITIES_PER_CHUNK]: try: entity = self._find_or_create_entity( name=ent.get('name', ''), entity_type=ent.get('type', 'organization'), description=ent.get('description') ) entity_map[ent.get('name', '').lower()] = entity # Create mention link mention = ZOPKKnowledgeEntityMention( chunk_id=chunk.id, entity_id=entity.id, mention_text=ent.get('name'), mention_type='direct', role_in_context=ent.get('role'), confidence=0.9 ) self.db.add(mention) entity_count += 1 except Exception as e: logger.error(f"Error saving entity: {e}") continue # Flush to get entity IDs self.db.flush() # Save relations for rel in relations_data: try: entity_a_name = rel.get('entity_a', '').lower() entity_b_name = rel.get('entity_b', '').lower() entity_a = entity_map.get(entity_a_name) entity_b = entity_map.get(entity_b_name) if not entity_a or not entity_b: continue # Check if relation already exists existing = self.db.query(ZOPKKnowledgeRelation).filter( ZOPKKnowledgeRelation.entity_a_id == entity_a.id, ZOPKKnowledgeRelation.entity_b_id == entity_b.id, ZOPKKnowledgeRelation.relation_type == rel.get('relation') ).first() if not existing: db_relation = ZOPKKnowledgeRelation( entity_a_id=entity_a.id, entity_b_id=entity_b.id, relation_type=rel.get('relation', 'cooperates_with'), evidence_text=rel.get('description'), source_chunk_id=chunk.id, confidence=0.8, strength=3 ) self.db.add(db_relation) relation_count += 1 except Exception as e: logger.error(f"Error saving relation: {e}") continue return entity_count, relation_count def extract_from_news(self, news_id: int) -> ExtractionResult: """ Extract knowledge from a single news article. Args: news_id: ID of ZOPKNews record Returns: ExtractionResult with statistics """ import time start_time = time.time() # Get news record news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first() if not news: return ExtractionResult( success=False, news_id=news_id, error=f"News record {news_id} not found" ) # Check if already extracted if news.knowledge_extracted: return ExtractionResult( success=True, news_id=news_id, error="Już wyekstrahowano" ) # Check if content is scraped if not news.full_content: return ExtractionResult( success=False, news_id=news_id, error="Brak zescrapowanej treści" ) logger.info(f"Extracting knowledge from news {news_id}: {news.title[:50]}...") # Create chunks chunks = create_chunks(news.full_content) if not chunks: return ExtractionResult( success=False, news_id=news_id, error="Treść za krótka do ekstrakcji" ) # Statistics chunks_created = 0 facts_created = 0 entities_created = 0 relations_created = 0 # Process each chunk for chunk in chunks: try: # Extract with AI extraction_data = self._extract_with_ai( chunk=chunk, source_name=news.source_name or 'unknown', published_date=news.published_at.strftime('%Y-%m-%d') if news.published_at else 'unknown' ) # Save chunk db_chunk = self._save_chunk(news, chunk, extraction_data) chunks_created += 1 if extraction_data: # Save facts facts_created += self._save_facts( db_chunk, news, extraction_data.get('facts', []) ) # Save entities and relations ent_count, rel_count = self._save_entities_and_relations( db_chunk, extraction_data.get('entities', []), extraction_data.get('relations', []) ) entities_created += ent_count relations_created += rel_count except Exception as e: logger.error(f"Error processing chunk {chunk.index}: {e}") continue # Mark as extracted news.knowledge_extracted = True news.knowledge_extracted_at = datetime.now() self.db.commit() processing_time = time.time() - start_time logger.info( f"Extracted from news {news_id}: " f"{chunks_created} chunks, {facts_created} facts, " f"{entities_created} entities, {relations_created} relations " f"in {processing_time:.2f}s" ) return ExtractionResult( success=True, news_id=news_id, chunks_created=chunks_created, facts_created=facts_created, entities_created=entities_created, relations_created=relations_created, processing_time=processing_time ) def batch_extract(self, limit: int = 50, progress_callback: ProgressCallback = None) -> Dict: """ Batch extract knowledge from scraped articles. Args: limit: Maximum number of articles to process progress_callback: Optional callback for progress updates Returns: Dict with statistics """ import time logger.info(f"Starting batch extraction: limit={limit}") # Find articles ready for extraction articles = self.db.query(ZOPKNews).filter( ZOPKNews.status.in_(['approved', 'auto_approved']), ZOPKNews.scrape_status == 'scraped', ZOPKNews.knowledge_extracted == False ).order_by( ZOPKNews.created_at.desc() ).limit(limit).all() total = len(articles) stats = { 'total': total, 'success': 0, 'failed': 0, 'chunks_created': 0, 'facts_created': 0, 'entities_created': 0, 'relations_created': 0, 'errors': [], 'processing_time': 0 } # Send initial progress if progress_callback and total > 0: progress_callback(ProgressUpdate( current=0, total=total, percent=0.0, stage='extracting', status='processing', message=f'Rozpoczynam ekstrakcję wiedzy z {total} artykułów...', details={'success': 0, 'failed': 0, 'chunks': 0, 'facts': 0, 'entities': 0} )) start_time = time.time() for idx, article in enumerate(articles, 1): # Send progress update before processing if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round((idx - 1) / total * 100, 1), stage='extracting', status='processing', message=f'Analizuję przez AI: {article.title[:50]}...', article_id=article.id, article_title=article.title[:80], details={ 'success': stats['success'], 'failed': stats['failed'], 'chunks': stats['chunks_created'], 'facts': stats['facts_created'], 'entities': stats['entities_created'] } )) result = self.extract_from_news(article.id) if result.success: stats['success'] += 1 stats['chunks_created'] += result.chunks_created stats['facts_created'] += result.facts_created stats['entities_created'] += result.entities_created stats['relations_created'] += result.relations_created if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='extracting', status='success', message=f'✓ Wyekstrahowano: {result.chunks_created} chunks, {result.facts_created} faktów, {result.entities_created} encji', article_id=article.id, article_title=article.title[:80], details={ 'success': stats['success'], 'failed': stats['failed'], 'chunks': stats['chunks_created'], 'facts': stats['facts_created'], 'entities': stats['entities_created'], 'new_chunks': result.chunks_created, 'new_facts': result.facts_created, 'new_entities': result.entities_created } )) else: stats['failed'] += 1 if result.error: stats['errors'].append({ 'id': article.id, 'title': article.title[:100], 'error': result.error }) if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='extracting', status='failed', message=f'✗ Błąd ekstrakcji: {result.error[:50]}...' if result.error else '✗ Błąd', article_id=article.id, article_title=article.title[:80], details={ 'success': stats['success'], 'failed': stats['failed'], 'error': result.error } )) stats['processing_time'] = round(time.time() - start_time, 2) # Send completion progress if progress_callback: progress_callback(ProgressUpdate( current=total, total=total, percent=100.0, stage='extracting', status='complete', message=f'Zakończono: {stats["success"]}/{total} artykułów. ' f'Utworzono: {stats["chunks_created"]} chunks, {stats["facts_created"]} faktów, ' f'{stats["entities_created"]} encji', details={ 'success': stats['success'], 'failed': stats['failed'], 'chunks': stats['chunks_created'], 'facts': stats['facts_created'], 'entities': stats['entities_created'], 'relations': stats['relations_created'], 'processing_time': stats['processing_time'] } )) logger.info( f"Batch extraction complete: {stats['success']}/{stats['total']} success " f"in {stats['processing_time']}s" ) return stats def get_extraction_statistics(self) -> Dict: """Get knowledge extraction statistics.""" from sqlalchemy import func # Articles stats total_approved = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.status.in_(['approved', 'auto_approved']) ).scalar() scraped = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.scrape_status == 'scraped' ).scalar() # Pending scrape: approved but not yet scraped pending_scrape = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.status.in_(['approved', 'auto_approved']), ZOPKNews.scrape_status.in_(['pending', None]) ).scalar() extracted = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.knowledge_extracted == True ).scalar() pending_extract = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.scrape_status == 'scraped', ZOPKNews.knowledge_extracted == False ).scalar() # Knowledge base stats total_chunks = self.db.query(func.count(ZOPKKnowledgeChunk.id)).scalar() total_facts = self.db.query(func.count(ZOPKKnowledgeFact.id)).scalar() total_entities = self.db.query(func.count(ZOPKKnowledgeEntity.id)).scalar() total_relations = self.db.query(func.count(ZOPKKnowledgeRelation.id)).scalar() # Embeddings stats chunks_with_embeddings = self.db.query(func.count(ZOPKKnowledgeChunk.id)).filter( ZOPKKnowledgeChunk.embedding.isnot(None) ).scalar() chunks_without_embeddings = (total_chunks or 0) - (chunks_with_embeddings or 0) # Top entities by mentions top_entities = self.db.query( ZOPKKnowledgeEntity.name, ZOPKKnowledgeEntity.entity_type, ZOPKKnowledgeEntity.mentions_count ).order_by( ZOPKKnowledgeEntity.mentions_count.desc() ).limit(10).all() return { 'articles': { 'total_approved': total_approved or 0, 'scraped': scraped or 0, 'pending_scrape': pending_scrape or 0, 'extracted': extracted or 0, 'pending_extract': pending_extract or 0 }, 'knowledge_base': { 'total_chunks': total_chunks or 0, 'total_facts': total_facts or 0, 'total_entities': total_entities or 0, 'total_relations': total_relations or 0, 'chunks_with_embeddings': chunks_with_embeddings or 0, 'chunks_without_embeddings': chunks_without_embeddings or 0 }, 'top_entities': [ {'name': e[0], 'type': e[1], 'mentions': e[2]} for e in top_entities ] } # ============================================================ # SEMANTIC SEARCH (FAZA 2) # ============================================================ def search_knowledge( db_session, query: str, limit: int = 5, min_similarity: float = 0.3, user_id: Optional[int] = None ) -> List[Dict]: """ Semantic search in ZOPK knowledge base. Args: db_session: SQLAlchemy session query: Search query limit: Max results to return min_similarity: Minimum cosine similarity (0-1) user_id: User ID for cost tracking Returns: List of matching chunks with similarity scores """ from gemini_service import GeminiService import json # Generate query embedding gemini = GeminiService() query_embedding = gemini.generate_embedding( text=query, task_type='retrieval_query', user_id=user_id, feature='zopk_knowledge_search' ) if not query_embedding: logger.warning("Failed to generate query embedding") return [] # Search in database # Note: This uses PostgreSQL pgvector for efficient similarity search # For now, we'll do a simpler approach with JSON embeddings chunks = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.embedding.isnot(None) ).all() results = [] for chunk in chunks: try: # Parse stored embedding if isinstance(chunk.embedding, str): chunk_embedding = json.loads(chunk.embedding) else: chunk_embedding = chunk.embedding if not chunk_embedding: continue # Calculate cosine similarity similarity = _cosine_similarity(query_embedding, chunk_embedding) if similarity >= min_similarity: results.append({ 'chunk_id': chunk.id, 'content': chunk.content[:500], 'summary': chunk.summary, 'keywords': chunk.keywords, 'similarity': round(similarity, 4), 'source_news_id': chunk.source_news_id, 'importance': chunk.importance_score }) except Exception as e: logger.error(f"Error processing chunk {chunk.id}: {e}") continue # Sort by similarity results.sort(key=lambda x: x['similarity'], reverse=True) return results[:limit] def _cosine_similarity(vec1: List[float], vec2: List[float]) -> float: """Calculate cosine similarity between two vectors.""" import math if len(vec1) != len(vec2): return 0.0 dot_product = sum(a * b for a, b in zip(vec1, vec2)) norm1 = math.sqrt(sum(a * a for a in vec1)) norm2 = math.sqrt(sum(b * b for b in vec2)) if norm1 == 0 or norm2 == 0: return 0.0 return dot_product / (norm1 * norm2) def get_relevant_facts( db_session, query: str, limit: int = 10 ) -> List[Dict]: """ Get facts relevant to a query. Uses keyword matching for now, can be enhanced with embeddings. """ from sqlalchemy import or_ # Simple keyword search keywords = query.lower().split() # NOTE: Removed is_verified filter - auto-extracted facts are usable # Future: add manual verification workflow and re-enable filter facts = db_session.query(ZOPKKnowledgeFact).filter( ZOPKKnowledgeFact.confidence_score >= 0.3 # Minimum confidence threshold ).all() results = [] for fact in facts: score = 0 fact_text = (fact.full_text or '').lower() for keyword in keywords: if keyword in fact_text: score += 1 if score > 0: # Get source URL from related news source_url = '' source_name = '' source_date = '' if fact.source_news: source_url = fact.source_news.url or '' source_name = fact.source_news.source_name or fact.source_news.source_domain or '' if fact.source_news.published_at: source_date = fact.source_news.published_at.strftime('%Y-%m-%d') results.append({ 'fact_id': fact.id, 'fact_type': fact.fact_type, 'full_text': fact.full_text, 'subject': fact.subject, 'numeric_value': float(fact.numeric_value) if fact.numeric_value else None, 'numeric_unit': fact.numeric_unit, 'confidence': float(fact.confidence_score) if fact.confidence_score else None, 'relevance_score': score, 'source_url': source_url, 'source_name': source_name, 'source_date': source_date }) # Sort by relevance results.sort(key=lambda x: x['relevance_score'], reverse=True) return results[:limit] def generate_chunk_embeddings( db_session, limit: int = 100, user_id: Optional[int] = None, progress_callback: ProgressCallback = None ) -> Dict: """ Generate embeddings for chunks that don't have them. Args: db_session: SQLAlchemy session limit: Max chunks to process user_id: User ID for cost tracking progress_callback: Optional callback for progress updates Returns: Dict with statistics """ import json import time from gemini_service import GeminiService gemini = GeminiService() # Find chunks without embeddings chunks = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.embedding.is_(None) ).limit(limit).all() total = len(chunks) stats = { 'total': total, 'success': 0, 'failed': 0, 'processing_time': 0 } # Send initial progress if progress_callback and total > 0: progress_callback(ProgressUpdate( current=0, total=total, percent=0.0, stage='embedding', status='processing', message=f'Rozpoczynam generowanie embeddingów dla {total} chunks...', details={'success': 0, 'failed': 0} )) start_time = time.time() for idx, chunk in enumerate(chunks, 1): # Send progress update before processing if progress_callback: # Get article title from chunk's source news article_title = None if chunk.source_news: article_title = chunk.source_news.title[:80] progress_callback(ProgressUpdate( current=idx, total=total, percent=round((idx - 1) / total * 100, 1), stage='embedding', status='processing', message=f'Generuję embedding {idx}/{total}: {chunk.summary[:40] if chunk.summary else "chunk"}...', article_id=chunk.source_news_id, article_title=article_title, details={ 'success': stats['success'], 'failed': stats['failed'], 'chunk_id': chunk.id } )) try: embedding = gemini.generate_embedding( text=chunk.content, task_type='retrieval_document', title=chunk.summary, user_id=user_id, feature='zopk_chunk_embedding' ) if embedding: # Store as JSON string chunk.embedding = json.dumps(embedding) stats['success'] += 1 if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='embedding', status='success', message=f'✓ Wygenerowano embedding (768 dim)', article_id=chunk.source_news_id, details={ 'success': stats['success'], 'failed': stats['failed'], 'chunk_id': chunk.id } )) else: stats['failed'] += 1 if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='embedding', status='failed', message='✗ Nie udało się wygenerować embeddingu', article_id=chunk.source_news_id, details={'success': stats['success'], 'failed': stats['failed']} )) except Exception as e: logger.error(f"Error generating embedding for chunk {chunk.id}: {e}") stats['failed'] += 1 if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='embedding', status='failed', message=f'✗ Błąd: {str(e)[:50]}...', article_id=chunk.source_news_id, details={'success': stats['success'], 'failed': stats['failed'], 'error': str(e)} )) db_session.commit() stats['processing_time'] = round(time.time() - start_time, 2) # Send completion progress if progress_callback: progress_callback(ProgressUpdate( current=total, total=total, percent=100.0, stage='embedding', status='complete', message=f'Zakończono: {stats["success"]}/{total} embeddingów wygenerowanych', details={ 'success': stats['success'], 'failed': stats['failed'], 'processing_time': stats['processing_time'] } )) logger.info(f"Generated embeddings: {stats['success']}/{stats['total']} success") return stats # ============================================================ # STANDALONE FUNCTIONS FOR CRON/CLI # ============================================================ def extract_pending_articles(db_session, limit: int = 50) -> Dict: """ Convenience function for cron jobs. Usage: from zopk_knowledge_service import extract_pending_articles result = extract_pending_articles(db_session, limit=50) """ service = ZOPKKnowledgeService(db_session) return service.batch_extract(limit=limit) def get_knowledge_stats(db_session) -> Dict: """ Get knowledge extraction statistics for monitoring. """ service = ZOPKKnowledgeService(db_session) return service.get_extraction_statistics() # ============================================================ # ADMIN PANEL - LIST FUNCTIONS # ============================================================ def list_chunks( db_session, page: int = 1, per_page: int = 20, source_news_id: Optional[int] = None, has_embedding: Optional[bool] = None, is_verified: Optional[bool] = None ) -> Dict: """ List knowledge chunks with pagination and filtering. Args: db_session: Database session page: Page number (1-based) per_page: Items per page source_news_id: Filter by source article has_embedding: Filter by embedding status is_verified: Filter by verification status Returns: { 'chunks': [...], 'total': int, 'page': int, 'per_page': int, 'pages': int } """ from sqlalchemy import func query = db_session.query(ZOPKKnowledgeChunk) # Apply filters if source_news_id: query = query.filter(ZOPKKnowledgeChunk.source_news_id == source_news_id) if has_embedding is not None: if has_embedding: query = query.filter(ZOPKKnowledgeChunk.embedding.isnot(None)) else: query = query.filter(ZOPKKnowledgeChunk.embedding.is_(None)) if is_verified is not None: query = query.filter(ZOPKKnowledgeChunk.is_verified == is_verified) # Get total count total = query.count() # Calculate pagination pages = (total + per_page - 1) // per_page offset = (page - 1) * per_page # Get chunks with source news info chunks = query.order_by( ZOPKKnowledgeChunk.created_at.desc() ).offset(offset).limit(per_page).all() return { 'chunks': [ { 'id': c.id, 'content': c.content[:300] + '...' if len(c.content) > 300 else c.content, 'full_content': c.content, 'summary': c.summary, 'chunk_type': c.chunk_type, 'chunk_index': c.chunk_index, 'token_count': c.token_count, 'importance_score': c.importance_score, 'confidence_score': float(c.confidence_score) if c.confidence_score else None, 'has_embedding': c.embedding is not None, 'is_verified': c.is_verified, 'source_news_id': c.source_news_id, 'source_title': c.source_news.title if c.source_news else None, 'source_url': c.source_news.url if c.source_news else None, 'created_at': c.created_at.isoformat() if c.created_at else None, 'keywords': c.keywords if isinstance(c.keywords, list) else [] } for c in chunks ], 'total': total, 'page': page, 'per_page': per_page, 'pages': pages } def list_facts( db_session, page: int = 1, per_page: int = 20, fact_type: Optional[str] = None, is_verified: Optional[bool] = None, source_news_id: Optional[int] = None ) -> Dict: """ List knowledge facts with pagination and filtering. Args: db_session: Database session page: Page number (1-based) per_page: Items per page fact_type: Filter by fact type (statistic, event, statement, decision, milestone) is_verified: Filter by verification status source_news_id: Filter by source article Returns: { 'facts': [...], 'total': int, 'page': int, 'per_page': int, 'pages': int, 'fact_types': [...] - available types for filtering } """ from sqlalchemy import func, distinct query = db_session.query(ZOPKKnowledgeFact) # Apply filters if fact_type: query = query.filter(ZOPKKnowledgeFact.fact_type == fact_type) if is_verified is not None: query = query.filter(ZOPKKnowledgeFact.is_verified == is_verified) if source_news_id: query = query.filter(ZOPKKnowledgeFact.source_news_id == source_news_id) # Get total count total = query.count() # Get available fact types fact_types = db_session.query( distinct(ZOPKKnowledgeFact.fact_type) ).filter( ZOPKKnowledgeFact.fact_type.isnot(None) ).all() fact_types = [f[0] for f in fact_types if f[0]] # Calculate pagination pages = (total + per_page - 1) // per_page offset = (page - 1) * per_page # Get facts facts = query.order_by( ZOPKKnowledgeFact.created_at.desc() ).offset(offset).limit(per_page).all() return { 'facts': [ { 'id': f.id, 'fact_type': f.fact_type, 'subject': f.subject, 'predicate': f.predicate, 'object': f.object, 'full_text': f.full_text, 'numeric_value': float(f.numeric_value) if f.numeric_value else None, 'numeric_unit': f.numeric_unit, 'date_value': f.date_value.isoformat() if f.date_value else None, 'confidence_score': float(f.confidence_score) if f.confidence_score else None, 'is_verified': f.is_verified, 'source_news_id': f.source_news_id, 'source_chunk_id': f.source_chunk_id, 'source_title': f.source_news.title if f.source_news else None, 'source_url': f.source_news.url if f.source_news else None, 'entities_involved': f.entities_involved if isinstance(f.entities_involved, list) else [], 'created_at': f.created_at.isoformat() if f.created_at else None } for f in facts ], 'total': total, 'page': page, 'per_page': per_page, 'pages': pages, 'fact_types': fact_types } def list_entities( db_session, page: int = 1, per_page: int = 20, entity_type: Optional[str] = None, is_verified: Optional[bool] = None, min_mentions: Optional[int] = None ) -> Dict: """ List knowledge entities with pagination and filtering. Args: db_session: Database session page: Page number (1-based) per_page: Items per page entity_type: Filter by entity type (company, person, place, organization, project, technology) is_verified: Filter by verification status min_mentions: Filter by minimum mention count Returns: { 'entities': [...], 'total': int, 'page': int, 'per_page': int, 'pages': int, 'entity_types': [...] - available types for filtering } """ from sqlalchemy import func, distinct query = db_session.query(ZOPKKnowledgeEntity) # Exclude merged entities query = query.filter(ZOPKKnowledgeEntity.merged_into_id.is_(None)) # Apply filters if entity_type: query = query.filter(ZOPKKnowledgeEntity.entity_type == entity_type) if is_verified is not None: query = query.filter(ZOPKKnowledgeEntity.is_verified == is_verified) if min_mentions: query = query.filter(ZOPKKnowledgeEntity.mentions_count >= min_mentions) # Get total count total = query.count() # Get available entity types entity_types = db_session.query( distinct(ZOPKKnowledgeEntity.entity_type) ).filter( ZOPKKnowledgeEntity.entity_type.isnot(None) ).all() entity_types = [e[0] for e in entity_types if e[0]] # Calculate pagination pages = (total + per_page - 1) // per_page offset = (page - 1) * per_page # Get entities sorted by mentions entities = query.order_by( ZOPKKnowledgeEntity.mentions_count.desc() ).offset(offset).limit(per_page).all() return { 'entities': [ { 'id': e.id, 'name': e.name, 'normalized_name': e.normalized_name, 'entity_type': e.entity_type, 'description': e.description, 'short_description': e.short_description, 'aliases': e.aliases if isinstance(e.aliases, list) else [], 'mentions_count': e.mentions_count or 0, 'is_verified': e.is_verified, 'company_id': e.company_id, 'external_url': e.external_url, 'first_mentioned_at': e.first_mentioned_at.isoformat() if e.first_mentioned_at else None, 'last_mentioned_at': e.last_mentioned_at.isoformat() if e.last_mentioned_at else None, 'created_at': e.created_at.isoformat() if e.created_at else None } for e in entities ], 'total': total, 'page': page, 'per_page': per_page, 'pages': pages, 'entity_types': entity_types } def get_chunk_detail(db_session, chunk_id: int) -> Optional[Dict]: """Get detailed information about a single chunk.""" chunk = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.id == chunk_id ).first() if not chunk: return None # Get facts from this chunk facts = db_session.query(ZOPKKnowledgeFact).filter( ZOPKKnowledgeFact.source_chunk_id == chunk_id ).all() # Get entity mentions mentions = db_session.query(ZOPKKnowledgeEntityMention).filter( ZOPKKnowledgeEntityMention.chunk_id == chunk_id ).all() return { 'id': chunk.id, 'content': chunk.content, 'content_clean': chunk.content_clean, 'summary': chunk.summary, 'chunk_type': chunk.chunk_type, 'chunk_index': chunk.chunk_index, 'token_count': chunk.token_count, 'importance_score': chunk.importance_score, 'confidence_score': float(chunk.confidence_score) if chunk.confidence_score else None, 'has_embedding': chunk.embedding is not None, 'is_verified': chunk.is_verified, 'keywords': chunk.keywords if isinstance(chunk.keywords, list) else [], 'context_date': chunk.context_date.isoformat() if chunk.context_date else None, 'context_location': chunk.context_location, 'extraction_model': chunk.extraction_model, 'extracted_at': chunk.extracted_at.isoformat() if chunk.extracted_at else None, 'created_at': chunk.created_at.isoformat() if chunk.created_at else None, 'source_news': { 'id': chunk.source_news.id, 'title': chunk.source_news.title, 'url': chunk.source_news.url, 'source_name': chunk.source_news.source_name } if chunk.source_news else None, 'facts': [ { 'id': f.id, 'fact_type': f.fact_type, 'full_text': f.full_text, 'is_verified': f.is_verified } for f in facts ], 'entity_mentions': [ { 'id': m.id, 'entity_id': m.entity_id, 'entity_name': m.entity.name if m.entity else None, 'entity_type': m.entity.entity_type if m.entity else None, 'mention_text': m.mention_text } for m in mentions ] } def update_chunk_verification(db_session, chunk_id: int, is_verified: bool, user_id: int) -> bool: """Update chunk verification status.""" chunk = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.id == chunk_id ).first() if not chunk: return False chunk.is_verified = is_verified chunk.verified_by = user_id chunk.verified_at = datetime.now() db_session.commit() return True def update_fact_verification(db_session, fact_id: int, is_verified: bool) -> bool: """Update fact verification status.""" fact = db_session.query(ZOPKKnowledgeFact).filter( ZOPKKnowledgeFact.id == fact_id ).first() if not fact: return False fact.is_verified = is_verified db_session.commit() return True def update_entity_verification(db_session, entity_id: int, is_verified: bool) -> bool: """Update entity verification status.""" entity = db_session.query(ZOPKKnowledgeEntity).filter( ZOPKKnowledgeEntity.id == entity_id ).first() if not entity: return False entity.is_verified = is_verified db_session.commit() return True def delete_chunk(db_session, chunk_id: int) -> bool: """Delete a chunk and its associated facts and mentions.""" chunk = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.id == chunk_id ).first() if not chunk: return False # Delete associated facts db_session.query(ZOPKKnowledgeFact).filter( ZOPKKnowledgeFact.source_chunk_id == chunk_id ).delete() # Delete associated mentions db_session.query(ZOPKKnowledgeEntityMention).filter( ZOPKKnowledgeEntityMention.chunk_id == chunk_id ).delete() # Delete chunk db_session.delete(chunk) db_session.commit() return True # ============================================================ # DUPLICATE ENTITY DETECTION AND MERGING # ============================================================ def find_duplicate_entities( db_session, entity_type: Optional[str] = None, min_similarity: float = 0.5, limit: int = 100 ) -> List[Dict]: """ Find potential duplicate entities using fuzzy matching. Uses PostgreSQL pg_trgm extension for similarity matching. Returns pairs of entities that might be duplicates. Args: db_session: SQLAlchemy session entity_type: Filter by entity type (company, person, etc.) min_similarity: Minimum similarity threshold (0.0-1.0) limit: Maximum number of pairs to return Returns: List of dicts with duplicate pairs: [ { 'entity1': {...}, 'entity2': {...}, 'similarity': 0.85, 'match_type': 'fuzzy' # or 'substring' } ] """ from sqlalchemy import text # Build query with pg_trgm similarity # Use conditional SQL with COALESCE to avoid f-string interpolation query = text(""" SELECT e1.id as id1, e1.name as name1, e1.entity_type as type1, e1.mentions_count as mentions1, e1.is_verified as verified1, e2.id as id2, e2.name as name2, e2.entity_type as type2, e2.mentions_count as mentions2, e2.is_verified as verified2, similarity(LOWER(e1.name), LOWER(e2.name)) as sim, CASE WHEN LOWER(e1.name) LIKE '%' || LOWER(e2.name) || '%' OR LOWER(e2.name) LIKE '%' || LOWER(e1.name) || '%' THEN 'substring' ELSE 'fuzzy' END as match_type FROM zopk_knowledge_entities e1 JOIN zopk_knowledge_entities e2 ON e1.id < e2.id AND e1.entity_type = e2.entity_type WHERE ( similarity(LOWER(e1.name), LOWER(e2.name)) > :min_sim OR LOWER(e1.name) LIKE '%' || LOWER(e2.name) || '%' OR LOWER(e2.name) LIKE '%' || LOWER(e1.name) || '%' ) AND (:entity_type IS NULL OR e1.entity_type = :entity_type) ORDER BY sim DESC, e1.entity_type, GREATEST(e1.mentions_count, e2.mentions_count) DESC LIMIT :limit """) params = {'min_sim': min_similarity, 'limit': limit, 'entity_type': entity_type} result = db_session.execute(query, params) duplicates = [] for row in result: duplicates.append({ 'entity1': { 'id': row.id1, 'name': row.name1, 'entity_type': row.type1, 'mentions_count': row.mentions1, 'is_verified': row.verified1 }, 'entity2': { 'id': row.id2, 'name': row.name2, 'entity_type': row.type2, 'mentions_count': row.mentions2, 'is_verified': row.verified2 }, 'similarity': float(row.sim) if row.sim else 0.0, 'match_type': row.match_type }) return duplicates def merge_entities( db_session, primary_id: int, duplicate_id: int, new_name: Optional[str] = None ) -> Dict: """ Merge two entities - keep primary, delete duplicate. Transfers all relationships from duplicate to primary: - Entity mentions - Facts (subject/object references) - Relations (source/target) - Updates mentions_count Args: db_session: SQLAlchemy session primary_id: ID of entity to keep duplicate_id: ID of entity to merge and delete new_name: Optional new canonical name for primary Returns: Dict with merge results: { 'success': True, 'primary_id': 123, 'deleted_id': 456, 'transfers': { 'mentions': 15, 'facts_subject': 3, 'facts_object': 2, 'relations_source': 1, 'relations_target': 0 } } """ from sqlalchemy import text # Get both entities primary = db_session.query(ZOPKKnowledgeEntity).get(primary_id) duplicate = db_session.query(ZOPKKnowledgeEntity).get(duplicate_id) if not primary: return {'success': False, 'error': f'Primary entity {primary_id} not found'} if not duplicate: return {'success': False, 'error': f'Duplicate entity {duplicate_id} not found'} if primary.entity_type != duplicate.entity_type: return {'success': False, 'error': 'Cannot merge entities of different types'} transfers = { 'mentions': 0, 'facts': 0, 'relations_source': 0, 'relations_target': 0 } try: # 1. Transfer mentions result = db_session.execute(text(""" UPDATE zopk_knowledge_entity_mentions SET entity_id = :primary_id WHERE entity_id = :duplicate_id """), {'primary_id': primary_id, 'duplicate_id': duplicate_id}) transfers['mentions'] = result.rowcount # 2. Transfer facts - update entities_involved JSONB # Replace duplicate entity ID with primary ID in the JSONB array result = db_session.execute(text(""" UPDATE zopk_knowledge_facts SET entities_involved = ( SELECT jsonb_agg( CASE WHEN (elem->>'id')::int = :duplicate_id THEN jsonb_set(elem, '{id}', to_jsonb(:primary_id)) ELSE elem END ) FROM jsonb_array_elements(entities_involved::jsonb) AS elem ) WHERE entities_involved::jsonb @> CAST(:entity_json AS jsonb) """), { 'primary_id': primary_id, 'duplicate_id': duplicate_id, 'entity_json': f'[{{"id": {duplicate_id}}}]' }) transfers['facts'] = result.rowcount # 4. Transfer relations (entity_a) result = db_session.execute(text(""" UPDATE zopk_knowledge_relations SET entity_a_id = :primary_id WHERE entity_a_id = :duplicate_id """), {'primary_id': primary_id, 'duplicate_id': duplicate_id}) transfers['relations_source'] = result.rowcount # 5. Transfer relations (entity_b) result = db_session.execute(text(""" UPDATE zopk_knowledge_relations SET entity_b_id = :primary_id WHERE entity_b_id = :duplicate_id """), {'primary_id': primary_id, 'duplicate_id': duplicate_id}) transfers['relations_target'] = result.rowcount # 6. Update primary entity primary.mentions_count += duplicate.mentions_count if new_name: primary.canonical_name = new_name # Merge aliases if duplicate.aliases: existing_aliases = primary.aliases or [] new_aliases = duplicate.aliases # Add duplicate name as alias if duplicate.name not in existing_aliases: existing_aliases.append(duplicate.name) # Add duplicate's aliases for alias in new_aliases: if alias not in existing_aliases: existing_aliases.append(alias) primary.aliases = existing_aliases # 7. Delete duplicate db_session.delete(duplicate) db_session.commit() return { 'success': True, 'primary_id': primary_id, 'deleted_id': duplicate_id, 'new_mentions_count': primary.mentions_count, 'transfers': transfers } except Exception as e: db_session.rollback() logger.error(f"Error merging entities: {e}") return {'success': False, 'error': str(e)} def get_entity_merge_preview( db_session, primary_id: int, duplicate_id: int ) -> Dict: """ Preview what would happen if two entities are merged. Returns counts of items that would be transferred. """ from sqlalchemy import text, func primary = db_session.query(ZOPKKnowledgeEntity).get(primary_id) duplicate = db_session.query(ZOPKKnowledgeEntity).get(duplicate_id) if not primary or not duplicate: return {'error': 'Entity not found'} # Count items that would be transferred mentions = db_session.query(func.count(ZOPKKnowledgeEntityMention.id)).filter( ZOPKKnowledgeEntityMention.entity_id == duplicate_id ).scalar() or 0 # Facts use entities_involved (JSONB) not FK columns, so count via JSONB query # Count facts where duplicate entity is in entities_involved array facts_with_entity = db_session.execute(text(""" SELECT COUNT(*) FROM zopk_knowledge_facts WHERE entities_involved::jsonb @> CAST(:entity_json AS jsonb) """), {'entity_json': f'[{{"id": {duplicate_id}}}]'}).scalar() or 0 relations_source = db_session.query(func.count(ZOPKKnowledgeRelation.id)).filter( ZOPKKnowledgeRelation.entity_a_id == duplicate_id ).scalar() or 0 relations_target = db_session.query(func.count(ZOPKKnowledgeRelation.id)).filter( ZOPKKnowledgeRelation.entity_b_id == duplicate_id ).scalar() or 0 return { 'primary': { 'id': primary.id, 'name': primary.name, 'entity_type': primary.entity_type, 'mentions_count': primary.mentions_count, 'aliases': primary.aliases or [] }, 'duplicate': { 'id': duplicate.id, 'name': duplicate.name, 'entity_type': duplicate.entity_type, 'mentions_count': duplicate.mentions_count, 'aliases': duplicate.aliases or [] }, 'transfers': { 'mentions': mentions, 'facts': facts_with_entity, 'relations_source': relations_source, 'relations_target': relations_target, 'total': mentions + facts_with_entity + relations_source + relations_target }, 'result': { 'new_mentions_count': primary.mentions_count + duplicate.mentions_count } } # ============================================================ # FACT DEDUPLICATION # ============================================================ def find_duplicate_facts( db_session, min_similarity: float = 0.7, limit: int = 100, fact_type: Optional[str] = None ) -> List[Dict]: """Find potential duplicate facts using text similarity. Uses pg_trgm % operator with GiST index for fast similarity search. """ from sqlalchemy import text # Set similarity threshold and use % operator (uses GiST index) db_session.execute(text("SET pg_trgm.similarity_threshold = :threshold"), {'threshold': min_similarity}) query = text(""" SELECT f1.id as id1, f1.full_text as text1, f1.fact_type as type1, f1.is_verified as verified1, f1.confidence_score as score1, f2.id as id2, f2.full_text as text2, f2.fact_type as type2, f2.is_verified as verified2, f2.confidence_score as score2, similarity(f1.full_text, f2.full_text) as sim FROM zopk_knowledge_facts f1 JOIN zopk_knowledge_facts f2 ON f1.id < f2.id WHERE f1.full_text % f2.full_text AND (:fact_type IS NULL OR f1.fact_type = :fact_type) ORDER BY sim DESC, COALESCE(GREATEST(f1.confidence_score, f2.confidence_score), 0) DESC LIMIT :limit """) params = {'limit': limit, 'fact_type': fact_type} result = db_session.execute(query, params) duplicates = [] for row in result: duplicates.append({ 'fact1': { 'id': row.id1, 'text': row.text1, 'fact_type': row.type1, 'is_verified': row.verified1, 'confidence_score': float(row.score1) if row.score1 else 0 }, 'fact2': { 'id': row.id2, 'text': row.text2, 'fact_type': row.type2, 'is_verified': row.verified2, 'confidence_score': float(row.score2) if row.score2 else 0 }, 'similarity': float(row.sim) }) return duplicates def merge_facts(db_session, primary_id: int, duplicate_id: int, new_text: Optional[str] = None) -> Dict: """Merge duplicate fact into primary.""" primary = db_session.query(ZOPKKnowledgeFact).get(primary_id) duplicate = db_session.query(ZOPKKnowledgeFact).get(duplicate_id) if not primary: return {'success': False, 'error': f'Primary fact {primary_id} not found'} if not duplicate: return {'success': False, 'error': f'Duplicate fact {duplicate_id} not found'} try: if new_text: primary.full_text = new_text if duplicate.importance_score and (not primary.importance_score or duplicate.importance_score > primary.importance_score): primary.importance_score = duplicate.importance_score if duplicate.confidence_score and (not primary.confidence_score or duplicate.confidence_score > primary.confidence_score): primary.confidence_score = duplicate.confidence_score if duplicate.is_verified: primary.is_verified = True db_session.delete(duplicate) db_session.commit() return {'success': True, 'primary_id': primary_id, 'deleted_id': duplicate_id} except Exception as e: db_session.rollback() return {'success': False, 'error': str(e)} # ============================================================ # AUTO-VERIFICATION # ============================================================ def auto_verify_top_entities(db_session, min_mentions: int = 5, limit: int = 100) -> Dict: """Auto-verify entities with high mention counts.""" entities = db_session.query(ZOPKKnowledgeEntity).filter( ZOPKKnowledgeEntity.is_verified == False, ZOPKKnowledgeEntity.mentions_count >= min_mentions ).order_by(ZOPKKnowledgeEntity.mentions_count.desc()).limit(limit).all() for entity in entities: entity.is_verified = True db_session.commit() return {'success': True, 'verified_count': len(entities), 'min_mentions': min_mentions} def auto_verify_top_facts(db_session, min_importance: float = 0.7, limit: int = 200) -> Dict: """Auto-verify facts with high confidence scores (≥70% by default).""" # Note: Table uses confidence_score, not importance_score facts = db_session.query(ZOPKKnowledgeFact).filter( ZOPKKnowledgeFact.is_verified == False, ZOPKKnowledgeFact.confidence_score >= min_importance ).order_by(ZOPKKnowledgeFact.confidence_score.desc()).limit(limit).all() for fact in facts: fact.is_verified = True db_session.commit() return {'success': True, 'verified_count': len(facts), 'min_confidence': min_importance} def find_similar_to_verified_facts( db_session, min_similarity: float = 0.8, limit: int = 100 ) -> List[Dict]: """ Find unverified facts that are similar to already verified facts. Uses pg_trgm similarity search to "learn" from verified examples. Returns list of suggestions with similarity scores. """ from sqlalchemy import text, func # Check if we have any verified facts to learn from verified_count = db_session.query(func.count(ZOPKKnowledgeFact.id)).filter( ZOPKKnowledgeFact.is_verified == True ).scalar() or 0 if verified_count == 0: return [] # Set similarity threshold db_session.execute(text("SET pg_trgm.similarity_threshold = :threshold"), {'threshold': min_similarity}) # Find unverified facts similar to verified ones query = text(""" SELECT DISTINCT ON (unverified.id) unverified.id as fact_id, unverified.full_text as fact_text, unverified.fact_type, unverified.confidence_score, verified.id as similar_to_id, verified.full_text as similar_to_text, similarity(unverified.full_text, verified.full_text) as sim_score FROM zopk_knowledge_facts unverified JOIN zopk_knowledge_facts verified ON verified.is_verified = TRUE AND unverified.full_text % verified.full_text WHERE unverified.is_verified = FALSE ORDER BY unverified.id, sim_score DESC LIMIT :limit """) result = db_session.execute(query, {'limit': limit}) suggestions = [] for row in result: suggestions.append({ 'fact_id': row.fact_id, 'fact_text': row.fact_text, 'fact_type': row.fact_type, 'confidence_score': float(row.confidence_score) if row.confidence_score else 0.5, 'similar_to_id': row.similar_to_id, 'similar_to_text': row.similar_to_text, 'similarity': float(row.sim_score) }) return suggestions def auto_verify_similar_to_verified( db_session, min_similarity: float = 0.8, limit: int = 100 ) -> Dict: """ Auto-verify facts that are similar to already verified facts. This enables "learning" from manual verifications. Args: min_similarity: Minimum similarity threshold (0.8 = 80% similar) limit: Maximum number of facts to verify at once Returns: Dict with success status and count of verified facts """ from sqlalchemy import text, func # Check if we have any verified facts to learn from verified_count = db_session.query(func.count(ZOPKKnowledgeFact.id)).filter( ZOPKKnowledgeFact.is_verified == True ).scalar() or 0 if verified_count == 0: return { 'success': False, 'error': 'Brak zweryfikowanych faktów do nauki. Najpierw zweryfikuj kilka faktów ręcznie.', 'verified_count': 0 } # Set similarity threshold db_session.execute(text("SET pg_trgm.similarity_threshold = :threshold"), {'threshold': min_similarity}) # First, find matching facts with their details and similarity scores find_query = text(""" SELECT DISTINCT ON (unverified.id) unverified.id as fact_id, unverified.full_text as fact_text, unverified.fact_type, unverified.confidence_score, verified.id as pattern_id, verified.full_text as pattern_text, similarity(unverified.full_text, verified.full_text) as sim_score FROM zopk_knowledge_facts unverified JOIN zopk_knowledge_facts verified ON verified.is_verified = TRUE AND unverified.full_text % verified.full_text WHERE unverified.is_verified = FALSE ORDER BY unverified.id, sim_score DESC LIMIT :limit """) result = db_session.execute(find_query, {'limit': limit}) facts_to_verify = [] for row in result: facts_to_verify.append({ 'fact_id': row.fact_id, 'fact_text': row.fact_text, 'fact_type': row.fact_type, 'confidence_score': float(row.confidence_score) if row.confidence_score else 0.5, 'pattern_id': row.pattern_id, 'pattern_text': row.pattern_text, 'similarity': round(float(row.sim_score) * 100) }) if not facts_to_verify: return { 'success': True, 'verified_count': 0, 'verified_facts': [], 'min_similarity': min_similarity, 'learned_from': verified_count } # Now verify the found facts fact_ids = [f['fact_id'] for f in facts_to_verify] update_query = text(""" UPDATE zopk_knowledge_facts SET is_verified = TRUE WHERE id = ANY(:ids) """) db_session.execute(update_query, {'ids': fact_ids}) db_session.commit() return { 'success': True, 'verified_count': len(facts_to_verify), 'verified_facts': facts_to_verify, 'min_similarity': min_similarity, 'learned_from': verified_count } # ============================================================ # DASHBOARD STATS # ============================================================ def get_knowledge_dashboard_stats(db_session) -> Dict: """Get comprehensive stats for knowledge dashboard.""" from sqlalchemy import func, text chunks_total = db_session.query(func.count(ZOPKKnowledgeChunk.id)).scalar() or 0 chunks_verified = db_session.query(func.count(ZOPKKnowledgeChunk.id)).filter(ZOPKKnowledgeChunk.is_verified == True).scalar() or 0 chunks_with_embedding = db_session.query(func.count(ZOPKKnowledgeChunk.id)).filter(ZOPKKnowledgeChunk.embedding.isnot(None)).scalar() or 0 entities_total = db_session.query(func.count(ZOPKKnowledgeEntity.id)).scalar() or 0 entities_verified = db_session.query(func.count(ZOPKKnowledgeEntity.id)).filter(ZOPKKnowledgeEntity.is_verified == True).scalar() or 0 facts_total = db_session.query(func.count(ZOPKKnowledgeFact.id)).scalar() or 0 facts_verified = db_session.query(func.count(ZOPKKnowledgeFact.id)).filter(ZOPKKnowledgeFact.is_verified == True).scalar() or 0 news_total = db_session.execute(text("SELECT COUNT(*) FROM zopk_news WHERE status IN ('approved', 'auto_approved')")).scalar() or 0 news_with_extraction = db_session.execute(text(''' SELECT COUNT(DISTINCT n.id) FROM zopk_news n JOIN zopk_knowledge_chunks c ON c.source_news_id = n.id WHERE n.status IN ('approved', 'auto_approved') ''')).scalar() or 0 entity_types = db_session.execute(text('SELECT entity_type, COUNT(*) FROM zopk_knowledge_entities GROUP BY entity_type ORDER BY 2 DESC')).fetchall() fact_types = db_session.execute(text('SELECT fact_type, COUNT(*) FROM zopk_knowledge_facts GROUP BY fact_type ORDER BY 2 DESC')).fetchall() top_entities = db_session.query(ZOPKKnowledgeEntity).order_by(ZOPKKnowledgeEntity.mentions_count.desc()).limit(10).all() return { 'chunks': {'total': chunks_total, 'verified': chunks_verified, 'with_embedding': chunks_with_embedding, 'verified_pct': round(100 * chunks_verified / chunks_total, 1) if chunks_total else 0}, 'entities': {'total': entities_total, 'verified': entities_verified, 'verified_pct': round(100 * entities_verified / entities_total, 1) if entities_total else 0, 'by_type': [{'type': r[0], 'count': r[1]} for r in entity_types]}, 'facts': {'total': facts_total, 'verified': facts_verified, 'verified_pct': round(100 * facts_verified / facts_total, 1) if facts_total else 0, 'by_type': [{'type': r[0] or 'unknown', 'count': r[1]} for r in fact_types]}, 'news': {'total': news_total, 'with_extraction': news_with_extraction, 'pending': news_total - news_with_extraction}, 'top_entities': [{'id': e.id, 'name': e.name, 'type': e.entity_type, 'mentions': e.mentions_count} for e in top_entities] } # ============================================================ # TIMELINE SUGGESTIONS (Auto-populate from Knowledge Base) # ============================================================ def get_timeline_suggestions( db_session, limit: int = 50, only_verified: bool = True ) -> Dict: """ Get milestone facts from knowledge base that could become timeline milestones. Finds verified milestone facts that are NOT yet linked to any timeline milestone. Groups similar facts and ranks by confidence score. Args: db_session: Database session limit: Max suggestions to return only_verified: Only include verified facts Returns: { 'success': True, 'suggestions': [...], 'total_milestone_facts': int, 'already_in_timeline': int } """ from sqlalchemy import text, func from database import ZOPKMilestone try: # Count total milestone facts total_query = text(""" SELECT COUNT(*) FROM zopk_knowledge_facts WHERE fact_type = 'milestone' """) total_milestone_facts = db_session.execute(total_query).scalar() or 0 # Count facts already linked to timeline linked_query = text(""" SELECT COUNT(DISTINCT f.id) FROM zopk_knowledge_facts f JOIN zopk_milestones m ON m.source_news_id = f.source_news_id WHERE f.fact_type = 'milestone' """) already_linked = db_session.execute(linked_query).scalar() or 0 # Get milestone facts not yet in timeline # Prioritize: verified, high confidence, has numeric value (dates/amounts) suggestions_query = text(""" SELECT DISTINCT ON (f.id) f.id as fact_id, f.full_text, f.subject, f.predicate, f.object, f.confidence_score, f.numeric_value, f.numeric_unit, f.is_verified, f.source_news_id, n.title as news_title, n.published_at as news_date, n.url as news_url, n.source_name FROM zopk_knowledge_facts f LEFT JOIN zopk_news n ON n.id = f.source_news_id WHERE f.fact_type = 'milestone' AND (:only_verified = FALSE OR f.is_verified = TRUE) AND NOT EXISTS ( SELECT 1 FROM zopk_milestones m WHERE m.source_news_id = f.source_news_id AND similarity(m.title, f.full_text) > 0.5 ) ORDER BY f.id, f.confidence_score DESC NULLS LAST, f.is_verified DESC LIMIT :limit """) params = {'limit': limit, 'only_verified': bool(only_verified)} results = db_session.execute(suggestions_query, params).fetchall() suggestions = [] for row in results: # Auto-detect category based on keywords category = _detect_milestone_category(row.full_text, row.subject) # Try to extract date from text or use news date target_date = _extract_date_from_text(row.full_text) if not target_date and row.news_date: target_date = row.news_date.strftime('%Y-%m-%d') if hasattr(row.news_date, 'strftime') else str(row.news_date) suggestions.append({ 'fact_id': row.fact_id, 'full_text': row.full_text, 'subject': row.subject, 'predicate': row.predicate, 'object': row.object, 'confidence_score': float(row.confidence_score) if row.confidence_score else 0.5, 'is_verified': row.is_verified, 'source_news_id': row.source_news_id, 'news_title': row.news_title, 'news_date': row.news_date.isoformat() if row.news_date else None, 'news_url': row.news_url, 'source_name': row.source_name, # Auto-suggested values for timeline 'suggested_title': _generate_milestone_title(row.full_text, row.subject), 'suggested_category': category, 'suggested_date': target_date, 'suggested_status': 'completed' if _is_past_event(row.full_text) else 'planned' }) return { 'success': True, 'suggestions': suggestions, 'total_milestone_facts': total_milestone_facts, 'already_in_timeline': already_linked, 'suggestions_count': len(suggestions) } except Exception as e: logger.error(f"Error getting timeline suggestions: {e}") return { 'success': False, 'error': str(e), 'suggestions': [] } def _detect_milestone_category(text: str, subject: str = None) -> str: """ Auto-detect milestone category based on keywords. Categories: nuclear, offshore, infrastructure, defense, other """ text_lower = (text or '').lower() subject_lower = (subject or '').lower() combined = f"{text_lower} {subject_lower}" # Nuclear energy keywords nuclear_keywords = [ 'jądrowa', 'jądrowy', 'atomowa', 'atomowy', 'nuclear', 'lubiatowo', 'kopalino', 'pej', 'polskie elektrownie', 'reaktor', 'uran', 'westinghouse', 'ap1000' ] if any(kw in combined for kw in nuclear_keywords): return 'nuclear' # Offshore wind keywords offshore_keywords = [ 'offshore', 'wiatrowa', 'wiatrowy', 'morska farma', 'farma wiatrowa', 'baltic power', 'baltica', 'orlen', 'northland', 'bałtyk', 'turbina', 'mw wiatr', 'gw wiatr' ] if any(kw in combined for kw in offshore_keywords): return 'offshore' # Defense/military keywords defense_keywords = [ 'kongsberg', 'obronność', 'obronny', 'wojsko', 'wojskowy', 'mon ', 'ministerstwo obrony', 'zbrojeniowy', 'dron', 'amunicja', 'nsm', 'rakieta', 'samolot bojowy', 'okręt', 'bezpieczeństwo', 'nato', 'sojusz' ] if any(kw in combined for kw in defense_keywords): return 'defense' # Infrastructure keywords infra_keywords = [ 's6', 's7', 'via pomerania', 'droga', 'autostrada', 'ekspresowa', 'kolej', 'pkp', 'port', 'terminal', 'lotnisko', 'most', 'infrastruktura', 'budowa', 'remont', 'przebudowa', 'wodociąg', 'kanalizacja', 'oczyszczalnia' ] if any(kw in combined for kw in infra_keywords): return 'infrastructure' return 'other' def _generate_milestone_title(full_text: str, subject: str = None) -> str: """ Generate a concise title for milestone from fact text. Truncates to ~100 chars and tries to keep meaningful content. """ if not full_text: return subject or "Kamień milowy" # If text is short enough, use as is if len(full_text) <= 100: return full_text # Try to find a natural break point text = full_text[:150] # Look for sentence end for sep in ['. ', ', ', ' - ', ': ']: if sep in text: parts = text.split(sep) if len(parts[0]) >= 30: return parts[0] + ('.' if not parts[0].endswith('.') else '') # Just truncate with ellipsis return text[:97] + '...' def _extract_date_from_text(text: str) -> str: """ Try to extract date from milestone text. Returns ISO format date string or None. """ import re from datetime import datetime if not text: return None text_lower = text.lower() # Patterns to match patterns = [ # "w 2025 roku", "2025 r.", "rok 2025" (r'\b(20[2-3]\d)\s*(rok|r\.?)\b', lambda m: f"{m.group(1)}-01-01"), (r'\brok\s*(20[2-3]\d)\b', lambda m: f"{m.group(1)}-01-01"), # "w marcu 2025", "marzec 2025" (r'\b(stycz\w*|luty|lut\w*|marz\w*|kwie\w*|maj\w*|czerw\w*|lip\w*|sierp\w*|wrze\w*|paźdz\w*|listop\w*|grud\w*)\s*(20[2-3]\d)', lambda m: _month_to_date(m.group(1), m.group(2))), # "Q1 2025", "Q3 2026" (r'\bQ([1-4])\s*(20[2-3]\d)', lambda m: f"{m.group(2)}-{int(m.group(1))*3-2:02d}-01"), # "I kwartał 2025" (r'\b(I|II|III|IV)\s*kwarta\w*\s*(20[2-3]\d)', lambda m: _quarter_to_date(m.group(1), m.group(2))), ] for pattern, formatter in patterns: match = re.search(pattern, text_lower) if match: try: return formatter(match) except: continue return None def _month_to_date(month_name: str, year: str) -> str: """Convert Polish month name to date string.""" months = { 'stycz': '01', 'luty': '02', 'lut': '02', 'marz': '03', 'kwie': '04', 'maj': '05', 'czerw': '06', 'lip': '07', 'sierp': '08', 'wrze': '09', 'paźdz': '10', 'listop': '11', 'grud': '12' } for prefix, num in months.items(): if month_name.startswith(prefix): return f"{year}-{num}-01" return f"{year}-01-01" def _quarter_to_date(quarter: str, year: str) -> str: """Convert Roman numeral quarter to date string.""" quarters = {'I': '01', 'II': '04', 'III': '07', 'IV': '10'} month = quarters.get(quarter, '01') return f"{year}-{month}-01" def _is_past_event(text: str) -> bool: """ Detect if milestone text describes a past event (completed) or future event (planned). """ if not text: return False text_lower = text.lower() # Past tense indicators (Polish) past_indicators = [ 'podpisano', 'podpisał', 'zakończono', 'oddano', 'otwarto', 'uruchomiono', 'rozpoczęto', 'ogłoszono', 'przyznano', 'uzyskano', 'otrzymał', 'zdobył', 'wygrał', 'został', 'odbył się', 'odbyła się', 'miało miejsce' ] # Future tense indicators future_indicators = [ 'planowany', 'planowane', 'planowana', 'ma zostać', 'będzie', 'zostanie', 'ma być', 'powstanie', 'w przyszłości', 'do końca', 'w ciągu' ] past_count = sum(1 for ind in past_indicators if ind in text_lower) future_count = sum(1 for ind in future_indicators if ind in text_lower) return past_count > future_count def create_milestone_from_suggestion( db_session, fact_id: int, title: str, description: str = None, category: str = 'other', target_date: str = None, status: str = 'planned', source_url: str = None ) -> Dict: """ Create a timeline milestone from a knowledge fact suggestion. Args: db_session: Database session fact_id: Source fact ID title: Milestone title description: Optional description category: nuclear, offshore, infrastructure, defense, other target_date: Target date (YYYY-MM-DD format) status: planned, in_progress, completed, delayed source_url: Source article URL Returns: {'success': True, 'milestone_id': int} or {'success': False, 'error': str} """ from database import ZOPKMilestone, ZOPKKnowledgeFact from datetime import datetime try: # Get the source fact fact = db_session.query(ZOPKKnowledgeFact).get(fact_id) if not fact: return {'success': False, 'error': f'Fact {fact_id} not found'} # Parse target date parsed_date = None if target_date: try: parsed_date = datetime.strptime(target_date, '%Y-%m-%d').date() except ValueError: pass # Create milestone milestone = ZOPKMilestone( title=title, description=description or fact.full_text, category=category, target_date=parsed_date, actual_date=parsed_date if status == 'completed' else None, status=status, source_url=source_url, source_news_id=fact.source_news_id, is_featured=False ) db_session.add(milestone) db_session.commit() logger.info(f"Created milestone #{milestone.id} from fact #{fact_id}: {title}") return { 'success': True, 'milestone_id': milestone.id, 'title': title, 'category': category } except Exception as e: db_session.rollback() logger.error(f"Error creating milestone from fact {fact_id}: {e}") return {'success': False, 'error': str(e)} def categorize_milestones_with_ai( db_session, suggestions: List[Dict], model_name: str = "gemini-3-flash-preview" ) -> List[Dict]: """ Use Gemini AI to categorize and enhance milestone suggestions. Adds AI-improved titles, categories, and extracts dates more accurately. """ import google.generativeai as genai import json if not suggestions: return suggestions # Prepare batch for AI processing facts_text = "\n".join([ f"{i+1}. {s['full_text'][:300]}" for i, s in enumerate(suggestions[:20]) # Limit to 20 for API ]) prompt = f"""Przeanalizuj poniższe fakty o projekcie ZOPK (Zielony Okręg Przemysłowy Kaszubia) i dla każdego zwróć: - category: jedna z [nuclear, offshore, infrastructure, defense, other] - short_title: zwięzły tytuł (max 80 znaków) - target_date: data w formacie YYYY-MM-DD (jeśli można wywnioskować) - status: jeden z [completed, in_progress, planned] Kategorie: - nuclear: elektrownia jądrowa, atom, Lubiatowo-Kopalino - offshore: farmy wiatrowe, offshore wind, Baltic Power, Baltica - infrastructure: drogi S6, Via Pomerania, porty, koleje - defense: Kongsberg, przemysł zbrojeniowy, obronność, MON Fakty: {facts_text} Odpowiedz TYLKO jako JSON array: [{{"id": 1, "category": "...", "short_title": "...", "target_date": "YYYY-MM-DD lub null", "status": "..."}}]""" try: model = genai.GenerativeModel(model_name) response = model.generate_content(prompt) # Parse response response_text = response.text.strip() if response_text.startswith('```'): response_text = response_text.split('```')[1] if response_text.startswith('json'): response_text = response_text[4:] ai_results = json.loads(response_text) # Merge AI results with suggestions for result in ai_results: idx = result.get('id', 0) - 1 if 0 <= idx < len(suggestions): suggestions[idx]['ai_category'] = result.get('category', suggestions[idx]['suggested_category']) suggestions[idx]['ai_title'] = result.get('short_title', suggestions[idx]['suggested_title']) suggestions[idx]['ai_date'] = result.get('target_date') suggestions[idx]['ai_status'] = result.get('status', suggestions[idx]['suggested_status']) return suggestions except Exception as e: logger.warning(f"AI categorization failed: {e}") return suggestions # Return original suggestions without AI enhancement def analyze_roadmap_with_ai(db_session) -> Dict: """ AI-powered roadmap analysis: new milestones, status updates, and gap detection. Uses Gemini to analyze existing milestones against recent knowledge facts. Returns: { 'success': True, 'new_milestones': [...], 'status_updates': [...], 'gaps': [...] } """ import google.generativeai as genai import json from database import ZOPKMilestone try: # 1. Get existing milestones milestones = db_session.query(ZOPKMilestone).order_by(ZOPKMilestone.target_date).all() milestones_text = "\n".join([ f"ID:{m.id} | {m.title} | kategoria:{m.category} | status:{m.status} | data:{m.target_date}" for m in milestones ]) or "(brak kamieni milowych)" # 2. Get recent verified facts from knowledge base from sqlalchemy import text facts_query = text(""" SELECT f.id, f.full_text, f.fact_type, f.date_value, f.confidence_score, f.source_news_id, n.title as news_title, n.url as news_url FROM zopk_knowledge_facts f LEFT JOIN zopk_news n ON n.id = f.source_news_id WHERE f.confidence_score >= 0.5 ORDER BY f.created_at DESC LIMIT 50 """) facts = db_session.execute(facts_query).fetchall() if not facts: return { 'success': True, 'new_milestones': [], 'status_updates': [], 'gaps': [], 'message': 'Brak faktów w bazie wiedzy do analizy' } facts_text = "\n".join([ f"ID:{f.id} | typ:{f.fact_type} | data:{f.date_value} | {f.full_text[:250]}" for f in facts ]) # 3. Send to Gemini prompt = f"""Jesteś ekspertem ds. projektu ZOPK (Zielony Okręg Przemysłowy Kaszubia - Pomorze). Projekty w regionie: elektrownia jądrowa (Lubiatowo-Kopalino), farmy wiatrowe offshore (Baltic Power, Baltica), infrastruktura (S6, porty), obronność (Kongsberg). ISTNIEJĄCE KAMIENIE MILOWE ROADMAPY: {milestones_text} OSTATNIE FAKTY Z BAZY WIEDZY: {facts_text} Przeanalizuj i zwróć TYLKO JSON (bez markdown): {{ "new_milestones": [ {{"fact_id": N, "title": "max 80 znaków", "category": "nuclear|offshore|infrastructure|defense|other", "target_date": "YYYY-MM-DD lub null", "status": "planned|in_progress|completed", "reason": "dlaczego to kamień milowy"}} ], "status_updates": [ {{"milestone_id": N, "current_status": "...", "suggested_status": "...", "reason": "krótkie uzasadnienie", "supporting_fact_ids": [N]}} ], "gaps": [ {{"description": "co brakuje w roadmapie", "suggested_title": "max 80 znaków", "category": "nuclear|offshore|infrastructure|defense|other", "reason": "dlaczego to ważne"}} ] }} Zasady: - new_milestones: fakty które powinny być kamieniami milowymi, a NIE ma ich jeszcze w roadmapie - status_updates: istniejące kamienie milowe, których status powinien się zmienić na podstawie nowych faktów - gaps: ważne tematy dla regionu Kaszubia/Pomorze bez kamienia milowego, które warto dodać - Jeśli nie ma sugestii w danej kategorii, zwróć pustą listę - Tytuły pisz po polsku""" model = genai.GenerativeModel("gemini-3-flash-preview") response = model.generate_content(prompt) response_text = response.text.strip() # Strip markdown code blocks if present if response_text.startswith('```'): response_text = response_text.split('```')[1] if response_text.startswith('json'): response_text = response_text[4:] if response_text.endswith('```'): response_text = response_text[:-3] ai_result = json.loads(response_text.strip()) # Enrich new_milestones with source info facts_map = {f.id: f for f in facts} for nm in ai_result.get('new_milestones', []): fact = facts_map.get(nm.get('fact_id')) if fact: nm['full_text'] = fact.full_text nm['news_url'] = fact.news_url nm['news_title'] = fact.news_title nm['source_news_id'] = fact.source_news_id # Enrich status_updates with milestone info milestones_map = {m.id: m for m in milestones} for su in ai_result.get('status_updates', []): ms = milestones_map.get(su.get('milestone_id')) if ms: su['milestone_title'] = ms.title su['milestone_category'] = ms.category logger.info( f"AI roadmap analysis: {len(ai_result.get('new_milestones', []))} new, " f"{len(ai_result.get('status_updates', []))} updates, " f"{len(ai_result.get('gaps', []))} gaps" ) return { 'success': True, 'new_milestones': ai_result.get('new_milestones', []), 'status_updates': ai_result.get('status_updates', []), 'gaps': ai_result.get('gaps', []), 'total_milestones': len(milestones), 'total_facts_analyzed': len(facts) } except json.JSONDecodeError as e: logger.error(f"AI roadmap analysis JSON parse error: {e}") return {'success': False, 'error': f'Błąd parsowania odpowiedzi AI: {e}'} except Exception as e: logger.error(f"AI roadmap analysis error: {e}") return {'success': False, 'error': str(e)}