nordabiz/nordabiz_chat.py
Maciej Pienczyn 819273bb58 feat(chat): Add recommendations and news to AI chat context
- Add CompanyRecommendation and ZOPKNews imports to nordabiz_chat.py
- Fetch approved recommendations (last 20) in conversation context
- Fetch approved news from last 30 days (last 10) in context
- Serialize recommendations and news to JSON in AI prompt
- Update system prompt with data format descriptions
- Update chat template header description
- Add new suggestion chips: "Kto poleca firmę..." and "Co słychać..."

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-13 13:19:23 +01:00

586 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Norda Biznes AI Chat Engine
============================
Multi-turn conversational AI for company directory queries.
Features:
- Answer questions about member companies
- Find companies by service, competency, or need
- Concise, helpful responses
- Full conversation history tracking
- Cost tracking per message
Author: Norda Biznes Development Team
Created: 2025-11-23
"""
import os
import time
from datetime import datetime
from typing import Dict, List, Any, Optional
import google.generativeai as genai
import gemini_service
from search_service import search_companies
from database import (
SessionLocal,
Company,
Category,
Service,
CompanyService,
Competency,
CompanyCompetency,
Certification,
Award,
CompanyEvent,
AIChatConversation,
AIChatMessage,
CompanyRecommendation,
ZOPKNews
)
# Import feedback learning service for few-shot learning
try:
from feedback_learning_service import get_feedback_learning_service
FEEDBACK_LEARNING_AVAILABLE = True
except ImportError:
FEEDBACK_LEARNING_AVAILABLE = False
class NordaBizChatEngine:
"""
AI Chat Assistant for Norda Biznes company directory
Helps users find companies, services, and business partners.
"""
def __init__(self, gemini_api_key: Optional[str] = None, use_global_service: bool = True):
"""
Initialize Norda Biznes Chat Engine
Args:
gemini_api_key: Google Gemini API key (uses env var if not provided)
use_global_service: Use global gemini_service for automatic cost tracking (default: True)
"""
self.use_global_service = use_global_service
if use_global_service:
# Use global gemini_service for automatic cost tracking to ai_api_costs table
self.gemini_service = gemini_service.get_gemini_service()
self.model_name = "gemini-2.5-flash"
self.model = None
# Initialize tokenizer for cost calculation (still needed for per-message tracking)
api_key = gemini_api_key or os.getenv('GOOGLE_GEMINI_API_KEY')
if api_key and api_key != 'TWOJ_KLUCZ_API_TUTAJ':
genai.configure(api_key=api_key)
self.tokenizer = genai.GenerativeModel(self.model_name)
else:
self.tokenizer = None
else:
# Legacy: direct API access (no centralized cost tracking)
api_key = gemini_api_key or os.getenv('GOOGLE_GEMINI_API_KEY')
if not api_key or api_key == 'TWOJ_KLUCZ_API_TUTAJ':
raise ValueError("GOOGLE_GEMINI_API_KEY not found in environment")
genai.configure(api_key=api_key)
self.model_name = "gemini-2.5-flash"
self.model = genai.GenerativeModel(self.model_name)
self.tokenizer = self.model
self.gemini_service = None
def start_conversation(
self,
user_id: int,
title: Optional[str] = None,
conversation_type: str = 'general'
) -> AIChatConversation:
"""
Start new conversation
Args:
user_id: User ID
title: Optional conversation title
conversation_type: Type of conversation (default: 'general')
Returns:
AIChatConversation: New conversation object
"""
db = SessionLocal()
try:
# Auto-generate title if not provided
if not title:
title = f"Rozmowa - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
conversation = AIChatConversation(
user_id=user_id,
started_at=datetime.now(),
conversation_type=conversation_type,
title=title,
is_active=True,
message_count=0,
model_name=self.model_name
)
db.add(conversation)
db.commit()
db.refresh(conversation)
return conversation
finally:
db.close()
def send_message(
self,
conversation_id: int,
user_message: str,
user_id: Optional[int] = None
) -> AIChatMessage:
"""
Send message and get AI response
Args:
conversation_id: Conversation ID
user_message: User's message text
user_id: User ID for cost tracking (optional)
Returns:
AIChatMessage: AI response message
"""
db = SessionLocal()
start_time = time.time()
try:
# Get conversation
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id
).first()
if not conversation:
raise ValueError(f"Conversation {conversation_id} not found")
# Save user message
user_msg = AIChatMessage(
conversation_id=conversation_id,
created_at=datetime.now(),
role='user',
content=user_message,
edited=False,
regenerated=False
)
db.add(user_msg)
db.commit()
# Build context from conversation history and relevant companies
context = self._build_conversation_context(db, conversation, user_message)
# Get AI response with cost tracking
response = self._query_ai(
context,
user_message,
user_id=user_id
)
# Calculate metrics for per-message tracking in AIChatMessage table
latency_ms = int((time.time() - start_time) * 1000)
if self.tokenizer:
input_tokens = self.tokenizer.count_tokens(user_message).total_tokens
output_tokens = self.tokenizer.count_tokens(response).total_tokens
cost_usd = self._calculate_cost(input_tokens, output_tokens)
else:
# Fallback if tokenizer not available
input_tokens = len(user_message.split()) * 2 # Rough estimate
output_tokens = len(response.split()) * 2
cost_usd = 0.0
# Save AI response
ai_msg = AIChatMessage(
conversation_id=conversation_id,
created_at=datetime.now(),
role='assistant',
content=response,
tokens_input=input_tokens,
tokens_output=output_tokens,
cost_usd=cost_usd,
latency_ms=latency_ms,
edited=False,
regenerated=False
)
db.add(ai_msg)
# Update conversation
conversation.message_count += 2
conversation.updated_at = datetime.now()
db.commit()
db.refresh(ai_msg)
return ai_msg
finally:
db.close()
def get_conversation_history(
self,
conversation_id: int
) -> List[Dict[str, Any]]:
"""
Get all messages in conversation
Args:
conversation_id: Conversation ID
Returns:
List of message dicts
"""
db = SessionLocal()
try:
messages = db.query(AIChatMessage).filter_by(
conversation_id=conversation_id
).order_by(AIChatMessage.created_at).all()
return [
{
'id': msg.id,
'role': msg.role,
'content': msg.content,
'created_at': msg.created_at.isoformat(),
'tokens_input': msg.tokens_input,
'tokens_output': msg.tokens_output,
'cost_usd': float(msg.cost_usd) if msg.cost_usd else 0.0,
'latency_ms': msg.latency_ms
}
for msg in messages
]
finally:
db.close()
def _build_conversation_context(
self,
db,
conversation: AIChatConversation,
current_message: str
) -> Dict[str, Any]:
"""
Build context for AI with ALL companies (not pre-filtered)
This allows AI to intelligently select relevant companies instead of
relying on keyword-based search pre-filtering.
Args:
db: Database session
conversation: Current conversation
current_message: User's current message (for reference only)
Returns:
Context dict with ALL companies and categories
"""
# Load ALL active companies - let AI do the intelligent filtering
all_companies = db.query(Company).filter_by(status='active').all()
context = {
'conversation_type': conversation.conversation_type,
'total_companies': len(all_companies)
}
# Get all categories with company counts
categories = db.query(Category).all()
context['categories'] = [
{
'name': cat.name,
'slug': cat.slug,
'company_count': db.query(Company).filter_by(category_id=cat.id, status='active').count()
}
for cat in categories
]
# Include ALL companies in compact format to minimize tokens
# AI will intelligently select the most relevant ones
context['all_companies'] = [
self._company_to_compact_dict(c)
for c in all_companies
]
# Add conversation history (last 10 messages for context)
messages = db.query(AIChatMessage).filter_by(
conversation_id=conversation.id
).order_by(AIChatMessage.created_at.desc()).limit(10).all()
context['recent_messages'] = [
{'role': msg.role, 'content': msg.content}
for msg in reversed(messages)
]
# === ETAP 1: Rekomendacje i Newsy ===
# Add approved recommendations (peer endorsements)
recommendations = db.query(CompanyRecommendation).filter_by(
status='approved'
).order_by(CompanyRecommendation.created_at.desc()).limit(20).all()
context['recommendations'] = [
{
'company': rec.company.name if rec.company else 'Nieznana',
'text': rec.recommendation_text[:200] if rec.recommendation_text else '',
'service': rec.service_category or '',
'author': rec.user.name if rec.user and rec.show_contact else 'Członek Norda Biznes'
}
for rec in recommendations
]
# Add recent approved news (last 30 days)
from datetime import timedelta
news_cutoff = datetime.now() - timedelta(days=30)
recent_news = db.query(ZOPKNews).filter(
ZOPKNews.status == 'approved',
ZOPKNews.published_at >= news_cutoff
).order_by(ZOPKNews.published_at.desc()).limit(10).all()
context['recent_news'] = [
{
'title': news.title[:100] if news.title else '',
'source': news.source_name or '',
'date': news.published_at.strftime('%Y-%m-%d') if news.published_at else '',
'type': news.news_type or 'news'
}
for news in recent_news
]
return context
def _company_to_compact_dict(self, c: Company) -> Dict[str, Any]:
"""
Convert company to compact dictionary for AI context.
Optimized to minimize tokens while keeping all important data.
Args:
c: Company object
Returns:
Compact dict with essential company info
"""
compact = {
'name': c.name,
'cat': c.category.name if c.category else None,
}
# Only include non-empty fields to save tokens
if c.description_short:
compact['desc'] = c.description_short
if c.founding_history:
compact['history'] = c.founding_history # Owners, founders, history
if c.services:
services = [cs.service.name for cs in c.services if cs.service]
if services:
compact['svc'] = services
if c.competencies:
competencies = [cc.competency.name for cc in c.competencies if cc.competency]
if competencies:
compact['comp'] = competencies
if c.website:
compact['web'] = c.website
if c.phone:
compact['tel'] = c.phone
if c.email:
compact['mail'] = c.email
if c.address_city:
compact['city'] = c.address_city
if c.year_established:
compact['year'] = c.year_established
if c.certifications:
certs = [cert.name for cert in c.certifications if cert.is_active]
if certs:
compact['cert'] = certs[:3] # Limit to 3 certs
return compact
# Słownik synonimów i powiązanych terminów dla lepszego wyszukiwania
KEYWORD_SYNONYMS = {
# IT / Web
'strony': ['www', 'web', 'internet', 'witryny', 'seo', 'e-commerce', 'ecommerce', 'sklep', 'portal'],
'internetowe': ['www', 'web', 'online', 'cyfrowe', 'seo', 'marketing'],
'aplikacje': ['software', 'programowanie', 'systemy', 'crm', 'erp', 'app'],
'it': ['informatyka', 'komputery', 'software', 'systemy', 'serwis'],
'programowanie': ['software', 'kod', 'developer', 'aplikacje'],
# Budownictwo
'budowa': ['budownictwo', 'konstrukcje', 'remonty', 'wykończenia', 'dach', 'elewacja'],
'dom': ['budynek', 'mieszkanie', 'nieruchomości', 'budownictwo'],
'remont': ['wykończenie', 'naprawa', 'renowacja', 'modernizacja'],
# Transport / Logistyka
'transport': ['przewóz', 'logistyka', 'spedycja', 'dostawa', 'kurier'],
'samochód': ['auto', 'pojazd', 'motoryzacja', 'serwis', 'naprawa'],
# Usługi
'księgowość': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe', 'kadry'],
'prawo': ['prawnik', 'adwokat', 'radca', 'kancelaria', 'notariusz'],
'marketing': ['reklama', 'promocja', 'seo', 'social media', 'branding'],
# Produkcja
'produkcja': ['wytwarzanie', 'fabryka', 'zakład', 'przemysł'],
'metal': ['stal', 'obróbka', 'spawanie', 'cnc', 'ślusarstwo'],
'drewno': ['stolarka', 'meble', 'tartak', 'carpentry'],
}
def _find_relevant_companies(self, db, message: str) -> List[Company]:
"""
Find companies relevant to user's message
Uses unified SearchService with:
- Synonym expansion for better keyword matching
- NIP/REGON direct lookup
- PostgreSQL FTS with fuzzy matching (when available)
- Fallback scoring for SQLite
Args:
db: Database session
message: User's message
Returns:
List of relevant Company objects
"""
# Use unified SearchService for better search results
results = search_companies(db, message, limit=10)
# Extract Company objects from SearchResult
return [result.company for result in results]
def _query_ai(
self,
context: Dict[str, Any],
user_message: str,
user_id: Optional[int] = None
) -> str:
"""
Query Gemini AI with full company database context
Args:
context: Context dict with ALL companies
user_message: User's message
user_id: User ID for cost tracking
Returns:
AI response text
"""
import json
# Build system prompt with ALL companies
recommendations_count = len(context.get('recommendations', []))
news_count = len(context.get('recent_news', []))
system_prompt = f"""Jesteś pomocnym asystentem portalu Norda Biznes - katalogu firm zrzeszonych w stowarzyszeniu Norda Biznes z Wejherowa.
📊 MASZ DOSTĘP DO BAZY WIEDZY:
- Liczba firm: {context['total_companies']}
- Kategorie: {', '.join([f"{cat['name']} ({cat['company_count']})" for cat in context.get('categories', [])])}
- Rekomendacje członków: {recommendations_count}
- Ostatnie aktualności: {news_count}
🎯 TWOJA ROLA:
- Analizujesz CAŁĄ bazę firm i wybierasz najlepsze dopasowania do pytania użytkownika
- Odpowiadasz zwięźle (2-3 zdania), chyba że użytkownik prosi o szczegóły
- Podajesz konkretne nazwy firm z kontaktem
- Możesz wyszukiwać po: nazwie firmy, usługach, kompetencjach, właścicielach (w history), mieście
- Możesz cytować rekomendacje innych członków
- Możesz informować o aktualnych newsach
📋 FORMAT DANYCH FIRM (skróty):
- name: nazwa firmy
- cat: kategoria
- desc: krótki opis
- history: historia firmy, właściciele, założyciele
- svc: usługi
- comp: kompetencje
- web/tel/mail: kontakt
- city: miasto
- cert: certyfikaty
⭐ REKOMENDACJE - opinie członków o firmach:
- company: nazwa polecanej firmy
- text: treść rekomendacji
- service: kategoria usługi
- author: kto poleca
📰 AKTUALNOŚCI - ostatnie newsy:
- title: tytuł artykułu
- source: źródło (portal)
- date: data publikacji
⚠️ WAŻNE:
- ZAWSZE podawaj nazwę firmy i kontakt (tel/web/mail jeśli dostępne)
- Jeśli pytanie o osobę (np. "kto to Roszman") - szukaj w polu "history"
- Jeśli pytanie "kto poleca firmę X" - szukaj w rekomendacjach
- Jeśli pytanie "co słychać" - sprawdź aktualności
- Odpowiadaj PO POLSKU
"""
# Add feedback-based learning context (few-shot examples)
if FEEDBACK_LEARNING_AVAILABLE:
try:
feedback_service = get_feedback_learning_service()
learning_context = feedback_service.format_for_prompt()
if learning_context:
system_prompt += learning_context
except Exception as e:
# Don't fail if feedback learning has issues
import logging
logging.getLogger(__name__).warning(f"Feedback learning error: {e}")
# Add ALL companies in compact JSON format
if context.get('all_companies'):
system_prompt += "\n\n🏢 PEŁNA BAZA FIRM (wybierz najlepsze):\n"
system_prompt += json.dumps(context['all_companies'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add recommendations (peer endorsements)
if context.get('recommendations'):
system_prompt += "\n\n⭐ REKOMENDACJE CZŁONKÓW:\n"
system_prompt += json.dumps(context['recommendations'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add recent news
if context.get('recent_news'):
system_prompt += "\n\n📰 OSTATNIE AKTUALNOŚCI:\n"
system_prompt += json.dumps(context['recent_news'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add conversation history
full_prompt = system_prompt + "\n\n# HISTORIA ROZMOWY:\n"
for msg in context.get('recent_messages', []):
role_name = "Użytkownik" if msg['role'] == 'user' else "Ty"
full_prompt += f"{role_name}: {msg['content']}\n"
full_prompt += f"\nUżytkownik: {user_message}\nTy: "
# Get response with automatic cost tracking to ai_api_costs table
if self.use_global_service and self.gemini_service:
response_text = self.gemini_service.generate_text(
prompt=full_prompt,
feature='ai_chat',
user_id=user_id,
temperature=0.7
)
return response_text
else:
# Legacy: direct API call (no centralized cost tracking)
response = self.model.generate_content(full_prompt)
return response.text
def _calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""
Calculate cost in USD
Args:
input_tokens: Number of input tokens
output_tokens: Number of output tokens
Returns:
Total cost in USD
"""
# Gemini 2.5 Flash pricing (per 1M tokens)
input_cost = (input_tokens / 1_000_000) * 0.075
output_cost = (output_tokens / 1_000_000) * 0.30
return input_cost + output_cost