nordabiz/zopk_knowledge_service.py
Maciej Pienczyn d0dda10bd7 feat(zopk): Panel admina bazy wiedzy, poprawa odpowiedzi AI, timeline
Priorytet 1 - Panel admina bazy wiedzy ZOPK:
- /admin/zopk/knowledge - dashboard ze statystykami
- /admin/zopk/knowledge/chunks - lista chunks z filtrowaniem
- /admin/zopk/knowledge/facts - lista faktów z typami
- /admin/zopk/knowledge/entities - lista encji z mentions
- CRUD operacje: weryfikacja, usuwanie

Priorytet 2 - Poprawa jakości odpowiedzi NordaGPT:
- Linki markdown do źródeł w kontekście ZOPK
- Ulepszone formatowanie (bold, listy, nagłówki)
- Sekcja "Źródła" na końcu odpowiedzi
- Instrukcje w system prompt dla lepszej prezentacji

Priorytet 3 - Timeline ZOPK:
- Model ZOPKMilestone w database.py
- Migracja 016_zopk_milestones.sql z sample data
- Sekcja "Roadmapa ZOPK" na stronie /zopk
- Pionowa oś czasu z markerami lat
- Statusy: completed, in_progress, planned, delayed

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 08:56:55 +01:00

1656 lines
54 KiB
Python

"""
ZOPK Knowledge Service - Ekstrakcja wiedzy z artykułów dla bazy wiedzy.
Pipeline:
1. Chunking - dzielenie tekstu na fragmenty 500-1000 tokenów
2. AI Extraction - ekstrakcja faktów i encji przez Gemini
3. Entity Linking - identyfikacja i deduplikacja encji
4. Relation Extraction - wykrywanie relacji między encjami
5. Embedding Generation - wektory dla semantic search (FAZA 2)
Usage:
from zopk_knowledge_service import ZOPKKnowledgeService
service = ZOPKKnowledgeService(db_session)
result = service.extract_from_news(news_id=123)
# lub batch:
result = service.batch_extract(limit=50)
"""
import re
import json
import logging
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass, field
# Import progress tracking from scraper
from zopk_content_scraper import ProgressUpdate, ProgressCallback
from database import (
ZOPKNews,
ZOPKKnowledgeChunk,
ZOPKKnowledgeEntity,
ZOPKKnowledgeFact,
ZOPKKnowledgeEntityMention,
ZOPKKnowledgeRelation,
)
# Configure logging
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURATION
# ============================================================
# Chunk size settings
# NOTE: Reduced from 1000 to 500 tokens due to Gemini safety filter issues
# Long texts (~4000 chars) trigger safety blocks, ~2000 chars work reliably
MIN_CHUNK_SIZE = 200 # tokens
MAX_CHUNK_SIZE = 500 # tokens (~2000 chars for Polish text)
CHUNK_OVERLAP = 50 # tokens overlap between chunks (reduced proportionally)
APPROX_CHARS_PER_TOKEN = 4 # Polish text approximation
# AI extraction settings
MAX_FACTS_PER_CHUNK = 10
MAX_ENTITIES_PER_CHUNK = 15
# Entity types
ENTITY_TYPES = [
'company', # Firma, organizacja biznesowa
'person', # Osoba
'place', # Miejsce, lokalizacja
'organization', # Instytucja rządowa, NGO
'project', # Projekt, inicjatywa
'technology', # Technologia, produkt
'event', # Wydarzenie, konferencja
]
# Fact types
FACT_TYPES = [
'statistic', # Liczby, dane ilościowe
'event', # Zdarzenie z datą
'statement', # Wypowiedź, deklaracja
'decision', # Decyzja, postanowienie
'milestone', # Kamień milowy projektu
'partnership', # Partnerstwo, współpraca
'investment', # Inwestycja
]
# Relation types
RELATION_TYPES = [
'investor_in', # A inwestuje w B
'partner_of', # A jest partnerem B
'located_in', # A znajduje się w B
'manages', # A zarządza B
'works_for', # A pracuje dla B
'part_of', # A jest częścią B
'cooperates_with', # A współpracuje z B
'produces', # A produkuje B
'supplies_to', # A dostarcza do B
]
# ============================================================
# AI PROMPTS
# ============================================================
# Ultra-simplified prompt to avoid Gemini safety filter issues
# Note: Complex JSON schemas with pipe characters were triggering filters
# Note: max_tokens parameter also triggers filters - don't use it!
EXTRACTION_USER_PROMPT = """Przeanalizuj artykuł i wyodrębnij informacje w formacie JSON.
ARTYKUŁ:
{chunk_text}
Zwróć JSON z następującą strukturą:
{{
"facts": [
{{"text": "pełny fakt", "type": "investment"}}
],
"entities": [
{{"name": "Nazwa", "type": "company"}}
],
"summary": "krótkie podsumowanie"
}}
Typy faktów: investment, decision, event, statistic, partnership, milestone
Typy encji: company, person, place, organization, project"""
# System prompt is now empty - the user prompt contains all necessary instructions
EXTRACTION_SYSTEM_PROMPT = ""
# ============================================================
# DATA CLASSES
# ============================================================
@dataclass
class ChunkData:
"""Data for a single chunk."""
content: str
index: int
token_count: int
@dataclass
class ExtractionResult:
"""Result of knowledge extraction from a news article."""
success: bool
news_id: int
chunks_created: int = 0
facts_created: int = 0
entities_created: int = 0
relations_created: int = 0
error: Optional[str] = None
processing_time: float = 0.0
# ============================================================
# CHUNKING FUNCTIONS
# ============================================================
def estimate_tokens(text: str) -> int:
"""Estimate token count for text."""
if not text:
return 0
# Polish text: ~4 chars per token on average
return len(text) // APPROX_CHARS_PER_TOKEN
def split_into_sentences(text: str) -> List[str]:
"""Split text into sentences."""
# Polish sentence boundaries
sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZĄĆĘŁŃÓŚŹŻ])'
sentences = re.split(sentence_pattern, text)
return [s.strip() for s in sentences if s.strip()]
def create_chunks(text: str) -> List[ChunkData]:
"""
Split text into chunks of appropriate size.
Strategy:
- Split by paragraphs first
- Combine small paragraphs
- Split large paragraphs by sentences
- Maintain overlap between chunks
"""
if not text:
return []
chunks = []
current_chunk = []
current_tokens = 0
# Split by paragraphs
paragraphs = text.split('\n\n')
for para in paragraphs:
para = para.strip()
if not para:
continue
para_tokens = estimate_tokens(para)
# If paragraph is too large, split by sentences
if para_tokens > MAX_CHUNK_SIZE:
sentences = split_into_sentences(para)
for sentence in sentences:
sent_tokens = estimate_tokens(sentence)
if current_tokens + sent_tokens > MAX_CHUNK_SIZE:
# Save current chunk
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append(ChunkData(
content=chunk_text,
index=len(chunks),
token_count=current_tokens
))
# Overlap: keep last sentence
current_chunk = [current_chunk[-1]] if current_chunk else []
current_tokens = estimate_tokens(current_chunk[0]) if current_chunk else 0
current_chunk.append(sentence)
current_tokens += sent_tokens
else:
# Add paragraph to current chunk
if current_tokens + para_tokens > MAX_CHUNK_SIZE:
# Save current chunk
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append(ChunkData(
content=chunk_text,
index=len(chunks),
token_count=current_tokens
))
current_chunk = []
current_tokens = 0
current_chunk.append(para)
current_tokens += para_tokens
# Save last chunk
if current_chunk:
chunk_text = '\n\n'.join(current_chunk) if len(current_chunk[0]) > 100 else ' '.join(current_chunk)
if estimate_tokens(chunk_text) >= MIN_CHUNK_SIZE:
chunks.append(ChunkData(
content=chunk_text,
index=len(chunks),
token_count=current_tokens
))
return chunks
# ============================================================
# KNOWLEDGE SERVICE CLASS
# ============================================================
class ZOPKKnowledgeService:
"""
Service for extracting knowledge from ZOPK news articles.
Features:
- Text chunking with overlap
- AI-powered fact extraction (Gemini)
- Named entity recognition
- Relation extraction
- Entity deduplication
"""
def __init__(self, db_session, user_id: Optional[int] = None):
"""
Initialize service.
Args:
db_session: SQLAlchemy database session
user_id: Optional user ID for cost tracking
"""
self.db = db_session
self.user_id = user_id
self._gemini_service = None
@property
def gemini(self):
"""Lazy-load Gemini service."""
if self._gemini_service is None:
from gemini_service import GeminiService
self._gemini_service = GeminiService()
return self._gemini_service
def _normalize_entity_name(self, name: str) -> str:
"""Normalize entity name for deduplication."""
if not name:
return ''
# Lowercase, remove extra spaces
normalized = name.lower().strip()
normalized = re.sub(r'\s+', ' ', normalized)
# Remove Polish diacritics for matching
trans = str.maketrans('ąćęłńóśźżĄĆĘŁŃÓŚŹŻ', 'acelnoszzACELNOSZZ')
normalized = normalized.translate(trans)
return normalized
def _find_or_create_entity(
self,
name: str,
entity_type: str,
description: Optional[str] = None
) -> ZOPKKnowledgeEntity:
"""
Find existing entity or create new one.
Uses normalized name for deduplication.
"""
normalized = self._normalize_entity_name(name)
# Try to find existing
existing = self.db.query(ZOPKKnowledgeEntity).filter(
ZOPKKnowledgeEntity.normalized_name == normalized
).first()
if existing:
# Update mention count
existing.mentions_count = (existing.mentions_count or 0) + 1
existing.last_mentioned_at = datetime.now()
# Update description if better
if description and (not existing.description or len(description) > len(existing.description)):
existing.description = description
return existing
# Create new entity
entity = ZOPKKnowledgeEntity(
entity_type=entity_type,
name=name,
normalized_name=normalized,
description=description,
short_description=description[:500] if description else None,
mentions_count=1,
first_mentioned_at=datetime.now(),
last_mentioned_at=datetime.now()
)
self.db.add(entity)
self.db.flush() # Get ID
return entity
def _extract_with_ai(
self,
chunk: ChunkData,
source_name: str,
published_date: str
) -> Optional[Dict]:
"""
Extract facts, entities, and relations using Gemini AI.
Returns parsed JSON or None on error.
"""
try:
# Truncate chunk to avoid Gemini safety filter issues with long texts
# Testing showed ~4000 chars triggers safety blocks, ~2000 chars works
MAX_PROMPT_CHARS = 2000
chunk_text = chunk.content[:MAX_PROMPT_CHARS]
if len(chunk.content) > MAX_PROMPT_CHARS:
logger.debug(f"Truncated chunk from {len(chunk.content)} to {MAX_PROMPT_CHARS} chars")
# Simplified single prompt (system prompt removed to avoid safety filter issues)
prompt = EXTRACTION_USER_PROMPT.format(
chunk_text=chunk_text,
source_name=source_name,
published_date=published_date
)
# NOTE: max_tokens removed - testing showed it triggers safety filters!
response = self.gemini.generate_text(
prompt=prompt,
temperature=0.1, # Low temperature for consistency
user_id=self.user_id,
feature='zopk_knowledge_extraction'
)
if not response:
logger.warning("Empty response from Gemini")
return None
# Parse JSON from response
# Handle markdown code blocks
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response)
if json_match:
json_str = json_match.group(1)
else:
json_str = response
# Clean and parse
json_str = json_str.strip()
data = json.loads(json_str)
return data
except json.JSONDecodeError as e:
logger.error(f"JSON parse error: {e}")
return None
except Exception as e:
logger.error(f"AI extraction error: {e}")
return None
def _save_chunk(
self,
news: ZOPKNews,
chunk: ChunkData,
extraction_data: Optional[Dict]
) -> ZOPKKnowledgeChunk:
"""Save chunk to database."""
db_chunk = ZOPKKnowledgeChunk(
source_news_id=news.id,
content=chunk.content,
chunk_index=chunk.index,
token_count=chunk.token_count,
chunk_type='narrative', # Default
summary=extraction_data.get('summary') if extraction_data else None,
keywords=extraction_data.get('keywords') if extraction_data else None,
language='pl',
extraction_model='gemini-2.0-flash',
extracted_at=datetime.now()
)
self.db.add(db_chunk)
self.db.flush()
return db_chunk
def _save_facts(
self,
chunk: ZOPKKnowledgeChunk,
news: ZOPKNews,
facts_data: List[Dict]
) -> int:
"""Save extracted facts to database."""
count = 0
for fact in facts_data[:MAX_FACTS_PER_CHUNK]:
try:
# Parse numeric value
numeric_value = fact.get('numeric_value')
if numeric_value is not None:
try:
numeric_value = float(numeric_value)
except (ValueError, TypeError):
numeric_value = None
# Parse date
date_value = fact.get('date_value')
if date_value:
try:
from datetime import datetime as dt
date_value = dt.strptime(date_value, '%Y-%m-%d').date()
except (ValueError, TypeError):
date_value = None
# Support both old format (full_text) and new simplified format (text)
fact_text = fact.get('text') or fact.get('full_text', '')
db_fact = ZOPKKnowledgeFact(
source_chunk_id=chunk.id,
source_news_id=news.id,
fact_type=fact.get('type', 'statement'),
subject=fact.get('subject'),
predicate=fact.get('predicate'),
object=fact.get('object'),
full_text=fact_text,
numeric_value=numeric_value,
numeric_unit=fact.get('numeric_unit'),
date_value=date_value,
confidence_score=fact.get('confidence', 0.5)
)
self.db.add(db_fact)
count += 1
except Exception as e:
logger.error(f"Error saving fact: {e}")
continue
return count
def _save_entities_and_relations(
self,
chunk: ZOPKKnowledgeChunk,
entities_data: List[Dict],
relations_data: List[Dict]
) -> Tuple[int, int]:
"""Save entities and relations to database."""
entity_count = 0
relation_count = 0
# Map of name -> entity for relations
entity_map = {}
# Save entities
for ent in entities_data[:MAX_ENTITIES_PER_CHUNK]:
try:
entity = self._find_or_create_entity(
name=ent.get('name', ''),
entity_type=ent.get('type', 'organization'),
description=ent.get('description')
)
entity_map[ent.get('name', '').lower()] = entity
# Create mention link
mention = ZOPKKnowledgeEntityMention(
chunk_id=chunk.id,
entity_id=entity.id,
mention_text=ent.get('name'),
mention_type='direct',
role_in_context=ent.get('role'),
confidence=0.9
)
self.db.add(mention)
entity_count += 1
except Exception as e:
logger.error(f"Error saving entity: {e}")
continue
# Flush to get entity IDs
self.db.flush()
# Save relations
for rel in relations_data:
try:
entity_a_name = rel.get('entity_a', '').lower()
entity_b_name = rel.get('entity_b', '').lower()
entity_a = entity_map.get(entity_a_name)
entity_b = entity_map.get(entity_b_name)
if not entity_a or not entity_b:
continue
# Check if relation already exists
existing = self.db.query(ZOPKKnowledgeRelation).filter(
ZOPKKnowledgeRelation.entity_a_id == entity_a.id,
ZOPKKnowledgeRelation.entity_b_id == entity_b.id,
ZOPKKnowledgeRelation.relation_type == rel.get('relation')
).first()
if not existing:
db_relation = ZOPKKnowledgeRelation(
entity_a_id=entity_a.id,
entity_b_id=entity_b.id,
relation_type=rel.get('relation', 'cooperates_with'),
evidence_text=rel.get('description'),
source_chunk_id=chunk.id,
confidence=0.8,
strength=3
)
self.db.add(db_relation)
relation_count += 1
except Exception as e:
logger.error(f"Error saving relation: {e}")
continue
return entity_count, relation_count
def extract_from_news(self, news_id: int) -> ExtractionResult:
"""
Extract knowledge from a single news article.
Args:
news_id: ID of ZOPKNews record
Returns:
ExtractionResult with statistics
"""
import time
start_time = time.time()
# Get news record
news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first()
if not news:
return ExtractionResult(
success=False,
news_id=news_id,
error=f"News record {news_id} not found"
)
# Check if already extracted
if news.knowledge_extracted:
return ExtractionResult(
success=True,
news_id=news_id,
error="Already extracted"
)
# Check if content is scraped
if not news.full_content:
return ExtractionResult(
success=False,
news_id=news_id,
error="No content scraped"
)
logger.info(f"Extracting knowledge from news {news_id}: {news.title[:50]}...")
# Create chunks
chunks = create_chunks(news.full_content)
if not chunks:
return ExtractionResult(
success=False,
news_id=news_id,
error="No chunks created"
)
# Statistics
chunks_created = 0
facts_created = 0
entities_created = 0
relations_created = 0
# Process each chunk
for chunk in chunks:
try:
# Extract with AI
extraction_data = self._extract_with_ai(
chunk=chunk,
source_name=news.source_name or 'unknown',
published_date=news.published_at.strftime('%Y-%m-%d') if news.published_at else 'unknown'
)
# Save chunk
db_chunk = self._save_chunk(news, chunk, extraction_data)
chunks_created += 1
if extraction_data:
# Save facts
facts_created += self._save_facts(
db_chunk,
news,
extraction_data.get('facts', [])
)
# Save entities and relations
ent_count, rel_count = self._save_entities_and_relations(
db_chunk,
extraction_data.get('entities', []),
extraction_data.get('relations', [])
)
entities_created += ent_count
relations_created += rel_count
except Exception as e:
logger.error(f"Error processing chunk {chunk.index}: {e}")
continue
# Mark as extracted
news.knowledge_extracted = True
news.knowledge_extracted_at = datetime.now()
self.db.commit()
processing_time = time.time() - start_time
logger.info(
f"Extracted from news {news_id}: "
f"{chunks_created} chunks, {facts_created} facts, "
f"{entities_created} entities, {relations_created} relations "
f"in {processing_time:.2f}s"
)
return ExtractionResult(
success=True,
news_id=news_id,
chunks_created=chunks_created,
facts_created=facts_created,
entities_created=entities_created,
relations_created=relations_created,
processing_time=processing_time
)
def batch_extract(self, limit: int = 50, progress_callback: ProgressCallback = None) -> Dict:
"""
Batch extract knowledge from scraped articles.
Args:
limit: Maximum number of articles to process
progress_callback: Optional callback for progress updates
Returns:
Dict with statistics
"""
import time
logger.info(f"Starting batch extraction: limit={limit}")
# Find articles ready for extraction
articles = self.db.query(ZOPKNews).filter(
ZOPKNews.status.in_(['approved', 'auto_approved']),
ZOPKNews.scrape_status == 'scraped',
ZOPKNews.knowledge_extracted == False
).order_by(
ZOPKNews.created_at.desc()
).limit(limit).all()
total = len(articles)
stats = {
'total': total,
'success': 0,
'failed': 0,
'chunks_created': 0,
'facts_created': 0,
'entities_created': 0,
'relations_created': 0,
'errors': [],
'processing_time': 0
}
# Send initial progress
if progress_callback and total > 0:
progress_callback(ProgressUpdate(
current=0,
total=total,
percent=0.0,
stage='extracting',
status='processing',
message=f'Rozpoczynam ekstrakcję wiedzy z {total} artykułów...',
details={'success': 0, 'failed': 0, 'chunks': 0, 'facts': 0, 'entities': 0}
))
start_time = time.time()
for idx, article in enumerate(articles, 1):
# Send progress update before processing
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round((idx - 1) / total * 100, 1),
stage='extracting',
status='processing',
message=f'Analizuję przez AI: {article.title[:50]}...',
article_id=article.id,
article_title=article.title[:80],
details={
'success': stats['success'],
'failed': stats['failed'],
'chunks': stats['chunks_created'],
'facts': stats['facts_created'],
'entities': stats['entities_created']
}
))
result = self.extract_from_news(article.id)
if result.success:
stats['success'] += 1
stats['chunks_created'] += result.chunks_created
stats['facts_created'] += result.facts_created
stats['entities_created'] += result.entities_created
stats['relations_created'] += result.relations_created
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='extracting',
status='success',
message=f'✓ Wyekstrahowano: {result.chunks_created} chunks, {result.facts_created} faktów, {result.entities_created} encji',
article_id=article.id,
article_title=article.title[:80],
details={
'success': stats['success'],
'failed': stats['failed'],
'chunks': stats['chunks_created'],
'facts': stats['facts_created'],
'entities': stats['entities_created'],
'new_chunks': result.chunks_created,
'new_facts': result.facts_created,
'new_entities': result.entities_created
}
))
else:
stats['failed'] += 1
if result.error:
stats['errors'].append({
'id': article.id,
'title': article.title[:100],
'error': result.error
})
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='extracting',
status='failed',
message=f'✗ Błąd ekstrakcji: {result.error[:50]}...' if result.error else '✗ Błąd',
article_id=article.id,
article_title=article.title[:80],
details={
'success': stats['success'],
'failed': stats['failed'],
'error': result.error
}
))
stats['processing_time'] = round(time.time() - start_time, 2)
# Send completion progress
if progress_callback:
progress_callback(ProgressUpdate(
current=total,
total=total,
percent=100.0,
stage='extracting',
status='complete',
message=f'Zakończono: {stats["success"]}/{total} artykułów. '
f'Utworzono: {stats["chunks_created"]} chunks, {stats["facts_created"]} faktów, '
f'{stats["entities_created"]} encji',
details={
'success': stats['success'],
'failed': stats['failed'],
'chunks': stats['chunks_created'],
'facts': stats['facts_created'],
'entities': stats['entities_created'],
'relations': stats['relations_created'],
'processing_time': stats['processing_time']
}
))
logger.info(
f"Batch extraction complete: {stats['success']}/{stats['total']} success "
f"in {stats['processing_time']}s"
)
return stats
def get_extraction_statistics(self) -> Dict:
"""Get knowledge extraction statistics."""
from sqlalchemy import func
# Articles stats
total_approved = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.status.in_(['approved', 'auto_approved'])
).scalar()
scraped = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.scrape_status == 'scraped'
).scalar()
# Pending scrape: approved but not yet scraped
pending_scrape = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.status.in_(['approved', 'auto_approved']),
ZOPKNews.scrape_status.in_(['pending', None])
).scalar()
extracted = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.knowledge_extracted == True
).scalar()
pending_extract = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.scrape_status == 'scraped',
ZOPKNews.knowledge_extracted == False
).scalar()
# Knowledge base stats
total_chunks = self.db.query(func.count(ZOPKKnowledgeChunk.id)).scalar()
total_facts = self.db.query(func.count(ZOPKKnowledgeFact.id)).scalar()
total_entities = self.db.query(func.count(ZOPKKnowledgeEntity.id)).scalar()
total_relations = self.db.query(func.count(ZOPKKnowledgeRelation.id)).scalar()
# Embeddings stats
chunks_with_embeddings = self.db.query(func.count(ZOPKKnowledgeChunk.id)).filter(
ZOPKKnowledgeChunk.embedding.isnot(None)
).scalar()
chunks_without_embeddings = (total_chunks or 0) - (chunks_with_embeddings or 0)
# Top entities by mentions
top_entities = self.db.query(
ZOPKKnowledgeEntity.name,
ZOPKKnowledgeEntity.entity_type,
ZOPKKnowledgeEntity.mentions_count
).order_by(
ZOPKKnowledgeEntity.mentions_count.desc()
).limit(10).all()
return {
'articles': {
'total_approved': total_approved or 0,
'scraped': scraped or 0,
'pending_scrape': pending_scrape or 0,
'extracted': extracted or 0,
'pending_extract': pending_extract or 0
},
'knowledge_base': {
'total_chunks': total_chunks or 0,
'total_facts': total_facts or 0,
'total_entities': total_entities or 0,
'total_relations': total_relations or 0,
'chunks_with_embeddings': chunks_with_embeddings or 0,
'chunks_without_embeddings': chunks_without_embeddings or 0
},
'top_entities': [
{'name': e[0], 'type': e[1], 'mentions': e[2]}
for e in top_entities
]
}
# ============================================================
# SEMANTIC SEARCH (FAZA 2)
# ============================================================
def search_knowledge(
db_session,
query: str,
limit: int = 5,
min_similarity: float = 0.3,
user_id: Optional[int] = None
) -> List[Dict]:
"""
Semantic search in ZOPK knowledge base.
Args:
db_session: SQLAlchemy session
query: Search query
limit: Max results to return
min_similarity: Minimum cosine similarity (0-1)
user_id: User ID for cost tracking
Returns:
List of matching chunks with similarity scores
"""
from gemini_service import GeminiService
import json
# Generate query embedding
gemini = GeminiService()
query_embedding = gemini.generate_embedding(
text=query,
task_type='retrieval_query',
user_id=user_id,
feature='zopk_knowledge_search'
)
if not query_embedding:
logger.warning("Failed to generate query embedding")
return []
# Search in database
# Note: This uses PostgreSQL pgvector for efficient similarity search
# For now, we'll do a simpler approach with JSON embeddings
chunks = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.embedding.isnot(None)
).all()
results = []
for chunk in chunks:
try:
# Parse stored embedding
if isinstance(chunk.embedding, str):
chunk_embedding = json.loads(chunk.embedding)
else:
chunk_embedding = chunk.embedding
if not chunk_embedding:
continue
# Calculate cosine similarity
similarity = _cosine_similarity(query_embedding, chunk_embedding)
if similarity >= min_similarity:
results.append({
'chunk_id': chunk.id,
'content': chunk.content[:500],
'summary': chunk.summary,
'keywords': chunk.keywords,
'similarity': round(similarity, 4),
'source_news_id': chunk.source_news_id,
'importance': chunk.importance_score
})
except Exception as e:
logger.error(f"Error processing chunk {chunk.id}: {e}")
continue
# Sort by similarity
results.sort(key=lambda x: x['similarity'], reverse=True)
return results[:limit]
def _cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
"""Calculate cosine similarity between two vectors."""
import math
if len(vec1) != len(vec2):
return 0.0
dot_product = sum(a * b for a, b in zip(vec1, vec2))
norm1 = math.sqrt(sum(a * a for a in vec1))
norm2 = math.sqrt(sum(b * b for b in vec2))
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def get_relevant_facts(
db_session,
query: str,
limit: int = 10
) -> List[Dict]:
"""
Get facts relevant to a query.
Uses keyword matching for now, can be enhanced with embeddings.
"""
from sqlalchemy import or_
# Simple keyword search
keywords = query.lower().split()
# NOTE: Removed is_verified filter - auto-extracted facts are usable
# Future: add manual verification workflow and re-enable filter
facts = db_session.query(ZOPKKnowledgeFact).filter(
ZOPKKnowledgeFact.confidence_score >= 0.3 # Minimum confidence threshold
).all()
results = []
for fact in facts:
score = 0
fact_text = (fact.full_text or '').lower()
for keyword in keywords:
if keyword in fact_text:
score += 1
if score > 0:
# Get source URL from related news
source_url = ''
source_name = ''
source_date = ''
if fact.source_news:
source_url = fact.source_news.url or ''
source_name = fact.source_news.source_name or fact.source_news.source_domain or ''
if fact.source_news.published_at:
source_date = fact.source_news.published_at.strftime('%Y-%m-%d')
results.append({
'fact_id': fact.id,
'fact_type': fact.fact_type,
'full_text': fact.full_text,
'subject': fact.subject,
'numeric_value': float(fact.numeric_value) if fact.numeric_value else None,
'numeric_unit': fact.numeric_unit,
'confidence': float(fact.confidence_score) if fact.confidence_score else None,
'relevance_score': score,
'source_url': source_url,
'source_name': source_name,
'source_date': source_date
})
# Sort by relevance
results.sort(key=lambda x: x['relevance_score'], reverse=True)
return results[:limit]
def generate_chunk_embeddings(
db_session,
limit: int = 100,
user_id: Optional[int] = None,
progress_callback: ProgressCallback = None
) -> Dict:
"""
Generate embeddings for chunks that don't have them.
Args:
db_session: SQLAlchemy session
limit: Max chunks to process
user_id: User ID for cost tracking
progress_callback: Optional callback for progress updates
Returns:
Dict with statistics
"""
import json
import time
from gemini_service import GeminiService
gemini = GeminiService()
# Find chunks without embeddings
chunks = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.embedding.is_(None)
).limit(limit).all()
total = len(chunks)
stats = {
'total': total,
'success': 0,
'failed': 0,
'processing_time': 0
}
# Send initial progress
if progress_callback and total > 0:
progress_callback(ProgressUpdate(
current=0,
total=total,
percent=0.0,
stage='embedding',
status='processing',
message=f'Rozpoczynam generowanie embeddingów dla {total} chunks...',
details={'success': 0, 'failed': 0}
))
start_time = time.time()
for idx, chunk in enumerate(chunks, 1):
# Send progress update before processing
if progress_callback:
# Get article title from chunk's source news
article_title = None
if chunk.source_news:
article_title = chunk.source_news.title[:80]
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round((idx - 1) / total * 100, 1),
stage='embedding',
status='processing',
message=f'Generuję embedding {idx}/{total}: {chunk.summary[:40] if chunk.summary else "chunk"}...',
article_id=chunk.source_news_id,
article_title=article_title,
details={
'success': stats['success'],
'failed': stats['failed'],
'chunk_id': chunk.id
}
))
try:
embedding = gemini.generate_embedding(
text=chunk.content,
task_type='retrieval_document',
title=chunk.summary,
user_id=user_id,
feature='zopk_chunk_embedding'
)
if embedding:
# Store as JSON string
chunk.embedding = json.dumps(embedding)
stats['success'] += 1
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='embedding',
status='success',
message=f'✓ Wygenerowano embedding (768 dim)',
article_id=chunk.source_news_id,
details={
'success': stats['success'],
'failed': stats['failed'],
'chunk_id': chunk.id
}
))
else:
stats['failed'] += 1
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='embedding',
status='failed',
message='✗ Nie udało się wygenerować embeddingu',
article_id=chunk.source_news_id,
details={'success': stats['success'], 'failed': stats['failed']}
))
except Exception as e:
logger.error(f"Error generating embedding for chunk {chunk.id}: {e}")
stats['failed'] += 1
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='embedding',
status='failed',
message=f'✗ Błąd: {str(e)[:50]}...',
article_id=chunk.source_news_id,
details={'success': stats['success'], 'failed': stats['failed'], 'error': str(e)}
))
db_session.commit()
stats['processing_time'] = round(time.time() - start_time, 2)
# Send completion progress
if progress_callback:
progress_callback(ProgressUpdate(
current=total,
total=total,
percent=100.0,
stage='embedding',
status='complete',
message=f'Zakończono: {stats["success"]}/{total} embeddingów wygenerowanych',
details={
'success': stats['success'],
'failed': stats['failed'],
'processing_time': stats['processing_time']
}
))
logger.info(f"Generated embeddings: {stats['success']}/{stats['total']} success")
return stats
# ============================================================
# STANDALONE FUNCTIONS FOR CRON/CLI
# ============================================================
def extract_pending_articles(db_session, limit: int = 50) -> Dict:
"""
Convenience function for cron jobs.
Usage:
from zopk_knowledge_service import extract_pending_articles
result = extract_pending_articles(db_session, limit=50)
"""
service = ZOPKKnowledgeService(db_session)
return service.batch_extract(limit=limit)
def get_knowledge_stats(db_session) -> Dict:
"""
Get knowledge extraction statistics for monitoring.
"""
service = ZOPKKnowledgeService(db_session)
return service.get_extraction_statistics()
# ============================================================
# ADMIN PANEL - LIST FUNCTIONS
# ============================================================
def list_chunks(
db_session,
page: int = 1,
per_page: int = 20,
source_news_id: Optional[int] = None,
has_embedding: Optional[bool] = None,
is_verified: Optional[bool] = None
) -> Dict:
"""
List knowledge chunks with pagination and filtering.
Args:
db_session: Database session
page: Page number (1-based)
per_page: Items per page
source_news_id: Filter by source article
has_embedding: Filter by embedding status
is_verified: Filter by verification status
Returns:
{
'chunks': [...],
'total': int,
'page': int,
'per_page': int,
'pages': int
}
"""
from sqlalchemy import func
query = db_session.query(ZOPKKnowledgeChunk)
# Apply filters
if source_news_id:
query = query.filter(ZOPKKnowledgeChunk.source_news_id == source_news_id)
if has_embedding is not None:
if has_embedding:
query = query.filter(ZOPKKnowledgeChunk.embedding.isnot(None))
else:
query = query.filter(ZOPKKnowledgeChunk.embedding.is_(None))
if is_verified is not None:
query = query.filter(ZOPKKnowledgeChunk.is_verified == is_verified)
# Get total count
total = query.count()
# Calculate pagination
pages = (total + per_page - 1) // per_page
offset = (page - 1) * per_page
# Get chunks with source news info
chunks = query.order_by(
ZOPKKnowledgeChunk.created_at.desc()
).offset(offset).limit(per_page).all()
return {
'chunks': [
{
'id': c.id,
'content': c.content[:300] + '...' if len(c.content) > 300 else c.content,
'full_content': c.content,
'summary': c.summary,
'chunk_type': c.chunk_type,
'chunk_index': c.chunk_index,
'token_count': c.token_count,
'importance_score': c.importance_score,
'confidence_score': float(c.confidence_score) if c.confidence_score else None,
'has_embedding': c.embedding is not None,
'is_verified': c.is_verified,
'source_news_id': c.source_news_id,
'source_title': c.source_news.title if c.source_news else None,
'source_url': c.source_news.url if c.source_news else None,
'created_at': c.created_at.isoformat() if c.created_at else None,
'keywords': c.keywords if isinstance(c.keywords, list) else []
}
for c in chunks
],
'total': total,
'page': page,
'per_page': per_page,
'pages': pages
}
def list_facts(
db_session,
page: int = 1,
per_page: int = 20,
fact_type: Optional[str] = None,
is_verified: Optional[bool] = None,
source_news_id: Optional[int] = None
) -> Dict:
"""
List knowledge facts with pagination and filtering.
Args:
db_session: Database session
page: Page number (1-based)
per_page: Items per page
fact_type: Filter by fact type (statistic, event, statement, decision, milestone)
is_verified: Filter by verification status
source_news_id: Filter by source article
Returns:
{
'facts': [...],
'total': int,
'page': int,
'per_page': int,
'pages': int,
'fact_types': [...] - available types for filtering
}
"""
from sqlalchemy import func, distinct
query = db_session.query(ZOPKKnowledgeFact)
# Apply filters
if fact_type:
query = query.filter(ZOPKKnowledgeFact.fact_type == fact_type)
if is_verified is not None:
query = query.filter(ZOPKKnowledgeFact.is_verified == is_verified)
if source_news_id:
query = query.filter(ZOPKKnowledgeFact.source_news_id == source_news_id)
# Get total count
total = query.count()
# Get available fact types
fact_types = db_session.query(
distinct(ZOPKKnowledgeFact.fact_type)
).filter(
ZOPKKnowledgeFact.fact_type.isnot(None)
).all()
fact_types = [f[0] for f in fact_types if f[0]]
# Calculate pagination
pages = (total + per_page - 1) // per_page
offset = (page - 1) * per_page
# Get facts
facts = query.order_by(
ZOPKKnowledgeFact.created_at.desc()
).offset(offset).limit(per_page).all()
return {
'facts': [
{
'id': f.id,
'fact_type': f.fact_type,
'subject': f.subject,
'predicate': f.predicate,
'object': f.object,
'full_text': f.full_text,
'numeric_value': float(f.numeric_value) if f.numeric_value else None,
'numeric_unit': f.numeric_unit,
'date_value': f.date_value.isoformat() if f.date_value else None,
'confidence_score': float(f.confidence_score) if f.confidence_score else None,
'is_verified': f.is_verified,
'source_news_id': f.source_news_id,
'source_chunk_id': f.source_chunk_id,
'source_title': f.source_news.title if f.source_news else None,
'source_url': f.source_news.url if f.source_news else None,
'entities_involved': f.entities_involved if isinstance(f.entities_involved, list) else [],
'created_at': f.created_at.isoformat() if f.created_at else None
}
for f in facts
],
'total': total,
'page': page,
'per_page': per_page,
'pages': pages,
'fact_types': fact_types
}
def list_entities(
db_session,
page: int = 1,
per_page: int = 20,
entity_type: Optional[str] = None,
is_verified: Optional[bool] = None,
min_mentions: Optional[int] = None
) -> Dict:
"""
List knowledge entities with pagination and filtering.
Args:
db_session: Database session
page: Page number (1-based)
per_page: Items per page
entity_type: Filter by entity type (company, person, place, organization, project, technology)
is_verified: Filter by verification status
min_mentions: Filter by minimum mention count
Returns:
{
'entities': [...],
'total': int,
'page': int,
'per_page': int,
'pages': int,
'entity_types': [...] - available types for filtering
}
"""
from sqlalchemy import func, distinct
query = db_session.query(ZOPKKnowledgeEntity)
# Exclude merged entities
query = query.filter(ZOPKKnowledgeEntity.merged_into_id.is_(None))
# Apply filters
if entity_type:
query = query.filter(ZOPKKnowledgeEntity.entity_type == entity_type)
if is_verified is not None:
query = query.filter(ZOPKKnowledgeEntity.is_verified == is_verified)
if min_mentions:
query = query.filter(ZOPKKnowledgeEntity.mentions_count >= min_mentions)
# Get total count
total = query.count()
# Get available entity types
entity_types = db_session.query(
distinct(ZOPKKnowledgeEntity.entity_type)
).filter(
ZOPKKnowledgeEntity.entity_type.isnot(None)
).all()
entity_types = [e[0] for e in entity_types if e[0]]
# Calculate pagination
pages = (total + per_page - 1) // per_page
offset = (page - 1) * per_page
# Get entities sorted by mentions
entities = query.order_by(
ZOPKKnowledgeEntity.mentions_count.desc()
).offset(offset).limit(per_page).all()
return {
'entities': [
{
'id': e.id,
'name': e.name,
'normalized_name': e.normalized_name,
'entity_type': e.entity_type,
'description': e.description,
'short_description': e.short_description,
'aliases': e.aliases if isinstance(e.aliases, list) else [],
'mentions_count': e.mentions_count or 0,
'is_verified': e.is_verified,
'company_id': e.company_id,
'external_url': e.external_url,
'first_mentioned_at': e.first_mentioned_at.isoformat() if e.first_mentioned_at else None,
'last_mentioned_at': e.last_mentioned_at.isoformat() if e.last_mentioned_at else None,
'created_at': e.created_at.isoformat() if e.created_at else None
}
for e in entities
],
'total': total,
'page': page,
'per_page': per_page,
'pages': pages,
'entity_types': entity_types
}
def get_chunk_detail(db_session, chunk_id: int) -> Optional[Dict]:
"""Get detailed information about a single chunk."""
chunk = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.id == chunk_id
).first()
if not chunk:
return None
# Get facts from this chunk
facts = db_session.query(ZOPKKnowledgeFact).filter(
ZOPKKnowledgeFact.source_chunk_id == chunk_id
).all()
# Get entity mentions
mentions = db_session.query(ZOPKKnowledgeEntityMention).filter(
ZOPKKnowledgeEntityMention.chunk_id == chunk_id
).all()
return {
'id': chunk.id,
'content': chunk.content,
'content_clean': chunk.content_clean,
'summary': chunk.summary,
'chunk_type': chunk.chunk_type,
'chunk_index': chunk.chunk_index,
'token_count': chunk.token_count,
'importance_score': chunk.importance_score,
'confidence_score': float(chunk.confidence_score) if chunk.confidence_score else None,
'has_embedding': chunk.embedding is not None,
'is_verified': chunk.is_verified,
'keywords': chunk.keywords if isinstance(chunk.keywords, list) else [],
'context_date': chunk.context_date.isoformat() if chunk.context_date else None,
'context_location': chunk.context_location,
'extraction_model': chunk.extraction_model,
'extracted_at': chunk.extracted_at.isoformat() if chunk.extracted_at else None,
'created_at': chunk.created_at.isoformat() if chunk.created_at else None,
'source_news': {
'id': chunk.source_news.id,
'title': chunk.source_news.title,
'url': chunk.source_news.url,
'source_name': chunk.source_news.source_name
} if chunk.source_news else None,
'facts': [
{
'id': f.id,
'fact_type': f.fact_type,
'full_text': f.full_text,
'is_verified': f.is_verified
}
for f in facts
],
'entity_mentions': [
{
'id': m.id,
'entity_id': m.entity_id,
'entity_name': m.entity.name if m.entity else None,
'entity_type': m.entity.entity_type if m.entity else None,
'mention_text': m.mention_text
}
for m in mentions
]
}
def update_chunk_verification(db_session, chunk_id: int, is_verified: bool, user_id: int) -> bool:
"""Update chunk verification status."""
chunk = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.id == chunk_id
).first()
if not chunk:
return False
chunk.is_verified = is_verified
chunk.verified_by = user_id
chunk.verified_at = datetime.now()
db_session.commit()
return True
def update_fact_verification(db_session, fact_id: int, is_verified: bool) -> bool:
"""Update fact verification status."""
fact = db_session.query(ZOPKKnowledgeFact).filter(
ZOPKKnowledgeFact.id == fact_id
).first()
if not fact:
return False
fact.is_verified = is_verified
db_session.commit()
return True
def update_entity_verification(db_session, entity_id: int, is_verified: bool) -> bool:
"""Update entity verification status."""
entity = db_session.query(ZOPKKnowledgeEntity).filter(
ZOPKKnowledgeEntity.id == entity_id
).first()
if not entity:
return False
entity.is_verified = is_verified
db_session.commit()
return True
def delete_chunk(db_session, chunk_id: int) -> bool:
"""Delete a chunk and its associated facts and mentions."""
chunk = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.id == chunk_id
).first()
if not chunk:
return False
# Delete associated facts
db_session.query(ZOPKKnowledgeFact).filter(
ZOPKKnowledgeFact.source_chunk_id == chunk_id
).delete()
# Delete associated mentions
db_session.query(ZOPKKnowledgeEntityMention).filter(
ZOPKKnowledgeEntityMention.chunk_id == chunk_id
).delete()
# Delete chunk
db_session.delete(chunk)
db_session.commit()
return True