nordabiz/zopk_knowledge_service.py
Maciej Pienczyn 1b4cd31c41 feat(zopk): Knowledge Base + NordaGPT integration (FAZY 0-3)
FAZA 0 - Web Scraping:
- Migracja 015: pola full_content, scrape_status w zopk_news
- zopk_content_scraper.py: scraper z rate limiting i selektorami

FAZA 1 - Knowledge Extraction:
- zopk_knowledge_service.py: chunking, facts, entities extraction
- Endpointy /admin/zopk/knowledge/extract

FAZA 2 - Embeddings:
- gemini_service.py: generate_embedding(), generate_embeddings_batch()
- Model text-embedding-004 (768 dimensions)

FAZA 3 - NordaGPT Integration:
- nordabiz_chat.py: _is_zopk_query(), _get_zopk_knowledge_context()
- System prompt z bazą wiedzy ZOPK
- Semantic search w kontekście chatu

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 20:15:30 +01:00

1040 lines
32 KiB
Python

"""
ZOPK Knowledge Service - Ekstrakcja wiedzy z artykułów dla bazy wiedzy.
Pipeline:
1. Chunking - dzielenie tekstu na fragmenty 500-1000 tokenów
2. AI Extraction - ekstrakcja faktów i encji przez Gemini
3. Entity Linking - identyfikacja i deduplikacja encji
4. Relation Extraction - wykrywanie relacji między encjami
5. Embedding Generation - wektory dla semantic search (FAZA 2)
Usage:
from zopk_knowledge_service import ZOPKKnowledgeService
service = ZOPKKnowledgeService(db_session)
result = service.extract_from_news(news_id=123)
# lub batch:
result = service.batch_extract(limit=50)
"""
import re
import json
import logging
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, field
from database import (
ZOPKNews,
ZOPKKnowledgeChunk,
ZOPKKnowledgeEntity,
ZOPKKnowledgeFact,
ZOPKKnowledgeEntityMention,
ZOPKKnowledgeRelation,
)
# Configure logging
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURATION
# ============================================================
# Chunk size settings
MIN_CHUNK_SIZE = 200 # tokens
MAX_CHUNK_SIZE = 1000 # tokens
CHUNK_OVERLAP = 100 # tokens overlap between chunks
APPROX_CHARS_PER_TOKEN = 4 # Polish text approximation
# AI extraction settings
MAX_FACTS_PER_CHUNK = 10
MAX_ENTITIES_PER_CHUNK = 15
# Entity types
ENTITY_TYPES = [
'company', # Firma, organizacja biznesowa
'person', # Osoba
'place', # Miejsce, lokalizacja
'organization', # Instytucja rządowa, NGO
'project', # Projekt, inicjatywa
'technology', # Technologia, produkt
'event', # Wydarzenie, konferencja
]
# Fact types
FACT_TYPES = [
'statistic', # Liczby, dane ilościowe
'event', # Zdarzenie z datą
'statement', # Wypowiedź, deklaracja
'decision', # Decyzja, postanowienie
'milestone', # Kamień milowy projektu
'partnership', # Partnerstwo, współpraca
'investment', # Inwestycja
]
# Relation types
RELATION_TYPES = [
'investor_in', # A inwestuje w B
'partner_of', # A jest partnerem B
'located_in', # A znajduje się w B
'manages', # A zarządza B
'works_for', # A pracuje dla B
'part_of', # A jest częścią B
'cooperates_with', # A współpracuje z B
'produces', # A produkuje B
'supplies_to', # A dostarcza do B
]
# ============================================================
# AI PROMPTS
# ============================================================
EXTRACTION_SYSTEM_PROMPT = """Jesteś ekspertem ds. ekstrakcji wiedzy z artykułów o projekcie Zielony Okręg Przemysłowy Kaszubia (ZOPK).
ZOPK to strategiczny projekt transformacji energetycznej i przemysłowej Pomorza, obejmujący:
- Morską energetykę wiatrową (Baltic Power, Baltica, F.E.W. Baltic)
- Elektrownię jądrową (Lubiatowo-Kopalino, PEJ)
- Inwestycje norweskie Kongsberg w Rumi
- Infrastrukturę: Via Pomerania, S6, Droga Czerwona
- Centra danych, laboratoria wodorowe
Twoim zadaniem jest wyodrębnić z tekstu:
1. FAKTY - konkretne, weryfikowalne informacje (liczby, daty, decyzje)
2. ENCJE - nazwy własne (firmy, osoby, miejsca, projekty)
3. RELACJE - powiązania między encjami"""
EXTRACTION_USER_PROMPT = """Przeanalizuj poniższy fragment artykułu i wyodrębnij strukturalne informacje.
TEKST:
{chunk_text}
ŹRÓDŁO: {source_name} ({published_date})
Zwróć JSON w formacie:
{{
"facts": [
{{
"type": "statistic|event|statement|decision|milestone|partnership|investment",
"subject": "podmiot faktu",
"predicate": "czasownik/relacja",
"object": "dopełnienie/informacja",
"full_text": "pełne zdanie opisujące fakt",
"numeric_value": null lub liczba,
"numeric_unit": null lub "PLN|EUR|MW|jobs|%",
"date_value": null lub "YYYY-MM-DD",
"confidence": 0.0-1.0
}}
],
"entities": [
{{
"type": "company|person|place|organization|project|technology|event",
"name": "nazwa encji",
"description": "krótki opis (1 zdanie)",
"role": "rola w kontekście (np. inwestor, koordynator)"
}}
],
"relations": [
{{
"entity_a": "nazwa encji A",
"entity_b": "nazwa encji B",
"relation": "investor_in|partner_of|located_in|manages|works_for|part_of|cooperates_with|produces|supplies_to",
"description": "opis relacji"
}}
],
"summary": "1-2 zdaniowe podsumowanie fragmentu",
"keywords": ["słowo1", "słowo2", "słowo3"]
}}
ZASADY:
- Wyodrębniaj TYLKO informacje wprost zawarte w tekście
- Nie dodawaj informacji z własnej wiedzy
- Podawaj confidence score dla każdego faktu (1.0 = pewny, 0.5 = prawdopodobny)
- Dla liczb zawsze podawaj jednostkę (PLN, EUR, MW, jobs, %)
- Dla dat używaj formatu YYYY-MM-DD
- Nazwy encji pisz dokładnie jak w tekście"""
# ============================================================
# DATA CLASSES
# ============================================================
@dataclass
class ChunkData:
"""Data for a single chunk."""
content: str
index: int
token_count: int
@dataclass
class ExtractionResult:
"""Result of knowledge extraction from a news article."""
success: bool
news_id: int
chunks_created: int = 0
facts_created: int = 0
entities_created: int = 0
relations_created: int = 0
error: Optional[str] = None
processing_time: float = 0.0
# ============================================================
# CHUNKING FUNCTIONS
# ============================================================
def estimate_tokens(text: str) -> int:
"""Estimate token count for text."""
if not text:
return 0
# Polish text: ~4 chars per token on average
return len(text) // APPROX_CHARS_PER_TOKEN
def split_into_sentences(text: str) -> List[str]:
"""Split text into sentences."""
# Polish sentence boundaries
sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZĄĆĘŁŃÓŚŹŻ])'
sentences = re.split(sentence_pattern, text)
return [s.strip() for s in sentences if s.strip()]
def create_chunks(text: str) -> List[ChunkData]:
"""
Split text into chunks of appropriate size.
Strategy:
- Split by paragraphs first
- Combine small paragraphs
- Split large paragraphs by sentences
- Maintain overlap between chunks
"""
if not text:
return []
chunks = []
current_chunk = []
current_tokens = 0
# Split by paragraphs
paragraphs = text.split('\n\n')
for para in paragraphs:
para = para.strip()
if not para:
continue
para_tokens = estimate_tokens(para)
# If paragraph is too large, split by sentences
if para_tokens > MAX_CHUNK_SIZE:
sentences = split_into_sentences(para)
for sentence in sentences:
sent_tokens = estimate_tokens(sentence)
if current_tokens + sent_tokens > MAX_CHUNK_SIZE:
# Save current chunk
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append(ChunkData(
content=chunk_text,
index=len(chunks),
token_count=current_tokens
))
# Overlap: keep last sentence
current_chunk = [current_chunk[-1]] if current_chunk else []
current_tokens = estimate_tokens(current_chunk[0]) if current_chunk else 0
current_chunk.append(sentence)
current_tokens += sent_tokens
else:
# Add paragraph to current chunk
if current_tokens + para_tokens > MAX_CHUNK_SIZE:
# Save current chunk
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append(ChunkData(
content=chunk_text,
index=len(chunks),
token_count=current_tokens
))
current_chunk = []
current_tokens = 0
current_chunk.append(para)
current_tokens += para_tokens
# Save last chunk
if current_chunk:
chunk_text = '\n\n'.join(current_chunk) if len(current_chunk[0]) > 100 else ' '.join(current_chunk)
if estimate_tokens(chunk_text) >= MIN_CHUNK_SIZE:
chunks.append(ChunkData(
content=chunk_text,
index=len(chunks),
token_count=current_tokens
))
return chunks
# ============================================================
# KNOWLEDGE SERVICE CLASS
# ============================================================
class ZOPKKnowledgeService:
"""
Service for extracting knowledge from ZOPK news articles.
Features:
- Text chunking with overlap
- AI-powered fact extraction (Gemini)
- Named entity recognition
- Relation extraction
- Entity deduplication
"""
def __init__(self, db_session, user_id: Optional[int] = None):
"""
Initialize service.
Args:
db_session: SQLAlchemy database session
user_id: Optional user ID for cost tracking
"""
self.db = db_session
self.user_id = user_id
self._gemini_service = None
@property
def gemini(self):
"""Lazy-load Gemini service."""
if self._gemini_service is None:
from gemini_service import GeminiService
self._gemini_service = GeminiService()
return self._gemini_service
def _normalize_entity_name(self, name: str) -> str:
"""Normalize entity name for deduplication."""
if not name:
return ''
# Lowercase, remove extra spaces
normalized = name.lower().strip()
normalized = re.sub(r'\s+', ' ', normalized)
# Remove Polish diacritics for matching
trans = str.maketrans('ąćęłńóśźżĄĆĘŁŃÓŚŹŻ', 'acelnoszzACELNOSZZ')
normalized = normalized.translate(trans)
return normalized
def _find_or_create_entity(
self,
name: str,
entity_type: str,
description: Optional[str] = None
) -> ZOPKKnowledgeEntity:
"""
Find existing entity or create new one.
Uses normalized name for deduplication.
"""
normalized = self._normalize_entity_name(name)
# Try to find existing
existing = self.db.query(ZOPKKnowledgeEntity).filter(
ZOPKKnowledgeEntity.normalized_name == normalized
).first()
if existing:
# Update mention count
existing.mentions_count = (existing.mentions_count or 0) + 1
existing.last_mentioned_at = datetime.now()
# Update description if better
if description and (not existing.description or len(description) > len(existing.description)):
existing.description = description
return existing
# Create new entity
entity = ZOPKKnowledgeEntity(
entity_type=entity_type,
name=name,
normalized_name=normalized,
description=description,
short_description=description[:500] if description else None,
mentions_count=1,
first_mentioned_at=datetime.now(),
last_mentioned_at=datetime.now()
)
self.db.add(entity)
self.db.flush() # Get ID
return entity
def _extract_with_ai(
self,
chunk: ChunkData,
source_name: str,
published_date: str
) -> Optional[Dict]:
"""
Extract facts, entities, and relations using Gemini AI.
Returns parsed JSON or None on error.
"""
try:
prompt = EXTRACTION_USER_PROMPT.format(
chunk_text=chunk.content,
source_name=source_name,
published_date=published_date
)
response = self.gemini.generate_text(
prompt=prompt,
system_prompt=EXTRACTION_SYSTEM_PROMPT,
temperature=0.1, # Low temperature for consistency
max_tokens=2000,
user_id=self.user_id,
feature='zopk_knowledge_extraction'
)
if not response:
logger.warning("Empty response from Gemini")
return None
# Parse JSON from response
# Handle markdown code blocks
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response)
if json_match:
json_str = json_match.group(1)
else:
json_str = response
# Clean and parse
json_str = json_str.strip()
data = json.loads(json_str)
return data
except json.JSONDecodeError as e:
logger.error(f"JSON parse error: {e}")
return None
except Exception as e:
logger.error(f"AI extraction error: {e}")
return None
def _save_chunk(
self,
news: ZOPKNews,
chunk: ChunkData,
extraction_data: Optional[Dict]
) -> ZOPKKnowledgeChunk:
"""Save chunk to database."""
db_chunk = ZOPKKnowledgeChunk(
source_news_id=news.id,
content=chunk.content,
chunk_index=chunk.index,
token_count=chunk.token_count,
chunk_type='narrative', # Default
summary=extraction_data.get('summary') if extraction_data else None,
keywords=extraction_data.get('keywords') if extraction_data else None,
language='pl',
extraction_model='gemini-2.0-flash',
extracted_at=datetime.now()
)
self.db.add(db_chunk)
self.db.flush()
return db_chunk
def _save_facts(
self,
chunk: ZOPKKnowledgeChunk,
news: ZOPKNews,
facts_data: List[Dict]
) -> int:
"""Save extracted facts to database."""
count = 0
for fact in facts_data[:MAX_FACTS_PER_CHUNK]:
try:
# Parse numeric value
numeric_value = fact.get('numeric_value')
if numeric_value is not None:
try:
numeric_value = float(numeric_value)
except (ValueError, TypeError):
numeric_value = None
# Parse date
date_value = fact.get('date_value')
if date_value:
try:
from datetime import datetime as dt
date_value = dt.strptime(date_value, '%Y-%m-%d').date()
except (ValueError, TypeError):
date_value = None
db_fact = ZOPKKnowledgeFact(
source_chunk_id=chunk.id,
source_news_id=news.id,
fact_type=fact.get('type', 'statement'),
subject=fact.get('subject'),
predicate=fact.get('predicate'),
object=fact.get('object'),
full_text=fact.get('full_text', ''),
numeric_value=numeric_value,
numeric_unit=fact.get('numeric_unit'),
date_value=date_value,
confidence_score=fact.get('confidence', 0.5)
)
self.db.add(db_fact)
count += 1
except Exception as e:
logger.error(f"Error saving fact: {e}")
continue
return count
def _save_entities_and_relations(
self,
chunk: ZOPKKnowledgeChunk,
entities_data: List[Dict],
relations_data: List[Dict]
) -> Tuple[int, int]:
"""Save entities and relations to database."""
entity_count = 0
relation_count = 0
# Map of name -> entity for relations
entity_map = {}
# Save entities
for ent in entities_data[:MAX_ENTITIES_PER_CHUNK]:
try:
entity = self._find_or_create_entity(
name=ent.get('name', ''),
entity_type=ent.get('type', 'organization'),
description=ent.get('description')
)
entity_map[ent.get('name', '').lower()] = entity
# Create mention link
mention = ZOPKKnowledgeEntityMention(
chunk_id=chunk.id,
entity_id=entity.id,
mention_text=ent.get('name'),
mention_type='direct',
role_in_context=ent.get('role'),
confidence=0.9
)
self.db.add(mention)
entity_count += 1
except Exception as e:
logger.error(f"Error saving entity: {e}")
continue
# Flush to get entity IDs
self.db.flush()
# Save relations
for rel in relations_data:
try:
entity_a_name = rel.get('entity_a', '').lower()
entity_b_name = rel.get('entity_b', '').lower()
entity_a = entity_map.get(entity_a_name)
entity_b = entity_map.get(entity_b_name)
if not entity_a or not entity_b:
continue
# Check if relation already exists
existing = self.db.query(ZOPKKnowledgeRelation).filter(
ZOPKKnowledgeRelation.entity_a_id == entity_a.id,
ZOPKKnowledgeRelation.entity_b_id == entity_b.id,
ZOPKKnowledgeRelation.relation_type == rel.get('relation')
).first()
if not existing:
db_relation = ZOPKKnowledgeRelation(
entity_a_id=entity_a.id,
entity_b_id=entity_b.id,
relation_type=rel.get('relation', 'cooperates_with'),
evidence_text=rel.get('description'),
source_chunk_id=chunk.id,
confidence=0.8,
strength=3
)
self.db.add(db_relation)
relation_count += 1
except Exception as e:
logger.error(f"Error saving relation: {e}")
continue
return entity_count, relation_count
def extract_from_news(self, news_id: int) -> ExtractionResult:
"""
Extract knowledge from a single news article.
Args:
news_id: ID of ZOPKNews record
Returns:
ExtractionResult with statistics
"""
import time
start_time = time.time()
# Get news record
news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first()
if not news:
return ExtractionResult(
success=False,
news_id=news_id,
error=f"News record {news_id} not found"
)
# Check if already extracted
if news.knowledge_extracted:
return ExtractionResult(
success=True,
news_id=news_id,
error="Already extracted"
)
# Check if content is scraped
if not news.full_content:
return ExtractionResult(
success=False,
news_id=news_id,
error="No content scraped"
)
logger.info(f"Extracting knowledge from news {news_id}: {news.title[:50]}...")
# Create chunks
chunks = create_chunks(news.full_content)
if not chunks:
return ExtractionResult(
success=False,
news_id=news_id,
error="No chunks created"
)
# Statistics
chunks_created = 0
facts_created = 0
entities_created = 0
relations_created = 0
# Process each chunk
for chunk in chunks:
try:
# Extract with AI
extraction_data = self._extract_with_ai(
chunk=chunk,
source_name=news.source_name or 'unknown',
published_date=news.published_at.strftime('%Y-%m-%d') if news.published_at else 'unknown'
)
# Save chunk
db_chunk = self._save_chunk(news, chunk, extraction_data)
chunks_created += 1
if extraction_data:
# Save facts
facts_created += self._save_facts(
db_chunk,
news,
extraction_data.get('facts', [])
)
# Save entities and relations
ent_count, rel_count = self._save_entities_and_relations(
db_chunk,
extraction_data.get('entities', []),
extraction_data.get('relations', [])
)
entities_created += ent_count
relations_created += rel_count
except Exception as e:
logger.error(f"Error processing chunk {chunk.index}: {e}")
continue
# Mark as extracted
news.knowledge_extracted = True
news.knowledge_extracted_at = datetime.now()
self.db.commit()
processing_time = time.time() - start_time
logger.info(
f"Extracted from news {news_id}: "
f"{chunks_created} chunks, {facts_created} facts, "
f"{entities_created} entities, {relations_created} relations "
f"in {processing_time:.2f}s"
)
return ExtractionResult(
success=True,
news_id=news_id,
chunks_created=chunks_created,
facts_created=facts_created,
entities_created=entities_created,
relations_created=relations_created,
processing_time=processing_time
)
def batch_extract(self, limit: int = 50) -> Dict:
"""
Batch extract knowledge from scraped articles.
Args:
limit: Maximum number of articles to process
Returns:
Dict with statistics
"""
import time
logger.info(f"Starting batch extraction: limit={limit}")
# Find articles ready for extraction
articles = self.db.query(ZOPKNews).filter(
ZOPKNews.status.in_(['approved', 'auto_approved']),
ZOPKNews.scrape_status == 'scraped',
ZOPKNews.knowledge_extracted == False
).order_by(
ZOPKNews.created_at.desc()
).limit(limit).all()
stats = {
'total': len(articles),
'success': 0,
'failed': 0,
'chunks_created': 0,
'facts_created': 0,
'entities_created': 0,
'relations_created': 0,
'errors': [],
'processing_time': 0
}
start_time = time.time()
for article in articles:
result = self.extract_from_news(article.id)
if result.success:
stats['success'] += 1
stats['chunks_created'] += result.chunks_created
stats['facts_created'] += result.facts_created
stats['entities_created'] += result.entities_created
stats['relations_created'] += result.relations_created
else:
stats['failed'] += 1
if result.error:
stats['errors'].append({
'id': article.id,
'title': article.title[:100],
'error': result.error
})
stats['processing_time'] = round(time.time() - start_time, 2)
logger.info(
f"Batch extraction complete: {stats['success']}/{stats['total']} success "
f"in {stats['processing_time']}s"
)
return stats
def get_extraction_statistics(self) -> Dict:
"""Get knowledge extraction statistics."""
from sqlalchemy import func
# Articles stats
total_approved = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.status.in_(['approved', 'auto_approved'])
).scalar()
scraped = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.scrape_status == 'scraped'
).scalar()
extracted = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.knowledge_extracted == True
).scalar()
pending = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.scrape_status == 'scraped',
ZOPKNews.knowledge_extracted == False
).scalar()
# Knowledge base stats
chunks_count = self.db.query(func.count(ZOPKKnowledgeChunk.id)).scalar()
facts_count = self.db.query(func.count(ZOPKKnowledgeFact.id)).scalar()
entities_count = self.db.query(func.count(ZOPKKnowledgeEntity.id)).scalar()
relations_count = self.db.query(func.count(ZOPKKnowledgeRelation.id)).scalar()
# Top entities by mentions
top_entities = self.db.query(
ZOPKKnowledgeEntity.name,
ZOPKKnowledgeEntity.entity_type,
ZOPKKnowledgeEntity.mentions_count
).order_by(
ZOPKKnowledgeEntity.mentions_count.desc()
).limit(10).all()
return {
'articles': {
'total_approved': total_approved or 0,
'scraped': scraped or 0,
'extracted': extracted or 0,
'pending': pending or 0
},
'knowledge_base': {
'chunks': chunks_count or 0,
'facts': facts_count or 0,
'entities': entities_count or 0,
'relations': relations_count or 0
},
'top_entities': [
{'name': e[0], 'type': e[1], 'mentions': e[2]}
for e in top_entities
]
}
# ============================================================
# SEMANTIC SEARCH (FAZA 2)
# ============================================================
def search_knowledge(
db_session,
query: str,
limit: int = 5,
min_similarity: float = 0.3,
user_id: Optional[int] = None
) -> List[Dict]:
"""
Semantic search in ZOPK knowledge base.
Args:
db_session: SQLAlchemy session
query: Search query
limit: Max results to return
min_similarity: Minimum cosine similarity (0-1)
user_id: User ID for cost tracking
Returns:
List of matching chunks with similarity scores
"""
from gemini_service import GeminiService
import json
# Generate query embedding
gemini = GeminiService()
query_embedding = gemini.generate_embedding(
text=query,
task_type='retrieval_query',
user_id=user_id,
feature='zopk_knowledge_search'
)
if not query_embedding:
logger.warning("Failed to generate query embedding")
return []
# Search in database
# Note: This uses PostgreSQL pgvector for efficient similarity search
# For now, we'll do a simpler approach with JSON embeddings
chunks = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.embedding.isnot(None)
).all()
results = []
for chunk in chunks:
try:
# Parse stored embedding
if isinstance(chunk.embedding, str):
chunk_embedding = json.loads(chunk.embedding)
else:
chunk_embedding = chunk.embedding
if not chunk_embedding:
continue
# Calculate cosine similarity
similarity = _cosine_similarity(query_embedding, chunk_embedding)
if similarity >= min_similarity:
results.append({
'chunk_id': chunk.id,
'content': chunk.content[:500],
'summary': chunk.summary,
'keywords': chunk.keywords,
'similarity': round(similarity, 4),
'source_news_id': chunk.source_news_id,
'importance': chunk.importance_score
})
except Exception as e:
logger.error(f"Error processing chunk {chunk.id}: {e}")
continue
# Sort by similarity
results.sort(key=lambda x: x['similarity'], reverse=True)
return results[:limit]
def _cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
"""Calculate cosine similarity between two vectors."""
import math
if len(vec1) != len(vec2):
return 0.0
dot_product = sum(a * b for a, b in zip(vec1, vec2))
norm1 = math.sqrt(sum(a * a for a in vec1))
norm2 = math.sqrt(sum(b * b for b in vec2))
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def get_relevant_facts(
db_session,
query: str,
limit: int = 10
) -> List[Dict]:
"""
Get facts relevant to a query.
Uses keyword matching for now, can be enhanced with embeddings.
"""
from sqlalchemy import or_
# Simple keyword search
keywords = query.lower().split()
facts = db_session.query(ZOPKKnowledgeFact).filter(
ZOPKKnowledgeFact.is_verified == True
).all()
results = []
for fact in facts:
score = 0
fact_text = (fact.full_text or '').lower()
for keyword in keywords:
if keyword in fact_text:
score += 1
if score > 0:
results.append({
'fact_id': fact.id,
'fact_type': fact.fact_type,
'full_text': fact.full_text,
'subject': fact.subject,
'numeric_value': float(fact.numeric_value) if fact.numeric_value else None,
'numeric_unit': fact.numeric_unit,
'confidence': float(fact.confidence_score) if fact.confidence_score else None,
'relevance_score': score
})
# Sort by relevance
results.sort(key=lambda x: x['relevance_score'], reverse=True)
return results[:limit]
def generate_chunk_embeddings(db_session, limit: int = 100, user_id: Optional[int] = None) -> Dict:
"""
Generate embeddings for chunks that don't have them.
Args:
db_session: SQLAlchemy session
limit: Max chunks to process
user_id: User ID for cost tracking
Returns:
Dict with statistics
"""
import json
from gemini_service import GeminiService
gemini = GeminiService()
# Find chunks without embeddings
chunks = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.embedding.is_(None)
).limit(limit).all()
stats = {
'total': len(chunks),
'success': 0,
'failed': 0
}
for chunk in chunks:
try:
embedding = gemini.generate_embedding(
text=chunk.content,
task_type='retrieval_document',
title=chunk.summary,
user_id=user_id,
feature='zopk_chunk_embedding'
)
if embedding:
# Store as JSON string
chunk.embedding = json.dumps(embedding)
stats['success'] += 1
else:
stats['failed'] += 1
except Exception as e:
logger.error(f"Error generating embedding for chunk {chunk.id}: {e}")
stats['failed'] += 1
db_session.commit()
logger.info(f"Generated embeddings: {stats['success']}/{stats['total']} success")
return stats
# ============================================================
# STANDALONE FUNCTIONS FOR CRON/CLI
# ============================================================
def extract_pending_articles(db_session, limit: int = 50) -> Dict:
"""
Convenience function for cron jobs.
Usage:
from zopk_knowledge_service import extract_pending_articles
result = extract_pending_articles(db_session, limit=50)
"""
service = ZOPKKnowledgeService(db_session)
return service.batch_extract(limit=limit)
def get_knowledge_stats(db_session) -> Dict:
"""
Get knowledge extraction statistics for monitoring.
"""
service = ZOPKKnowledgeService(db_session)
return service.get_extraction_statistics()