""" GBP Audit Service for Norda Biznes Hub ======================================= Google Business Profile completeness audit service with: - Field-by-field completeness checking - Weighted scoring algorithm - AI-powered recommendations (via Gemini) - Historical tracking Inspired by Localo.com audit features. Author: Norda Biznes Development Team Created: 2026-01-08 """ import json import logging from dataclasses import dataclass, field from datetime import datetime from decimal import Decimal from typing import Dict, List, Optional, Any from sqlalchemy.orm import Session from database import Company, GBPAudit, CompanyWebsiteAnalysis, SessionLocal import gemini_service # Configure logging logger = logging.getLogger(__name__) # Field weights for completeness scoring (total = 100) FIELD_WEIGHTS = { 'name': 10, # Business name - essential 'address': 10, # Full address - essential for local SEO 'phone': 8, # Contact phone - important 'website': 8, # Business website - important 'hours': 8, # Opening hours - important for customers 'categories': 10, # Business categories - essential for discovery 'photos': 15, # Photos - high impact on engagement 'description': 12, # Business description - important for SEO 'services': 10, # Services list - important for discovery 'reviews': 9, # Review presence and rating - trust factor } # Photo requirements for optimal GBP profile PHOTO_REQUIREMENTS = { 'minimum': 3, # Minimum photos for basic completeness 'recommended': 10, # Recommended for good profile 'optimal': 25, # Optimal for excellent profile } # Review thresholds REVIEW_THRESHOLDS = { 'minimum': 1, # At least 1 review 'good': 5, # Good number of reviews 'excellent': 20, # Excellent review count } @dataclass class FieldStatus: """Status of a single GBP field""" field_name: str status: str # 'complete', 'partial', 'missing' value: Optional[Any] = None score: float = 0.0 max_score: float = 0.0 recommendation: Optional[str] = None details: Optional[Dict[str, Any]] = None # Additional field-specific details @dataclass class AuditResult: """Complete GBP audit result""" company_id: int completeness_score: int fields: Dict[str, FieldStatus] = field(default_factory=dict) recommendations: List[Dict[str, Any]] = field(default_factory=list) photo_count: int = 0 logo_present: bool = False cover_photo_present: bool = False review_count: int = 0 average_rating: Optional[Decimal] = None google_place_id: Optional[str] = None google_maps_url: Optional[str] = None audit_errors: Optional[str] = None class GBPAuditService: """Service for auditing Google Business Profile completeness""" def __init__(self, db: Session): """ Initialize GBP Audit service. Args: db: SQLAlchemy database session """ self.db = db def audit_company(self, company_id: int) -> AuditResult: """ Run full GBP audit for a company. Args: company_id: ID of the company to audit Returns: AuditResult with completeness score and field details """ company = self.db.query(Company).filter(Company.id == company_id).first() if not company: raise ValueError(f"Company with id {company_id} not found") # Get latest website analysis for Google Business data website_analysis = self.db.query(CompanyWebsiteAnalysis).filter( CompanyWebsiteAnalysis.company_id == company_id ).order_by(CompanyWebsiteAnalysis.analyzed_at.desc()).first() # Audit each field fields = {} total_score = 0.0 recommendations = [] # Name check fields['name'] = self._check_name(company) total_score += fields['name'].score # Address check fields['address'] = self._check_address(company) total_score += fields['address'].score # Phone check fields['phone'] = self._check_phone(company) total_score += fields['phone'].score # Website check fields['website'] = self._check_website(company) total_score += fields['website'].score # Hours check (from website analysis if available) fields['hours'] = self._check_hours(company, website_analysis) total_score += fields['hours'].score # Categories check fields['categories'] = self._check_categories(company) total_score += fields['categories'].score # Photos check (from website analysis) fields['photos'] = self._check_photos(company, website_analysis) total_score += fields['photos'].score # Description check fields['description'] = self._check_description(company) total_score += fields['description'].score # Services check fields['services'] = self._check_services(company) total_score += fields['services'].score # Reviews check (from website analysis) fields['reviews'] = self._check_reviews(company, website_analysis) total_score += fields['reviews'].score # Build recommendations from fields with issues for field_name, field_status in fields.items(): if field_status.recommendation: priority = self._get_priority(field_status) recommendations.append({ 'priority': priority, 'field': field_name, 'recommendation': field_status.recommendation, 'impact': FIELD_WEIGHTS.get(field_name, 0) }) # Sort recommendations by priority and impact priority_order = {'high': 0, 'medium': 1, 'low': 2} recommendations.sort(key=lambda x: (priority_order.get(x['priority'], 3), -x['impact'])) # Extract Google Business data from website analysis google_place_id = None google_maps_url = None review_count = 0 average_rating = None if website_analysis: google_place_id = website_analysis.google_place_id review_count = website_analysis.google_reviews_count or 0 average_rating = website_analysis.google_rating # Create result result = AuditResult( company_id=company_id, completeness_score=round(total_score), fields=fields, recommendations=recommendations, photo_count=fields['photos'].value if isinstance(fields['photos'].value, int) else 0, logo_present=False, # Would need specific logo detection cover_photo_present=False, # Would need specific cover detection review_count=review_count, average_rating=average_rating, google_place_id=google_place_id, google_maps_url=google_maps_url ) return result def save_audit(self, result: AuditResult, source: str = 'manual') -> GBPAudit: """ Save audit result to database. Args: result: AuditResult to save source: Audit source ('manual', 'automated', 'api') Returns: Saved GBPAudit record """ # Convert fields to JSON-serializable format fields_status = {} for name, field_status in result.fields.items(): # Keep dict values as-is for JSON serialization (e.g., opening hours) # Convert other complex types to string if field_status.value is None: serialized_value = None elif isinstance(field_status.value, (dict, list)): serialized_value = field_status.value # Keep as dict/list for JSON elif isinstance(field_status.value, (int, float, bool)): serialized_value = field_status.value # Keep primitives as-is else: serialized_value = str(field_status.value) fields_status[name] = { 'status': field_status.status, 'value': serialized_value, 'score': field_status.score, 'max_score': field_status.max_score } # Create audit record audit = GBPAudit( company_id=result.company_id, audit_date=datetime.now(), completeness_score=result.completeness_score, fields_status=fields_status, recommendations=result.recommendations, has_name=result.fields.get('name', FieldStatus('name', 'missing')).status == 'complete', has_address=result.fields.get('address', FieldStatus('address', 'missing')).status == 'complete', has_phone=result.fields.get('phone', FieldStatus('phone', 'missing')).status == 'complete', has_website=result.fields.get('website', FieldStatus('website', 'missing')).status == 'complete', has_hours=result.fields.get('hours', FieldStatus('hours', 'missing')).status == 'complete', has_categories=result.fields.get('categories', FieldStatus('categories', 'missing')).status == 'complete', has_photos=result.fields.get('photos', FieldStatus('photos', 'missing')).status in ['complete', 'partial'], has_description=result.fields.get('description', FieldStatus('description', 'missing')).status == 'complete', has_services=result.fields.get('services', FieldStatus('services', 'missing')).status == 'complete', has_reviews=result.fields.get('reviews', FieldStatus('reviews', 'missing')).status in ['complete', 'partial'], photo_count=result.photo_count, logo_present=result.logo_present, cover_photo_present=result.cover_photo_present, review_count=result.review_count, average_rating=result.average_rating, google_place_id=result.google_place_id, google_maps_url=result.google_maps_url, audit_source=source, audit_version='1.0', audit_errors=result.audit_errors ) self.db.add(audit) self.db.commit() self.db.refresh(audit) logger.info(f"GBP audit saved for company {result.company_id}: score={result.completeness_score}") return audit def get_latest_audit(self, company_id: int) -> Optional[GBPAudit]: """ Get the most recent audit for a company. Args: company_id: Company ID Returns: Latest GBPAudit or None """ return self.db.query(GBPAudit).filter( GBPAudit.company_id == company_id ).order_by(GBPAudit.audit_date.desc()).first() def get_audit_history(self, company_id: int, limit: int = 10) -> List[GBPAudit]: """ Get audit history for a company. Args: company_id: Company ID limit: Maximum number of audits to return Returns: List of GBPAudit records ordered by date descending """ return self.db.query(GBPAudit).filter( GBPAudit.company_id == company_id ).order_by(GBPAudit.audit_date.desc()).limit(limit).all() # === Field Check Methods === def _check_name(self, company: Company) -> FieldStatus: """Check business name completeness""" max_score = FIELD_WEIGHTS['name'] if company.name and len(company.name.strip()) >= 3: return FieldStatus( field_name='name', status='complete', value=company.name, score=max_score, max_score=max_score ) return FieldStatus( field_name='name', status='missing', score=0, max_score=max_score, recommendation='Dodaj nazwę firmy do wizytówki Google. Nazwa powinna być oficjalną nazwą firmy.' ) def _check_address(self, company: Company) -> FieldStatus: """Check address completeness""" max_score = FIELD_WEIGHTS['address'] # Check all address components has_street = bool(company.address_street) has_city = bool(company.address_city) has_postal = bool(company.address_postal) if has_street and has_city and has_postal: return FieldStatus( field_name='address', status='complete', value=company.address_full or f"{company.address_street}, {company.address_postal} {company.address_city}", score=max_score, max_score=max_score ) if has_city or has_street: partial_score = max_score * 0.5 return FieldStatus( field_name='address', status='partial', value=company.address_city or company.address_street, score=partial_score, max_score=max_score, recommendation='Uzupełnij pełny adres firmy (ulica, kod pocztowy, miasto) dla lepszej widoczności w mapach.' ) return FieldStatus( field_name='address', status='missing', score=0, max_score=max_score, recommendation='Dodaj adres firmy do wizytówki Google. Pełny adres jest kluczowy dla lokalnego SEO.' ) def _check_phone(self, company: Company) -> FieldStatus: """Check phone number presence""" max_score = FIELD_WEIGHTS['phone'] if company.phone and len(company.phone.strip()) >= 9: return FieldStatus( field_name='phone', status='complete', value=company.phone, score=max_score, max_score=max_score ) # Check contacts relationship for additional phones if hasattr(company, 'contacts') and company.contacts: phones = [c for c in company.contacts if c.contact_type == 'phone'] if phones: return FieldStatus( field_name='phone', status='complete', value=phones[0].value, score=max_score, max_score=max_score ) return FieldStatus( field_name='phone', status='missing', score=0, max_score=max_score, recommendation='Dodaj numer telefonu do wizytówki. Klienci oczekują możliwości bezpośredniego kontaktu.' ) def _check_website(self, company: Company) -> FieldStatus: """Check website presence""" max_score = FIELD_WEIGHTS['website'] if company.website and company.website.strip().startswith(('http://', 'https://')): return FieldStatus( field_name='website', status='complete', value=company.website, score=max_score, max_score=max_score ) if company.website: # Has website but might not be properly formatted return FieldStatus( field_name='website', status='partial', value=company.website, score=max_score * 0.7, max_score=max_score, recommendation='Upewnij się, że adres strony internetowej zawiera protokół (https://).' ) return FieldStatus( field_name='website', status='missing', score=0, max_score=max_score, recommendation='Dodaj stronę internetową firmy. Link do strony zwiększa wiarygodność i ruch.' ) def _check_hours(self, company: Company, analysis: Optional[CompanyWebsiteAnalysis]) -> FieldStatus: """Check opening hours presence""" max_score = FIELD_WEIGHTS['hours'] # Check if we have opening hours from Google Business Profile if analysis and analysis.google_opening_hours: return FieldStatus( field_name='hours', status='complete', value=analysis.google_opening_hours, score=max_score, max_score=max_score ) return FieldStatus( field_name='hours', status='missing', score=0, max_score=max_score, recommendation='Dodaj godziny otwarcia firmy. Klienci chcą wiedzieć, kiedy mogą Cię odwiedzić.' ) def _check_categories(self, company: Company) -> FieldStatus: """Check business category completeness""" max_score = FIELD_WEIGHTS['categories'] # Check if company has a category assigned if company.category_id and company.category: return FieldStatus( field_name='categories', status='complete', value=company.category.name if company.category else None, score=max_score, max_score=max_score ) return FieldStatus( field_name='categories', status='missing', score=0, max_score=max_score, recommendation='Wybierz główną kategorię działalności. Kategoria pomaga klientom znaleźć Twoją firmę.' ) def _check_photos(self, company: Company, analysis: Optional[CompanyWebsiteAnalysis]) -> FieldStatus: """Check photo completeness""" max_score = FIELD_WEIGHTS['photos'] # Get Google Business Profile photo count from website analysis photo_count = 0 if analysis and analysis.google_photos_count: photo_count = analysis.google_photos_count if photo_count >= PHOTO_REQUIREMENTS['recommended']: return FieldStatus( field_name='photos', status='complete', value=photo_count, score=max_score, max_score=max_score ) if photo_count >= PHOTO_REQUIREMENTS['minimum']: partial_score = max_score * (photo_count / PHOTO_REQUIREMENTS['recommended']) return FieldStatus( field_name='photos', status='partial', value=photo_count, score=min(partial_score, max_score * 0.7), max_score=max_score, recommendation=f'Dodaj więcej zdjęć firmy. Zalecane minimum to {PHOTO_REQUIREMENTS["recommended"]} zdjęć.' ) return FieldStatus( field_name='photos', status='missing', value=photo_count, score=0, max_score=max_score, recommendation='Dodaj zdjęcia firmy (logo, wnętrze, zespół, produkty). Wizytówki ze zdjęciami mają 42% więcej zapytań o wskazówki dojazdu.' ) def _check_description(self, company: Company) -> FieldStatus: """Check business description completeness""" max_score = FIELD_WEIGHTS['description'] # Check short and full descriptions desc = company.description_full or company.description_short if desc and len(desc.strip()) >= 100: return FieldStatus( field_name='description', status='complete', value=desc[:100] + '...' if len(desc) > 100 else desc, score=max_score, max_score=max_score ) if desc and len(desc.strip()) >= 30: return FieldStatus( field_name='description', status='partial', value=desc, score=max_score * 0.5, max_score=max_score, recommendation='Rozbuduj opis firmy. Dobry opis powinien mieć minimum 100-200 znaków i zawierać słowa kluczowe.' ) return FieldStatus( field_name='description', status='missing', score=0, max_score=max_score, recommendation='Dodaj szczegółowy opis firmy. Opisz czym się zajmujesz, jakie usługi oferujesz i co Cię wyróżnia.' ) def _check_services(self, company: Company) -> FieldStatus: """Check services list completeness""" max_score = FIELD_WEIGHTS['services'] # Check company services relationship service_count = 0 if hasattr(company, 'services') and company.services: service_count = len(company.services) # Also check services_offered text field has_services_text = bool(company.services_offered and len(company.services_offered.strip()) > 10) if service_count >= 3 or has_services_text: return FieldStatus( field_name='services', status='complete', value=service_count if service_count else 'W opisie', score=max_score, max_score=max_score ) if service_count >= 1: return FieldStatus( field_name='services', status='partial', value=service_count, score=max_score * 0.5, max_score=max_score, recommendation='Dodaj więcej usług do wizytówki. Zalecane jest minimum 3-5 głównych usług.' ) return FieldStatus( field_name='services', status='missing', score=0, max_score=max_score, recommendation='Dodaj listę usług lub produktów. Pomaga to klientom zrozumieć Twoją ofertę.' ) def _check_reviews(self, company: Company, analysis: Optional[CompanyWebsiteAnalysis]) -> FieldStatus: """ Check reviews presence and quality. Scoring logic (max 9 points): ============================= COMPLETE (9/9 pts): - 5+ opinii AND ocena >= 4.0 PARTIAL (3-8/9 pts): - Bazowo: 3 pkt za pierwszą opinię - +1 pkt za każdą kolejną opinię (do 4 opinii = 6 pkt max) - +1 pkt bonus jeśli ocena >= 4.0 (do 7 pkt max) Przykłady: - 1 opinia, ocena 3.5 → 3/9 pkt - 1 opinia, ocena 5.0 → 4/9 pkt (3 + 1 bonus) - 3 opinie, ocena 4.2 → 6/9 pkt (3 + 2 + 1 bonus) - 4 opinie, ocena 4.5 → 7/9 pkt (3 + 3 + 1 bonus) MISSING (0/9 pts): - Brak opinii """ max_score = FIELD_WEIGHTS['reviews'] review_count = 0 rating = None if analysis: review_count = analysis.google_reviews_count or 0 rating = analysis.google_rating # COMPLETE: 5+ reviews with good rating (4.0+) → full 9/9 points if review_count >= REVIEW_THRESHOLDS['good'] and rating and float(rating) >= 4.0: return FieldStatus( field_name='reviews', status='complete', value=f'{review_count} opinii, ocena {rating}', score=max_score, max_score=max_score, details={ 'review_count': review_count, 'rating': float(rating), 'base_score': 3, 'additional_score': min(review_count - 1, 4), 'rating_bonus': 1, 'breakdown': f'3 pkt (1. opinia) + {min(review_count - 1, 4)} pkt (kolejne) + 1 pkt (ocena) = {max_score}/{max_score}' } ) # PARTIAL: 1-4 reviews → proportional scoring if review_count >= REVIEW_THRESHOLDS['minimum']: # Base: 3 pts for first review + 1 pt per additional review (max 6 pts for 4 reviews) base_score = 3 additional_score = min(review_count - 1, 4) partial_score = base_score + additional_score # Bonus: +1 pt for good rating (>= 4.0) rating_bonus = 0 if rating and float(rating) >= 4.0: rating_bonus = 1 partial_score = min(partial_score + rating_bonus, max_score - 2) # max 7 pts # Build breakdown string breakdown_parts = [f'3 pkt (1. opinia)'] if additional_score > 0: breakdown_parts.append(f'{additional_score} pkt (kolejne)') if rating_bonus > 0: breakdown_parts.append(f'1 pkt (ocena)') breakdown = ' + '.join(breakdown_parts) + f' = {partial_score}/{max_score}' return FieldStatus( field_name='reviews', status='partial', value=f'{review_count} opinii' + (f', ocena {rating}' if rating else ''), score=partial_score, max_score=max_score, recommendation='Zachęcaj klientów do zostawiania opinii. Więcej pozytywnych recenzji zwiększa zaufanie.', details={ 'review_count': review_count, 'rating': float(rating) if rating else None, 'base_score': base_score, 'additional_score': additional_score, 'rating_bonus': rating_bonus, 'breakdown': breakdown } ) # MISSING: no reviews → 0/9 points return FieldStatus( field_name='reviews', status='missing', value=review_count, score=0, max_score=max_score, recommendation='Zbieraj opinie od klientów. Wizytówki z opiniami są bardziej wiarygodne i lepiej widoczne.', details={ 'review_count': 0, 'rating': None, 'breakdown': '0 pkt (brak opinii)' } ) def _get_priority(self, field_status: FieldStatus) -> str: """Determine recommendation priority based on field importance and status""" weight = FIELD_WEIGHTS.get(field_status.field_name, 0) if field_status.status == 'missing': if weight >= 10: return 'high' elif weight >= 8: return 'medium' else: return 'low' elif field_status.status == 'partial': if weight >= 10: return 'medium' else: return 'low' return 'low' # === AI-Powered Recommendations === def generate_ai_recommendations( self, company: Company, result: AuditResult, user_id: Optional[int] = None ) -> List[Dict[str, Any]]: """ Generate AI-powered recommendations using Gemini. Args: company: Company being audited result: AuditResult from the audit user_id: Optional user ID for cost tracking Returns: List of AI-generated recommendation dicts with keys: - priority: 'high', 'medium', 'low' - field: field name this applies to - recommendation: AI-generated recommendation text - action_steps: list of specific action steps - expected_impact: description of expected improvement """ service = gemini_service.get_gemini_service() if not service: logger.warning("Gemini service not available - using static recommendations") return result.recommendations try: # Build context for AI prompt = self._build_ai_recommendation_prompt(company, result) # Call Gemini with cost tracking response_text = service.generate_text( prompt=prompt, feature='gbp_audit_ai', user_id=user_id, temperature=0.7, max_tokens=2000 ) # Parse AI response ai_recommendations = self._parse_ai_recommendations(response_text, result) logger.info( f"AI recommendations generated for company {company.id}: " f"{len(ai_recommendations)} recommendations" ) return ai_recommendations except Exception as e: logger.error(f"AI recommendation generation failed: {e}") # Fall back to static recommendations return result.recommendations def _build_ai_recommendation_prompt( self, company: Company, result: AuditResult ) -> str: """ Build prompt for Gemini to generate personalized recommendations. Args: company: Company being audited result: AuditResult with field statuses Returns: Formatted prompt string """ # Build field status summary field_summary = [] for field_name, field_status in result.fields.items(): status_emoji = { 'complete': '✅', 'partial': '⚠️', 'missing': '❌' }.get(field_status.status, '❓') field_summary.append( f"- {field_name}: {status_emoji} {field_status.status} " f"({field_status.score:.1f}/{field_status.max_score:.1f} pkt)" ) # Get category info category_name = company.category.name if company.category else 'Nieznana' prompt = f"""Jesteś ekspertem od Google Business Profile (Wizytówki Google) i lokalnego SEO. FIRMA: {company.name} BRANŻA: {category_name} MIASTO: {company.address_city or 'Nieznane'} WYNIK AUDYTU: {result.completeness_score}/100 STATUS PÓL WIZYTÓWKI: {chr(10).join(field_summary)} LICZBA ZDJĘĆ: {result.photo_count} LICZBA OPINII: {result.review_count} OCENA: {result.average_rating or 'Brak'} ZADANIE: Wygeneruj 3-5 spersonalizowanych rekomendacji dla tej firmy, aby poprawić jej wizytówkę Google. WYMAGANIA: 1. Każda rekomendacja powinna być konkretna i dostosowana do branży firmy 2. Skup się na polach z najniższymi wynikami 3. Podaj praktyczne kroki do wykonania 4. Używaj języka polskiego ZWRÓĆ ODPOWIEDŹ W FORMACIE JSON (TYLKO JSON, BEZ MARKDOWN): [ {{ "priority": "high|medium|low", "field": "nazwa_pola", "recommendation": "Krótki opis co poprawić", "action_steps": ["Krok 1", "Krok 2", "Krok 3"], "expected_impact": "Opis spodziewanej poprawy" }} ] Priorytety: - high: kluczowe pola (name, address, categories, description) - medium: ważne pola (phone, website, photos, services) - low: dodatkowe pola (hours, reviews) Odpowiedź (TYLKO JSON):""" return prompt def _parse_ai_recommendations( self, response_text: str, fallback_result: AuditResult ) -> List[Dict[str, Any]]: """ Parse AI response into structured recommendations. Args: response_text: Raw text from Gemini fallback_result: AuditResult to use for fallback Returns: List of recommendation dicts """ try: # Clean up response - remove markdown code blocks if present cleaned = response_text.strip() if cleaned.startswith('```'): # Remove markdown code block markers lines = cleaned.split('\n') # Find JSON content between ``` markers json_lines = [] in_json = False for line in lines: if line.startswith('```') and not in_json: in_json = True continue elif line.startswith('```') and in_json: break elif in_json: json_lines.append(line) cleaned = '\n'.join(json_lines) # Parse JSON recommendations = json.loads(cleaned) # Validate and enhance recommendations valid_recommendations = [] valid_priorities = {'high', 'medium', 'low'} valid_fields = set(FIELD_WEIGHTS.keys()) for rec in recommendations: if not isinstance(rec, dict): continue # Validate priority priority = rec.get('priority', 'medium') if priority not in valid_priorities: priority = 'medium' # Validate field field = rec.get('field', 'general') if field not in valid_fields: field = 'general' # Get impact score from field weights impact = FIELD_WEIGHTS.get(field, 5) valid_recommendations.append({ 'priority': priority, 'field': field, 'recommendation': rec.get('recommendation', ''), 'action_steps': rec.get('action_steps', []), 'expected_impact': rec.get('expected_impact', ''), 'impact': impact, 'source': 'ai' }) if valid_recommendations: # Sort by priority and impact priority_order = {'high': 0, 'medium': 1, 'low': 2} valid_recommendations.sort( key=lambda x: (priority_order.get(x['priority'], 3), -x['impact']) ) return valid_recommendations except json.JSONDecodeError as e: logger.warning(f"Failed to parse AI recommendations JSON: {e}") except Exception as e: logger.warning(f"Error processing AI recommendations: {e}") # Return fallback recommendations with source marker fallback = [] for rec in fallback_result.recommendations: rec_copy = dict(rec) rec_copy['source'] = 'static' rec_copy['action_steps'] = [] rec_copy['expected_impact'] = '' fallback.append(rec_copy) return fallback def audit_with_ai( self, company_id: int, user_id: Optional[int] = None ) -> AuditResult: """ Run full GBP audit with AI-powered recommendations. Args: company_id: ID of the company to audit user_id: Optional user ID for cost tracking Returns: AuditResult with AI-enhanced recommendations """ # Run standard audit result = self.audit_company(company_id) # Get company for AI context company = self.db.query(Company).filter(Company.id == company_id).first() if not company: return result # Generate AI recommendations ai_recommendations = self.generate_ai_recommendations( company=company, result=result, user_id=user_id ) # Replace static recommendations with AI-generated ones result.recommendations = ai_recommendations return result # === Convenience Functions === def audit_company(db: Session, company_id: int, save: bool = True) -> AuditResult: """ Audit a company's GBP completeness. Args: db: Database session company_id: Company ID to audit save: Whether to save audit to database Returns: AuditResult with completeness score and recommendations """ service = GBPAuditService(db) result = service.audit_company(company_id) if save: service.save_audit(result) return result def get_company_audit(db: Session, company_id: int) -> Optional[GBPAudit]: """ Get the latest audit for a company. Args: db: Database session company_id: Company ID Returns: Latest GBPAudit or None """ service = GBPAuditService(db) return service.get_latest_audit(company_id) def audit_company_with_ai( db: Session, company_id: int, save: bool = True, user_id: Optional[int] = None ) -> AuditResult: """ Audit a company's GBP completeness with AI-powered recommendations. Args: db: Database session company_id: Company ID to audit save: Whether to save audit to database user_id: Optional user ID for cost tracking Returns: AuditResult with AI-enhanced recommendations """ service = GBPAuditService(db) result = service.audit_with_ai(company_id, user_id=user_id) if save: service.save_audit(result, source='ai') return result def batch_audit_companies( db: Session, company_ids: Optional[List[int]] = None, save: bool = True ) -> Dict[int, AuditResult]: """ Audit multiple companies. Args: db: Database session company_ids: List of company IDs (None = all active companies) save: Whether to save audits to database Returns: Dict mapping company_id to AuditResult """ service = GBPAuditService(db) # Get companies to audit if company_ids is None: companies = db.query(Company).filter(Company.status == 'active').all() company_ids = [c.id for c in companies] results = {} for company_id in company_ids: try: result = service.audit_company(company_id) if save: service.save_audit(result, source='automated') results[company_id] = result except Exception as e: logger.error(f"Failed to audit company {company_id}: {e}") return results # === Google Places API Integration === def fetch_google_business_data( db: Session, company_id: int, force_refresh: bool = False ) -> Dict[str, Any]: """ Fetch fresh Google Business Profile data from Google Places API. This function searches for the company on Google Places, retrieves detailed business information, and updates the CompanyWebsiteAnalysis record. Args: db: Database session company_id: Company ID to fetch data for force_refresh: If True, fetch even if recent data exists Returns: Dict with: - success: bool - steps: List of step results with status - data: Fetched Google data (if successful) - error: Error message (if failed) """ import os import requests from datetime import datetime, timedelta result = { 'success': False, 'steps': [], 'data': {}, 'error': None } # Get company company = db.query(Company).filter(Company.id == company_id).first() if not company: result['error'] = f'Firma o ID {company_id} nie znaleziona' return result # Check if we have recent data (less than 24 hours old) if not force_refresh: existing = db.query(CompanyWebsiteAnalysis).filter( CompanyWebsiteAnalysis.company_id == company_id ).order_by(CompanyWebsiteAnalysis.analyzed_at.desc()).first() if existing and existing.analyzed_at: age = datetime.now() - existing.analyzed_at if age < timedelta(hours=24) and existing.google_place_id: result['success'] = True result['steps'].append({ 'step': 'cache_check', 'status': 'skipped', 'message': f'Dane pobrano {age.seconds // 3600}h temu - używam cache' }) result['data'] = { 'google_place_id': existing.google_place_id, 'google_rating': float(existing.google_rating) if existing.google_rating else None, 'google_reviews_count': existing.google_reviews_count, 'google_photos_count': existing.google_photos_count, 'google_opening_hours': existing.google_opening_hours, 'cached': True } return result # Get API key api_key = os.getenv('GOOGLE_PLACES_API_KEY') if not api_key: result['error'] = 'Brak klucza API Google Places (GOOGLE_PLACES_API_KEY)' result['steps'].append({ 'step': 'api_key_check', 'status': 'error', 'message': result['error'] }) return result result['steps'].append({ 'step': 'api_key_check', 'status': 'complete', 'message': 'Klucz API skonfigurowany' }) # Step 1: Search for place result['steps'].append({ 'step': 'find_place', 'status': 'in_progress', 'message': f'Szukam firmy "{company.name}" w Google Maps...' }) city = company.address_city or 'Wejherowo' search_query = f'{company.name} {city}' try: find_response = requests.get( 'https://maps.googleapis.com/maps/api/place/findplacefromtext/json', params={ 'input': search_query, 'inputtype': 'textquery', 'fields': 'place_id,name,formatted_address', 'language': 'pl', 'key': api_key, }, timeout=15 ) find_response.raise_for_status() find_data = find_response.json() if find_data.get('status') != 'OK' or not find_data.get('candidates'): result['steps'][-1]['status'] = 'warning' result['steps'][-1]['message'] = f'Nie znaleziono firmy w Google Maps' result['error'] = 'Firma nie ma profilu Google Business lub nazwa jest inna niż w Google' return result candidate = find_data['candidates'][0] place_id = candidate.get('place_id') google_name = candidate.get('name') google_address = candidate.get('formatted_address') result['steps'][-1]['status'] = 'complete' result['steps'][-1]['message'] = f'Znaleziono: {google_name}' result['data']['google_place_id'] = place_id result['data']['google_name'] = google_name result['data']['google_address'] = google_address except requests.exceptions.Timeout: result['steps'][-1]['status'] = 'error' result['steps'][-1]['message'] = 'Timeout - Google API nie odpowiada' result['error'] = 'Timeout podczas wyszukiwania w Google Places API' return result except Exception as e: result['steps'][-1]['status'] = 'error' result['steps'][-1]['message'] = f'Błąd: {str(e)}' result['error'] = str(e) return result # Step 2: Get place details result['steps'].append({ 'step': 'get_details', 'status': 'in_progress', 'message': 'Pobieram szczegóły wizytówki...' }) try: fields = [ 'rating', 'user_ratings_total', 'opening_hours', 'business_status', 'formatted_phone_number', 'website', 'photos', ] details_response = requests.get( 'https://maps.googleapis.com/maps/api/place/details/json', params={ 'place_id': place_id, 'fields': ','.join(fields), 'language': 'pl', 'key': api_key, }, timeout=15 ) details_response.raise_for_status() details_data = details_response.json() if details_data.get('status') != 'OK': result['steps'][-1]['status'] = 'warning' result['steps'][-1]['message'] = f'Nie udało się pobrać szczegółów' result['error'] = f'Google Places API: {details_data.get("status")}' return result place = details_data.get('result', {}) # Extract data rating = place.get('rating') reviews_count = place.get('user_ratings_total') photos = place.get('photos', []) photos_count = len(photos) if photos else 0 opening_hours = place.get('opening_hours', {}) business_status = place.get('business_status') phone = place.get('formatted_phone_number') website = place.get('website') result['data']['google_rating'] = rating result['data']['google_reviews_count'] = reviews_count result['data']['google_photos_count'] = photos_count result['data']['google_opening_hours'] = opening_hours result['data']['google_business_status'] = business_status result['data']['google_phone'] = phone result['data']['google_website'] = website result['steps'][-1]['status'] = 'complete' details_msg = [] if rating: details_msg.append(f'Ocena: {rating}') if reviews_count: details_msg.append(f'{reviews_count} opinii') if photos_count: details_msg.append(f'{photos_count} zdjęć') result['steps'][-1]['message'] = ', '.join(details_msg) if details_msg else 'Pobrano dane' except requests.exceptions.Timeout: result['steps'][-1]['status'] = 'error' result['steps'][-1]['message'] = 'Timeout podczas pobierania szczegółów' result['error'] = 'Timeout podczas pobierania szczegółów z Google Places API' return result except Exception as e: result['steps'][-1]['status'] = 'error' result['steps'][-1]['message'] = f'Błąd: {str(e)}' result['error'] = str(e) return result # Step 3: Save to database result['steps'].append({ 'step': 'save_data', 'status': 'in_progress', 'message': 'Zapisuję dane w bazie...' }) try: # Get or create CompanyWebsiteAnalysis record analysis = db.query(CompanyWebsiteAnalysis).filter( CompanyWebsiteAnalysis.company_id == company_id ).first() if not analysis: analysis = CompanyWebsiteAnalysis( company_id=company_id, url=company.website, analyzed_at=datetime.now() ) db.add(analysis) # Update Google fields analysis.google_place_id = place_id analysis.google_rating = rating analysis.google_reviews_count = reviews_count analysis.google_photos_count = photos_count analysis.google_opening_hours = opening_hours if opening_hours else None analysis.google_business_status = business_status analysis.analyzed_at = datetime.now() db.commit() result['steps'][-1]['status'] = 'complete' result['steps'][-1]['message'] = 'Dane zapisane pomyślnie' result['success'] = True except Exception as e: db.rollback() result['steps'][-1]['status'] = 'error' result['steps'][-1]['message'] = f'Błąd zapisu: {str(e)}' result['error'] = f'Błąd zapisu do bazy danych: {str(e)}' return result logger.info( f"Google data fetched for company {company_id}: " f"rating={rating}, reviews={reviews_count}, photos={photos_count}" ) return result # === Main for Testing === if __name__ == '__main__': import sys # Test the service logging.basicConfig(level=logging.INFO) # Check for --ai flag to test AI recommendations use_ai = '--ai' in sys.argv db = SessionLocal() try: # Get first active company company = db.query(Company).filter(Company.status == 'active').first() if company: print(f"\nAuditing company: {company.name} (ID: {company.id})") print("-" * 50) if use_ai: print("\n[AI MODE] Generating AI-powered recommendations...") result = audit_company_with_ai(db, company.id, save=False) else: result = audit_company(db, company.id, save=False) print(f"\nCompleteness Score: {result.completeness_score}/100") print(f"\nField Status:") for name, field in result.fields.items(): status_icon = {'complete': '✅', 'partial': '⚠️', 'missing': '❌'}.get(field.status, '?') print(f" {status_icon} {name}: {field.status} ({field.score:.1f}/{field.max_score:.1f})") print(f"\nRecommendations ({len(result.recommendations)}):") for rec in result.recommendations[:5]: source = rec.get('source', 'static') source_label = '[AI]' if source == 'ai' else '[STATIC]' print(f"\n {source_label} [{rec['priority'].upper()}] {rec['field']}:") print(f" {rec['recommendation']}") # Print AI-specific fields if present if rec.get('action_steps'): print(" Action steps:") for step in rec['action_steps']: print(f" • {step}") if rec.get('expected_impact'): print(f" Expected impact: {rec['expected_impact']}") else: print("No active companies found") print("\n" + "-" * 50) print("Usage: python gbp_audit_service.py [--ai]") print(" --ai Generate AI-powered recommendations using Gemini") finally: db.close()