#!/usr/bin/env python3 """ Social Media & Website Audit Script for Norda Biznes ===================================================== Performs comprehensive audit of company websites and social media presence. Designed to run with multiple parallel workers. Features: - Website analysis (SSL, hosting, author, responsiveness) - Social media discovery (FB, IG, TikTok, YouTube, LinkedIn) - Google Reviews scraping via Brave Search - Parallel execution support Usage: python social_media_audit.py --company-id 26 python social_media_audit.py --batch 1-10 python social_media_audit.py --all Author: Maciej Pienczyn, InPi sp. z o.o. Date: 2025-12-29 """ import html as html_module import os import sys import json import re import ssl import socket import argparse import logging from datetime import datetime, timedelta from typing import Optional, Dict, List, Tuple, Any from urllib.parse import urlparse import time from pathlib import Path # Load .env file from project root try: from dotenv import load_dotenv # Find .env file relative to this script script_dir = Path(__file__).resolve().parent project_root = script_dir.parent env_path = project_root / '.env' if env_path.exists(): load_dotenv(env_path) logging.info(f"Loaded .env from {env_path}") except ImportError: pass # python-dotenv not installed, rely on system environment import requests from bs4 import BeautifulSoup import whois from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) try: from database import normalize_social_url except ImportError: # Fallback: define locally if import fails def normalize_social_url(url: str, platform: str = None) -> str: """Normalize social media URLs to prevent duplicates.""" if not url: return url url = url.strip() if url.startswith('http://'): url = 'https://' + url[7:] elif not url.startswith('https://'): url = 'https://' + url url = url.replace('https://www.', 'https://') url = url.rstrip('/') return url # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Database configuration # WARNING: The fallback DATABASE_URL uses a placeholder password. # Production credentials MUST be set via the DATABASE_URL environment variable. # NEVER commit real credentials to version control (CWE-798). DATABASE_URL = os.getenv( 'DATABASE_URL', 'postgresql://nordabiz_app:CHANGE_ME@127.0.0.1:5432/nordabiz' ) # Request configuration REQUEST_TIMEOUT = 15 USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' # Known Polish hosting providers (IP ranges and identifiers) HOSTING_PROVIDERS = { 'nazwa.pl': ['nazwa.pl', '185.252.', '91.227.'], 'home.pl': ['home.pl', '212.85.', '195.26.'], 'OVH': ['ovh.', '51.38.', '51.68.', '51.75.', '51.77.', '51.83.', '51.89.', '51.91.', '54.36.', '54.37.', '54.38.', '135.125.', '141.94.', '141.95.', '142.4.', '144.217.', '145.239.', '147.135.', '149.202.', '151.80.', '158.69.', '164.132.', '167.114.', '176.31.', '178.32.', '185.15.', '188.165.', '192.95.', '193.70.', '194.182.', '195.154.', '198.27.', '198.50.', '198.100.', '213.186.', '213.251.', '217.182.'], 'cyber_Folks': ['cyberfolks', 'cf.', '77.55.'], 'Zenbox': ['zenbox', '195.181.'], 'Linuxpl': ['linuxpl', '91.200.'], 'Hekko': ['hekko', 'hekko.pl'], 'Smarthost': ['smarthost'], 'AZ.pl': ['az.pl', 'aznetwork'], 'Aftermarket': ['aftermarket', 'aftermarket.pl'], 'Cloudflare': ['cloudflare', '104.16.', '104.17.', '104.18.', '104.19.', '104.20.', '104.21.', '104.22.', '104.23.', '104.24.', '172.67.'], 'Google Cloud': ['google', '34.', '35.'], 'AWS': ['amazon', 'aws', '52.', '54.'], 'Vercel': ['vercel', '76.76.21.'], 'Netlify': ['netlify'], } # Social media patterns SOCIAL_MEDIA_PATTERNS = { 'facebook': [ r'(?:https?://)?(?:www\.)?facebook\.com/profile\.php\?id=(\d+)', # Multi-segment paths like /p/PageName-12345/ - capture full path r'(?:https?://)?(?:www\.)?facebook\.com/(p/[^/?\s"\'<>]+)', r'(?:https?://)?(?:www\.)?facebook\.com/([^/?\s"\'<>]+)', r'(?:https?://)?(?:www\.)?fb\.com/([^/?\s"\'<>]+)', ], 'instagram': [ r'(?:https?://)?(?:www\.)?instagram\.com/([^/?\s"\'<>]+)', ], 'youtube': [ r'(?:https?://)?(?:www\.)?youtube\.com/(?:channel|c|user|@)/([^/?\s"\'<>]+)', r'(?:https?://)?(?:www\.)?youtube\.com/([^/?\s"\'<>]+)', ], 'linkedin': [ r'(?:https?://)?(?:www\.|pl\.)?linkedin\.com/(company/[^/?\s"\'<>]+)', r'(?:https?://)?(?:www\.|pl\.)?linkedin\.com/(in/[^/?\s"\'<>]+)', ], 'tiktok': [ r'(?:https?://)?(?:www\.)?tiktok\.com/@([^/?\s"\'<>]+)', ], 'twitter': [ r'(?:https?://)?(?:www\.)?(?:twitter|x)\.com/([^/?\s"\'<>]+)', ], } # False positives to exclude SOCIAL_MEDIA_EXCLUDE = { 'facebook': ['sharer', 'share', 'intent', 'plugins', 'dialog', 'sharer.php', 'login', 'pages', 'boldthemes', 'profile.php', 'profile', 'watch', 'groups', 'events', 'marketplace', 'gaming', 'stories', 'p', 'people', 'hashtag', 'help', 'settings', 'notifications', 'tr', 'privacy', 'policies', 'ads', 'business', 'legal', 'flx'], 'instagram': ['explore', 'accounts', 'p', 'reel'], 'youtube': ['embed', 'watch', 'playlist', 'results', 'feed', 'channel', 'c', 'user', '@', 'about', 'featured', 'videos', 'shorts', 'streams', 'playlists', 'community', 'channels', 'store'], 'linkedin': ['company/shareArticle', 'company/share', 'company/login', 'in/shareArticle', 'in/share', 'in/login'], 'tiktok': ['embed', 'video'], 'twitter': ['intent', 'share', 'widgets.js', 'widgets', 'tweet', 'platform.twitter.com', 'bold_themes', 'boldthemes'], } class WebsiteAuditor: """Audits website technical details and metadata.""" def __init__(self): self.session = requests.Session() self.session.headers.update({'User-Agent': USER_AGENT}) def audit_website(self, url: str) -> Dict[str, Any]: """ Perform comprehensive website audit. Returns dict with: - http_status, load_time_ms - has_ssl, ssl_valid, ssl_expiry - hosting_provider, hosting_ip, server_software - site_author, site_generator - is_mobile_friendly, has_viewport_meta - last_modified_at - social_media_links (dict of platform -> url) """ result = { 'url': url, 'http_status': None, 'load_time_ms': None, 'has_ssl': False, 'ssl_valid': False, 'ssl_expiry': None, 'ssl_issuer': None, 'hosting_provider': None, 'hosting_ip': None, 'server_software': None, 'site_author': None, 'site_generator': None, 'is_mobile_friendly': False, 'has_viewport_meta': False, 'last_modified_at': None, 'social_media_links': {}, 'errors': [], } if not url: result['errors'].append('No URL provided') return result # Normalize URL if not url.startswith(('http://', 'https://')): url = 'https://' + url parsed = urlparse(url) domain = parsed.netloc # 1. Check SSL certificate try: result.update(self._check_ssl(domain)) except Exception as e: result['errors'].append(f'SSL check failed: {str(e)}') # 2. Resolve IP and detect hosting try: result.update(self._detect_hosting(domain)) except Exception as e: result['errors'].append(f'Hosting detection failed: {str(e)}') # 3. Fetch page and analyze try: start_time = time.time() response = self.session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True) result['load_time_ms'] = int((time.time() - start_time) * 1000) result['http_status'] = response.status_code result['has_ssl'] = response.url.startswith('https://') # Server header result['server_software'] = response.headers.get('Server', '')[:100] # Last-Modified header last_mod = response.headers.get('Last-Modified') if last_mod: try: result['last_modified_at'] = datetime.strptime( last_mod, '%a, %d %b %Y %H:%M:%S %Z' ) except: pass # Parse HTML if response.status_code == 200: result.update(self._parse_html(response.text)) except requests.exceptions.SSLError as e: result['errors'].append(f'SSL Error: {str(e)}') result['ssl_valid'] = False # Try HTTP fallback try: http_url = url.replace('https://', 'http://') response = self.session.get(http_url, timeout=REQUEST_TIMEOUT) result['http_status'] = response.status_code result['has_ssl'] = False if response.status_code == 200: result.update(self._parse_html(response.text)) except Exception as e2: result['errors'].append(f'HTTP fallback failed: {str(e2)}') except requests.exceptions.RequestException as e: result['errors'].append(f'Request failed: {str(e)}') return result def _check_ssl(self, domain: str) -> Dict[str, Any]: """Check SSL certificate validity, expiry and issuer.""" result = {'ssl_valid': False, 'ssl_expiry': None, 'ssl_issuer': None} try: context = ssl.create_default_context() with socket.create_connection((domain, 443), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert() result['ssl_valid'] = True # Parse expiry date not_after = cert.get('notAfter') if not_after: result['ssl_expiry'] = datetime.strptime( not_after, '%b %d %H:%M:%S %Y %Z' ).date() # Extract issuer (Certificate Authority) issuer = cert.get('issuer') if issuer: # issuer is tuple of tuples like ((('organizationName', 'Let\'s Encrypt'),),) issuer_dict = {} for item in issuer: for key, value in item: issuer_dict[key] = value # Prefer Organization name, fallback to Common Name issuer_name = issuer_dict.get('organizationName') or issuer_dict.get('commonName') if issuer_name: result['ssl_issuer'] = issuer_name[:100] # Limit length except Exception as e: result['ssl_valid'] = False return result def _detect_hosting(self, domain: str) -> Dict[str, Any]: """Detect hosting provider from IP and reverse DNS.""" result = {'hosting_provider': None, 'hosting_ip': None} try: ip = socket.gethostbyname(domain) result['hosting_ip'] = ip # Check against known hosting IP ranges for provider, patterns in HOSTING_PROVIDERS.items(): for pattern in patterns: if ip.startswith(pattern) or pattern in domain.lower(): result['hosting_provider'] = provider return result # Try reverse DNS try: reverse = socket.gethostbyaddr(ip)[0] for provider, patterns in HOSTING_PROVIDERS.items(): for pattern in patterns: if pattern in reverse.lower(): result['hosting_provider'] = provider return result except: pass # Try WHOIS for registrar try: w = whois.whois(domain) if w.registrar: result['domain_registrar'] = str(w.registrar)[:100] except: pass except Exception as e: result['errors'] = [f'Hosting detection: {str(e)}'] return result def _parse_html(self, html: str) -> Dict[str, Any]: """Parse HTML for metadata and social media links.""" result = { 'site_author': None, 'site_generator': None, 'is_mobile_friendly': False, 'has_viewport_meta': False, 'social_media_links': {}, } try: soup = BeautifulSoup(html, 'html.parser') # Check viewport meta (mobile-friendly indicator) viewport = soup.find('meta', attrs={'name': 'viewport'}) if viewport: result['has_viewport_meta'] = True content = viewport.get('content', '') if 'width=device-width' in content: result['is_mobile_friendly'] = True # Author meta author = soup.find('meta', attrs={'name': 'author'}) if author: result['site_author'] = author.get('content', '')[:255] # Generator meta (CMS) generator = soup.find('meta', attrs={'name': 'generator'}) if generator: result['site_generator'] = generator.get('content', '')[:100] # Look for author in multiple places if not result['site_author']: author_found = None # 1. Check HTML comments for author info comments = soup.find_all(string=lambda text: isinstance(text, str) and '', html, re.DOTALL) for comment in html_comments: comment_patterns = [ r'(?:created by|designed by|developed by|made by|author)[:\s]+([^\n<>]+)', r'(?:agencja|agency|studio)[:\s]+([^\n<>]+)', ] for pattern in comment_patterns: match = re.search(pattern, comment, re.IGNORECASE) if match: author_found = match.group(1).strip() break if author_found: break # 2. Check footer text if not author_found: footer = soup.find('footer') if footer: footer_text = footer.get_text(separator=' ') footer_patterns = [ r'(?:wykonanie|realizacja|created by|designed by|made by|developed by)[:\s]+([^|<>\n©]+)', r'(?:projekt|design|strona)[:\s]+([^|<>\n©]+)', r'(?:powered by|built with)[:\s]+([^|<>\n©]+)', r'(?:agencja|agency|studio)[:\s]+([^|<>\n©]+)', ] for pattern in footer_patterns: match = re.search(pattern, footer_text, re.IGNORECASE) if match: author_found = match.group(1).strip() break # 3. Check footer links for agency/studio domains if not author_found: footer_links = footer.find_all('a', href=True) agency_domains = ['.pl', '.com', '.eu'] agency_keywords = ['studio', 'agencja', 'agency', 'design', 'web', 'digital', 'media', 'creative'] for link in footer_links: href = link.get('href', '') link_text = link.get_text().strip() # Check if link looks like an agency if any(kw in href.lower() or kw in link_text.lower() for kw in agency_keywords): if any(dom in href for dom in agency_domains) and 'facebook' not in href and 'instagram' not in href: # Extract domain or link text as author if link_text and len(link_text) > 2 and len(link_text) < 50: author_found = link_text break # 4. Check entire page for common Polish patterns if not author_found: page_text = soup.get_text(separator=' ') page_patterns = [ r'(?:stronę wykonała?|witrynę wykonała?|stronę stworzył[ao]?)[:\s]+([^|<>\n©.]+)', r'(?:copyright|©).*?(?:by|przez)[:\s]+([^|<>\n©.]+)', ] for pattern in page_patterns: match = re.search(pattern, page_text, re.IGNORECASE) if match: author_found = match.group(1).strip() break # Clean up author name if author_found: # Remove common prefixes/suffixes author_found = re.sub(r'^[\s\-–—:]+', '', author_found) author_found = re.sub(r'[\s\-–—:]+$', '', author_found) author_found = re.sub(r'\s+', ' ', author_found) # Remove if too short or looks like garbage if len(author_found) > 2 and len(author_found) < 100: result['site_author'] = author_found[:255] # Extract social media links html_lower = html.lower() for platform, patterns in SOCIAL_MEDIA_PATTERNS.items(): found_for_platform = False for pattern in patterns: if found_for_platform: break # Already found this platform, skip remaining patterns matches = re.findall(pattern, html, re.IGNORECASE) if matches: # Get first valid match, excluding common false positives for match in matches: # Skip very short matches (likely truncated or generic paths) if len(match) < 2: continue # Check against exclusion list (exact match only to avoid false positives) excludes = SOCIAL_MEDIA_EXCLUDE.get(platform, []) if match.lower() not in excludes: # Construct full URL if platform == 'facebook': if match.isdigit(): url = f'https://facebook.com/profile.php?id={match}' elif '/' in match: # Multi-segment path (e.g. p/PageName-123) url = f'https://facebook.com/{match}' else: url = f'https://facebook.com/{match}' elif platform == 'instagram': # Skip Instagram handles with tracking params (igsh=, utm_) if '?' in match or '&' in match: match = match.split('?')[0].split('&')[0] if len(match) < 2: continue url = f'https://instagram.com/{match}' elif platform == 'youtube': if match.startswith('@'): url = f'https://youtube.com/{match}' else: url = f'https://youtube.com/channel/{match}' elif platform == 'linkedin': url = f'https://linkedin.com/{match}' elif platform == 'tiktok': url = f'https://tiktok.com/@{match}' elif platform == 'twitter': url = f'https://twitter.com/{match}' else: continue result['social_media_links'][platform] = url found_for_platform = True break # Found valid match, stop searching this pattern's matches except Exception as e: result['errors'] = [f'HTML parsing: {str(e)}'] return result class GooglePlacesSearcher: """Search for Google Business profiles using Google Places API.""" # Google Places API configuration FIND_PLACE_URL = 'https://maps.googleapis.com/maps/api/place/findplacefromtext/json' PLACE_DETAILS_URL = 'https://maps.googleapis.com/maps/api/place/details/json' def __init__(self, api_key: Optional[str] = None): """ Initialize GooglePlacesSearcher. Args: api_key: Google Places API key. Falls back to GOOGLE_PLACES_API_KEY env var. """ self.api_key = api_key or os.getenv('GOOGLE_PLACES_API_KEY') self.session = requests.Session() self.session.headers.update({'User-Agent': USER_AGENT}) def find_place(self, company_name: str, city: str = 'Wejherowo') -> Optional[str]: """ Find a place by company name and city. Uses Google Places findplacefromtext API to search for a business and returns the place_id if found. Args: company_name: Name of the company to search for. city: City to narrow down the search (default: Wejherowo). Returns: place_id string if found, None otherwise. """ if not self.api_key: logger.warning('Google Places API key not configured') return None try: # Construct search query with company name and city search_query = f'{company_name} {city}' params = { 'input': search_query, 'inputtype': 'textquery', 'fields': 'place_id,name,formatted_address', 'language': 'pl', 'key': self.api_key, } response = self.session.get( self.FIND_PLACE_URL, params=params, timeout=REQUEST_TIMEOUT ) response.raise_for_status() data = response.json() if data.get('status') == 'OK' and data.get('candidates'): candidate = data['candidates'][0] place_id = candidate.get('place_id') logger.info( f"Found place for '{company_name}': {candidate.get('name')} " f"at {candidate.get('formatted_address')}" ) return place_id elif data.get('status') == 'ZERO_RESULTS': logger.info(f"No Google Business Profile found for '{company_name}' in {city}") return None else: logger.warning( f"Google Places API returned status: {data.get('status')} " f"for '{company_name}'" ) return None except requests.exceptions.Timeout: logger.error(f"Timeout searching for '{company_name}' on Google Places") return None except requests.exceptions.RequestException as e: logger.error(f"Request error searching for '{company_name}': {e}") return None except Exception as e: logger.error(f"Error finding place for '{company_name}': {e}") return None def get_place_details(self, place_id: str) -> Dict[str, Any]: """ Get detailed information about a place. Retrieves rating, review count, opening hours, and other business details from Google Places API. Args: place_id: Google Place ID returned from find_place(). Returns: Dict containing: - google_rating: Decimal rating (1.0-5.0) or None - google_reviews_count: Integer review count or None - opening_hours: Dict with weekday_text and open_now, or None - business_status: String like 'OPERATIONAL', 'CLOSED_TEMPORARILY', etc. - formatted_phone: Phone number or None - website: Website URL or None """ result = { 'google_rating': None, 'google_reviews_count': None, 'google_photos_count': None, 'opening_hours': None, 'business_status': None, 'formatted_phone': None, 'website': None, } if not self.api_key: logger.warning('Google Places API key not configured') return result if not place_id: return result try: # Request fields we need for the audit fields = [ 'rating', 'user_ratings_total', 'opening_hours', 'business_status', 'formatted_phone_number', 'website', 'name', 'photos', ] params = { 'place_id': place_id, 'fields': ','.join(fields), 'language': 'pl', 'key': self.api_key, } response = self.session.get( self.PLACE_DETAILS_URL, params=params, timeout=REQUEST_TIMEOUT ) response.raise_for_status() data = response.json() if data.get('status') == 'OK' and data.get('result'): place = data['result'] # Extract rating if 'rating' in place: result['google_rating'] = round(float(place['rating']), 1) # Extract review count if 'user_ratings_total' in place: result['google_reviews_count'] = int(place['user_ratings_total']) # Extract opening hours if 'opening_hours' in place: hours = place['opening_hours'] result['opening_hours'] = { 'weekday_text': hours.get('weekday_text', []), 'open_now': hours.get('open_now'), 'periods': hours.get('periods', []), } # Extract business status if 'business_status' in place: result['business_status'] = place['business_status'] # Extract phone if 'formatted_phone_number' in place: result['formatted_phone'] = place['formatted_phone_number'] # Extract website if 'website' in place: result['website'] = place['website'] # Extract photos count if 'photos' in place: result['google_photos_count'] = len(place['photos']) logger.info( f"Retrieved details for {place.get('name')}: " f"rating={result['google_rating']}, " f"reviews={result['google_reviews_count']}, " f"photos={result['google_photos_count']}" ) else: logger.warning( f"Google Places API returned status: {data.get('status')} " f"for place_id: {place_id}" ) except requests.exceptions.Timeout: logger.error(f"Timeout getting details for place_id: {place_id}") except requests.exceptions.RequestException as e: logger.error(f"Request error getting place details: {e}") except Exception as e: logger.error(f"Error getting place details for {place_id}: {e}") return result class BraveSearcher: """Search for social media profiles and Google reviews using Brave Search.""" def __init__(self, api_key: Optional[str] = None): self.api_key = api_key or os.getenv('BRAVE_API_KEY') self.session = requests.Session() self.session.headers.update({'User-Agent': USER_AGENT}) def search_social_media(self, company_name: str, city: str = 'Wejherowo') -> Dict[str, str]: """ Search for company social media profiles. Returns dict of platform -> url. """ results = {} platforms = [ ('facebook', f'{company_name} {city} facebook'), ('instagram', f'{company_name} instagram'), ('tiktok', f'{company_name} tiktok'), ('youtube', f'{company_name} youtube kanał'), ('linkedin', f'{company_name} linkedin.com/company'), ] for platform, query in platforms: try: url = self._search_brave(query, platform, company_name) if url: results[platform] = url time.sleep(0.5) # Rate limiting except Exception as e: logger.warning(f'Brave search failed for {platform}: {e}') # LinkedIn fallback: if company page not found, try general search (may find personal profile) if 'linkedin' not in results: try: url = self._search_brave(f'{company_name} linkedin', 'linkedin', company_name) if url: results['linkedin'] = url logger.info(f"LinkedIn fallback found profile: {url}") except Exception as e: logger.warning(f'Brave search LinkedIn fallback failed: {e}') return results def search_google_reviews(self, company_name: str, city: str = 'Wejherowo') -> Dict[str, Any]: """ Search for Google reviews using Google Places API. This method uses the GooglePlacesSearcher to find the company on Google and retrieve its rating and review count. Args: company_name: Name of the company to search for. city: City to narrow down the search (default: Wejherowo). Returns: Dict containing: - google_rating: Decimal rating (1.0-5.0) or None - google_reviews_count: Integer review count or None - opening_hours: Dict with weekday_text and open_now, or None - business_status: String like 'OPERATIONAL', 'CLOSED_TEMPORARILY', etc. """ result = { 'google_rating': None, 'google_reviews_count': None, 'opening_hours': None, 'business_status': None, } try: # Use Google Places API for accurate data google_api_key = os.getenv('GOOGLE_PLACES_API_KEY') if google_api_key: # Use GooglePlacesSearcher for accurate data retrieval places_searcher = GooglePlacesSearcher(api_key=google_api_key) # Step 1: Find the place by company name and city place_id = places_searcher.find_place(company_name, city) if place_id: # Step 2: Get detailed information including reviews details = places_searcher.get_place_details(place_id) result['google_rating'] = details.get('google_rating') result['google_reviews_count'] = details.get('google_reviews_count') result['opening_hours'] = details.get('opening_hours') result['business_status'] = details.get('business_status') logger.info( f"Google reviews for '{company_name}': " f"rating={result['google_rating']}, " f"reviews={result['google_reviews_count']}, " f"status={result['business_status']}" ) else: logger.info(f"No Google Business Profile found for '{company_name}' in {city}") else: # Fallback: Try Brave Search API if available if self.api_key: brave_result = self._search_brave_for_reviews(company_name, city) if brave_result: result.update(brave_result) else: logger.warning( 'Neither GOOGLE_PLACES_API_KEY nor BRAVE_API_KEY configured. ' 'Cannot retrieve Google reviews data.' ) except Exception as e: logger.warning(f'Google reviews search failed for {company_name}: {e}') return result def _search_brave_for_reviews(self, company_name: str, city: str) -> Optional[Dict[str, Any]]: """ Fallback method to search for Google reviews via Brave Search API. This parses search results to extract rating and review count from Google Business snippets in search results. Args: company_name: Name of the company. city: City for location context. Returns: Dict with google_rating and google_reviews_count, or None if not found. """ if not self.api_key: return None try: query = f'{company_name} {city} opinie google' # Brave Web Search API endpoint url = 'https://api.search.brave.com/res/v1/web/search' headers = { 'Accept': 'application/json', 'Accept-Encoding': 'gzip', 'X-Subscription-Token': self.api_key, } params = { 'q': query, 'count': 10, 'country': 'pl', 'search_lang': 'pl', 'ui_lang': 'pl-PL', } response = self.session.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT) response.raise_for_status() data = response.json() # Parse search results for rating/review patterns # Google snippets often contain patterns like "4,5 (123 opinii)" or "Rating: 4.5 · 123 reviews" for result in data.get('web', {}).get('results', []): snippet = result.get('description', '') + ' ' + result.get('title', '') # Pattern for Polish Google reviews: "4,5 (123 opinii)" or "4.5 · 123 reviews" rating_patterns = [ r'(\d+[,\.]\d)\s*[·\(]\s*(\d+)\s*(?:opinii|recenzji|reviews)', r'ocena[:\s]+(\d+[,\.]\d).*?(\d+)\s*(?:opinii|recenzji)', r'rating[:\s]+(\d+[,\.]\d).*?(\d+)\s*(?:reviews|opinii)', ] for pattern in rating_patterns: match = re.search(pattern, snippet, re.IGNORECASE) if match: rating_str = match.group(1).replace(',', '.') reviews_str = match.group(2) return { 'google_rating': round(float(rating_str), 1), 'google_reviews_count': int(reviews_str), } logger.info(f"No Google reviews data found in Brave results for '{company_name}'") return None except requests.exceptions.Timeout: logger.warning(f"Timeout searching Brave for '{company_name}' reviews") return None except requests.exceptions.RequestException as e: logger.warning(f"Brave API request failed for '{company_name}': {e}") return None except Exception as e: logger.warning(f"Error parsing Brave results for '{company_name}': {e}") return None def _search_brave(self, query: str, platform: str, company_name: str = '', **kwargs) -> Optional[str]: """ Perform Brave search and extract relevant social media URL. Validates results against company_name to avoid false matches. Returns normalized URL for the platform or None. """ if not self.api_key: logger.debug(f"No Brave API key - skipping search for {platform}") return None try: url = 'https://api.search.brave.com/res/v1/web/search' headers = { 'Accept': 'application/json', 'Accept-Encoding': 'gzip', 'X-Subscription-Token': self.api_key, } params = { 'q': query, 'count': 10, 'country': 'pl', 'search_lang': 'pl', 'ui_lang': 'pl-PL', } response = self.session.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT) response.raise_for_status() data = response.json() results = data.get('web', {}).get('results', []) # Platform domain patterns domain_patterns = { 'facebook': r'facebook\.com/', 'instagram': r'instagram\.com/', 'youtube': r'youtube\.com/', 'linkedin': r'linkedin\.com/(?:company|in)/', 'tiktok': r'tiktok\.com/@', 'twitter': r'(?:twitter|x)\.com/', } pattern = domain_patterns.get(platform) if not pattern: return None # Prepare company name variations for matching name_lower = company_name.lower().strip() # Generate matching tokens with word boundary patterns # (e.g. "Waterm Artur Wiertel" -> [r'\bwaterm\b', r'\bartur\b', r'\bwiertel\b']) name_tokens = [re.compile(r'\b' + re.escape(t) + r'\b', re.IGNORECASE) for t in name_lower.split() if len(t) >= 3] candidates = [] for rank, result in enumerate(results): result_url = result.get('url', '') result_title = result.get('title', '') result_desc = result.get('description', '') if not re.search(pattern, result_url, re.IGNORECASE): continue # Extract handle first, then check excludes against handle (not full URL) extracted_url = None handle = None for regex in SOCIAL_MEDIA_PATTERNS.get(platform, []): match = re.search(regex, result_url, re.IGNORECASE) if match: handle = match.group(1) if len(handle) >= 2: extracted_url = self._build_social_url(platform, handle) break if not extracted_url: extracted_url = result_url # Validate it's a real profile, not a search/share page # Check handle against excludes (exact match on first path segment) excludes = SOCIAL_MEDIA_EXCLUDE.get(platform, []) handle_base = (handle or '').split('/')[0].lower() is_excluded = handle_base in [ex.lower() for ex in excludes] if is_excluded: continue # Check if result relates to the company searchable = f'{result_title} {result_desc} {result_url}'.lower() # Count how many name tokens appear in the result (word boundary match) token_matches = sum(1 for t in name_tokens if t.search(searchable)) if token_matches == 0: continue # No connection to company at all # For LinkedIn: prioritize /company/ over /in/ (company pages > personal) is_company_page = 1 if (platform == 'linkedin' and '/company/' in (extracted_url or '')) else 0 candidates.append((is_company_page, token_matches, extracted_url)) if candidates: # Sort by: 1) company page priority, 2) token matches (best match first) candidates.sort(key=lambda x: (x[0], x[1]), reverse=True) best_url = candidates[0][2] logger.info(f"Brave search matched {platform}: {best_url} (company={candidates[0][0]}, score={candidates[0][1]}/{len(name_tokens)})") return best_url logger.debug(f"No {platform} profile found in Brave results for: {query}") return None except requests.exceptions.Timeout: logger.warning(f"Timeout searching Brave for '{query}'") return None except requests.exceptions.RequestException as e: logger.warning(f"Brave API request failed for '{query}': {e}") return None except Exception as e: logger.warning(f"Error parsing Brave results for '{query}': {e}") return None def _check_linkedin_company_page(self, company_name: str) -> Optional[str]: """ Try direct LinkedIn company page URL based on company name slugs. Returns URL if page exists and title matches, None otherwise. """ # Generate slug candidates from company name name_clean = company_name.strip() slugs = set() # Basic slug: lowercase, spaces to hyphens slug = re.sub(r'[^a-z0-9\s-]', '', name_clean.lower()) slug = re.sub(r'\s+', '-', slug).strip('-') if slug: slugs.add(slug) # First word only (common for short brand names like "Waterm") first_word = name_clean.split()[0].lower() if name_clean.split() else '' first_word = re.sub(r'[^a-z0-9]', '', first_word) if first_word and len(first_word) >= 3: slugs.add(first_word) name_tokens = [re.compile(r'\b' + re.escape(t) + r'\b', re.IGNORECASE) for t in name_clean.lower().split() if len(t) >= 3] for slug in slugs: try: check_url = f'https://www.linkedin.com/company/{slug}' resp = self.session.get(check_url, timeout=8, allow_redirects=True) if resp.status_code == 200: # Verify title contains company name title_match = re.search(r'([^<]+)', resp.text) if title_match: title = title_match.group(1).lower() if any(t.search(title) for t in name_tokens): logger.info(f"LinkedIn company page verified: {check_url} (title: {title_match.group(1)})") return f'https://linkedin.com/company/{slug}' else: logger.debug(f"LinkedIn /company/{slug} exists but title '{title_match.group(1)}' doesn't match '{company_name}'") except Exception as e: logger.debug(f"LinkedIn company page check failed for {slug}: {e}") return None @staticmethod def _build_social_url(platform: str, handle: str) -> str: """Build normalized social media URL from platform and handle.""" if platform == 'facebook': if handle.isdigit(): return f'https://facebook.com/profile.php?id={handle}' return f'https://facebook.com/{handle}' elif platform == 'instagram': handle = handle.split('?')[0].split('&')[0] return f'https://instagram.com/{handle}' elif platform == 'youtube': if handle.startswith('@'): return f'https://youtube.com/{handle}' return f'https://youtube.com/channel/{handle}' elif platform == 'linkedin': return f'https://linkedin.com/{handle}' elif platform == 'tiktok': return f'https://tiktok.com/@{handle}' elif platform == 'twitter': return f'https://twitter.com/{handle}' return handle class SocialProfileEnricher: """Enriches social media profiles with additional data from public APIs and scraping.""" def __init__(self): self.session = requests.Session() self.session.headers.update({'User-Agent': USER_AGENT}) def enrich_profile(self, platform: str, url: str) -> Dict[str, Any]: """Fetch additional data for a social media profile.""" enrichers = { 'facebook': self._enrich_facebook, 'instagram': self._enrich_instagram, 'youtube': self._enrich_youtube, 'linkedin': self._enrich_linkedin, 'tiktok': self._enrich_tiktok, 'twitter': self._enrich_twitter, } enricher = enrichers.get(platform) if enricher: try: result = enricher(url) # Decode HTML entities in all string values (og:meta often contains & ' etc.) for key, val in result.items(): if isinstance(val, str): result[key] = html_module.unescape(val) return result except Exception as e: logger.warning(f"Failed to enrich {platform} profile {url}: {e}") return {} return {} def _enrich_facebook(self, url: str) -> Dict[str, Any]: """Enrich Facebook page data from public page HTML.""" result = {} try: resp = self.session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True) if resp.status_code == 200: html = resp.text # Extract page name from og:title og_match = re.search(r' Dict[str, Any]: """Enrich Instagram profile data.""" result = {} try: # Try og:description which often contains "X Followers, Y Following, Z Posts" resp = self.session.get(url, timeout=REQUEST_TIMEOUT) if resp.status_code == 200: html = resp.text # og:description format: "123 Followers, 45 Following, 67 Posts - See Instagram photos..." og_desc = re.search(r' Dict[str, Any]: """Enrich YouTube channel data via YouTube Data API v3. Falls back to scraping if API key is not available. """ result = {} try: # Try YouTube Data API v3 first import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from youtube_service import YouTubeService yt = YouTubeService() channel_id = yt.extract_channel_id_from_url(url) if channel_id: stats = yt.get_channel_stats(channel_id) if stats: result['followers_count'] = stats['subscriber_count'] result['posts_count_365d'] = stats['video_count'] result['page_name'] = stats['channel_title'] if stats.get('channel_description'): result['profile_description'] = stats['channel_description'][:500] result['has_bio'] = True result['has_profile_photo'] = bool(stats.get('thumbnail_url')) result['has_cover_photo'] = bool(stats.get('banner_url')) # Store extra data in a special key for content_types JSONB result['_youtube_extra'] = { 'view_count': stats.get('view_count', 0), 'country': stats.get('country', ''), 'published_at': stats.get('published_at', '')[:10] if stats.get('published_at') else '', 'custom_url': stats.get('custom_url', ''), 'thumbnail_url': stats.get('thumbnail_url', ''), 'banner_url': stats.get('banner_url', ''), } # Fetch recent videos (best effort) actual_channel_id = stats.get('channel_id', channel_id) if actual_channel_id.startswith('UC'): videos = yt.get_recent_videos(actual_channel_id, 5) if videos: result['_youtube_extra']['recent_videos'] = videos # Last post date from most recent video if videos[0].get('date'): from datetime import datetime try: result['last_post_date'] = datetime.strptime(videos[0]['date'], '%Y-%m-%d') except ValueError: pass return result except (ImportError, ValueError) as e: logger.debug(f"YouTube API not available ({e}), falling back to scraping") except Exception as e: logger.debug(f"YouTube API enrichment failed: {e}") # Fallback: scraping (usually returns nothing due to JS rendering) try: resp = self.session.get(url, timeout=REQUEST_TIMEOUT) if resp.status_code == 200: html = resp.text og_img = re.search(r' Dict[str, Any]: """Enrich LinkedIn company page data. LinkedIn aggressively blocks bots — retries with random delays to improve success rate. Returns empty dict if all attempts fail. """ import random result = {} max_retries = 3 for attempt in range(max_retries): try: if attempt > 0: delay = random.uniform(2, 5) time.sleep(delay) resp = self.session.get(url, timeout=REQUEST_TIMEOUT) if resp.status_code == 200: html = resp.text # Check if LinkedIn returned a login wall instead of data if 'authwall' in html[:2000].lower() or 'sign in' in html[:2000].lower(): logger.debug(f"LinkedIn authwall on attempt {attempt+1} for {url}") continue og_desc = re.search(r' Dict[str, Any]: """Enrich TikTok profile data.""" result = {} try: resp = self.session.get(url, timeout=REQUEST_TIMEOUT) if resp.status_code == 200: html = resp.text # TikTok embeds profile data in JSON followers_match = re.search(r'"followerCount":\s*(\d+)', html) if followers_match: result['followers_count'] = int(followers_match.group(1)) videos_match = re.search(r'"videoCount":\s*(\d+)', html) if videos_match: result['posts_count_365d'] = int(videos_match.group(1)) desc_match = re.search(r'"signature":\s*"([^"]*)"', html) if desc_match and desc_match.group(1).strip(): result['profile_description'] = desc_match.group(1)[:500] result['has_bio'] = True og_img = re.search(r' Dict[str, Any]: """Enrich Twitter/X profile data via Twitter GraphQL API (guest token). Uses Twitter's internal API with guest authentication — no paid API key needed. Rate limit: ~50 requests per 15 minutes per IP. Falls back to og tag scraping if API is unavailable. """ result = {} # Try Twitter GraphQL API first try: import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from twitter_service import TwitterService tw = TwitterService() username = tw.extract_username_from_url(url) if username: profile = tw.get_profile(username) if profile: result['followers_count'] = profile['followers_count'] result['page_name'] = profile['name'] if profile.get('description'): result['profile_description'] = profile['description'][:500] result['has_bio'] = True result['has_profile_photo'] = bool(profile.get('profile_image_url')) result['has_cover_photo'] = bool(profile.get('profile_banner_url')) result['posts_count_365d'] = profile['tweet_count'] # Store extra data in content_types JSONB result['_twitter_extra'] = { 'following_count': profile.get('following_count', 0), 'listed_count': profile.get('listed_count', 0), 'media_count': profile.get('media_count', 0), 'favourites_count': profile.get('favourites_count', 0), 'location': profile.get('location', ''), 'created_at': profile.get('created_at', ''), 'profile_image_url': profile.get('profile_image_url', ''), 'profile_banner_url': profile.get('profile_banner_url', ''), 'verified': profile.get('verified', False), 'url': profile.get('url', ''), } return result except (ImportError, ValueError) as e: logger.debug(f"Twitter API not available ({e}), falling back to scraping") except Exception as e: logger.debug(f"Twitter API enrichment failed: {e}") # Fallback: og tag scraping (usually returns nothing for x.com) try: resp = self.session.get(url, timeout=REQUEST_TIMEOUT) if resp.status_code == 200: html = resp.text og_desc = re.search(r' Optional[int]: """Parse follower/subscriber count strings like '1.2K', '3,456', '2.1M'.""" if not text: return None text = text.strip().replace(',', '').replace(' ', '') try: multipliers = {'k': 1000, 'm': 1000000, 'b': 1000000000} last_char = text[-1].lower() if last_char in multipliers: return int(float(text[:-1]) * multipliers[last_char]) return int(float(text)) except (ValueError, IndexError): return None def calculate_profile_completeness(profile_data: Dict[str, Any]) -> int: """Calculate profile completeness score 0-100 for a social media profile.""" score = 0 if profile_data.get('url'): score += 20 # Profile exists if profile_data.get('has_bio'): score += 15 # Bio filled if profile_data.get('has_profile_photo'): score += 15 # Avatar if profile_data.get('has_cover_photo'): score += 10 # Cover photo if (profile_data.get('followers_count') or 0) > 10: score += 10 # Has followers if (profile_data.get('posts_count_30d') or 0) > 0: score += 15 # Active in last 30d if (profile_data.get('engagement_rate') or 0) > 1: score += 15 # Good engagement return min(score, 100) class SocialMediaAuditor: """Main auditor class that coordinates website and social media auditing.""" def __init__(self, database_url: str = DATABASE_URL): self.engine = create_engine(database_url) self.Session = sessionmaker(bind=self.engine) self.website_auditor = WebsiteAuditor() # Brave Search disabled — too many false positives (matches unrelated companies by name) # Social media profiles are now sourced only from website scraping and manual admin entry # self.brave_searcher = BraveSearcher() self.profile_enricher = SocialProfileEnricher() # Initialize Google Places searcher if API key is available google_places_api_key = os.getenv('GOOGLE_PLACES_API_KEY') if google_places_api_key: self.google_places_searcher = GooglePlacesSearcher(api_key=google_places_api_key) logger.info('Google Places API key found - using Places API for reviews') else: self.google_places_searcher = None logger.info('GOOGLE_PLACES_API_KEY not set - falling back to Brave Search for reviews') def get_companies(self, company_ids: Optional[List[int]] = None, batch_start: Optional[int] = None, batch_end: Optional[int] = None) -> List[Dict]: """Fetch companies from database.""" with self.Session() as session: if company_ids: query = text(""" SELECT id, name, slug, website, address_city FROM companies WHERE id = ANY(:ids) ORDER BY id """) result = session.execute(query, {'ids': company_ids}) elif batch_start is not None and batch_end is not None: query = text(""" SELECT id, name, slug, website, address_city FROM companies ORDER BY id OFFSET :offset LIMIT :limit """) result = session.execute(query, { 'offset': batch_start - 1, 'limit': batch_end - batch_start + 1 }) else: query = text(""" SELECT id, name, slug, website, address_city FROM companies ORDER BY id """) result = session.execute(query) return [dict(row._mapping) for row in result] def get_company_id_by_slug(self, slug: str) -> Optional[int]: """Get company ID by slug.""" with self.Session() as session: query = text(""" SELECT id FROM companies WHERE slug = :slug """) result = session.execute(query, {'slug': slug}) row = result.fetchone() if row: return row[0] return None def audit_company(self, company: Dict) -> Dict[str, Any]: """ Perform full audit for a single company. Returns comprehensive audit result. """ logger.info(f"Auditing company: {company['name']} (ID: {company['id']})") logger.info(f"Company website: {company.get('website', 'NOT SET')}") result = { 'company_id': company['id'], 'company_name': company['name'], 'audit_date': datetime.now(), 'website': {}, 'social_media': {}, 'google_reviews': {}, 'errors': [], } # 1. Website audit if company.get('website'): try: logger.info(f"Starting website audit for: {company['website']}") result['website'] = self.website_auditor.audit_website(company['website']) logger.info(f"Website audit completed. HTTP status: {result['website'].get('http_status')}") except Exception as e: logger.error(f"Website audit failed: {str(e)}") result['errors'].append(f'Website audit failed: {str(e)}') else: logger.warning(f"No website URL for company {company['name']}") result['website'] = {'errors': ['No website URL']} # 2. Social media from website website_social = result['website'].get('social_media_links', {}) social_sources = {} # Track source per platform if website_social: logger.info(f"Social media found on website: {list(website_social.keys())}") for p in website_social: social_sources[p] = 'website_scrape' else: logger.info("No social media links found on website") # 3. Brave Search disabled — too many false positives # Social media profiles are now sourced only from: # - Website scraping (automatic, from company's own website) # - Manual admin entry (via company profile editing) # - OAuth API (Facebook Graph API for verified data) result['social_media'] = website_social result['social_sources'] = social_sources logger.info(f"Total social media profiles found: {len(website_social)} - {list(website_social.keys())}") # OAuth: Try Facebook/Instagram Graph API for authenticated data try: from oauth_service import OAuthService from facebook_graph_service import FacebookGraphService from database import SessionLocal as OAuthSessionLocal, OAuthToken oauth = OAuthService() company_id = company.get('id') if company_id: oauth_db = OAuthSessionLocal() try: fb_token = oauth.get_valid_token(oauth_db, company_id, 'meta', 'facebook') if fb_token: fb_service = FacebookGraphService(fb_token) token_rec = oauth_db.query(OAuthToken).filter( OAuthToken.company_id == company_id, OAuthToken.provider == 'meta', OAuthToken.service == 'facebook', OAuthToken.is_active == True, ).first() page_id = token_rec.account_id if token_rec else None if page_id: page_info = fb_service.get_page_info(page_id) if page_info: result['oauth_facebook'] = { 'fan_count': page_info.get('fan_count'), 'category': page_info.get('category'), 'data_source': 'oauth_api', } insights = fb_service.get_page_insights(page_id) if insights: result['oauth_facebook_insights'] = insights ig_id = fb_service.get_instagram_account(page_id) if ig_id: ig_insights = fb_service.get_ig_media_insights(ig_id) if ig_insights: result['oauth_instagram'] = { **ig_insights, 'data_source': 'oauth_api', } logger.info(f"OAuth Facebook/IG enrichment done for company {company_id}") finally: oauth_db.close() except ImportError: pass # Services not yet available except Exception as e: logger.warning(f"OAuth social media enrichment failed: {e}") # 5. Enrich social media profiles with additional data enriched_profiles = {} for platform, url in website_social.items(): logger.info(f"Enriching {platform} profile: {url}") enrichment = self.profile_enricher.enrich_profile(platform, url) enriched_profiles[platform] = { 'url': url, **enrichment, } # Calculate completeness score enriched_profiles[platform]['profile_completeness_score'] = calculate_profile_completeness(enriched_profiles[platform]) # Calculate engagement rate (ESTIMATED - without API we don't have real engagement data) profile = enriched_profiles[platform] if profile.get('followers_count') and profile.get('followers_count') > 0 and profile.get('posts_count_30d') and profile.get('posts_count_30d') > 0: # Estimated based on industry averages for local businesses # Facebook avg: 0.5-2%, Instagram: 1-3%, LinkedIn: 0.5-1% base_rates = {'facebook': 1.0, 'instagram': 2.0, 'linkedin': 0.7, 'youtube': 0.5, 'twitter': 0.3, 'tiktok': 3.0} base = base_rates.get(platform, 1.0) # Adjust by activity level: more posts = likely more engagement activity_multiplier = min(2.0, profile.get('posts_count_30d', 0) / 4.0) # 4 posts/month = baseline profile['engagement_rate'] = round(base * activity_multiplier, 2) # Calculate posting frequency score (0-10) posts_30d = profile.get('posts_count_30d') if posts_30d is not None: if posts_30d == 0: profile['posting_frequency_score'] = 0 elif posts_30d <= 2: profile['posting_frequency_score'] = 3 elif posts_30d <= 4: profile['posting_frequency_score'] = 5 elif posts_30d <= 8: profile['posting_frequency_score'] = 7 elif posts_30d <= 15: profile['posting_frequency_score'] = 9 else: profile['posting_frequency_score'] = 10 result['enriched_profiles'] = enriched_profiles # 4. Google reviews search - prefer Google Places API if available try: if self.google_places_searcher: # Use Google Places API directly for accurate data place_id = self.google_places_searcher.find_place(company['name'], city) if place_id: details = self.google_places_searcher.get_place_details(place_id) result['google_reviews'] = { 'google_rating': details.get('google_rating'), 'google_reviews_count': details.get('google_reviews_count'), 'google_opening_hours': details.get('opening_hours'), 'google_photos_count': details.get('google_photos_count'), 'business_status': details.get('business_status'), } else: result['google_reviews'] = { 'google_rating': None, 'google_reviews_count': None, 'google_opening_hours': None, 'google_photos_count': None, 'business_status': None, } else: # Fallback to Brave Search result['google_reviews'] = self.brave_searcher.search_google_reviews( company['name'], city ) except Exception as e: result['errors'].append(f'Google reviews search failed: {str(e)}') return result def save_audit_result(self, result: Dict) -> bool: """Save audit result to database.""" try: with self.Session() as session: company_id = result['company_id'] website = result.get('website', {}) # Update or insert website analysis upsert_website = text(""" INSERT INTO company_website_analysis ( company_id, analyzed_at, website_url, http_status_code, load_time_ms, has_ssl, ssl_expires_at, ssl_issuer, is_responsive, is_mobile_friendly, has_viewport_meta, last_modified_at, hosting_provider, hosting_ip, server_software, site_author, cms_detected, google_rating, google_reviews_count, google_opening_hours, google_photos_count, audit_source, audit_version ) VALUES ( :company_id, :analyzed_at, :website_url, :http_status_code, :load_time_ms, :has_ssl, :ssl_expires_at, :ssl_issuer, :is_responsive, :is_mobile_friendly, :has_viewport_meta, :last_modified_at, :hosting_provider, :hosting_ip, :server_software, :site_author, :cms_detected, :google_rating, :google_reviews_count, :google_opening_hours, :google_photos_count, :audit_source, :audit_version ) ON CONFLICT (company_id) DO UPDATE SET analyzed_at = EXCLUDED.analyzed_at, http_status_code = EXCLUDED.http_status_code, load_time_ms = EXCLUDED.load_time_ms, has_ssl = EXCLUDED.has_ssl, ssl_expires_at = EXCLUDED.ssl_expires_at, ssl_issuer = EXCLUDED.ssl_issuer, is_mobile_friendly = EXCLUDED.is_mobile_friendly, has_viewport_meta = EXCLUDED.has_viewport_meta, last_modified_at = EXCLUDED.last_modified_at, hosting_provider = EXCLUDED.hosting_provider, hosting_ip = EXCLUDED.hosting_ip, server_software = EXCLUDED.server_software, site_author = EXCLUDED.site_author, cms_detected = EXCLUDED.cms_detected, google_rating = EXCLUDED.google_rating, google_reviews_count = EXCLUDED.google_reviews_count, google_opening_hours = EXCLUDED.google_opening_hours, google_photos_count = EXCLUDED.google_photos_count, audit_source = EXCLUDED.audit_source, audit_version = EXCLUDED.audit_version """) google_reviews = result.get('google_reviews', {}) # Convert opening_hours dict to JSON string for JSONB column opening_hours = google_reviews.get('google_opening_hours') opening_hours_json = json.dumps(opening_hours) if opening_hours else None session.execute(upsert_website, { 'company_id': company_id, 'analyzed_at': result['audit_date'], 'website_url': website.get('url'), 'http_status_code': website.get('http_status'), 'load_time_ms': website.get('load_time_ms'), 'has_ssl': website.get('has_ssl', False), 'ssl_expires_at': website.get('ssl_expiry'), 'ssl_issuer': website.get('ssl_issuer'), 'is_responsive': website.get('is_mobile_friendly', False), 'is_mobile_friendly': website.get('is_mobile_friendly', False), 'has_viewport_meta': website.get('has_viewport_meta', False), 'last_modified_at': website.get('last_modified_at'), 'hosting_provider': website.get('hosting_provider'), 'hosting_ip': website.get('hosting_ip'), 'server_software': website.get('server_software'), 'site_author': website.get('site_author'), 'cms_detected': website.get('site_generator'), 'google_rating': google_reviews.get('google_rating'), 'google_reviews_count': google_reviews.get('google_reviews_count'), 'google_opening_hours': opening_hours_json, 'google_photos_count': google_reviews.get('google_photos_count'), 'audit_source': 'automated', 'audit_version': '1.0', }) # Save social media with enriched data social_sources = result.get('social_sources', {}) for platform, url in result.get('social_media', {}).items(): normalized_url = normalize_social_url(url, platform) # Get enrichment data if available enriched = result.get('enriched_profiles', {}).get(platform, {}) upsert_social = text(""" INSERT INTO company_social_media ( company_id, platform, url, verified_at, source, is_valid, page_name, followers_count, has_profile_photo, has_cover_photo, has_bio, profile_description, posts_count_30d, posts_count_365d, last_post_date, engagement_rate, posting_frequency_score, profile_completeness_score, updated_at ) VALUES ( :company_id, :platform, :url, :verified_at, :source, :is_valid, :page_name, :followers_count, :has_profile_photo, :has_cover_photo, :has_bio, :profile_description, :posts_count_30d, :posts_count_365d, :last_post_date, :engagement_rate, :posting_frequency_score, :profile_completeness_score, NOW() ) ON CONFLICT (company_id, platform, url) DO UPDATE SET verified_at = EXCLUDED.verified_at, is_valid = EXCLUDED.is_valid, last_checked_at = NOW(), -- Don't overwrite source if existing record is from a higher-priority source source = CASE WHEN company_social_media.source IN ('facebook_api') THEN company_social_media.source ELSE EXCLUDED.source END, -- Don't overwrite metrics if existing record is from API (higher priority) page_name = CASE WHEN company_social_media.source IN ('facebook_api') THEN company_social_media.page_name ELSE COALESCE(EXCLUDED.page_name, company_social_media.page_name) END, followers_count = CASE WHEN company_social_media.source IN ('facebook_api') THEN company_social_media.followers_count ELSE COALESCE(EXCLUDED.followers_count, company_social_media.followers_count) END, has_profile_photo = COALESCE(EXCLUDED.has_profile_photo, company_social_media.has_profile_photo), has_cover_photo = COALESCE(EXCLUDED.has_cover_photo, company_social_media.has_cover_photo), has_bio = CASE WHEN company_social_media.source IN ('facebook_api') THEN company_social_media.has_bio ELSE COALESCE(EXCLUDED.has_bio, company_social_media.has_bio) END, profile_description = CASE WHEN company_social_media.source IN ('facebook_api') THEN company_social_media.profile_description ELSE COALESCE(EXCLUDED.profile_description, company_social_media.profile_description) END, posts_count_30d = COALESCE(EXCLUDED.posts_count_30d, company_social_media.posts_count_30d), posts_count_365d = COALESCE(EXCLUDED.posts_count_365d, company_social_media.posts_count_365d), engagement_rate = CASE WHEN company_social_media.source IN ('facebook_api') THEN company_social_media.engagement_rate ELSE COALESCE(EXCLUDED.engagement_rate, company_social_media.engagement_rate) END, posting_frequency_score = COALESCE(EXCLUDED.posting_frequency_score, company_social_media.posting_frequency_score), last_post_date = COALESCE(EXCLUDED.last_post_date, company_social_media.last_post_date), profile_completeness_score = CASE WHEN company_social_media.source IN ('facebook_api') THEN company_social_media.profile_completeness_score ELSE COALESCE(EXCLUDED.profile_completeness_score, company_social_media.profile_completeness_score) END, updated_at = NOW() """) session.execute(upsert_social, { 'company_id': company_id, 'platform': platform, 'url': normalized_url, 'verified_at': result['audit_date'], 'source': social_sources.get(platform, 'website_scrape'), 'is_valid': True, 'page_name': enriched.get('page_name'), 'followers_count': enriched.get('followers_count'), 'has_profile_photo': enriched.get('has_profile_photo'), 'has_cover_photo': enriched.get('has_cover_photo'), 'has_bio': enriched.get('has_bio'), 'profile_description': enriched.get('profile_description'), 'posts_count_30d': enriched.get('posts_count_30d'), 'posts_count_365d': enriched.get('posts_count_365d'), 'last_post_date': enriched.get('last_post_date'), 'engagement_rate': enriched.get('engagement_rate'), 'posting_frequency_score': enriched.get('posting_frequency_score'), 'profile_completeness_score': enriched.get('profile_completeness_score'), }) session.commit() logger.info(f"Saved audit for company {company_id}") return True except Exception as e: logger.error(f"Failed to save audit result: {e}") return False def run_audit(self, company_ids: Optional[List[int]] = None, batch_start: Optional[int] = None, batch_end: Optional[int] = None, dry_run: bool = False) -> Dict[str, Any]: """ Run audit for specified companies. Returns summary of audit results. """ companies = self.get_companies(company_ids, batch_start, batch_end) summary = { 'total': len(companies), 'success': 0, 'failed': 0, 'results': [], } for company in companies: try: result = self.audit_company(company) if not dry_run: if self.save_audit_result(result): summary['success'] += 1 else: summary['failed'] += 1 else: summary['success'] += 1 print(json.dumps(result, default=str, indent=2)) summary['results'].append({ 'company_id': company['id'], 'company_name': company['name'], 'status': 'success', 'social_media_found': len(result.get('social_media', {})), }) except Exception as e: logger.error(f"Audit failed for company {company['id']}: {e}") summary['failed'] += 1 summary['results'].append({ 'company_id': company['id'], 'company_name': company['name'], 'status': 'failed', 'error': str(e), }) return summary def main(): parser = argparse.ArgumentParser(description='Social Media & Website Audit') parser.add_argument('--company-id', type=int, help='Audit single company by ID') parser.add_argument('--company-slug', type=str, help='Audit single company by slug') parser.add_argument('--batch', type=str, help='Audit batch of companies (e.g., 1-10)') parser.add_argument('--all', action='store_true', help='Audit all companies') parser.add_argument('--dry-run', action='store_true', help='Print results without saving') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) auditor = SocialMediaAuditor() if args.company_id: summary = auditor.run_audit(company_ids=[args.company_id], dry_run=args.dry_run) elif args.company_slug: # Look up company ID by slug company_id = auditor.get_company_id_by_slug(args.company_slug) if company_id: summary = auditor.run_audit(company_ids=[company_id], dry_run=args.dry_run) else: print(f"Error: Company with slug '{args.company_slug}' not found") sys.exit(1) elif args.batch: start, end = map(int, args.batch.split('-')) summary = auditor.run_audit(batch_start=start, batch_end=end, dry_run=args.dry_run) elif args.all: summary = auditor.run_audit(dry_run=args.dry_run) else: parser.print_help() sys.exit(1) print("\n" + "=" * 60) print(f"AUDIT SUMMARY") print("=" * 60) print(f"Total companies: {summary['total']}") print(f"Successful: {summary['success']}") print(f"Failed: {summary['failed']}") print("=" * 60) if __name__ == '__main__': main()