""" ZOPK Knowledge Service - Ekstrakcja wiedzy z artykułów dla bazy wiedzy. Pipeline: 1. Chunking - dzielenie tekstu na fragmenty 500-1000 tokenów 2. AI Extraction - ekstrakcja faktów i encji przez Gemini 3. Entity Linking - identyfikacja i deduplikacja encji 4. Relation Extraction - wykrywanie relacji między encjami 5. Embedding Generation - wektory dla semantic search (FAZA 2) Usage: from zopk_knowledge_service import ZOPKKnowledgeService service = ZOPKKnowledgeService(db_session) result = service.extract_from_news(news_id=123) # lub batch: result = service.batch_extract(limit=50) """ import re import json import logging import hashlib from datetime import datetime from typing import Dict, List, Optional, Tuple, Any, Callable from dataclasses import dataclass, field # Import progress tracking from scraper from zopk_content_scraper import ProgressUpdate, ProgressCallback from database import ( ZOPKNews, ZOPKKnowledgeChunk, ZOPKKnowledgeEntity, ZOPKKnowledgeFact, ZOPKKnowledgeEntityMention, ZOPKKnowledgeRelation, ) # Configure logging logger = logging.getLogger(__name__) # ============================================================ # CONFIGURATION # ============================================================ # Chunk size settings # NOTE: Reduced from 1000 to 500 tokens due to Gemini safety filter issues # Long texts (~4000 chars) trigger safety blocks, ~2000 chars work reliably MIN_CHUNK_SIZE = 200 # tokens MAX_CHUNK_SIZE = 500 # tokens (~2000 chars for Polish text) CHUNK_OVERLAP = 50 # tokens overlap between chunks (reduced proportionally) APPROX_CHARS_PER_TOKEN = 4 # Polish text approximation # AI extraction settings MAX_FACTS_PER_CHUNK = 10 MAX_ENTITIES_PER_CHUNK = 15 # Entity types ENTITY_TYPES = [ 'company', # Firma, organizacja biznesowa 'person', # Osoba 'place', # Miejsce, lokalizacja 'organization', # Instytucja rządowa, NGO 'project', # Projekt, inicjatywa 'technology', # Technologia, produkt 'event', # Wydarzenie, konferencja ] # Fact types FACT_TYPES = [ 'statistic', # Liczby, dane ilościowe 'event', # Zdarzenie z datą 'statement', # Wypowiedź, deklaracja 'decision', # Decyzja, postanowienie 'milestone', # Kamień milowy projektu 'partnership', # Partnerstwo, współpraca 'investment', # Inwestycja ] # Relation types RELATION_TYPES = [ 'investor_in', # A inwestuje w B 'partner_of', # A jest partnerem B 'located_in', # A znajduje się w B 'manages', # A zarządza B 'works_for', # A pracuje dla B 'part_of', # A jest częścią B 'cooperates_with', # A współpracuje z B 'produces', # A produkuje B 'supplies_to', # A dostarcza do B ] # ============================================================ # AI PROMPTS # ============================================================ # Ultra-simplified prompt to avoid Gemini safety filter issues # Note: Complex JSON schemas with pipe characters were triggering filters # Note: max_tokens parameter also triggers filters - don't use it! EXTRACTION_USER_PROMPT = """Przeanalizuj artykuł i wyodrębnij informacje w formacie JSON. ARTYKUŁ: {chunk_text} Zwróć JSON z następującą strukturą: {{ "facts": [ {{"text": "pełny fakt", "type": "investment"}} ], "entities": [ {{"name": "Nazwa", "type": "company"}} ], "summary": "krótkie podsumowanie" }} Typy faktów: investment, decision, event, statistic, partnership, milestone Typy encji: company, person, place, organization, project""" # System prompt is now empty - the user prompt contains all necessary instructions EXTRACTION_SYSTEM_PROMPT = "" # ============================================================ # DATA CLASSES # ============================================================ @dataclass class ChunkData: """Data for a single chunk.""" content: str index: int token_count: int @dataclass class ExtractionResult: """Result of knowledge extraction from a news article.""" success: bool news_id: int chunks_created: int = 0 facts_created: int = 0 entities_created: int = 0 relations_created: int = 0 error: Optional[str] = None processing_time: float = 0.0 # ============================================================ # CHUNKING FUNCTIONS # ============================================================ def estimate_tokens(text: str) -> int: """Estimate token count for text.""" if not text: return 0 # Polish text: ~4 chars per token on average return len(text) // APPROX_CHARS_PER_TOKEN def split_into_sentences(text: str) -> List[str]: """Split text into sentences.""" # Polish sentence boundaries sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZĄĆĘŁŃÓŚŹŻ])' sentences = re.split(sentence_pattern, text) return [s.strip() for s in sentences if s.strip()] def create_chunks(text: str) -> List[ChunkData]: """ Split text into chunks of appropriate size. Strategy: - Split by paragraphs first - Combine small paragraphs - Split large paragraphs by sentences - Maintain overlap between chunks """ if not text: return [] chunks = [] current_chunk = [] current_tokens = 0 # Split by paragraphs paragraphs = text.split('\n\n') for para in paragraphs: para = para.strip() if not para: continue para_tokens = estimate_tokens(para) # If paragraph is too large, split by sentences if para_tokens > MAX_CHUNK_SIZE: sentences = split_into_sentences(para) for sentence in sentences: sent_tokens = estimate_tokens(sentence) if current_tokens + sent_tokens > MAX_CHUNK_SIZE: # Save current chunk if current_chunk: chunk_text = ' '.join(current_chunk) chunks.append(ChunkData( content=chunk_text, index=len(chunks), token_count=current_tokens )) # Overlap: keep last sentence current_chunk = [current_chunk[-1]] if current_chunk else [] current_tokens = estimate_tokens(current_chunk[0]) if current_chunk else 0 current_chunk.append(sentence) current_tokens += sent_tokens else: # Add paragraph to current chunk if current_tokens + para_tokens > MAX_CHUNK_SIZE: # Save current chunk if current_chunk: chunk_text = '\n\n'.join(current_chunk) chunks.append(ChunkData( content=chunk_text, index=len(chunks), token_count=current_tokens )) current_chunk = [] current_tokens = 0 current_chunk.append(para) current_tokens += para_tokens # Save last chunk if current_chunk: chunk_text = '\n\n'.join(current_chunk) if len(current_chunk[0]) > 100 else ' '.join(current_chunk) if estimate_tokens(chunk_text) >= MIN_CHUNK_SIZE: chunks.append(ChunkData( content=chunk_text, index=len(chunks), token_count=current_tokens )) return chunks # ============================================================ # KNOWLEDGE SERVICE CLASS # ============================================================ class ZOPKKnowledgeService: """ Service for extracting knowledge from ZOPK news articles. Features: - Text chunking with overlap - AI-powered fact extraction (Gemini) - Named entity recognition - Relation extraction - Entity deduplication """ def __init__(self, db_session, user_id: Optional[int] = None): """ Initialize service. Args: db_session: SQLAlchemy database session user_id: Optional user ID for cost tracking """ self.db = db_session self.user_id = user_id self._gemini_service = None @property def gemini(self): """Lazy-load Gemini service.""" if self._gemini_service is None: from gemini_service import GeminiService self._gemini_service = GeminiService() return self._gemini_service def _normalize_entity_name(self, name: str) -> str: """Normalize entity name for deduplication.""" if not name: return '' # Lowercase, remove extra spaces normalized = name.lower().strip() normalized = re.sub(r'\s+', ' ', normalized) # Remove Polish diacritics for matching trans = str.maketrans('ąćęłńóśźżĄĆĘŁŃÓŚŹŻ', 'acelnoszzACELNOSZZ') normalized = normalized.translate(trans) return normalized def _find_or_create_entity( self, name: str, entity_type: str, description: Optional[str] = None ) -> ZOPKKnowledgeEntity: """ Find existing entity or create new one. Uses normalized name for deduplication. """ normalized = self._normalize_entity_name(name) # Try to find existing existing = self.db.query(ZOPKKnowledgeEntity).filter( ZOPKKnowledgeEntity.normalized_name == normalized ).first() if existing: # Update mention count existing.mentions_count = (existing.mentions_count or 0) + 1 existing.last_mentioned_at = datetime.now() # Update description if better if description and (not existing.description or len(description) > len(existing.description)): existing.description = description return existing # Create new entity entity = ZOPKKnowledgeEntity( entity_type=entity_type, name=name, normalized_name=normalized, description=description, short_description=description[:500] if description else None, mentions_count=1, first_mentioned_at=datetime.now(), last_mentioned_at=datetime.now() ) self.db.add(entity) self.db.flush() # Get ID return entity def _extract_with_ai( self, chunk: ChunkData, source_name: str, published_date: str ) -> Optional[Dict]: """ Extract facts, entities, and relations using Gemini AI. Returns parsed JSON or None on error. """ try: # Truncate chunk to avoid Gemini safety filter issues with long texts # Testing showed ~4000 chars triggers safety blocks, ~2000 chars works MAX_PROMPT_CHARS = 2000 chunk_text = chunk.content[:MAX_PROMPT_CHARS] if len(chunk.content) > MAX_PROMPT_CHARS: logger.debug(f"Truncated chunk from {len(chunk.content)} to {MAX_PROMPT_CHARS} chars") # Simplified single prompt (system prompt removed to avoid safety filter issues) prompt = EXTRACTION_USER_PROMPT.format( chunk_text=chunk_text, source_name=source_name, published_date=published_date ) # NOTE: max_tokens removed - testing showed it triggers safety filters! response = self.gemini.generate_text( prompt=prompt, temperature=0.1, # Low temperature for consistency user_id=self.user_id, feature='zopk_knowledge_extraction' ) if not response: logger.warning("Empty response from Gemini") return None # Parse JSON from response # Handle markdown code blocks json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response) if json_match: json_str = json_match.group(1) else: json_str = response # Clean and parse json_str = json_str.strip() data = json.loads(json_str) return data except json.JSONDecodeError as e: logger.error(f"JSON parse error: {e}") return None except Exception as e: logger.error(f"AI extraction error: {e}") return None def _save_chunk( self, news: ZOPKNews, chunk: ChunkData, extraction_data: Optional[Dict] ) -> ZOPKKnowledgeChunk: """Save chunk to database.""" db_chunk = ZOPKKnowledgeChunk( source_news_id=news.id, content=chunk.content, chunk_index=chunk.index, token_count=chunk.token_count, chunk_type='narrative', # Default summary=extraction_data.get('summary') if extraction_data else None, keywords=extraction_data.get('keywords') if extraction_data else None, language='pl', extraction_model='gemini-2.0-flash', extracted_at=datetime.now() ) self.db.add(db_chunk) self.db.flush() return db_chunk def _save_facts( self, chunk: ZOPKKnowledgeChunk, news: ZOPKNews, facts_data: List[Dict] ) -> int: """Save extracted facts to database.""" count = 0 for fact in facts_data[:MAX_FACTS_PER_CHUNK]: try: # Parse numeric value numeric_value = fact.get('numeric_value') if numeric_value is not None: try: numeric_value = float(numeric_value) except (ValueError, TypeError): numeric_value = None # Parse date date_value = fact.get('date_value') if date_value: try: from datetime import datetime as dt date_value = dt.strptime(date_value, '%Y-%m-%d').date() except (ValueError, TypeError): date_value = None # Support both old format (full_text) and new simplified format (text) fact_text = fact.get('text') or fact.get('full_text', '') db_fact = ZOPKKnowledgeFact( source_chunk_id=chunk.id, source_news_id=news.id, fact_type=fact.get('type', 'statement'), subject=fact.get('subject'), predicate=fact.get('predicate'), object=fact.get('object'), full_text=fact_text, numeric_value=numeric_value, numeric_unit=fact.get('numeric_unit'), date_value=date_value, confidence_score=fact.get('confidence', 0.5) ) self.db.add(db_fact) count += 1 except Exception as e: logger.error(f"Error saving fact: {e}") continue return count def _save_entities_and_relations( self, chunk: ZOPKKnowledgeChunk, entities_data: List[Dict], relations_data: List[Dict] ) -> Tuple[int, int]: """Save entities and relations to database.""" entity_count = 0 relation_count = 0 # Map of name -> entity for relations entity_map = {} # Save entities for ent in entities_data[:MAX_ENTITIES_PER_CHUNK]: try: entity = self._find_or_create_entity( name=ent.get('name', ''), entity_type=ent.get('type', 'organization'), description=ent.get('description') ) entity_map[ent.get('name', '').lower()] = entity # Create mention link mention = ZOPKKnowledgeEntityMention( chunk_id=chunk.id, entity_id=entity.id, mention_text=ent.get('name'), mention_type='direct', role_in_context=ent.get('role'), confidence=0.9 ) self.db.add(mention) entity_count += 1 except Exception as e: logger.error(f"Error saving entity: {e}") continue # Flush to get entity IDs self.db.flush() # Save relations for rel in relations_data: try: entity_a_name = rel.get('entity_a', '').lower() entity_b_name = rel.get('entity_b', '').lower() entity_a = entity_map.get(entity_a_name) entity_b = entity_map.get(entity_b_name) if not entity_a or not entity_b: continue # Check if relation already exists existing = self.db.query(ZOPKKnowledgeRelation).filter( ZOPKKnowledgeRelation.entity_a_id == entity_a.id, ZOPKKnowledgeRelation.entity_b_id == entity_b.id, ZOPKKnowledgeRelation.relation_type == rel.get('relation') ).first() if not existing: db_relation = ZOPKKnowledgeRelation( entity_a_id=entity_a.id, entity_b_id=entity_b.id, relation_type=rel.get('relation', 'cooperates_with'), evidence_text=rel.get('description'), source_chunk_id=chunk.id, confidence=0.8, strength=3 ) self.db.add(db_relation) relation_count += 1 except Exception as e: logger.error(f"Error saving relation: {e}") continue return entity_count, relation_count def extract_from_news(self, news_id: int) -> ExtractionResult: """ Extract knowledge from a single news article. Args: news_id: ID of ZOPKNews record Returns: ExtractionResult with statistics """ import time start_time = time.time() # Get news record news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first() if not news: return ExtractionResult( success=False, news_id=news_id, error=f"News record {news_id} not found" ) # Check if already extracted if news.knowledge_extracted: return ExtractionResult( success=True, news_id=news_id, error="Already extracted" ) # Check if content is scraped if not news.full_content: return ExtractionResult( success=False, news_id=news_id, error="No content scraped" ) logger.info(f"Extracting knowledge from news {news_id}: {news.title[:50]}...") # Create chunks chunks = create_chunks(news.full_content) if not chunks: return ExtractionResult( success=False, news_id=news_id, error="No chunks created" ) # Statistics chunks_created = 0 facts_created = 0 entities_created = 0 relations_created = 0 # Process each chunk for chunk in chunks: try: # Extract with AI extraction_data = self._extract_with_ai( chunk=chunk, source_name=news.source_name or 'unknown', published_date=news.published_at.strftime('%Y-%m-%d') if news.published_at else 'unknown' ) # Save chunk db_chunk = self._save_chunk(news, chunk, extraction_data) chunks_created += 1 if extraction_data: # Save facts facts_created += self._save_facts( db_chunk, news, extraction_data.get('facts', []) ) # Save entities and relations ent_count, rel_count = self._save_entities_and_relations( db_chunk, extraction_data.get('entities', []), extraction_data.get('relations', []) ) entities_created += ent_count relations_created += rel_count except Exception as e: logger.error(f"Error processing chunk {chunk.index}: {e}") continue # Mark as extracted news.knowledge_extracted = True news.knowledge_extracted_at = datetime.now() self.db.commit() processing_time = time.time() - start_time logger.info( f"Extracted from news {news_id}: " f"{chunks_created} chunks, {facts_created} facts, " f"{entities_created} entities, {relations_created} relations " f"in {processing_time:.2f}s" ) return ExtractionResult( success=True, news_id=news_id, chunks_created=chunks_created, facts_created=facts_created, entities_created=entities_created, relations_created=relations_created, processing_time=processing_time ) def batch_extract(self, limit: int = 50, progress_callback: ProgressCallback = None) -> Dict: """ Batch extract knowledge from scraped articles. Args: limit: Maximum number of articles to process progress_callback: Optional callback for progress updates Returns: Dict with statistics """ import time logger.info(f"Starting batch extraction: limit={limit}") # Find articles ready for extraction articles = self.db.query(ZOPKNews).filter( ZOPKNews.status.in_(['approved', 'auto_approved']), ZOPKNews.scrape_status == 'scraped', ZOPKNews.knowledge_extracted == False ).order_by( ZOPKNews.created_at.desc() ).limit(limit).all() total = len(articles) stats = { 'total': total, 'success': 0, 'failed': 0, 'chunks_created': 0, 'facts_created': 0, 'entities_created': 0, 'relations_created': 0, 'errors': [], 'processing_time': 0 } # Send initial progress if progress_callback and total > 0: progress_callback(ProgressUpdate( current=0, total=total, percent=0.0, stage='extracting', status='processing', message=f'Rozpoczynam ekstrakcję wiedzy z {total} artykułów...', details={'success': 0, 'failed': 0, 'chunks': 0, 'facts': 0, 'entities': 0} )) start_time = time.time() for idx, article in enumerate(articles, 1): # Send progress update before processing if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round((idx - 1) / total * 100, 1), stage='extracting', status='processing', message=f'Analizuję przez AI: {article.title[:50]}...', article_id=article.id, article_title=article.title[:80], details={ 'success': stats['success'], 'failed': stats['failed'], 'chunks': stats['chunks_created'], 'facts': stats['facts_created'], 'entities': stats['entities_created'] } )) result = self.extract_from_news(article.id) if result.success: stats['success'] += 1 stats['chunks_created'] += result.chunks_created stats['facts_created'] += result.facts_created stats['entities_created'] += result.entities_created stats['relations_created'] += result.relations_created if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='extracting', status='success', message=f'✓ Wyekstrahowano: {result.chunks_created} chunks, {result.facts_created} faktów, {result.entities_created} encji', article_id=article.id, article_title=article.title[:80], details={ 'success': stats['success'], 'failed': stats['failed'], 'chunks': stats['chunks_created'], 'facts': stats['facts_created'], 'entities': stats['entities_created'], 'new_chunks': result.chunks_created, 'new_facts': result.facts_created, 'new_entities': result.entities_created } )) else: stats['failed'] += 1 if result.error: stats['errors'].append({ 'id': article.id, 'title': article.title[:100], 'error': result.error }) if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='extracting', status='failed', message=f'✗ Błąd ekstrakcji: {result.error[:50]}...' if result.error else '✗ Błąd', article_id=article.id, article_title=article.title[:80], details={ 'success': stats['success'], 'failed': stats['failed'], 'error': result.error } )) stats['processing_time'] = round(time.time() - start_time, 2) # Send completion progress if progress_callback: progress_callback(ProgressUpdate( current=total, total=total, percent=100.0, stage='extracting', status='complete', message=f'Zakończono: {stats["success"]}/{total} artykułów. ' f'Utworzono: {stats["chunks_created"]} chunks, {stats["facts_created"]} faktów, ' f'{stats["entities_created"]} encji', details={ 'success': stats['success'], 'failed': stats['failed'], 'chunks': stats['chunks_created'], 'facts': stats['facts_created'], 'entities': stats['entities_created'], 'relations': stats['relations_created'], 'processing_time': stats['processing_time'] } )) logger.info( f"Batch extraction complete: {stats['success']}/{stats['total']} success " f"in {stats['processing_time']}s" ) return stats def get_extraction_statistics(self) -> Dict: """Get knowledge extraction statistics.""" from sqlalchemy import func # Articles stats total_approved = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.status.in_(['approved', 'auto_approved']) ).scalar() scraped = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.scrape_status == 'scraped' ).scalar() # Pending scrape: approved but not yet scraped pending_scrape = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.status.in_(['approved', 'auto_approved']), ZOPKNews.scrape_status.in_(['pending', None]) ).scalar() extracted = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.knowledge_extracted == True ).scalar() pending_extract = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.scrape_status == 'scraped', ZOPKNews.knowledge_extracted == False ).scalar() # Knowledge base stats total_chunks = self.db.query(func.count(ZOPKKnowledgeChunk.id)).scalar() total_facts = self.db.query(func.count(ZOPKKnowledgeFact.id)).scalar() total_entities = self.db.query(func.count(ZOPKKnowledgeEntity.id)).scalar() total_relations = self.db.query(func.count(ZOPKKnowledgeRelation.id)).scalar() # Embeddings stats chunks_with_embeddings = self.db.query(func.count(ZOPKKnowledgeChunk.id)).filter( ZOPKKnowledgeChunk.embedding.isnot(None) ).scalar() chunks_without_embeddings = (total_chunks or 0) - (chunks_with_embeddings or 0) # Top entities by mentions top_entities = self.db.query( ZOPKKnowledgeEntity.name, ZOPKKnowledgeEntity.entity_type, ZOPKKnowledgeEntity.mentions_count ).order_by( ZOPKKnowledgeEntity.mentions_count.desc() ).limit(10).all() return { 'articles': { 'total_approved': total_approved or 0, 'scraped': scraped or 0, 'pending_scrape': pending_scrape or 0, 'extracted': extracted or 0, 'pending_extract': pending_extract or 0 }, 'knowledge_base': { 'total_chunks': total_chunks or 0, 'total_facts': total_facts or 0, 'total_entities': total_entities or 0, 'total_relations': total_relations or 0, 'chunks_with_embeddings': chunks_with_embeddings or 0, 'chunks_without_embeddings': chunks_without_embeddings or 0 }, 'top_entities': [ {'name': e[0], 'type': e[1], 'mentions': e[2]} for e in top_entities ] } # ============================================================ # SEMANTIC SEARCH (FAZA 2) # ============================================================ def search_knowledge( db_session, query: str, limit: int = 5, min_similarity: float = 0.3, user_id: Optional[int] = None ) -> List[Dict]: """ Semantic search in ZOPK knowledge base. Args: db_session: SQLAlchemy session query: Search query limit: Max results to return min_similarity: Minimum cosine similarity (0-1) user_id: User ID for cost tracking Returns: List of matching chunks with similarity scores """ from gemini_service import GeminiService import json # Generate query embedding gemini = GeminiService() query_embedding = gemini.generate_embedding( text=query, task_type='retrieval_query', user_id=user_id, feature='zopk_knowledge_search' ) if not query_embedding: logger.warning("Failed to generate query embedding") return [] # Search in database # Note: This uses PostgreSQL pgvector for efficient similarity search # For now, we'll do a simpler approach with JSON embeddings chunks = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.embedding.isnot(None) ).all() results = [] for chunk in chunks: try: # Parse stored embedding if isinstance(chunk.embedding, str): chunk_embedding = json.loads(chunk.embedding) else: chunk_embedding = chunk.embedding if not chunk_embedding: continue # Calculate cosine similarity similarity = _cosine_similarity(query_embedding, chunk_embedding) if similarity >= min_similarity: results.append({ 'chunk_id': chunk.id, 'content': chunk.content[:500], 'summary': chunk.summary, 'keywords': chunk.keywords, 'similarity': round(similarity, 4), 'source_news_id': chunk.source_news_id, 'importance': chunk.importance_score }) except Exception as e: logger.error(f"Error processing chunk {chunk.id}: {e}") continue # Sort by similarity results.sort(key=lambda x: x['similarity'], reverse=True) return results[:limit] def _cosine_similarity(vec1: List[float], vec2: List[float]) -> float: """Calculate cosine similarity between two vectors.""" import math if len(vec1) != len(vec2): return 0.0 dot_product = sum(a * b for a, b in zip(vec1, vec2)) norm1 = math.sqrt(sum(a * a for a in vec1)) norm2 = math.sqrt(sum(b * b for b in vec2)) if norm1 == 0 or norm2 == 0: return 0.0 return dot_product / (norm1 * norm2) def get_relevant_facts( db_session, query: str, limit: int = 10 ) -> List[Dict]: """ Get facts relevant to a query. Uses keyword matching for now, can be enhanced with embeddings. """ from sqlalchemy import or_ # Simple keyword search keywords = query.lower().split() # NOTE: Removed is_verified filter - auto-extracted facts are usable # Future: add manual verification workflow and re-enable filter facts = db_session.query(ZOPKKnowledgeFact).filter( ZOPKKnowledgeFact.confidence_score >= 0.3 # Minimum confidence threshold ).all() results = [] for fact in facts: score = 0 fact_text = (fact.full_text or '').lower() for keyword in keywords: if keyword in fact_text: score += 1 if score > 0: # Get source URL from related news source_url = '' source_name = '' source_date = '' if fact.source_news: source_url = fact.source_news.url or '' source_name = fact.source_news.source_name or fact.source_news.source_domain or '' if fact.source_news.published_at: source_date = fact.source_news.published_at.strftime('%Y-%m-%d') results.append({ 'fact_id': fact.id, 'fact_type': fact.fact_type, 'full_text': fact.full_text, 'subject': fact.subject, 'numeric_value': float(fact.numeric_value) if fact.numeric_value else None, 'numeric_unit': fact.numeric_unit, 'confidence': float(fact.confidence_score) if fact.confidence_score else None, 'relevance_score': score, 'source_url': source_url, 'source_name': source_name, 'source_date': source_date }) # Sort by relevance results.sort(key=lambda x: x['relevance_score'], reverse=True) return results[:limit] def generate_chunk_embeddings( db_session, limit: int = 100, user_id: Optional[int] = None, progress_callback: ProgressCallback = None ) -> Dict: """ Generate embeddings for chunks that don't have them. Args: db_session: SQLAlchemy session limit: Max chunks to process user_id: User ID for cost tracking progress_callback: Optional callback for progress updates Returns: Dict with statistics """ import json import time from gemini_service import GeminiService gemini = GeminiService() # Find chunks without embeddings chunks = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.embedding.is_(None) ).limit(limit).all() total = len(chunks) stats = { 'total': total, 'success': 0, 'failed': 0, 'processing_time': 0 } # Send initial progress if progress_callback and total > 0: progress_callback(ProgressUpdate( current=0, total=total, percent=0.0, stage='embedding', status='processing', message=f'Rozpoczynam generowanie embeddingów dla {total} chunks...', details={'success': 0, 'failed': 0} )) start_time = time.time() for idx, chunk in enumerate(chunks, 1): # Send progress update before processing if progress_callback: # Get article title from chunk's source news article_title = None if chunk.source_news: article_title = chunk.source_news.title[:80] progress_callback(ProgressUpdate( current=idx, total=total, percent=round((idx - 1) / total * 100, 1), stage='embedding', status='processing', message=f'Generuję embedding {idx}/{total}: {chunk.summary[:40] if chunk.summary else "chunk"}...', article_id=chunk.source_news_id, article_title=article_title, details={ 'success': stats['success'], 'failed': stats['failed'], 'chunk_id': chunk.id } )) try: embedding = gemini.generate_embedding( text=chunk.content, task_type='retrieval_document', title=chunk.summary, user_id=user_id, feature='zopk_chunk_embedding' ) if embedding: # Store as JSON string chunk.embedding = json.dumps(embedding) stats['success'] += 1 if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='embedding', status='success', message=f'✓ Wygenerowano embedding (768 dim)', article_id=chunk.source_news_id, details={ 'success': stats['success'], 'failed': stats['failed'], 'chunk_id': chunk.id } )) else: stats['failed'] += 1 if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='embedding', status='failed', message='✗ Nie udało się wygenerować embeddingu', article_id=chunk.source_news_id, details={'success': stats['success'], 'failed': stats['failed']} )) except Exception as e: logger.error(f"Error generating embedding for chunk {chunk.id}: {e}") stats['failed'] += 1 if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='embedding', status='failed', message=f'✗ Błąd: {str(e)[:50]}...', article_id=chunk.source_news_id, details={'success': stats['success'], 'failed': stats['failed'], 'error': str(e)} )) db_session.commit() stats['processing_time'] = round(time.time() - start_time, 2) # Send completion progress if progress_callback: progress_callback(ProgressUpdate( current=total, total=total, percent=100.0, stage='embedding', status='complete', message=f'Zakończono: {stats["success"]}/{total} embeddingów wygenerowanych', details={ 'success': stats['success'], 'failed': stats['failed'], 'processing_time': stats['processing_time'] } )) logger.info(f"Generated embeddings: {stats['success']}/{stats['total']} success") return stats # ============================================================ # STANDALONE FUNCTIONS FOR CRON/CLI # ============================================================ def extract_pending_articles(db_session, limit: int = 50) -> Dict: """ Convenience function for cron jobs. Usage: from zopk_knowledge_service import extract_pending_articles result = extract_pending_articles(db_session, limit=50) """ service = ZOPKKnowledgeService(db_session) return service.batch_extract(limit=limit) def get_knowledge_stats(db_session) -> Dict: """ Get knowledge extraction statistics for monitoring. """ service = ZOPKKnowledgeService(db_session) return service.get_extraction_statistics() # ============================================================ # ADMIN PANEL - LIST FUNCTIONS # ============================================================ def list_chunks( db_session, page: int = 1, per_page: int = 20, source_news_id: Optional[int] = None, has_embedding: Optional[bool] = None, is_verified: Optional[bool] = None ) -> Dict: """ List knowledge chunks with pagination and filtering. Args: db_session: Database session page: Page number (1-based) per_page: Items per page source_news_id: Filter by source article has_embedding: Filter by embedding status is_verified: Filter by verification status Returns: { 'chunks': [...], 'total': int, 'page': int, 'per_page': int, 'pages': int } """ from sqlalchemy import func query = db_session.query(ZOPKKnowledgeChunk) # Apply filters if source_news_id: query = query.filter(ZOPKKnowledgeChunk.source_news_id == source_news_id) if has_embedding is not None: if has_embedding: query = query.filter(ZOPKKnowledgeChunk.embedding.isnot(None)) else: query = query.filter(ZOPKKnowledgeChunk.embedding.is_(None)) if is_verified is not None: query = query.filter(ZOPKKnowledgeChunk.is_verified == is_verified) # Get total count total = query.count() # Calculate pagination pages = (total + per_page - 1) // per_page offset = (page - 1) * per_page # Get chunks with source news info chunks = query.order_by( ZOPKKnowledgeChunk.created_at.desc() ).offset(offset).limit(per_page).all() return { 'chunks': [ { 'id': c.id, 'content': c.content[:300] + '...' if len(c.content) > 300 else c.content, 'full_content': c.content, 'summary': c.summary, 'chunk_type': c.chunk_type, 'chunk_index': c.chunk_index, 'token_count': c.token_count, 'importance_score': c.importance_score, 'confidence_score': float(c.confidence_score) if c.confidence_score else None, 'has_embedding': c.embedding is not None, 'is_verified': c.is_verified, 'source_news_id': c.source_news_id, 'source_title': c.source_news.title if c.source_news else None, 'source_url': c.source_news.url if c.source_news else None, 'created_at': c.created_at.isoformat() if c.created_at else None, 'keywords': c.keywords if isinstance(c.keywords, list) else [] } for c in chunks ], 'total': total, 'page': page, 'per_page': per_page, 'pages': pages } def list_facts( db_session, page: int = 1, per_page: int = 20, fact_type: Optional[str] = None, is_verified: Optional[bool] = None, source_news_id: Optional[int] = None ) -> Dict: """ List knowledge facts with pagination and filtering. Args: db_session: Database session page: Page number (1-based) per_page: Items per page fact_type: Filter by fact type (statistic, event, statement, decision, milestone) is_verified: Filter by verification status source_news_id: Filter by source article Returns: { 'facts': [...], 'total': int, 'page': int, 'per_page': int, 'pages': int, 'fact_types': [...] - available types for filtering } """ from sqlalchemy import func, distinct query = db_session.query(ZOPKKnowledgeFact) # Apply filters if fact_type: query = query.filter(ZOPKKnowledgeFact.fact_type == fact_type) if is_verified is not None: query = query.filter(ZOPKKnowledgeFact.is_verified == is_verified) if source_news_id: query = query.filter(ZOPKKnowledgeFact.source_news_id == source_news_id) # Get total count total = query.count() # Get available fact types fact_types = db_session.query( distinct(ZOPKKnowledgeFact.fact_type) ).filter( ZOPKKnowledgeFact.fact_type.isnot(None) ).all() fact_types = [f[0] for f in fact_types if f[0]] # Calculate pagination pages = (total + per_page - 1) // per_page offset = (page - 1) * per_page # Get facts facts = query.order_by( ZOPKKnowledgeFact.created_at.desc() ).offset(offset).limit(per_page).all() return { 'facts': [ { 'id': f.id, 'fact_type': f.fact_type, 'subject': f.subject, 'predicate': f.predicate, 'object': f.object, 'full_text': f.full_text, 'numeric_value': float(f.numeric_value) if f.numeric_value else None, 'numeric_unit': f.numeric_unit, 'date_value': f.date_value.isoformat() if f.date_value else None, 'confidence_score': float(f.confidence_score) if f.confidence_score else None, 'is_verified': f.is_verified, 'source_news_id': f.source_news_id, 'source_chunk_id': f.source_chunk_id, 'source_title': f.source_news.title if f.source_news else None, 'source_url': f.source_news.url if f.source_news else None, 'entities_involved': f.entities_involved if isinstance(f.entities_involved, list) else [], 'created_at': f.created_at.isoformat() if f.created_at else None } for f in facts ], 'total': total, 'page': page, 'per_page': per_page, 'pages': pages, 'fact_types': fact_types } def list_entities( db_session, page: int = 1, per_page: int = 20, entity_type: Optional[str] = None, is_verified: Optional[bool] = None, min_mentions: Optional[int] = None ) -> Dict: """ List knowledge entities with pagination and filtering. Args: db_session: Database session page: Page number (1-based) per_page: Items per page entity_type: Filter by entity type (company, person, place, organization, project, technology) is_verified: Filter by verification status min_mentions: Filter by minimum mention count Returns: { 'entities': [...], 'total': int, 'page': int, 'per_page': int, 'pages': int, 'entity_types': [...] - available types for filtering } """ from sqlalchemy import func, distinct query = db_session.query(ZOPKKnowledgeEntity) # Exclude merged entities query = query.filter(ZOPKKnowledgeEntity.merged_into_id.is_(None)) # Apply filters if entity_type: query = query.filter(ZOPKKnowledgeEntity.entity_type == entity_type) if is_verified is not None: query = query.filter(ZOPKKnowledgeEntity.is_verified == is_verified) if min_mentions: query = query.filter(ZOPKKnowledgeEntity.mentions_count >= min_mentions) # Get total count total = query.count() # Get available entity types entity_types = db_session.query( distinct(ZOPKKnowledgeEntity.entity_type) ).filter( ZOPKKnowledgeEntity.entity_type.isnot(None) ).all() entity_types = [e[0] for e in entity_types if e[0]] # Calculate pagination pages = (total + per_page - 1) // per_page offset = (page - 1) * per_page # Get entities sorted by mentions entities = query.order_by( ZOPKKnowledgeEntity.mentions_count.desc() ).offset(offset).limit(per_page).all() return { 'entities': [ { 'id': e.id, 'name': e.name, 'normalized_name': e.normalized_name, 'entity_type': e.entity_type, 'description': e.description, 'short_description': e.short_description, 'aliases': e.aliases if isinstance(e.aliases, list) else [], 'mentions_count': e.mentions_count or 0, 'is_verified': e.is_verified, 'company_id': e.company_id, 'external_url': e.external_url, 'first_mentioned_at': e.first_mentioned_at.isoformat() if e.first_mentioned_at else None, 'last_mentioned_at': e.last_mentioned_at.isoformat() if e.last_mentioned_at else None, 'created_at': e.created_at.isoformat() if e.created_at else None } for e in entities ], 'total': total, 'page': page, 'per_page': per_page, 'pages': pages, 'entity_types': entity_types } def get_chunk_detail(db_session, chunk_id: int) -> Optional[Dict]: """Get detailed information about a single chunk.""" chunk = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.id == chunk_id ).first() if not chunk: return None # Get facts from this chunk facts = db_session.query(ZOPKKnowledgeFact).filter( ZOPKKnowledgeFact.source_chunk_id == chunk_id ).all() # Get entity mentions mentions = db_session.query(ZOPKKnowledgeEntityMention).filter( ZOPKKnowledgeEntityMention.chunk_id == chunk_id ).all() return { 'id': chunk.id, 'content': chunk.content, 'content_clean': chunk.content_clean, 'summary': chunk.summary, 'chunk_type': chunk.chunk_type, 'chunk_index': chunk.chunk_index, 'token_count': chunk.token_count, 'importance_score': chunk.importance_score, 'confidence_score': float(chunk.confidence_score) if chunk.confidence_score else None, 'has_embedding': chunk.embedding is not None, 'is_verified': chunk.is_verified, 'keywords': chunk.keywords if isinstance(chunk.keywords, list) else [], 'context_date': chunk.context_date.isoformat() if chunk.context_date else None, 'context_location': chunk.context_location, 'extraction_model': chunk.extraction_model, 'extracted_at': chunk.extracted_at.isoformat() if chunk.extracted_at else None, 'created_at': chunk.created_at.isoformat() if chunk.created_at else None, 'source_news': { 'id': chunk.source_news.id, 'title': chunk.source_news.title, 'url': chunk.source_news.url, 'source_name': chunk.source_news.source_name } if chunk.source_news else None, 'facts': [ { 'id': f.id, 'fact_type': f.fact_type, 'full_text': f.full_text, 'is_verified': f.is_verified } for f in facts ], 'entity_mentions': [ { 'id': m.id, 'entity_id': m.entity_id, 'entity_name': m.entity.name if m.entity else None, 'entity_type': m.entity.entity_type if m.entity else None, 'mention_text': m.mention_text } for m in mentions ] } def update_chunk_verification(db_session, chunk_id: int, is_verified: bool, user_id: int) -> bool: """Update chunk verification status.""" chunk = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.id == chunk_id ).first() if not chunk: return False chunk.is_verified = is_verified chunk.verified_by = user_id chunk.verified_at = datetime.now() db_session.commit() return True def update_fact_verification(db_session, fact_id: int, is_verified: bool) -> bool: """Update fact verification status.""" fact = db_session.query(ZOPKKnowledgeFact).filter( ZOPKKnowledgeFact.id == fact_id ).first() if not fact: return False fact.is_verified = is_verified db_session.commit() return True def update_entity_verification(db_session, entity_id: int, is_verified: bool) -> bool: """Update entity verification status.""" entity = db_session.query(ZOPKKnowledgeEntity).filter( ZOPKKnowledgeEntity.id == entity_id ).first() if not entity: return False entity.is_verified = is_verified db_session.commit() return True def delete_chunk(db_session, chunk_id: int) -> bool: """Delete a chunk and its associated facts and mentions.""" chunk = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.id == chunk_id ).first() if not chunk: return False # Delete associated facts db_session.query(ZOPKKnowledgeFact).filter( ZOPKKnowledgeFact.source_chunk_id == chunk_id ).delete() # Delete associated mentions db_session.query(ZOPKKnowledgeEntityMention).filter( ZOPKKnowledgeEntityMention.chunk_id == chunk_id ).delete() # Delete chunk db_session.delete(chunk) db_session.commit() return True # ============================================================ # DUPLICATE ENTITY DETECTION AND MERGING # ============================================================ def find_duplicate_entities( db_session, entity_type: Optional[str] = None, min_similarity: float = 0.5, limit: int = 100 ) -> List[Dict]: """ Find potential duplicate entities using fuzzy matching. Uses PostgreSQL pg_trgm extension for similarity matching. Returns pairs of entities that might be duplicates. Args: db_session: SQLAlchemy session entity_type: Filter by entity type (company, person, etc.) min_similarity: Minimum similarity threshold (0.0-1.0) limit: Maximum number of pairs to return Returns: List of dicts with duplicate pairs: [ { 'entity1': {...}, 'entity2': {...}, 'similarity': 0.85, 'match_type': 'fuzzy' # or 'substring' } ] """ from sqlalchemy import text # Build query with pg_trgm similarity type_filter = f"AND e1.entity_type = '{entity_type}'" if entity_type else "" query = text(f""" SELECT e1.id as id1, e1.name as name1, e1.entity_type as type1, e1.mentions_count as mentions1, e1.is_verified as verified1, e2.id as id2, e2.name as name2, e2.entity_type as type2, e2.mentions_count as mentions2, e2.is_verified as verified2, similarity(LOWER(e1.name), LOWER(e2.name)) as sim, CASE WHEN LOWER(e1.name) LIKE '%' || LOWER(e2.name) || '%' OR LOWER(e2.name) LIKE '%' || LOWER(e1.name) || '%' THEN 'substring' ELSE 'fuzzy' END as match_type FROM zopk_knowledge_entities e1 JOIN zopk_knowledge_entities e2 ON e1.id < e2.id AND e1.entity_type = e2.entity_type WHERE ( similarity(LOWER(e1.name), LOWER(e2.name)) > :min_sim OR LOWER(e1.name) LIKE '%' || LOWER(e2.name) || '%' OR LOWER(e2.name) LIKE '%' || LOWER(e1.name) || '%' ) {type_filter} ORDER BY sim DESC, e1.entity_type, GREATEST(e1.mentions_count, e2.mentions_count) DESC LIMIT :limit """) result = db_session.execute(query, {'min_sim': min_similarity, 'limit': limit}) duplicates = [] for row in result: duplicates.append({ 'entity1': { 'id': row.id1, 'name': row.name1, 'entity_type': row.type1, 'mentions_count': row.mentions1, 'is_verified': row.verified1 }, 'entity2': { 'id': row.id2, 'name': row.name2, 'entity_type': row.type2, 'mentions_count': row.mentions2, 'is_verified': row.verified2 }, 'similarity': float(row.sim) if row.sim else 0.0, 'match_type': row.match_type }) return duplicates def merge_entities( db_session, primary_id: int, duplicate_id: int, new_name: Optional[str] = None ) -> Dict: """ Merge two entities - keep primary, delete duplicate. Transfers all relationships from duplicate to primary: - Entity mentions - Facts (subject/object references) - Relations (source/target) - Updates mentions_count Args: db_session: SQLAlchemy session primary_id: ID of entity to keep duplicate_id: ID of entity to merge and delete new_name: Optional new canonical name for primary Returns: Dict with merge results: { 'success': True, 'primary_id': 123, 'deleted_id': 456, 'transfers': { 'mentions': 15, 'facts_subject': 3, 'facts_object': 2, 'relations_source': 1, 'relations_target': 0 } } """ from sqlalchemy import text # Get both entities primary = db_session.query(ZOPKKnowledgeEntity).get(primary_id) duplicate = db_session.query(ZOPKKnowledgeEntity).get(duplicate_id) if not primary: return {'success': False, 'error': f'Primary entity {primary_id} not found'} if not duplicate: return {'success': False, 'error': f'Duplicate entity {duplicate_id} not found'} if primary.entity_type != duplicate.entity_type: return {'success': False, 'error': 'Cannot merge entities of different types'} transfers = { 'mentions': 0, 'facts': 0, 'relations_source': 0, 'relations_target': 0 } try: # 1. Transfer mentions result = db_session.execute(text(""" UPDATE zopk_knowledge_entity_mentions SET entity_id = :primary_id WHERE entity_id = :duplicate_id """), {'primary_id': primary_id, 'duplicate_id': duplicate_id}) transfers['mentions'] = result.rowcount # 2. Transfer facts - update entities_involved JSONB # Replace duplicate entity ID with primary ID in the JSONB array result = db_session.execute(text(""" UPDATE zopk_knowledge_facts SET entities_involved = ( SELECT jsonb_agg( CASE WHEN (elem->>'id')::int = :duplicate_id THEN jsonb_set(elem, '{id}', to_jsonb(:primary_id)) ELSE elem END ) FROM jsonb_array_elements(entities_involved::jsonb) AS elem ) WHERE entities_involved::jsonb @> CAST(:entity_json AS jsonb) """), { 'primary_id': primary_id, 'duplicate_id': duplicate_id, 'entity_json': f'[{{"id": {duplicate_id}}}]' }) transfers['facts'] = result.rowcount # 4. Transfer relations (source) result = db_session.execute(text(""" UPDATE zopk_knowledge_relations SET source_entity_id = :primary_id WHERE source_entity_id = :duplicate_id """), {'primary_id': primary_id, 'duplicate_id': duplicate_id}) transfers['relations_source'] = result.rowcount # 5. Transfer relations (target) result = db_session.execute(text(""" UPDATE zopk_knowledge_relations SET target_entity_id = :primary_id WHERE target_entity_id = :duplicate_id """), {'primary_id': primary_id, 'duplicate_id': duplicate_id}) transfers['relations_target'] = result.rowcount # 6. Update primary entity primary.mentions_count += duplicate.mentions_count if new_name: primary.canonical_name = new_name # Merge aliases if duplicate.aliases: existing_aliases = primary.aliases or [] new_aliases = duplicate.aliases # Add duplicate name as alias if duplicate.name not in existing_aliases: existing_aliases.append(duplicate.name) # Add duplicate's aliases for alias in new_aliases: if alias not in existing_aliases: existing_aliases.append(alias) primary.aliases = existing_aliases # 7. Delete duplicate db_session.delete(duplicate) db_session.commit() return { 'success': True, 'primary_id': primary_id, 'deleted_id': duplicate_id, 'new_mentions_count': primary.mentions_count, 'transfers': transfers } except Exception as e: db_session.rollback() logger.error(f"Error merging entities: {e}") return {'success': False, 'error': str(e)} def get_entity_merge_preview( db_session, primary_id: int, duplicate_id: int ) -> Dict: """ Preview what would happen if two entities are merged. Returns counts of items that would be transferred. """ from sqlalchemy import text, func primary = db_session.query(ZOPKKnowledgeEntity).get(primary_id) duplicate = db_session.query(ZOPKKnowledgeEntity).get(duplicate_id) if not primary or not duplicate: return {'error': 'Entity not found'} # Count items that would be transferred mentions = db_session.query(func.count(ZOPKKnowledgeEntityMention.id)).filter( ZOPKKnowledgeEntityMention.entity_id == duplicate_id ).scalar() or 0 # Facts use entities_involved (JSONB) not FK columns, so count via JSONB query # Count facts where duplicate entity is in entities_involved array facts_with_entity = db_session.execute(text(""" SELECT COUNT(*) FROM zopk_knowledge_facts WHERE entities_involved::jsonb @> CAST(:entity_json AS jsonb) """), {'entity_json': f'[{{"id": {duplicate_id}}}]'}).scalar() or 0 relations_source = db_session.query(func.count(ZOPKKnowledgeRelation.id)).filter( ZOPKKnowledgeRelation.source_entity_id == duplicate_id ).scalar() or 0 relations_target = db_session.query(func.count(ZOPKKnowledgeRelation.id)).filter( ZOPKKnowledgeRelation.target_entity_id == duplicate_id ).scalar() or 0 return { 'primary': { 'id': primary.id, 'name': primary.name, 'entity_type': primary.entity_type, 'mentions_count': primary.mentions_count, 'aliases': primary.aliases or [] }, 'duplicate': { 'id': duplicate.id, 'name': duplicate.name, 'entity_type': duplicate.entity_type, 'mentions_count': duplicate.mentions_count, 'aliases': duplicate.aliases or [] }, 'transfers': { 'mentions': mentions, 'facts': facts_with_entity, 'relations_source': relations_source, 'relations_target': relations_target, 'total': mentions + facts_with_entity + relations_source + relations_target }, 'result': { 'new_mentions_count': primary.mentions_count + duplicate.mentions_count } }