#!/usr/bin/env python3 """ NordaGPT Context Builder ======================== Selective data loader for the Smart Router. Instead of loading ALL data for every query, this module loads only the categories requested by the Smart Router. Usage: from context_builder import build_selective_context context = build_selective_context( data_needed=["companies_all", "events"], conversation_id=42, current_message="Szukam firmy budowlanej", user_context={"user_id": 5, "company_id": 12} ) Author: Norda Biznes Development Team Created: 2026-03-28 """ import logging from datetime import datetime, date, timedelta from typing import Dict, List, Any, Optional from sqlalchemy.orm import joinedload from database import ( SessionLocal, Company, Category, AIChatMessage, ZOPKNews, NordaEvent, Classified, ForumTopic, ForumReply, Person, CompanyPerson, CompanySocialMedia, GBPAudit, CompanyWebsiteAnalysis, User, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_selective_context( data_needed: List[str], conversation_id: int, current_message: str, user_context: Optional[Dict] = None, ) -> Dict[str, Any]: """ Build context dict for _query_ai() by loading only the requested data categories. Always included (regardless of data_needed): - basic stats: total_companies, categories - conversation history: last 10 messages for conversation_id Args: data_needed: List of category strings such as: "companies_all", "companies_filtered:IT", "companies_single:pixlab-sp-z-o-o", "events", "news", "classifieds", "forum", "company_people", "registered_users", "social_media", "audits" conversation_id: AIChatMessage conversation ID for history loading. current_message: The user's current message (passed through to context). user_context: Optional dict with extra user info (user_id, company_id, …). Returns: Context dict compatible with nordabiz_chat.py's _query_ai(). """ db = SessionLocal() try: context: Dict[str, Any] = {} # --------------------------------------------------------------- # ALWAYS: basic stats # --------------------------------------------------------------- _load_basic_stats(db, context) # --------------------------------------------------------------- # ALWAYS: conversation history # --------------------------------------------------------------- _load_conversation_history(db, conversation_id, context) # --------------------------------------------------------------- # SELECTIVE: load only what the router asked for # --------------------------------------------------------------- for category in data_needed: try: _load_category(db, category, context) except Exception as exc: logger.warning("context_builder: failed to load '%s': %s", category, exc) # Pass-through extras context['current_message'] = current_message if user_context: context['user_context'] = user_context return context finally: db.close() # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _load_basic_stats(db, context: Dict) -> None: """Always-loaded: total active companies and category breakdown.""" all_active = db.query(Company).filter_by(status='active').count() context['total_companies'] = all_active categories = db.query(Category).all() context['categories'] = [ { 'name': cat.name, 'slug': cat.slug, 'company_count': db.query(Company).filter_by( category_id=cat.id, status='active' ).count(), } for cat in categories ] def _load_conversation_history(db, conversation_id: int, context: Dict) -> None: """Always-loaded: last 10 messages in the conversation.""" messages = ( db.query(AIChatMessage) .filter_by(conversation_id=conversation_id) .order_by(AIChatMessage.created_at.desc()) .limit(10) .all() ) context['recent_messages'] = [ {'role': msg.role, 'content': msg.content} for msg in reversed(messages) ] def _load_category(db, category: str, context: Dict) -> None: """Dispatch a single category string to the appropriate loader.""" if category == 'companies_all': _load_companies_all(db, context) elif category.startswith('companies_filtered:'): cat_name = category.split(':', 1)[1] _load_companies_filtered(db, cat_name, context) elif category.startswith('companies_single:'): identifier = category.split(':', 1)[1] _load_company_single(db, identifier, context) elif category == 'events': _load_events(db, context) elif category == 'news': _load_news(db, context) elif category == 'classifieds': _load_classifieds(db, context) elif category == 'forum': _load_forum(db, context) elif category == 'company_people': _load_company_people(db, context) elif category == 'registered_users': _load_registered_users(db, context) elif category == 'social_media': _load_social_media(db, context) elif category == 'audits': _load_audits(db, context) else: logger.debug("context_builder: unknown category '%s' — skipped", category) # --------------------------------------------------------------------------- # Category loaders # --------------------------------------------------------------------------- def _load_companies_all(db, context: Dict) -> None: """Load all active companies in compact format.""" companies = db.query(Company).filter_by(status='active').all() context['all_companies'] = [_company_to_compact_dict(c) for c in companies] def _load_companies_filtered(db, cat_name: str, context: Dict) -> None: """Load active companies filtered by category name (case-insensitive).""" category = db.query(Category).filter( Category.name.ilike(cat_name) ).first() if category: companies = db.query(Company).filter_by( category_id=category.id, status='active' ).all() else: # Fallback: search by slug category = db.query(Category).filter( Category.slug.ilike(cat_name) ).first() if category: companies = db.query(Company).filter_by( category_id=category.id, status='active' ).all() else: companies = [] context.setdefault('filtered_companies', []) context['filtered_companies'].extend( [_company_to_compact_dict(c) for c in companies] ) context['filter_category'] = cat_name def _load_company_single(db, identifier: str, context: Dict) -> None: """Load a single company by slug or partial name match.""" # Try slug first (exact match) company = db.query(Company).filter_by(slug=identifier, status='active').first() if not company: # Partial name match company = ( db.query(Company) .filter( Company.name.ilike(f'%{identifier}%'), Company.status == 'active', ) .first() ) if company: context.setdefault('single_companies', []) context['single_companies'].append(_company_to_compact_dict(company)) else: logger.debug("context_builder: company '%s' not found", identifier) def _load_events(db, context: Dict) -> None: """Load upcoming events (next 60 days).""" today = date.today() cutoff = today + timedelta(days=60) upcoming = ( db.query(NordaEvent) .filter( NordaEvent.event_date >= today, NordaEvent.event_date <= cutoff, ) .order_by(NordaEvent.event_date) .limit(15) .all() ) context['upcoming_events'] = [ { 'title': (event.title or '')[:80], 'date': event.event_date.strftime('%Y-%m-%d') if event.event_date else '', 'type': event.event_type or 'meeting', 'location': (event.location or '')[:50], 'speaker': (event.speaker_name or '')[:30], } for event in upcoming ] def _load_news(db, context: Dict) -> None: """Load recent approved ZOPK news (last 30 days, max 10).""" cutoff = datetime.now() - timedelta(days=30) news_items = ( db.query(ZOPKNews) .filter( ZOPKNews.status.in_(['approved', 'auto_approved']), ZOPKNews.published_at >= cutoff, ) .order_by(ZOPKNews.published_at.desc()) .limit(10) .all() ) context['recent_news'] = [ { 'title': news.title, 'description': (news.description or '')[:400], 'summary': (news.ai_summary or '')[:300], 'source': news.source_name or '', 'url': news.url or '', 'date': news.published_at.strftime('%Y-%m-%d') if news.published_at else '', 'type': news.news_type or 'news', 'keywords': (news.keywords or [])[:5], } for news in news_items ] def _load_classifieds(db, context: Dict) -> None: """Load active non-test B2B classifieds (max 20).""" classifieds = ( db.query(Classified) .filter( Classified.is_active == True, Classified.is_test == False, ) .order_by(Classified.created_at.desc()) .limit(20) .all() ) context['classifieds'] = [ { 'type': c.listing_type, 'category': c.category, 'title': c.title, 'description': (c.description or '')[:400], 'company': c.company.name if c.company else '', 'author': c.author.name if c.author else '', 'budget': c.budget_info or '', 'location': c.location_info or '', 'date': c.created_at.strftime('%Y-%m-%d') if c.created_at else '', 'views': c.views_count or 0, 'url': f'/classifieds/{c.id}', } for c in classifieds ] def _load_forum(db, context: Dict) -> None: """Load recent forum topics with replies (non-test, max 15).""" topics = ( db.query(ForumTopic) .options( joinedload(ForumTopic.author), joinedload(ForumTopic.replies).joinedload(ForumReply.author), ) .filter(ForumTopic.category != 'test') .order_by(ForumTopic.created_at.desc()) .limit(15) .all() ) forum_data = [] for topic in topics: topic_data = { 'title': topic.title, 'content': (topic.content or '')[:500], 'author': topic.author.name if topic.author else 'Anonim', 'category': topic.category_label if hasattr(topic, 'category_label') else topic.category, 'status': topic.status_label if hasattr(topic, 'status_label') else topic.status, 'date': topic.created_at.strftime('%Y-%m-%d') if topic.created_at else '', 'url': f'/forum/{topic.id}', 'views': topic.views_count or 0, 'pinned': topic.is_pinned, 'replies_count': topic.reply_count if hasattr(topic, 'reply_count') else 0, 'has_attachments': bool(topic.attachments) if topic.attachments else False, } if topic.replies: sorted_replies = sorted(topic.replies, key=lambda r: r.created_at, reverse=True) topic_data['replies'] = [ { 'author': reply.author.name if reply.author else 'Anonim', 'content': (reply.content or '')[:300], 'date': reply.created_at.strftime('%Y-%m-%d') if reply.created_at else '', } for reply in sorted_replies[:5] ] forum_data.append(topic_data) context['forum_topics'] = forum_data def _load_company_people(db, context: Dict) -> None: """Load KRS company-people relationships grouped by company.""" company_people = ( db.query(CompanyPerson) .options( joinedload(CompanyPerson.person), joinedload(CompanyPerson.company), ) .order_by(CompanyPerson.company_id) .all() ) people_by_company: Dict[str, Any] = {} for cp in company_people: company_name = cp.company.name if cp.company else 'Nieznana' company_profile = ( f"https://nordabiznes.pl/company/{cp.company.slug}" if cp.company and cp.company.slug else None ) if company_name not in people_by_company: people_by_company[company_name] = {'profile': company_profile, 'people': []} person_info: Dict[str, Any] = { 'name': cp.person.full_name() if cp.person else '', 'profile': f"https://nordabiznes.pl/osoba/{cp.person.id}" if cp.person else None, 'role': (cp.role or '')[:30], } if cp.shares_percent: person_info['shares'] = f"{cp.shares_percent}%" people_by_company[company_name]['people'].append(person_info) context['company_people'] = people_by_company def _load_registered_users(db, context: Dict) -> None: """Load active portal users with company assignments grouped by company.""" users = ( db.query(User) .filter( User.is_active == True, User.company_id.isnot(None), ) .options(joinedload(User.company)) .all() ) role_labels = { 'MANAGER': 'administrator profilu', 'EMPLOYEE': 'pracownik', 'VIEWER': 'obserwator', } users_by_company: Dict[str, Any] = {} for u in users: company_name = u.company.name if u.company else 'Nieznana' company_profile = ( f"https://nordabiznes.pl/company/{u.company.slug}" if u.company and u.company.slug else None ) if company_name not in users_by_company: users_by_company[company_name] = {'profile': company_profile, 'users': []} users_by_company[company_name]['users'].append({ 'name': u.name, 'email': u.email, 'portal_role': role_labels.get(u.company_role, ''), 'member': u.is_norda_member, 'verified': u.is_verified, }) context['registered_users'] = users_by_company def _load_social_media(db, context: Dict) -> None: """Load valid company social media profiles grouped by company.""" social_items = ( db.query(CompanySocialMedia) .filter(CompanySocialMedia.is_valid == True) .options(joinedload(CompanySocialMedia.company)) .all() ) social_by_company: Dict[str, List] = {} for sm in social_items: company_name = sm.company.name if sm.company else 'Nieznana' if company_name not in social_by_company: social_by_company[company_name] = [] social_by_company[company_name].append({ 'platform': sm.platform, 'url': sm.url or '', 'followers': sm.followers_count or 0, }) context['company_social_media'] = social_by_company def _load_audits(db, context: Dict) -> None: """Load latest GBP audits and SEO PageSpeed scores.""" from sqlalchemy import func # GBP audits — one per company, most recent latest_subq = ( db.query( GBPAudit.company_id, func.max(GBPAudit.audit_date).label('max_date'), ) .group_by(GBPAudit.company_id) .subquery() ) latest_audits = ( db.query(GBPAudit) .join( latest_subq, (GBPAudit.company_id == latest_subq.c.company_id) & (GBPAudit.audit_date == latest_subq.c.max_date), ) .options(joinedload(GBPAudit.company)) .all() ) context['gbp_audits'] = [ { 'company': audit.company.name if audit.company else '', 'score': audit.completeness_score or 0, 'reviews': audit.review_count or 0, 'rating': float(audit.average_rating) if audit.average_rating else 0, 'maps_url': audit.google_maps_url or '', 'profile_url': ( f'https://nordabiznes.pl/company/{audit.company.slug}' if audit.company else '' ), } for audit in latest_audits ] # SEO / PageSpeed audits seo_audits = ( db.query(CompanyWebsiteAnalysis) .filter(CompanyWebsiteAnalysis.pagespeed_seo_score.isnot(None)) .options(joinedload(CompanyWebsiteAnalysis.company)) .all() ) context['seo_audits'] = [ { 'company': audit.company.name if audit.company else '', 'seo': audit.pagespeed_seo_score or 0, 'performance': audit.pagespeed_performance_score or 0, 'accessibility': audit.pagespeed_accessibility_score or 0, 'best_practices': audit.pagespeed_best_practices_score or 0, 'overall': audit.seo_overall_score or 0, 'url': audit.company.website if audit.company else '', 'profile_url': ( f'https://nordabiznes.pl/company/{audit.company.slug}' if audit.company else '' ), } for audit in seo_audits ] # --------------------------------------------------------------------------- # Company compact format (mirrors nordabiz_chat._company_to_compact_dict) # --------------------------------------------------------------------------- def _company_to_compact_dict(c: Company) -> Dict[str, Any]: """ Convert a Company ORM object to a compact token-efficient dict. Format matches nordabiz_chat.py's _company_to_compact_dict() exactly. """ compact: Dict[str, Any] = { 'name': c.name, 'cat': c.category.name if c.category else None, 'profile': f'https://nordabiznes.pl/company/{c.slug}', } # Only include non-empty fields to save tokens if c.description_short: compact['desc'] = c.description_short if c.description_full: compact['about'] = c.description_full if c.founding_history: compact['history'] = c.founding_history if c.core_values: compact['values'] = c.core_values if c.services_offered: compact['offerings'] = c.services_offered if c.technologies_used: compact['tech'] = c.technologies_used if c.services: services = [cs.service.name for cs in c.services if cs.service] if services: compact['svc'] = services if c.competencies: competencies = [cc.competency.name for cc in c.competencies if cc.competency] if competencies: compact['comp'] = competencies if c.website: compact['web'] = c.website if c.phone: compact['tel'] = c.phone if c.email: compact['mail'] = c.email if c.address_city: compact['city'] = c.address_city if c.year_established: compact['year'] = c.year_established if c.certifications: certs = [cert.name for cert in c.certifications if cert.is_active] if certs: compact['cert'] = certs[:3] return compact