#!/usr/bin/env python3 """ Norda Biznes AI Chat Engine ============================ Multi-turn conversational AI for company directory queries. Features: - Answer questions about member companies - Find companies by service, competency, or need - Concise, helpful responses - Full conversation history tracking - Cost tracking per message Author: Norda Biznes Development Team Created: 2025-11-23 """ import os import time import logging from datetime import datetime from typing import Dict, List, Any, Optional import google.generativeai as genai import gemini_service from search_service import search_companies # Module logger logger = logging.getLogger(__name__) from database import ( SessionLocal, Company, Category, Service, CompanyService, Competency, CompanyCompetency, Certification, Award, CompanyEvent, AIChatConversation, AIChatMessage, CompanyRecommendation, ZOPKNews, # Etap 2: Tablica B2B, Kalendarz, Forum Classified, NordaEvent, ForumTopic, ForumReply, # Etap 3: Osoby, Social Media, Audyty Person, CompanyPerson, CompanySocialMedia, GBPAudit, CompanyWebsiteAnalysis ) # Import feedback learning service for few-shot learning try: from feedback_learning_service import get_feedback_learning_service FEEDBACK_LEARNING_AVAILABLE = True except ImportError: FEEDBACK_LEARNING_AVAILABLE = False # Import ZOPK knowledge service for semantic search try: from zopk_knowledge_service import search_knowledge, get_relevant_facts ZOPK_KNOWLEDGE_AVAILABLE = True except ImportError: ZOPK_KNOWLEDGE_AVAILABLE = False # Import sensitive data sanitization service (RODO compliance) try: from sensitive_data_service import sanitize_message, SensitiveDataType SENSITIVE_DATA_SERVICE_AVAILABLE = True except ImportError: SENSITIVE_DATA_SERVICE_AVAILABLE = False logger.warning("Sensitive data service not available - messages will not be sanitized") class NordaBizChatEngine: """ AI Chat Assistant for Norda Biznes company directory Helps users find companies, services, and business partners. """ def __init__(self, gemini_api_key: Optional[str] = None, use_global_service: bool = True, model: str = None): """ Initialize Norda Biznes Chat Engine Args: gemini_api_key: Google Gemini API key (uses env var if not provided) use_global_service: Use global gemini_service for automatic cost tracking (default: True) model: Model key ('3-flash', '3-pro') - if provided, creates new service with this model """ self.use_global_service = use_global_service self.requested_model = model if use_global_service: if model: # Create new service with requested model from gemini_service import GeminiService self.gemini_service = GeminiService(model=model) self.model_name = self.gemini_service.model_name else: # Use global gemini_service for automatic cost tracking to ai_api_costs table self.gemini_service = gemini_service.get_gemini_service() # Get model name from global service (currently Gemini 3 Flash Preview) self.model_name = self.gemini_service.model_name if self.gemini_service else "gemini-3-flash-preview" self.model = None # Initialize tokenizer for cost calculation (still needed for per-message tracking) api_key = gemini_api_key or os.getenv('GOOGLE_GEMINI_API_KEY') if api_key and api_key != 'TWOJ_KLUCZ_API_TUTAJ': genai.configure(api_key=api_key) self.tokenizer = genai.GenerativeModel(self.model_name) else: self.tokenizer = None else: # Legacy: direct API access (no centralized cost tracking) api_key = gemini_api_key or os.getenv('GOOGLE_GEMINI_API_KEY') if not api_key or api_key == 'TWOJ_KLUCZ_API_TUTAJ': raise ValueError("GOOGLE_GEMINI_API_KEY not found in environment") genai.configure(api_key=api_key) self.model_name = "gemini-3-flash-preview" self.model = genai.GenerativeModel(self.model_name) self.tokenizer = self.model self.gemini_service = None def start_conversation( self, user_id: int, title: Optional[str] = None, conversation_type: str = 'general' ) -> AIChatConversation: """ Start new conversation Args: user_id: User ID title: Optional conversation title conversation_type: Type of conversation (default: 'general') Returns: AIChatConversation: New conversation object """ db = SessionLocal() try: # Auto-generate title if not provided if not title: title = f"Rozmowa - {datetime.now().strftime('%Y-%m-%d %H:%M')}" conversation = AIChatConversation( user_id=user_id, started_at=datetime.now(), conversation_type=conversation_type, title=title, is_active=True, message_count=0, model_name=self.model_name ) db.add(conversation) db.commit() db.refresh(conversation) return conversation finally: db.close() def send_message( self, conversation_id: int, user_message: str, user_id: int, thinking_level: str = 'high' ) -> AIChatMessage: """ Send message and get AI response SECURITY: Validates that user_id owns the conversation before allowing messages. This is defense-in-depth - API routes should also validate ownership. Args: conversation_id: Conversation ID user_message: User's message text user_id: User ID (required for ownership validation and cost tracking) thinking_level: AI reasoning depth ('minimal', 'low', 'medium', 'high') Returns: AIChatMessage: AI response message Raises: ValueError: If conversation not found PermissionError: If user doesn't own the conversation """ db = SessionLocal() start_time = time.time() try: # SECURITY: Get conversation with ownership check conversation = db.query(AIChatConversation).filter_by( id=conversation_id ).first() if not conversation: raise ValueError(f"Conversation {conversation_id} not found") # SECURITY: Verify user owns this conversation (defense in depth) if conversation.user_id != user_id: logger.warning( f"SECURITY: User {user_id} attempted to access conversation {conversation_id} " f"owned by user {conversation.user_id}" ) raise PermissionError("Access denied: You don't own this conversation") # RODO/GDPR: Sanitize user message - remove sensitive data before storage # Note: NIP and email are NOT considered sensitive (public business data) sanitized_message = user_message sensitive_data_found = [] if SENSITIVE_DATA_SERVICE_AVAILABLE: sanitized_message, sensitive_data_found = sanitize_message(user_message) if sensitive_data_found: logger.info( f"RODO: Sanitized {len(sensitive_data_found)} sensitive items in message " f"from user {user_id}: {[m.data_type.value for m in sensitive_data_found]}" ) # Save user message (sanitized for storage, original for AI context) user_msg = AIChatMessage( conversation_id=conversation_id, created_at=datetime.now(), role='user', content=sanitized_message, # Store sanitized version edited=False, regenerated=False ) db.add(user_msg) db.commit() # Build context from conversation history and relevant companies # Use ORIGINAL message for AI (so it can understand the question) # but the sanitized version is what gets stored in DB context = self._build_conversation_context(db, conversation, user_message) # Get AI response with cost tracking response = self._query_ai( context, user_message, user_id=user_id, thinking_level=thinking_level ) # Calculate metrics for per-message tracking in AIChatMessage table latency_ms = int((time.time() - start_time) * 1000) if self.tokenizer: input_tokens = self.tokenizer.count_tokens(user_message).total_tokens output_tokens = self.tokenizer.count_tokens(response).total_tokens cost_usd = self._calculate_cost(input_tokens, output_tokens) else: # Fallback if tokenizer not available input_tokens = len(user_message.split()) * 2 # Rough estimate output_tokens = len(response.split()) * 2 cost_usd = 0.0 # Save AI response ai_msg = AIChatMessage( conversation_id=conversation_id, created_at=datetime.now(), role='assistant', content=response, tokens_input=input_tokens, tokens_output=output_tokens, cost_usd=cost_usd, latency_ms=latency_ms, edited=False, regenerated=False ) db.add(ai_msg) # Update conversation conversation.message_count += 2 conversation.updated_at = datetime.now() db.commit() db.refresh(ai_msg) return ai_msg finally: db.close() def get_conversation_history( self, conversation_id: int, user_id: Optional[int] = None ) -> List[Dict[str, Any]]: """ Get all messages in conversation SECURITY: If user_id is provided, validates ownership before returning messages. This is defense-in-depth - API routes should also validate ownership. Args: conversation_id: Conversation ID user_id: User ID for ownership validation (optional for backwards compatibility, but strongly recommended for security) Returns: List of message dicts Raises: ValueError: If conversation not found PermissionError: If user_id provided and user doesn't own the conversation """ db = SessionLocal() try: # SECURITY: Verify ownership if user_id provided if user_id is not None: conversation = db.query(AIChatConversation).filter_by( id=conversation_id ).first() if not conversation: raise ValueError(f"Conversation {conversation_id} not found") if conversation.user_id != user_id: logger.warning( f"SECURITY: User {user_id} attempted to read history of conversation {conversation_id} " f"owned by user {conversation.user_id}" ) raise PermissionError("Access denied: You don't own this conversation") messages = db.query(AIChatMessage).filter_by( conversation_id=conversation_id ).order_by(AIChatMessage.created_at).all() return [ { 'id': msg.id, 'role': msg.role, 'content': msg.content, 'created_at': msg.created_at.isoformat(), 'tokens_input': msg.tokens_input, 'tokens_output': msg.tokens_output, 'cost_usd': float(msg.cost_usd) if msg.cost_usd else 0.0, 'latency_ms': msg.latency_ms } for msg in messages ] finally: db.close() def _build_conversation_context( self, db, conversation: AIChatConversation, current_message: str ) -> Dict[str, Any]: """ Build context for AI with ALL companies (not pre-filtered) This allows AI to intelligently select relevant companies instead of relying on keyword-based search pre-filtering. Args: db: Database session conversation: Current conversation current_message: User's current message (for reference only) Returns: Context dict with ALL companies and categories """ # Load ALL active companies - let AI do the intelligent filtering all_companies = db.query(Company).filter_by(status='active').all() context = { 'conversation_type': conversation.conversation_type, 'total_companies': len(all_companies) } # Get all categories with company counts categories = db.query(Category).all() context['categories'] = [ { 'name': cat.name, 'slug': cat.slug, 'company_count': db.query(Company).filter_by(category_id=cat.id, status='active').count() } for cat in categories ] # Include ALL companies in compact format to minimize tokens # AI will intelligently select the most relevant ones context['all_companies'] = [ self._company_to_compact_dict(c) for c in all_companies ] # Add conversation history (last 10 messages for context) messages = db.query(AIChatMessage).filter_by( conversation_id=conversation.id ).order_by(AIChatMessage.created_at.desc()).limit(10).all() context['recent_messages'] = [ {'role': msg.role, 'content': msg.content} for msg in reversed(messages) ] # === ETAP 1: Rekomendacje i Newsy === # Add approved recommendations (peer endorsements) recommendations = db.query(CompanyRecommendation).filter_by( status='approved' ).order_by(CompanyRecommendation.created_at.desc()).limit(20).all() context['recommendations'] = [ { 'company': rec.company.name if rec.company else 'Nieznana', 'text': rec.recommendation_text[:200] if rec.recommendation_text else '', 'service': rec.service_category or '', 'author': rec.user.name if rec.user and rec.show_contact else 'Członek Norda Biznes' } for rec in recommendations ] # Add recent approved news (last 30 days) from datetime import timedelta news_cutoff = datetime.now() - timedelta(days=30) recent_news = db.query(ZOPKNews).filter( ZOPKNews.status.in_(['approved', 'auto_approved']), ZOPKNews.published_at >= news_cutoff ).order_by(ZOPKNews.published_at.desc()).limit(10).all() context['recent_news'] = [ { 'title': news.title, 'description': news.description[:400] if news.description else '', # Opis/lead 'summary': news.ai_summary[:300] if news.ai_summary else '', # AI streszczenie 'source': news.source_name or '', 'url': news.url or '', # Link do artykułu 'date': news.published_at.strftime('%Y-%m-%d') if news.published_at else '', 'type': news.news_type or 'news', 'keywords': news.keywords[:5] if news.keywords else [] # Słowa kluczowe } for news in recent_news ] # === ZOPK KNOWLEDGE BASE (semantic search) === # Detect if question is about ZOPK topics if self._is_zopk_query(current_message): zopk_knowledge = self._get_zopk_knowledge_context(db, current_message) context['zopk_knowledge'] = zopk_knowledge # === ETAP 2: Tablica B2B, Kalendarz, Forum === # Add upcoming events (next 60 days) from datetime import date event_cutoff = date.today() + timedelta(days=60) upcoming_events = db.query(NordaEvent).filter( NordaEvent.event_date >= date.today(), NordaEvent.event_date <= event_cutoff ).order_by(NordaEvent.event_date).limit(15).all() context['upcoming_events'] = [ { 'title': event.title[:80] if event.title else '', 'date': event.event_date.strftime('%Y-%m-%d') if event.event_date else '', 'type': event.event_type or 'meeting', 'location': event.location[:50] if event.location else '', 'speaker': event.speaker_name[:30] if event.speaker_name else '' } for event in upcoming_events ] # Add active B2B classifieds (non-test only) active_classifieds = db.query(Classified).filter( Classified.is_active == True, Classified.is_test == False ).order_by(Classified.created_at.desc()).limit(20).all() context['classifieds'] = [ { 'type': c.listing_type, # szukam/oferuje 'category': c.category, 'title': c.title, 'description': c.description[:400] if c.description else '', # Pełny opis 'company': c.company.name if c.company else '', 'author': c.author.name if c.author else '', 'budget': c.budget_info or '', 'location': c.location_info or '', 'date': c.created_at.strftime('%Y-%m-%d') if c.created_at else '', 'views': c.views_count or 0, 'url': f'/classifieds/{c.id}' } for c in active_classifieds ] # Add recent forum topics with FULL content, authors and replies from sqlalchemy.orm import joinedload as jl forum_topics = db.query(ForumTopic).options( jl(ForumTopic.author), jl(ForumTopic.replies).joinedload(ForumReply.author) ).filter( ForumTopic.category != 'test' ).order_by(ForumTopic.created_at.desc()).limit(15).all() context['forum_topics'] = [] for topic in forum_topics: topic_data = { 'title': topic.title, 'content': topic.content[:500] if topic.content else '', # Treść tematu 'author': topic.author.name if topic.author else 'Anonim', 'category': topic.category_label, 'status': topic.status_label, 'date': topic.created_at.strftime('%Y-%m-%d') if topic.created_at else '', 'url': f'/forum/{topic.id}', # Link do tematu 'views': topic.views_count or 0, # Popularność 'pinned': topic.is_pinned, # Czy przypięty (ważny) 'replies_count': topic.reply_count, 'has_attachments': len(topic.attachments) > 0 if topic.attachments else False } # Dodaj odpowiedzi (max 5 ostatnich per temat) if topic.replies: topic_data['replies'] = [ { 'author': reply.author.name if reply.author else 'Anonim', 'content': reply.content[:300] if reply.content else '', 'date': reply.created_at.strftime('%Y-%m-%d') if reply.created_at else '' } for reply in sorted(topic.replies, key=lambda r: r.created_at, reverse=True)[:5] ] context['forum_topics'].append(topic_data) # === ETAP 3: Osoby (zarząd/wspólnicy), Social Media, Audyty === # Add company people (zarząd, wspólnicy) - grouped by company from sqlalchemy.orm import joinedload company_people = db.query(CompanyPerson).options( joinedload(CompanyPerson.person), joinedload(CompanyPerson.company) ).order_by(CompanyPerson.company_id).all() # Group people by company for compact representation people_by_company = {} for cp in company_people: company_name = cp.company.name if cp.company else 'Nieznana' company_profile = f"https://nordabiznes.pl/company/{cp.company.slug}" if cp.company and cp.company.slug else None if company_name not in people_by_company: people_by_company[company_name] = {'profile': company_profile, 'people': []} person_info = { 'name': cp.person.full_name() if cp.person else '', 'profile': f"https://nordabiznes.pl/osoba/{cp.person.id}" if cp.person else None, 'role': cp.role[:30] if cp.role else '' } if cp.shares_percent: person_info['shares'] = f"{cp.shares_percent}%" people_by_company[company_name]['people'].append(person_info) context['company_people'] = people_by_company # Add social media summary per company (platforms and followers) social_media = db.query(CompanySocialMedia).filter( CompanySocialMedia.is_valid == True ).options(joinedload(CompanySocialMedia.company)).all() # Group social media by company social_by_company = {} for sm in social_media: company_name = sm.company.name if sm.company else 'Nieznana' if company_name not in social_by_company: social_by_company[company_name] = [] social_by_company[company_name].append({ 'platform': sm.platform, 'url': sm.url or '', 'followers': sm.followers_count or 0 }) context['company_social_media'] = social_by_company # Add latest GBP audit scores (one per company, most recent) from sqlalchemy import func # Subquery to get max audit_date per company latest_audit_subq = db.query( GBPAudit.company_id, func.max(GBPAudit.audit_date).label('max_date') ).group_by(GBPAudit.company_id).subquery() latest_audits = db.query(GBPAudit).join( latest_audit_subq, (GBPAudit.company_id == latest_audit_subq.c.company_id) & (GBPAudit.audit_date == latest_audit_subq.c.max_date) ).options(joinedload(GBPAudit.company)).all() context['gbp_audits'] = [ { 'company': audit.company.name if audit.company else '', 'score': audit.completeness_score or 0, 'reviews': audit.review_count or 0, 'rating': float(audit.average_rating) if audit.average_rating else 0, 'maps_url': audit.google_maps_url or '', 'profile_url': f'https://nordabiznes.pl/company/{audit.company.slug}' if audit.company else '' } for audit in latest_audits ] # Add SEO audits (PageSpeed scores) for companies with website analysis seo_audits = db.query(CompanyWebsiteAnalysis).filter( CompanyWebsiteAnalysis.pagespeed_seo_score.isnot(None) ).options(joinedload(CompanyWebsiteAnalysis.company)).all() context['seo_audits'] = [ { 'company': audit.company.name if audit.company else '', 'seo': audit.pagespeed_seo_score or 0, 'performance': audit.pagespeed_performance_score or 0, 'accessibility': audit.pagespeed_accessibility_score or 0, 'best_practices': audit.pagespeed_best_practices_score or 0, 'overall': audit.seo_overall_score or 0, 'url': audit.company.website if audit.company else '', 'profile_url': f'https://nordabiznes.pl/company/{audit.company.slug}' if audit.company else '' } for audit in seo_audits ] return context def _company_to_compact_dict(self, c: Company) -> Dict[str, Any]: """ Convert company to compact dictionary for AI context. Optimized to minimize tokens while keeping all important data. Args: c: Company object Returns: Compact dict with essential company info """ compact = { 'name': c.name, 'cat': c.category.name if c.category else None, 'profile': f'https://nordabiznes.pl/company/{c.slug}', } # Only include non-empty fields to save tokens if c.description_short: compact['desc'] = c.description_short if c.founding_history: compact['history'] = c.founding_history # Owners, founders, history if c.services: services = [cs.service.name for cs in c.services if cs.service] if services: compact['svc'] = services if c.competencies: competencies = [cc.competency.name for cc in c.competencies if cc.competency] if competencies: compact['comp'] = competencies if c.website: compact['web'] = c.website if c.phone: compact['tel'] = c.phone if c.email: compact['mail'] = c.email if c.address_city: compact['city'] = c.address_city if c.year_established: compact['year'] = c.year_established if c.certifications: certs = [cert.name for cert in c.certifications if cert.is_active] if certs: compact['cert'] = certs[:3] # Limit to 3 certs return compact # Słownik synonimów i powiązanych terminów dla lepszego wyszukiwania KEYWORD_SYNONYMS = { # IT / Web 'strony': ['www', 'web', 'internet', 'witryny', 'seo', 'e-commerce', 'ecommerce', 'sklep', 'portal'], 'internetowe': ['www', 'web', 'online', 'cyfrowe', 'seo', 'marketing'], 'aplikacje': ['software', 'programowanie', 'systemy', 'crm', 'erp', 'app'], 'it': ['informatyka', 'komputery', 'software', 'systemy', 'serwis'], 'programowanie': ['software', 'kod', 'developer', 'aplikacje'], # Budownictwo 'budowa': ['budownictwo', 'konstrukcje', 'remonty', 'wykończenia', 'dach', 'elewacja'], 'dom': ['budynek', 'mieszkanie', 'nieruchomości', 'budownictwo'], 'remont': ['wykończenie', 'naprawa', 'renowacja', 'modernizacja'], # Transport / Logistyka 'transport': ['przewóz', 'logistyka', 'spedycja', 'dostawa', 'kurier'], 'samochód': ['auto', 'pojazd', 'motoryzacja', 'serwis', 'naprawa'], # Usługi 'księgowość': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe', 'kadry'], 'prawo': ['prawnik', 'adwokat', 'radca', 'kancelaria', 'notariusz'], 'marketing': ['reklama', 'promocja', 'seo', 'social media', 'branding'], # Produkcja 'produkcja': ['wytwarzanie', 'fabryka', 'zakład', 'przemysł'], 'metal': ['stal', 'obróbka', 'spawanie', 'cnc', 'ślusarstwo'], 'drewno': ['stolarka', 'meble', 'tartak', 'carpentry'], } def _find_relevant_companies(self, db, message: str) -> List[Company]: """ Find companies relevant to user's message Uses unified SearchService with: - Synonym expansion for better keyword matching - NIP/REGON direct lookup - PostgreSQL FTS with fuzzy matching (when available) - Fallback scoring for SQLite Args: db: Database session message: User's message Returns: List of relevant Company objects """ # Use unified SearchService for better search results results = search_companies(db, message, limit=10) # Extract Company objects from SearchResult return [result.company for result in results] def _is_zopk_query(self, message: str) -> bool: """ Check if the message is related to ZOPK (Zielony Okręg Przemysłowy Kaszubia). ZOPK topics include: - Offshore wind energy (Baltic Power, Baltica) - Nuclear power plant (Lubiatowo-Kopalino) - Kongsberg investment in Rumia - Infrastructure (Via Pomerania, S6, Droga Czerwona) - Hydrogen, data centers """ zopk_keywords = [ # Main project 'zopk', 'zielony okręg', 'okręg przemysłowy', 'kaszubia', 'kaszub', 'projekt kaszubia', 'przemysłowy kaszubia', # Offshore wind (Polish forms + English) 'offshore', 'farmy wiatrowe', 'energetyka wiatrowa', 'bałtyk', 'baltic power', 'baltica', 'orsted', 'morska energia', 'wiatraki morskie', 'farma wiatrowa', # Nuclear - all Polish grammatical forms 'elektrownia jądrowa', 'elektrowni jądrowej', 'elektrownie jądrowe', 'jądrowa', 'jądrowej', 'jądrowe', 'jądrowy', # adjective forms 'atomowa', 'atomowej', 'atomowe', 'atom', 'lubiatowo', 'kopalino', 'pej', 'polskie elektrownie', 'westinghouse', 'bechtel', 'turbiny', 'arabelle', # Kongsberg defense industry 'kongsberg', 'inwestycje norweskie', 'przemysł obronny', 'zbrojeniow', # Infrastructure 'via pomerania', 'droga czerwona', 's6', 'port gdynia', # Energy transition 'wodór', 'centra danych', 'samsonowicz', 'transformacja energetyczna', 'energetyka', 'energetyczny', 'energetyczna', # Organizations 'norda biznes', 'izba przedsiębiorców', 'rumia invest', 'rumia', # Roadmap and milestones 'kamienie milowe', 'roadmapa', 'timeline', 'harmonogram', 'inwestycje pomorze', 'inwestycje pomorskie', 'rozwój pomorza' ] message_lower = message.lower() return any(kw in message_lower for kw in zopk_keywords) def _get_zopk_knowledge_context(self, db, message: str) -> Dict[str, Any]: """ Get ZOPK knowledge base context for the current message. Uses semantic search to find relevant: - Knowledge chunks (text fragments with embeddings) - Facts (structured information) - Entities (companies, people, projects) Args: db: Database session message: User's question Returns: Dict with chunks, facts, entities """ from database import ZOPKKnowledgeEntity, ZOPKKnowledgeChunk, ZOPKNews context = { 'chunks': [], 'facts': [], 'entities': [] } # Check if knowledge service is available if not ZOPK_KNOWLEDGE_AVAILABLE: logger.warning("ZOPK knowledge service not available") return context try: # Semantic search in knowledge chunks chunks = search_knowledge( db, query=message, limit=5, min_similarity=0.3, user_id=None # Don't track cost for context building ) # Enrich chunks with source information for c in chunks: chunk_data = { 'content': c['content'][:400], # Limit length 'summary': c.get('summary', ''), 'similarity': c.get('similarity', 0), 'source': 'nieznane', 'date': '' } # Get source news info if available if c.get('source_news_id'): news = db.query(ZOPKNews).filter( ZOPKNews.id == c['source_news_id'] ).first() if news: chunk_data['source'] = news.source_name or news.source_domain or 'nieznane' chunk_data['source_url'] = news.url or '' if news.published_at: chunk_data['date'] = news.published_at.strftime('%Y-%m-%d') context['chunks'].append(chunk_data) # Get relevant facts with source information facts = get_relevant_facts(db, query=message, limit=5) context['facts'] = [ { 'fact': f['full_text'], 'type': f['fact_type'], 'confidence': f.get('confidence', 0), 'value': f.get('numeric_value'), 'unit': f.get('numeric_unit'), 'source_url': f.get('source_url', ''), 'source_name': f.get('source_name', ''), 'source_date': f.get('source_date', '') } for f in facts ] # Get top mentioned entities (always include for context) top_entities = db.query(ZOPKKnowledgeEntity).filter( ZOPKKnowledgeEntity.mentions_count > 1 ).order_by( ZOPKKnowledgeEntity.mentions_count.desc() ).limit(10).all() context['entities'] = [ { 'name': e.name, 'type': e.entity_type, 'description': e.short_description or '', 'mentions': e.mentions_count } for e in top_entities ] except Exception as e: logger.error(f"Error getting ZOPK knowledge context: {e}") # Return empty context on error, don't break chat return context def _query_ai( self, context: Dict[str, Any], user_message: str, user_id: Optional[int] = None, thinking_level: str = 'high' ) -> str: """ Query Gemini AI with full company database context Args: context: Context dict with ALL companies user_message: User's message user_id: User ID for cost tracking thinking_level: AI reasoning depth ('minimal', 'low', 'medium', 'high') Returns: AI response text """ import json # Build system prompt with ALL companies recommendations_count = len(context.get('recommendations', [])) news_count = len(context.get('recent_news', [])) events_count = len(context.get('upcoming_events', [])) classifieds_count = len(context.get('classifieds', [])) forum_count = len(context.get('forum_topics', [])) people_companies_count = len(context.get('company_people', {})) social_companies_count = len(context.get('company_social_media', {})) gbp_audits_count = len(context.get('gbp_audits', [])) seo_audits_count = len(context.get('seo_audits', [])) system_prompt = f"""Jesteś pomocnym asystentem portalu Norda Biznes - katalogu firm zrzeszonych w stowarzyszeniu Norda Biznes z Wejherowa. 📊 MASZ DOSTĘP DO BAZY WIEDZY: - Liczba firm: {context['total_companies']} - Kategorie: {', '.join([f"{cat['name']} ({cat['company_count']})" for cat in context.get('categories', [])])} - Rekomendacje członków: {recommendations_count} - Ostatnie aktualności: {news_count} - Nadchodzące wydarzenia: {events_count} - Ogłoszenia B2B: {classifieds_count} - Tematy na forum: {forum_count} - Firmy z danymi KRS (zarząd/wspólnicy): {people_companies_count} - Firmy z Social Media: {social_companies_count} - Audyty Google Business: {gbp_audits_count} - Audyty SEO (PageSpeed): {seo_audits_count} 🎯 TWOJA ROLA: - Analizujesz CAŁĄ bazę firm i wybierasz najlepsze dopasowania do pytania użytkownika - Odpowiadasz zwięźle (2-3 zdania), chyba że użytkownik prosi o szczegóły - Podajesz konkretne nazwy firm z kontaktem - Możesz wyszukiwać po: nazwie firmy, usługach, kompetencjach, właścicielach (w history), mieście - Możesz cytować rekomendacje innych członków - Możesz informować o aktualnych newsach, wydarzeniach, ogłoszeniach i dyskusjach na forum 📋 FORMAT DANYCH FIRM (skróty): - name: nazwa firmy - cat: kategoria - profile: link do profilu firmy na nordabiznes.pl - desc: krótki opis - history: historia firmy, właściciele, założyciele - svc: usługi - comp: kompetencje - web/tel/mail: kontakt - city: miasto - cert: certyfikaty ⭐ REKOMENDACJE - opinie członków o firmach: - company: nazwa polecanej firmy - text: treść rekomendacji - service: kategoria usługi - author: kto poleca 📰 AKTUALNOŚCI - ostatnie newsy: - title: tytuł artykułu - source: źródło (portal) - date: data publikacji 📅 KALENDARZ - nadchodzące wydarzenia Norda Biznes: - title: nazwa wydarzenia - date: data (YYYY-MM-DD) - type: typ (meeting, networking, webinar) - location: miejsce - speaker: prelegent (jeśli jest) 📋 TABLICA B2B - ogłoszenia członków: - type: "szukam" lub "oferuje" - category: uslugi/produkty/wspolpraca/praca/inne - title: tytuł ogłoszenia - company: firma ogłaszająca - location: lokalizacja 💬 FORUM - dyskusje społeczności (pełna treść!): - title: tytuł tematu - content: treść tematu (do 500 znaków) - author: imię i nazwisko autora tematu - category: Propozycja funkcji/Błąd/Pytanie/Ogłoszenie - status: Nowy/W realizacji/Rozwiązany/Odrzucony - date: data utworzenia - url: link do tematu na nordabiznes.pl - views: liczba wyświetleń (popularność) - pinned: czy temat jest przypięty (ważny) - replies_count: liczba odpowiedzi - replies: lista odpowiedzi (autor, treść, data) - max 5 ostatnich 👥 ZARZĄD I WSPÓLNICY - dane KRS firm (pogrupowane po firmie): - name: imię i nazwisko osoby - role: funkcja (Prezes Zarządu, Członek Zarządu, Wspólnik, Prokurent) - shares: procent udziałów (tylko dla wspólników) 📱 SOCIAL MEDIA - profile firm (pogrupowane po firmie): - platform: facebook, instagram, linkedin, youtube, twitter, tiktok - url: link do profilu - followers: liczba obserwujących 🏪 AUDYT GOOGLE BUSINESS - wyniki audytu profili Google: - company: nazwa firmy - score: wynik kompletności profilu (0-100) - reviews: liczba recenzji - rating: średnia ocena (1-5) - maps_url: link do profilu Google Maps - profile_url: link do profilu firmy na nordabiznes.pl 🔍 AUDYT SEO (PageSpeed) - wyniki analizy stron www firm: - company: nazwa firmy - seo: wynik SEO (0-100) - performance: wydajność strony (0-100) - accessibility: dostępność (0-100) - best_practices: najlepsze praktyki (0-100) - overall: ogólny wynik SEO (0-100) - url: adres strony www - profile_url: link do profilu firmy na nordabiznes.pl ⚠️ WAŻNE: - ZAWSZE podawaj nazwę firmy i kontakt (tel/web/mail jeśli dostępne) 🚫 DANE WRAŻLIWE - BEZWZGLĘDNY ZAKAZ: NIE odpowiadaj na pytania o: - PESEL (numer identyfikacyjny) - Numery dowodów osobistych - Numery paszportów - Numery kart kredytowych/debetowych - Hasła lub dane logowania - Numery kont bankowych / IBAN Jeśli użytkownik pyta o PESEL lub inne dane wrażliwe (nawet zamaskowane jako [PESEL UKRYTY]): - ODPOWIEDZ: "Przepraszam, nie mogę podawać informacji o numerach PESEL ani innych danych wrażliwych. Jest to niezgodne z RODO." - NIE wymyślaj żadnych połączeń między numerami a osobami - NIE zgaduj czyj to może być PESEL - NIE sugeruj żadnych osób z bazy danych To jest wymóg prawny (RODO/GDPR) - nie ma żadnych wyjątków. 🔗 KLIKALNE LINKI (BEZWZGLĘDNIE OBOWIĄZKOWE!): ⚠️ KRYTYCZNE - KAŻDA nazwa firmy MUSI być linkiem markdown: - ✅ JEDYNY PRAWIDŁOWY FORMAT: [Nazwa Firmy](URL z pola profile) - ❌ NIEDOPUSZCZALNE: Nazwa Firmy (bez linku) - ❌ NIEDOPUSZCZALNE: **Nazwa Firmy** (pogrubienie bez linku) - ❌ NIEDOPUSZCZALNE: "Nazwa Firmy" (cudzysłowy bez linku) Przykład: [Pixlab Softwarehouse](https://nordabiznes.pl/company/pixlab-sp-z-o-o) 👤 OSOBY - każda osoba (zarząd/wspólnik) też MUSI być linkiem: - ✅ PRAWIDŁOWO: [Michał Bogdan Roszman](https://nordabiznes.pl/osoba/123) - ❌ BŁĘDNIE: **Michał Bogdan Roszman** (pogrubienie bez linku) - ❌ BŁĘDNIE: Michał Bogdan Roszman (tekst bez linku) W sekcji ZARZĄD I WSPÓLNICY każda osoba ma pole "profile" z URL - UŻYJ GO! Inne linki które MUSISZ dołączać gdy dostępne: • Strona www firmy (pole "web" lub "url") • Profil Google Maps (pole "maps_url") • Profile social media (pole "url") • FORUM: [tytuł tematu](/forum/ID) - np. [moje uwagi do CRM](/forum/18) • B2B: [tytuł ogłoszenia](/ogloszenia/ID) • AKTUALNOŚCI: [tytuł](/news/ID) - Jeśli pytanie o osobę (np. "kto to Roszman") - szukaj w ZARZĄD I WSPÓLNICY lub w polu "history" - Jeśli pytanie "kto jest prezesem firmy X" - szukaj w ZARZĄD I WSPÓLNICY - Jeśli pytanie "kto poleca firmę X" - szukaj w rekomendacjach - Jeśli pytanie "co słychać" - sprawdź aktualności i wydarzenia - Jeśli pytanie "kiedy następne spotkanie" - sprawdź kalendarz - Jeśli pytanie "kto szuka/oferuje X" - sprawdź tablicę B2B - Jeśli pytanie o dyskusje/tematy - sprawdź forum i ZAWSZE podaj link z pola "url" - Jeśli pytanie o social media/followers - sprawdź SOCIAL MEDIA (dołącz linki!) - Jeśli pytanie o Google opinie/recenzje - sprawdź AUDYT GOOGLE BUSINESS (dołącz link do Maps!) - Jeśli pytanie o SEO/wydajność strony/PageSpeed - sprawdź AUDYT SEO (dołącz link do strony!) - Odpowiadaj PO POLSKU ✍️ FORMATOWANIE ODPOWIEDZI: 🎯 PRECYZJA: Jeśli użytkownik pyta o konkretną liczbę (np. "wymień 5"), ZAWSZE podaj DOKŁADNIE tyle ile prosi! - "wymień 5" → podaj dokładnie 5 elementów - "podaj 3 firmy" → podaj dokładnie 3 firmy - NIGDY nie podawaj mniej niż użytkownik prosi! 📝 FORMAT LIST - każdy element w JEDNEJ LINII ze szczegółami po przecinku: PRAWIDŁOWO: 1. **Chwila dla Biznesu** (29.01.2026) - Hotel Olimp, Wejherowo, networking 2. **Rada Izby NORDA** (04.02.2026) - biuro Norda Biznes, spotkanie zarządu 3. **Chwila dla Biznesu** (26.02.2026) - Hotel Olimp, Wejherowo, networking BŁĘDNIE (NIE RÓB - resetuje numerację): 1. **Chwila dla Biznesu** Data: 29.01.2026 Miejsce: Hotel Olimp 1. **Rada Izby NORDA** Data: 04.02.2026 - Używaj **pogrubienia** dla nazw firm i tytułów - Wszystkie szczegóły elementu w JEDNEJ linii (po myślniku lub w nawiasie) - Numeracja MUSI być sekwencyjna: 1. 2. 3. 4. 5. (nie 1. 1. 1. 1.) """ # Add thinking level specific instructions if thinking_level == 'high': system_prompt += """ 🧠 TRYB GŁĘBOKIEJ ANALIZY - WYMAGANIA: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ W tym trybie użytkownik oczekuje SZCZEGÓŁOWEJ odpowiedzi: 1. ROZBUDOWANA analiza - podaj WIĘCEJ informacji niż standardowo 2. STRUKTURA - używaj list punktowanych/numerowanych (1. 2. 3.) 3. KONTEKST - dodaj tło, wyjaśnienia, powiązania między informacjami 4. WSZYSTKIE LINKI - każda firma, osoba, temat forum MUSI mieć link markdown 5. CYTATY - jeśli są rekomendacje lub opinie, cytuj je 6. WNIOSKI - na końcu możesz dodać krótkie podsumowanie lub wniosek Przykład odpowiedzi w trybie głębokiej analizy: "Pomysły [Jacka Pomieczyńskiego](link) w temacie **moje uwagi do CRM** są bardzo konkretne: 1. **Rozbudowa paska nawigacji** - dodanie zakładek: NordaGPT, B2B, Lokalne Projekty 2. **Usprawnienie kalendarza** - podział na przeszłe/przyszłe wydarzenia 3. **Identyfikacja wizualna** - logo Norda Biznes w interfejsie W dyskusji [Artur Wiertel](link) pytał o moderację. Pełna treść: [moje uwagi do CRM](/forum/18)" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ elif thinking_level == 'minimal': system_prompt += """ ⚡ TRYB SZYBKI - odpowiadaj zwięźle ale z PEŁNYMI linkami do firm i tematów. """ # Add feedback-based learning context (few-shot examples) if FEEDBACK_LEARNING_AVAILABLE: try: feedback_service = get_feedback_learning_service() learning_context = feedback_service.format_for_prompt() if learning_context: system_prompt += learning_context except Exception as e: # Don't fail if feedback learning has issues import logging logging.getLogger(__name__).warning(f"Feedback learning error: {e}") # Add ALL companies in compact JSON format if context.get('all_companies'): system_prompt += "\n\n🏢 PEŁNA BAZA FIRM (wybierz najlepsze):\n" system_prompt += json.dumps(context['all_companies'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add recommendations (peer endorsements) if context.get('recommendations'): system_prompt += "\n\n⭐ REKOMENDACJE CZŁONKÓW:\n" system_prompt += json.dumps(context['recommendations'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add recent news if context.get('recent_news'): system_prompt += "\n\n📰 OSTATNIE AKTUALNOŚCI:\n" system_prompt += json.dumps(context['recent_news'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add ZOPK Knowledge Base context (semantic search results) if context.get('zopk_knowledge'): zopk = context['zopk_knowledge'] system_prompt += "\n\n🌍 BAZA WIEDZY ZOPK (Zielony Okręg Przemysłowy Kaszubia):\n" system_prompt += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n" # Collect all sources for citations at the end sources_for_citation = [] # Add knowledge chunks (most relevant excerpts) if zopk.get('chunks'): system_prompt += "\n📄 FRAGMENTY WIEDZY (semantycznie dopasowane):\n" for i, chunk in enumerate(zopk['chunks'][:5], 1): source_name = chunk.get('source', 'nieznane') source_url = chunk.get('source_url', '') source_date = chunk.get('date', '') system_prompt += f"\n[{i}] {chunk.get('summary', '')}\n" if source_url: system_prompt += f" Źródło: [{source_name}]({source_url}) ({source_date})\n" if source_url and source_name: sources_for_citation.append({ 'name': source_name, 'url': source_url, 'date': source_date }) else: system_prompt += f" Źródło: {source_name} ({source_date})\n" if chunk.get('content'): content_preview = chunk['content'][:300] if len(chunk['content']) > 300: content_preview += "..." system_prompt += f" Treść: {content_preview}\n" # Add verified facts with source links if zopk.get('facts'): system_prompt += "\n📌 ZWERYFIKOWANE FAKTY:\n" for fact in zopk['facts'][:10]: confidence_stars = "★" * int(fact.get('confidence', 0) * 5) source_name = fact.get('source_name', '') source_url = fact.get('source_url', '') source_date = fact.get('source_date', '') system_prompt += f"• {fact.get('fact', '')} [{confidence_stars}]" if source_name and source_url: system_prompt += f" ([{source_name}]({source_url}), {source_date})" sources_for_citation.append({ 'name': source_name, 'url': source_url, 'date': source_date }) system_prompt += "\n" if fact.get('value') and fact.get('unit'): system_prompt += f" Wartość: {fact['value']} {fact['unit']}\n" # Add key entities if zopk.get('entities'): system_prompt += "\n🏢 KLUCZOWE PODMIOTY ZOPK:\n" for entity in zopk['entities'][:8]: entity_icon = { 'organization': '🏛️', 'company': '🏢', 'person': '👤', 'location': '📍', 'place': '📍', 'project': '🎯', 'technology': '⚡' }.get(entity.get('type', ''), '•') system_prompt += f"{entity_icon} {entity.get('name', '')} ({entity.get('type', '')})" if entity.get('description'): system_prompt += f" - {entity['description']}" if entity.get('mentions'): system_prompt += f" [{entity['mentions']} wzmianek]" system_prompt += "\n" # Add available sources for citation if sources_for_citation: # Deduplicate sources by URL unique_sources = {s['url']: s for s in sources_for_citation if s.get('url')}.values() system_prompt += "\n📚 DOSTĘPNE ŹRÓDŁA DO CYTOWANIA:\n" for src in list(unique_sources)[:5]: system_prompt += f"- [{src['name']}]({src['url']}) ({src['date']})\n" system_prompt += "\n🎯 ZASADY ODPOWIEDZI O ZOPK:\n" system_prompt += "1. Odpowiadaj na podstawie bazy wiedzy (NIE WYMYŚLAJ faktów)\n" system_prompt += "2. FORMATUJ odpowiedzi używając:\n" system_prompt += " - **Pogrubienia** dla kluczowych informacji\n" system_prompt += " - Listy punktowane dla wielu faktów\n" system_prompt += " - Nagłówki dla sekcji (## Inwestycje, ## Terminarz)\n" system_prompt += "3. CYTUJ źródła w tekście: \"Według [nazwa portalu](URL) z dnia RRRR-MM-DD...\"\n" system_prompt += "4. NA KOŃCU odpowiedzi DODAJ sekcję:\n" system_prompt += " 📚 **Źródła:**\n" system_prompt += " - [Nazwa portalu](URL) - krótki opis (data)\n" system_prompt += "5. Podawaj konkretne daty i liczby gdy dostępne\n" system_prompt += "6. Jeśli brak informacji w bazie - powiedz wprost: \"Nie mam tej informacji w bazie wiedzy ZOPK\"\n" system_prompt += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n" # Add upcoming events (Etap 2) if context.get('upcoming_events'): system_prompt += "\n\n📅 KALENDARZ WYDARZEŃ:\n" system_prompt += json.dumps(context['upcoming_events'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add B2B classifieds (Etap 2) if context.get('classifieds'): system_prompt += "\n\n📋 TABLICA B2B:\n" system_prompt += json.dumps(context['classifieds'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add forum topics (Etap 2) if context.get('forum_topics'): system_prompt += "\n\n💬 FORUM:\n" system_prompt += json.dumps(context['forum_topics'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add company people - zarząd/wspólnicy (Etap 3) if context.get('company_people'): system_prompt += "\n\n👥 ZARZĄD I WSPÓLNICY:\n" system_prompt += json.dumps(context['company_people'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add social media per company (Etap 3) if context.get('company_social_media'): system_prompt += "\n\n📱 SOCIAL MEDIA:\n" system_prompt += json.dumps(context['company_social_media'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add GBP audits (Etap 3) if context.get('gbp_audits'): system_prompt += "\n\n🏪 AUDYTY GOOGLE BUSINESS:\n" system_prompt += json.dumps(context['gbp_audits'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add SEO audits (PageSpeed scores) if context.get('seo_audits'): system_prompt += "\n\n🔍 AUDYTY SEO (PageSpeed):\n" system_prompt += json.dumps(context['seo_audits'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add conversation history full_prompt = system_prompt + "\n\n# HISTORIA ROZMOWY:\n" for msg in context.get('recent_messages', []): role_name = "Użytkownik" if msg['role'] == 'user' else "Ty" full_prompt += f"{role_name}: {msg['content']}\n" full_prompt += f"\nUżytkownik: {user_message}\nTy: " # Get response with automatic cost tracking to ai_api_costs table if self.use_global_service and self.gemini_service: response_text = self.gemini_service.generate_text( prompt=full_prompt, feature='ai_chat', user_id=user_id, temperature=0.7, thinking_level=thinking_level ) # Post-process to ensure links are added even if AI didn't format them return self._postprocess_links(response_text, context) else: # Legacy: direct API call (no centralized cost tracking) response = self.model.generate_content(full_prompt) # Post-process to ensure links are added even if AI didn't format them return self._postprocess_links(response.text, context) def _postprocess_links(self, text: str, context: Dict) -> str: """ Post-process AI response to add markdown links for companies and people. This ensures consistent linking regardless of AI behavior. Args: text: AI response text context: Context dict with company_people data Returns: Text with names replaced by markdown links """ import re # Build lookup dict: name -> url name_to_url = {} # Extract companies and people from company_people context company_people = context.get('company_people', {}) for company_name, data in company_people.items(): # Add company if data.get('profile'): name_to_url[company_name] = data['profile'] # Add people - normalize to Title Case (DB stores UPPERCASE) for person in data.get('people', []): if person.get('name') and person.get('profile'): # Convert "MICHAŁ BOGDAN ROSZMAN" to "Michał Bogdan Roszman" normalized_name = person['name'].title() name_to_url[normalized_name] = person['profile'] # Also extract from companies list (context['companies'] has profile URLs) # Companies format: list of dicts with 'name' and 'profile' # This is populated by _company_to_compact_dict # Sort by name length (longest first) to avoid partial replacements sorted_names = sorted(name_to_url.keys(), key=len, reverse=True) for name in sorted_names: url = name_to_url[name] if not name or not url: continue # Skip if already a markdown link # Pattern: [Name](url) - already linked already_linked = re.search(r'\[' + re.escape(name) + r'\]\([^)]+\)', text) if already_linked: continue # Replace **Name** (bold) with [Name](url) bold_pattern = r'\*\*' + re.escape(name) + r'\*\*' if re.search(bold_pattern, text): text = re.sub(bold_pattern, f'[{name}]({url})', text, count=1) continue # Replace plain "Name" at word boundaries (but not if already in link) # Be careful not to replace inside existing markdown plain_pattern = r'(? float: """ Calculate cost in USD Args: input_tokens: Number of input tokens output_tokens: Number of output tokens Returns: Total cost in USD """ # Gemini 2.5 Flash pricing (per 1M tokens) input_cost = (input_tokens / 1_000_000) * 0.075 output_cost = (output_tokens / 1_000_000) * 0.30 return input_cost + output_cost