""" ZOPK Knowledge Service - Ekstrakcja wiedzy z artykułów dla bazy wiedzy. Pipeline: 1. Chunking - dzielenie tekstu na fragmenty 500-1000 tokenów 2. AI Extraction - ekstrakcja faktów i encji przez Gemini 3. Entity Linking - identyfikacja i deduplikacja encji 4. Relation Extraction - wykrywanie relacji między encjami 5. Embedding Generation - wektory dla semantic search (FAZA 2) Usage: from zopk_knowledge_service import ZOPKKnowledgeService service = ZOPKKnowledgeService(db_session) result = service.extract_from_news(news_id=123) # lub batch: result = service.batch_extract(limit=50) """ import re import json import logging import hashlib from datetime import datetime from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass, field from database import ( ZOPKNews, ZOPKKnowledgeChunk, ZOPKKnowledgeEntity, ZOPKKnowledgeFact, ZOPKKnowledgeEntityMention, ZOPKKnowledgeRelation, ) # Configure logging logger = logging.getLogger(__name__) # ============================================================ # CONFIGURATION # ============================================================ # Chunk size settings # NOTE: Reduced from 1000 to 500 tokens due to Gemini safety filter issues # Long texts (~4000 chars) trigger safety blocks, ~2000 chars work reliably MIN_CHUNK_SIZE = 200 # tokens MAX_CHUNK_SIZE = 500 # tokens (~2000 chars for Polish text) CHUNK_OVERLAP = 50 # tokens overlap between chunks (reduced proportionally) APPROX_CHARS_PER_TOKEN = 4 # Polish text approximation # AI extraction settings MAX_FACTS_PER_CHUNK = 10 MAX_ENTITIES_PER_CHUNK = 15 # Entity types ENTITY_TYPES = [ 'company', # Firma, organizacja biznesowa 'person', # Osoba 'place', # Miejsce, lokalizacja 'organization', # Instytucja rządowa, NGO 'project', # Projekt, inicjatywa 'technology', # Technologia, produkt 'event', # Wydarzenie, konferencja ] # Fact types FACT_TYPES = [ 'statistic', # Liczby, dane ilościowe 'event', # Zdarzenie z datą 'statement', # Wypowiedź, deklaracja 'decision', # Decyzja, postanowienie 'milestone', # Kamień milowy projektu 'partnership', # Partnerstwo, współpraca 'investment', # Inwestycja ] # Relation types RELATION_TYPES = [ 'investor_in', # A inwestuje w B 'partner_of', # A jest partnerem B 'located_in', # A znajduje się w B 'manages', # A zarządza B 'works_for', # A pracuje dla B 'part_of', # A jest częścią B 'cooperates_with', # A współpracuje z B 'produces', # A produkuje B 'supplies_to', # A dostarcza do B ] # ============================================================ # AI PROMPTS # ============================================================ # Ultra-simplified prompt to avoid Gemini safety filter issues # Note: Complex JSON schemas with pipe characters were triggering filters # Note: max_tokens parameter also triggers filters - don't use it! EXTRACTION_USER_PROMPT = """Przeanalizuj artykuł i wyodrębnij informacje w formacie JSON. ARTYKUŁ: {chunk_text} Zwróć JSON z następującą strukturą: {{ "facts": [ {{"text": "pełny fakt", "type": "investment"}} ], "entities": [ {{"name": "Nazwa", "type": "company"}} ], "summary": "krótkie podsumowanie" }} Typy faktów: investment, decision, event, statistic, partnership, milestone Typy encji: company, person, place, organization, project""" # System prompt is now empty - the user prompt contains all necessary instructions EXTRACTION_SYSTEM_PROMPT = "" # ============================================================ # DATA CLASSES # ============================================================ @dataclass class ChunkData: """Data for a single chunk.""" content: str index: int token_count: int @dataclass class ExtractionResult: """Result of knowledge extraction from a news article.""" success: bool news_id: int chunks_created: int = 0 facts_created: int = 0 entities_created: int = 0 relations_created: int = 0 error: Optional[str] = None processing_time: float = 0.0 # ============================================================ # CHUNKING FUNCTIONS # ============================================================ def estimate_tokens(text: str) -> int: """Estimate token count for text.""" if not text: return 0 # Polish text: ~4 chars per token on average return len(text) // APPROX_CHARS_PER_TOKEN def split_into_sentences(text: str) -> List[str]: """Split text into sentences.""" # Polish sentence boundaries sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZĄĆĘŁŃÓŚŹŻ])' sentences = re.split(sentence_pattern, text) return [s.strip() for s in sentences if s.strip()] def create_chunks(text: str) -> List[ChunkData]: """ Split text into chunks of appropriate size. Strategy: - Split by paragraphs first - Combine small paragraphs - Split large paragraphs by sentences - Maintain overlap between chunks """ if not text: return [] chunks = [] current_chunk = [] current_tokens = 0 # Split by paragraphs paragraphs = text.split('\n\n') for para in paragraphs: para = para.strip() if not para: continue para_tokens = estimate_tokens(para) # If paragraph is too large, split by sentences if para_tokens > MAX_CHUNK_SIZE: sentences = split_into_sentences(para) for sentence in sentences: sent_tokens = estimate_tokens(sentence) if current_tokens + sent_tokens > MAX_CHUNK_SIZE: # Save current chunk if current_chunk: chunk_text = ' '.join(current_chunk) chunks.append(ChunkData( content=chunk_text, index=len(chunks), token_count=current_tokens )) # Overlap: keep last sentence current_chunk = [current_chunk[-1]] if current_chunk else [] current_tokens = estimate_tokens(current_chunk[0]) if current_chunk else 0 current_chunk.append(sentence) current_tokens += sent_tokens else: # Add paragraph to current chunk if current_tokens + para_tokens > MAX_CHUNK_SIZE: # Save current chunk if current_chunk: chunk_text = '\n\n'.join(current_chunk) chunks.append(ChunkData( content=chunk_text, index=len(chunks), token_count=current_tokens )) current_chunk = [] current_tokens = 0 current_chunk.append(para) current_tokens += para_tokens # Save last chunk if current_chunk: chunk_text = '\n\n'.join(current_chunk) if len(current_chunk[0]) > 100 else ' '.join(current_chunk) if estimate_tokens(chunk_text) >= MIN_CHUNK_SIZE: chunks.append(ChunkData( content=chunk_text, index=len(chunks), token_count=current_tokens )) return chunks # ============================================================ # KNOWLEDGE SERVICE CLASS # ============================================================ class ZOPKKnowledgeService: """ Service for extracting knowledge from ZOPK news articles. Features: - Text chunking with overlap - AI-powered fact extraction (Gemini) - Named entity recognition - Relation extraction - Entity deduplication """ def __init__(self, db_session, user_id: Optional[int] = None): """ Initialize service. Args: db_session: SQLAlchemy database session user_id: Optional user ID for cost tracking """ self.db = db_session self.user_id = user_id self._gemini_service = None @property def gemini(self): """Lazy-load Gemini service.""" if self._gemini_service is None: from gemini_service import GeminiService self._gemini_service = GeminiService() return self._gemini_service def _normalize_entity_name(self, name: str) -> str: """Normalize entity name for deduplication.""" if not name: return '' # Lowercase, remove extra spaces normalized = name.lower().strip() normalized = re.sub(r'\s+', ' ', normalized) # Remove Polish diacritics for matching trans = str.maketrans('ąćęłńóśźżĄĆĘŁŃÓŚŹŻ', 'acelnoszzACELNOSZZ') normalized = normalized.translate(trans) return normalized def _find_or_create_entity( self, name: str, entity_type: str, description: Optional[str] = None ) -> ZOPKKnowledgeEntity: """ Find existing entity or create new one. Uses normalized name for deduplication. """ normalized = self._normalize_entity_name(name) # Try to find existing existing = self.db.query(ZOPKKnowledgeEntity).filter( ZOPKKnowledgeEntity.normalized_name == normalized ).first() if existing: # Update mention count existing.mentions_count = (existing.mentions_count or 0) + 1 existing.last_mentioned_at = datetime.now() # Update description if better if description and (not existing.description or len(description) > len(existing.description)): existing.description = description return existing # Create new entity entity = ZOPKKnowledgeEntity( entity_type=entity_type, name=name, normalized_name=normalized, description=description, short_description=description[:500] if description else None, mentions_count=1, first_mentioned_at=datetime.now(), last_mentioned_at=datetime.now() ) self.db.add(entity) self.db.flush() # Get ID return entity def _extract_with_ai( self, chunk: ChunkData, source_name: str, published_date: str ) -> Optional[Dict]: """ Extract facts, entities, and relations using Gemini AI. Returns parsed JSON or None on error. """ try: # Truncate chunk to avoid Gemini safety filter issues with long texts # Testing showed ~4000 chars triggers safety blocks, ~2000 chars works MAX_PROMPT_CHARS = 2000 chunk_text = chunk.content[:MAX_PROMPT_CHARS] if len(chunk.content) > MAX_PROMPT_CHARS: logger.debug(f"Truncated chunk from {len(chunk.content)} to {MAX_PROMPT_CHARS} chars") # Simplified single prompt (system prompt removed to avoid safety filter issues) prompt = EXTRACTION_USER_PROMPT.format( chunk_text=chunk_text, source_name=source_name, published_date=published_date ) # NOTE: max_tokens removed - testing showed it triggers safety filters! response = self.gemini.generate_text( prompt=prompt, temperature=0.1, # Low temperature for consistency user_id=self.user_id, feature='zopk_knowledge_extraction' ) if not response: logger.warning("Empty response from Gemini") return None # Parse JSON from response # Handle markdown code blocks json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response) if json_match: json_str = json_match.group(1) else: json_str = response # Clean and parse json_str = json_str.strip() data = json.loads(json_str) return data except json.JSONDecodeError as e: logger.error(f"JSON parse error: {e}") return None except Exception as e: logger.error(f"AI extraction error: {e}") return None def _save_chunk( self, news: ZOPKNews, chunk: ChunkData, extraction_data: Optional[Dict] ) -> ZOPKKnowledgeChunk: """Save chunk to database.""" db_chunk = ZOPKKnowledgeChunk( source_news_id=news.id, content=chunk.content, chunk_index=chunk.index, token_count=chunk.token_count, chunk_type='narrative', # Default summary=extraction_data.get('summary') if extraction_data else None, keywords=extraction_data.get('keywords') if extraction_data else None, language='pl', extraction_model='gemini-2.0-flash', extracted_at=datetime.now() ) self.db.add(db_chunk) self.db.flush() return db_chunk def _save_facts( self, chunk: ZOPKKnowledgeChunk, news: ZOPKNews, facts_data: List[Dict] ) -> int: """Save extracted facts to database.""" count = 0 for fact in facts_data[:MAX_FACTS_PER_CHUNK]: try: # Parse numeric value numeric_value = fact.get('numeric_value') if numeric_value is not None: try: numeric_value = float(numeric_value) except (ValueError, TypeError): numeric_value = None # Parse date date_value = fact.get('date_value') if date_value: try: from datetime import datetime as dt date_value = dt.strptime(date_value, '%Y-%m-%d').date() except (ValueError, TypeError): date_value = None # Support both old format (full_text) and new simplified format (text) fact_text = fact.get('text') or fact.get('full_text', '') db_fact = ZOPKKnowledgeFact( source_chunk_id=chunk.id, source_news_id=news.id, fact_type=fact.get('type', 'statement'), subject=fact.get('subject'), predicate=fact.get('predicate'), object=fact.get('object'), full_text=fact_text, numeric_value=numeric_value, numeric_unit=fact.get('numeric_unit'), date_value=date_value, confidence_score=fact.get('confidence', 0.5) ) self.db.add(db_fact) count += 1 except Exception as e: logger.error(f"Error saving fact: {e}") continue return count def _save_entities_and_relations( self, chunk: ZOPKKnowledgeChunk, entities_data: List[Dict], relations_data: List[Dict] ) -> Tuple[int, int]: """Save entities and relations to database.""" entity_count = 0 relation_count = 0 # Map of name -> entity for relations entity_map = {} # Save entities for ent in entities_data[:MAX_ENTITIES_PER_CHUNK]: try: entity = self._find_or_create_entity( name=ent.get('name', ''), entity_type=ent.get('type', 'organization'), description=ent.get('description') ) entity_map[ent.get('name', '').lower()] = entity # Create mention link mention = ZOPKKnowledgeEntityMention( chunk_id=chunk.id, entity_id=entity.id, mention_text=ent.get('name'), mention_type='direct', role_in_context=ent.get('role'), confidence=0.9 ) self.db.add(mention) entity_count += 1 except Exception as e: logger.error(f"Error saving entity: {e}") continue # Flush to get entity IDs self.db.flush() # Save relations for rel in relations_data: try: entity_a_name = rel.get('entity_a', '').lower() entity_b_name = rel.get('entity_b', '').lower() entity_a = entity_map.get(entity_a_name) entity_b = entity_map.get(entity_b_name) if not entity_a or not entity_b: continue # Check if relation already exists existing = self.db.query(ZOPKKnowledgeRelation).filter( ZOPKKnowledgeRelation.entity_a_id == entity_a.id, ZOPKKnowledgeRelation.entity_b_id == entity_b.id, ZOPKKnowledgeRelation.relation_type == rel.get('relation') ).first() if not existing: db_relation = ZOPKKnowledgeRelation( entity_a_id=entity_a.id, entity_b_id=entity_b.id, relation_type=rel.get('relation', 'cooperates_with'), evidence_text=rel.get('description'), source_chunk_id=chunk.id, confidence=0.8, strength=3 ) self.db.add(db_relation) relation_count += 1 except Exception as e: logger.error(f"Error saving relation: {e}") continue return entity_count, relation_count def extract_from_news(self, news_id: int) -> ExtractionResult: """ Extract knowledge from a single news article. Args: news_id: ID of ZOPKNews record Returns: ExtractionResult with statistics """ import time start_time = time.time() # Get news record news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first() if not news: return ExtractionResult( success=False, news_id=news_id, error=f"News record {news_id} not found" ) # Check if already extracted if news.knowledge_extracted: return ExtractionResult( success=True, news_id=news_id, error="Already extracted" ) # Check if content is scraped if not news.full_content: return ExtractionResult( success=False, news_id=news_id, error="No content scraped" ) logger.info(f"Extracting knowledge from news {news_id}: {news.title[:50]}...") # Create chunks chunks = create_chunks(news.full_content) if not chunks: return ExtractionResult( success=False, news_id=news_id, error="No chunks created" ) # Statistics chunks_created = 0 facts_created = 0 entities_created = 0 relations_created = 0 # Process each chunk for chunk in chunks: try: # Extract with AI extraction_data = self._extract_with_ai( chunk=chunk, source_name=news.source_name or 'unknown', published_date=news.published_at.strftime('%Y-%m-%d') if news.published_at else 'unknown' ) # Save chunk db_chunk = self._save_chunk(news, chunk, extraction_data) chunks_created += 1 if extraction_data: # Save facts facts_created += self._save_facts( db_chunk, news, extraction_data.get('facts', []) ) # Save entities and relations ent_count, rel_count = self._save_entities_and_relations( db_chunk, extraction_data.get('entities', []), extraction_data.get('relations', []) ) entities_created += ent_count relations_created += rel_count except Exception as e: logger.error(f"Error processing chunk {chunk.index}: {e}") continue # Mark as extracted news.knowledge_extracted = True news.knowledge_extracted_at = datetime.now() self.db.commit() processing_time = time.time() - start_time logger.info( f"Extracted from news {news_id}: " f"{chunks_created} chunks, {facts_created} facts, " f"{entities_created} entities, {relations_created} relations " f"in {processing_time:.2f}s" ) return ExtractionResult( success=True, news_id=news_id, chunks_created=chunks_created, facts_created=facts_created, entities_created=entities_created, relations_created=relations_created, processing_time=processing_time ) def batch_extract(self, limit: int = 50) -> Dict: """ Batch extract knowledge from scraped articles. Args: limit: Maximum number of articles to process Returns: Dict with statistics """ import time logger.info(f"Starting batch extraction: limit={limit}") # Find articles ready for extraction articles = self.db.query(ZOPKNews).filter( ZOPKNews.status.in_(['approved', 'auto_approved']), ZOPKNews.scrape_status == 'scraped', ZOPKNews.knowledge_extracted == False ).order_by( ZOPKNews.created_at.desc() ).limit(limit).all() stats = { 'total': len(articles), 'success': 0, 'failed': 0, 'chunks_created': 0, 'facts_created': 0, 'entities_created': 0, 'relations_created': 0, 'errors': [], 'processing_time': 0 } start_time = time.time() for article in articles: result = self.extract_from_news(article.id) if result.success: stats['success'] += 1 stats['chunks_created'] += result.chunks_created stats['facts_created'] += result.facts_created stats['entities_created'] += result.entities_created stats['relations_created'] += result.relations_created else: stats['failed'] += 1 if result.error: stats['errors'].append({ 'id': article.id, 'title': article.title[:100], 'error': result.error }) stats['processing_time'] = round(time.time() - start_time, 2) logger.info( f"Batch extraction complete: {stats['success']}/{stats['total']} success " f"in {stats['processing_time']}s" ) return stats def get_extraction_statistics(self) -> Dict: """Get knowledge extraction statistics.""" from sqlalchemy import func # Articles stats total_approved = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.status.in_(['approved', 'auto_approved']) ).scalar() scraped = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.scrape_status == 'scraped' ).scalar() extracted = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.knowledge_extracted == True ).scalar() pending = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.scrape_status == 'scraped', ZOPKNews.knowledge_extracted == False ).scalar() # Knowledge base stats chunks_count = self.db.query(func.count(ZOPKKnowledgeChunk.id)).scalar() facts_count = self.db.query(func.count(ZOPKKnowledgeFact.id)).scalar() entities_count = self.db.query(func.count(ZOPKKnowledgeEntity.id)).scalar() relations_count = self.db.query(func.count(ZOPKKnowledgeRelation.id)).scalar() # Top entities by mentions top_entities = self.db.query( ZOPKKnowledgeEntity.name, ZOPKKnowledgeEntity.entity_type, ZOPKKnowledgeEntity.mentions_count ).order_by( ZOPKKnowledgeEntity.mentions_count.desc() ).limit(10).all() return { 'articles': { 'total_approved': total_approved or 0, 'scraped': scraped or 0, 'extracted': extracted or 0, 'pending': pending or 0 }, 'knowledge_base': { 'chunks': chunks_count or 0, 'facts': facts_count or 0, 'entities': entities_count or 0, 'relations': relations_count or 0 }, 'top_entities': [ {'name': e[0], 'type': e[1], 'mentions': e[2]} for e in top_entities ] } # ============================================================ # SEMANTIC SEARCH (FAZA 2) # ============================================================ def search_knowledge( db_session, query: str, limit: int = 5, min_similarity: float = 0.3, user_id: Optional[int] = None ) -> List[Dict]: """ Semantic search in ZOPK knowledge base. Args: db_session: SQLAlchemy session query: Search query limit: Max results to return min_similarity: Minimum cosine similarity (0-1) user_id: User ID for cost tracking Returns: List of matching chunks with similarity scores """ from gemini_service import GeminiService import json # Generate query embedding gemini = GeminiService() query_embedding = gemini.generate_embedding( text=query, task_type='retrieval_query', user_id=user_id, feature='zopk_knowledge_search' ) if not query_embedding: logger.warning("Failed to generate query embedding") return [] # Search in database # Note: This uses PostgreSQL pgvector for efficient similarity search # For now, we'll do a simpler approach with JSON embeddings chunks = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.embedding.isnot(None) ).all() results = [] for chunk in chunks: try: # Parse stored embedding if isinstance(chunk.embedding, str): chunk_embedding = json.loads(chunk.embedding) else: chunk_embedding = chunk.embedding if not chunk_embedding: continue # Calculate cosine similarity similarity = _cosine_similarity(query_embedding, chunk_embedding) if similarity >= min_similarity: results.append({ 'chunk_id': chunk.id, 'content': chunk.content[:500], 'summary': chunk.summary, 'keywords': chunk.keywords, 'similarity': round(similarity, 4), 'source_news_id': chunk.source_news_id, 'importance': chunk.importance_score }) except Exception as e: logger.error(f"Error processing chunk {chunk.id}: {e}") continue # Sort by similarity results.sort(key=lambda x: x['similarity'], reverse=True) return results[:limit] def _cosine_similarity(vec1: List[float], vec2: List[float]) -> float: """Calculate cosine similarity between two vectors.""" import math if len(vec1) != len(vec2): return 0.0 dot_product = sum(a * b for a, b in zip(vec1, vec2)) norm1 = math.sqrt(sum(a * a for a in vec1)) norm2 = math.sqrt(sum(b * b for b in vec2)) if norm1 == 0 or norm2 == 0: return 0.0 return dot_product / (norm1 * norm2) def get_relevant_facts( db_session, query: str, limit: int = 10 ) -> List[Dict]: """ Get facts relevant to a query. Uses keyword matching for now, can be enhanced with embeddings. """ from sqlalchemy import or_ # Simple keyword search keywords = query.lower().split() facts = db_session.query(ZOPKKnowledgeFact).filter( ZOPKKnowledgeFact.is_verified == True ).all() results = [] for fact in facts: score = 0 fact_text = (fact.full_text or '').lower() for keyword in keywords: if keyword in fact_text: score += 1 if score > 0: results.append({ 'fact_id': fact.id, 'fact_type': fact.fact_type, 'full_text': fact.full_text, 'subject': fact.subject, 'numeric_value': float(fact.numeric_value) if fact.numeric_value else None, 'numeric_unit': fact.numeric_unit, 'confidence': float(fact.confidence_score) if fact.confidence_score else None, 'relevance_score': score }) # Sort by relevance results.sort(key=lambda x: x['relevance_score'], reverse=True) return results[:limit] def generate_chunk_embeddings(db_session, limit: int = 100, user_id: Optional[int] = None) -> Dict: """ Generate embeddings for chunks that don't have them. Args: db_session: SQLAlchemy session limit: Max chunks to process user_id: User ID for cost tracking Returns: Dict with statistics """ import json from gemini_service import GeminiService gemini = GeminiService() # Find chunks without embeddings chunks = db_session.query(ZOPKKnowledgeChunk).filter( ZOPKKnowledgeChunk.embedding.is_(None) ).limit(limit).all() stats = { 'total': len(chunks), 'success': 0, 'failed': 0 } for chunk in chunks: try: embedding = gemini.generate_embedding( text=chunk.content, task_type='retrieval_document', title=chunk.summary, user_id=user_id, feature='zopk_chunk_embedding' ) if embedding: # Store as JSON string chunk.embedding = json.dumps(embedding) stats['success'] += 1 else: stats['failed'] += 1 except Exception as e: logger.error(f"Error generating embedding for chunk {chunk.id}: {e}") stats['failed'] += 1 db_session.commit() logger.info(f"Generated embeddings: {stats['success']}/{stats['total']} success") return stats # ============================================================ # STANDALONE FUNCTIONS FOR CRON/CLI # ============================================================ def extract_pending_articles(db_session, limit: int = 50) -> Dict: """ Convenience function for cron jobs. Usage: from zopk_knowledge_service import extract_pending_articles result = extract_pending_articles(db_session, limit=50) """ service = ZOPKKnowledgeService(db_session) return service.batch_extract(limit=limit) def get_knowledge_stats(db_session) -> Dict: """ Get knowledge extraction statistics for monitoring. """ service = ZOPKKnowledgeService(db_session) return service.get_extraction_statistics()