Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Brave Search matched unrelated companies by name token (e.g. VINDOR matched vindorclothing, vindormusic, beautybyneyador). Social media profiles are now sourced only from website scraping and manual admin entry. - Disabled BraveSearcher initialization and call in audit_company() - Removed Brave Search step from audit progress animation - Updated missing profile message with explanation and link to profile editor - Added migration 071 to clean up existing brave_search entries Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1790 lines
79 KiB
Python
1790 lines
79 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Social Media & Website Audit Script for Norda Biznes
|
|
=====================================================
|
|
|
|
Performs comprehensive audit of company websites and social media presence.
|
|
Designed to run with multiple parallel workers.
|
|
|
|
Features:
|
|
- Website analysis (SSL, hosting, author, responsiveness)
|
|
- Social media discovery (FB, IG, TikTok, YouTube, LinkedIn)
|
|
- Google Reviews scraping via Brave Search
|
|
- Parallel execution support
|
|
|
|
Usage:
|
|
python social_media_audit.py --company-id 26
|
|
python social_media_audit.py --batch 1-10
|
|
python social_media_audit.py --all
|
|
|
|
Author: Claude Code
|
|
Date: 2025-12-29
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import re
|
|
import ssl
|
|
import socket
|
|
import argparse
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, List, Tuple, Any
|
|
from urllib.parse import urlparse
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# Load .env file from project root
|
|
try:
|
|
from dotenv import load_dotenv
|
|
# Find .env file relative to this script
|
|
script_dir = Path(__file__).resolve().parent
|
|
project_root = script_dir.parent
|
|
env_path = project_root / '.env'
|
|
if env_path.exists():
|
|
load_dotenv(env_path)
|
|
logging.info(f"Loaded .env from {env_path}")
|
|
except ImportError:
|
|
pass # python-dotenv not installed, rely on system environment
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import whois
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
try:
|
|
from database import normalize_social_url
|
|
except ImportError:
|
|
# Fallback: define locally if import fails
|
|
def normalize_social_url(url: str, platform: str = None) -> str:
|
|
"""Normalize social media URLs to prevent duplicates."""
|
|
if not url:
|
|
return url
|
|
url = url.strip()
|
|
if url.startswith('http://'):
|
|
url = 'https://' + url[7:]
|
|
elif not url.startswith('https://'):
|
|
url = 'https://' + url
|
|
url = url.replace('https://www.', 'https://')
|
|
url = url.rstrip('/')
|
|
return url
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Database configuration
|
|
# WARNING: The fallback DATABASE_URL uses a placeholder password.
|
|
# Production credentials MUST be set via the DATABASE_URL environment variable.
|
|
# NEVER commit real credentials to version control (CWE-798).
|
|
DATABASE_URL = os.getenv(
|
|
'DATABASE_URL',
|
|
'postgresql://nordabiz_app:CHANGE_ME@127.0.0.1:5432/nordabiz'
|
|
)
|
|
|
|
# Request configuration
|
|
REQUEST_TIMEOUT = 15
|
|
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
|
|
|
# Known Polish hosting providers (IP ranges and identifiers)
|
|
HOSTING_PROVIDERS = {
|
|
'nazwa.pl': ['nazwa.pl', '185.252.', '91.227.'],
|
|
'home.pl': ['home.pl', '212.85.', '195.26.'],
|
|
'OVH': ['ovh.', '51.38.', '51.68.', '51.75.', '51.77.', '51.83.', '51.89.', '51.91.', '54.36.', '54.37.', '54.38.', '135.125.', '141.94.', '141.95.', '142.4.', '144.217.', '145.239.', '147.135.', '149.202.', '151.80.', '158.69.', '164.132.', '167.114.', '176.31.', '178.32.', '185.15.', '188.165.', '192.95.', '193.70.', '194.182.', '195.154.', '198.27.', '198.50.', '198.100.', '213.186.', '213.251.', '217.182.'],
|
|
'cyber_Folks': ['cyberfolks', 'cf.', '77.55.'],
|
|
'Zenbox': ['zenbox', '195.181.'],
|
|
'Linuxpl': ['linuxpl', '91.200.'],
|
|
'Hekko': ['hekko', 'hekko.pl'],
|
|
'Smarthost': ['smarthost'],
|
|
'AZ.pl': ['az.pl', 'aznetwork'],
|
|
'Aftermarket': ['aftermarket', 'aftermarket.pl'],
|
|
'Cloudflare': ['cloudflare', '104.16.', '104.17.', '104.18.', '104.19.', '104.20.', '104.21.', '104.22.', '104.23.', '104.24.', '172.67.'],
|
|
'Google Cloud': ['google', '34.', '35.'],
|
|
'AWS': ['amazon', 'aws', '52.', '54.'],
|
|
'Vercel': ['vercel', '76.76.21.'],
|
|
'Netlify': ['netlify'],
|
|
}
|
|
|
|
# Social media patterns
|
|
SOCIAL_MEDIA_PATTERNS = {
|
|
'facebook': [
|
|
r'(?:https?://)?(?:www\.)?facebook\.com/profile\.php\?id=(\d+)',
|
|
# Multi-segment paths like /p/PageName-12345/ - capture full path
|
|
r'(?:https?://)?(?:www\.)?facebook\.com/(p/[^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.)?facebook\.com/([^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.)?fb\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
'instagram': [
|
|
r'(?:https?://)?(?:www\.)?instagram\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
'youtube': [
|
|
r'(?:https?://)?(?:www\.)?youtube\.com/(?:channel|c|user|@)/([^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.)?youtube\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
'linkedin': [
|
|
r'(?:https?://)?(?:www\.|pl\.)?linkedin\.com/(company/[^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.|pl\.)?linkedin\.com/(in/[^/?\s"\'<>]+)',
|
|
],
|
|
'tiktok': [
|
|
r'(?:https?://)?(?:www\.)?tiktok\.com/@([^/?\s"\'<>]+)',
|
|
],
|
|
'twitter': [
|
|
r'(?:https?://)?(?:www\.)?(?:twitter|x)\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
}
|
|
|
|
# False positives to exclude
|
|
SOCIAL_MEDIA_EXCLUDE = {
|
|
'facebook': ['sharer', 'share', 'intent', 'plugins', 'dialog', 'sharer.php', 'login', 'pages', 'boldthemes', 'profile.php', 'profile', 'watch', 'groups', 'events', 'marketplace', 'gaming', 'stories', 'p', 'people', 'hashtag', 'help', 'settings', 'notifications', 'tr', 'privacy', 'policies', 'ads', 'business', 'legal', 'flx'],
|
|
'instagram': ['explore', 'accounts', 'p', 'reel'],
|
|
'youtube': ['embed', 'watch', 'playlist', 'results', 'feed', 'channel', 'c', 'user', '@', 'about', 'featured', 'videos', 'shorts', 'streams', 'playlists', 'community', 'channels', 'store'],
|
|
'linkedin': ['company/shareArticle', 'company/share', 'company/login', 'in/shareArticle', 'in/share', 'in/login'],
|
|
'tiktok': ['embed', 'video'],
|
|
'twitter': ['intent', 'share', 'widgets.js', 'widgets', 'tweet', 'platform.twitter.com', 'bold_themes', 'boldthemes'],
|
|
}
|
|
|
|
|
|
class WebsiteAuditor:
|
|
"""Audits website technical details and metadata."""
|
|
|
|
def __init__(self):
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def audit_website(self, url: str) -> Dict[str, Any]:
|
|
"""
|
|
Perform comprehensive website audit.
|
|
|
|
Returns dict with:
|
|
- http_status, load_time_ms
|
|
- has_ssl, ssl_valid, ssl_expiry
|
|
- hosting_provider, hosting_ip, server_software
|
|
- site_author, site_generator
|
|
- is_mobile_friendly, has_viewport_meta
|
|
- last_modified_at
|
|
- social_media_links (dict of platform -> url)
|
|
"""
|
|
result = {
|
|
'url': url,
|
|
'http_status': None,
|
|
'load_time_ms': None,
|
|
'has_ssl': False,
|
|
'ssl_valid': False,
|
|
'ssl_expiry': None,
|
|
'ssl_issuer': None,
|
|
'hosting_provider': None,
|
|
'hosting_ip': None,
|
|
'server_software': None,
|
|
'site_author': None,
|
|
'site_generator': None,
|
|
'is_mobile_friendly': False,
|
|
'has_viewport_meta': False,
|
|
'last_modified_at': None,
|
|
'social_media_links': {},
|
|
'errors': [],
|
|
}
|
|
|
|
if not url:
|
|
result['errors'].append('No URL provided')
|
|
return result
|
|
|
|
# Normalize URL
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = 'https://' + url
|
|
|
|
parsed = urlparse(url)
|
|
domain = parsed.netloc
|
|
|
|
# 1. Check SSL certificate
|
|
try:
|
|
result.update(self._check_ssl(domain))
|
|
except Exception as e:
|
|
result['errors'].append(f'SSL check failed: {str(e)}')
|
|
|
|
# 2. Resolve IP and detect hosting
|
|
try:
|
|
result.update(self._detect_hosting(domain))
|
|
except Exception as e:
|
|
result['errors'].append(f'Hosting detection failed: {str(e)}')
|
|
|
|
# 3. Fetch page and analyze
|
|
try:
|
|
start_time = time.time()
|
|
response = self.session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True)
|
|
result['load_time_ms'] = int((time.time() - start_time) * 1000)
|
|
result['http_status'] = response.status_code
|
|
result['has_ssl'] = response.url.startswith('https://')
|
|
|
|
# Server header
|
|
result['server_software'] = response.headers.get('Server', '')[:100]
|
|
|
|
# Last-Modified header
|
|
last_mod = response.headers.get('Last-Modified')
|
|
if last_mod:
|
|
try:
|
|
result['last_modified_at'] = datetime.strptime(
|
|
last_mod, '%a, %d %b %Y %H:%M:%S %Z'
|
|
)
|
|
except:
|
|
pass
|
|
|
|
# Parse HTML
|
|
if response.status_code == 200:
|
|
result.update(self._parse_html(response.text))
|
|
|
|
except requests.exceptions.SSLError as e:
|
|
result['errors'].append(f'SSL Error: {str(e)}')
|
|
result['ssl_valid'] = False
|
|
# Try HTTP fallback
|
|
try:
|
|
http_url = url.replace('https://', 'http://')
|
|
response = self.session.get(http_url, timeout=REQUEST_TIMEOUT)
|
|
result['http_status'] = response.status_code
|
|
result['has_ssl'] = False
|
|
if response.status_code == 200:
|
|
result.update(self._parse_html(response.text))
|
|
except Exception as e2:
|
|
result['errors'].append(f'HTTP fallback failed: {str(e2)}')
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
result['errors'].append(f'Request failed: {str(e)}')
|
|
|
|
return result
|
|
|
|
def _check_ssl(self, domain: str) -> Dict[str, Any]:
|
|
"""Check SSL certificate validity, expiry and issuer."""
|
|
result = {'ssl_valid': False, 'ssl_expiry': None, 'ssl_issuer': None}
|
|
|
|
try:
|
|
context = ssl.create_default_context()
|
|
with socket.create_connection((domain, 443), timeout=10) as sock:
|
|
with context.wrap_socket(sock, server_hostname=domain) as ssock:
|
|
cert = ssock.getpeercert()
|
|
result['ssl_valid'] = True
|
|
|
|
# Parse expiry date
|
|
not_after = cert.get('notAfter')
|
|
if not_after:
|
|
result['ssl_expiry'] = datetime.strptime(
|
|
not_after, '%b %d %H:%M:%S %Y %Z'
|
|
).date()
|
|
|
|
# Extract issuer (Certificate Authority)
|
|
issuer = cert.get('issuer')
|
|
if issuer:
|
|
# issuer is tuple of tuples like ((('organizationName', 'Let\'s Encrypt'),),)
|
|
issuer_dict = {}
|
|
for item in issuer:
|
|
for key, value in item:
|
|
issuer_dict[key] = value
|
|
# Prefer Organization name, fallback to Common Name
|
|
issuer_name = issuer_dict.get('organizationName') or issuer_dict.get('commonName')
|
|
if issuer_name:
|
|
result['ssl_issuer'] = issuer_name[:100] # Limit length
|
|
except Exception as e:
|
|
result['ssl_valid'] = False
|
|
|
|
return result
|
|
|
|
def _detect_hosting(self, domain: str) -> Dict[str, Any]:
|
|
"""Detect hosting provider from IP and reverse DNS."""
|
|
result = {'hosting_provider': None, 'hosting_ip': None}
|
|
|
|
try:
|
|
ip = socket.gethostbyname(domain)
|
|
result['hosting_ip'] = ip
|
|
|
|
# Check against known hosting IP ranges
|
|
for provider, patterns in HOSTING_PROVIDERS.items():
|
|
for pattern in patterns:
|
|
if ip.startswith(pattern) or pattern in domain.lower():
|
|
result['hosting_provider'] = provider
|
|
return result
|
|
|
|
# Try reverse DNS
|
|
try:
|
|
reverse = socket.gethostbyaddr(ip)[0]
|
|
for provider, patterns in HOSTING_PROVIDERS.items():
|
|
for pattern in patterns:
|
|
if pattern in reverse.lower():
|
|
result['hosting_provider'] = provider
|
|
return result
|
|
except:
|
|
pass
|
|
|
|
# Try WHOIS for registrar
|
|
try:
|
|
w = whois.whois(domain)
|
|
if w.registrar:
|
|
result['domain_registrar'] = str(w.registrar)[:100]
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
result['errors'] = [f'Hosting detection: {str(e)}']
|
|
|
|
return result
|
|
|
|
def _parse_html(self, html: str) -> Dict[str, Any]:
|
|
"""Parse HTML for metadata and social media links."""
|
|
result = {
|
|
'site_author': None,
|
|
'site_generator': None,
|
|
'is_mobile_friendly': False,
|
|
'has_viewport_meta': False,
|
|
'social_media_links': {},
|
|
}
|
|
|
|
try:
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Check viewport meta (mobile-friendly indicator)
|
|
viewport = soup.find('meta', attrs={'name': 'viewport'})
|
|
if viewport:
|
|
result['has_viewport_meta'] = True
|
|
content = viewport.get('content', '')
|
|
if 'width=device-width' in content:
|
|
result['is_mobile_friendly'] = True
|
|
|
|
# Author meta
|
|
author = soup.find('meta', attrs={'name': 'author'})
|
|
if author:
|
|
result['site_author'] = author.get('content', '')[:255]
|
|
|
|
# Generator meta (CMS)
|
|
generator = soup.find('meta', attrs={'name': 'generator'})
|
|
if generator:
|
|
result['site_generator'] = generator.get('content', '')[:100]
|
|
|
|
# Look for author in multiple places
|
|
if not result['site_author']:
|
|
author_found = None
|
|
|
|
# 1. Check HTML comments for author info
|
|
comments = soup.find_all(string=lambda text: isinstance(text, str) and '<!--' in str(text.parent) if text.parent else False)
|
|
html_comments = re.findall(r'<!--(.+?)-->', html, re.DOTALL)
|
|
for comment in html_comments:
|
|
comment_patterns = [
|
|
r'(?:created by|designed by|developed by|made by|author)[:\s]+([^\n<>]+)',
|
|
r'(?:agencja|agency|studio)[:\s]+([^\n<>]+)',
|
|
]
|
|
for pattern in comment_patterns:
|
|
match = re.search(pattern, comment, re.IGNORECASE)
|
|
if match:
|
|
author_found = match.group(1).strip()
|
|
break
|
|
if author_found:
|
|
break
|
|
|
|
# 2. Check footer text
|
|
if not author_found:
|
|
footer = soup.find('footer')
|
|
if footer:
|
|
footer_text = footer.get_text(separator=' ')
|
|
footer_patterns = [
|
|
r'(?:wykonanie|realizacja|created by|designed by|made by|developed by)[:\s]+([^|<>\n©]+)',
|
|
r'(?:projekt|design|strona)[:\s]+([^|<>\n©]+)',
|
|
r'(?:powered by|built with)[:\s]+([^|<>\n©]+)',
|
|
r'(?:agencja|agency|studio)[:\s]+([^|<>\n©]+)',
|
|
]
|
|
for pattern in footer_patterns:
|
|
match = re.search(pattern, footer_text, re.IGNORECASE)
|
|
if match:
|
|
author_found = match.group(1).strip()
|
|
break
|
|
|
|
# 3. Check footer links for agency/studio domains
|
|
if not author_found:
|
|
footer_links = footer.find_all('a', href=True)
|
|
agency_domains = ['.pl', '.com', '.eu']
|
|
agency_keywords = ['studio', 'agencja', 'agency', 'design', 'web', 'digital', 'media', 'creative']
|
|
for link in footer_links:
|
|
href = link.get('href', '')
|
|
link_text = link.get_text().strip()
|
|
# Check if link looks like an agency
|
|
if any(kw in href.lower() or kw in link_text.lower() for kw in agency_keywords):
|
|
if any(dom in href for dom in agency_domains) and 'facebook' not in href and 'instagram' not in href:
|
|
# Extract domain or link text as author
|
|
if link_text and len(link_text) > 2 and len(link_text) < 50:
|
|
author_found = link_text
|
|
break
|
|
|
|
# 4. Check entire page for common Polish patterns
|
|
if not author_found:
|
|
page_text = soup.get_text(separator=' ')
|
|
page_patterns = [
|
|
r'(?:stronę wykonała?|witrynę wykonała?|stronę stworzył[ao]?)[:\s]+([^|<>\n©.]+)',
|
|
r'(?:copyright|©).*?(?:by|przez)[:\s]+([^|<>\n©.]+)',
|
|
]
|
|
for pattern in page_patterns:
|
|
match = re.search(pattern, page_text, re.IGNORECASE)
|
|
if match:
|
|
author_found = match.group(1).strip()
|
|
break
|
|
|
|
# Clean up author name
|
|
if author_found:
|
|
# Remove common prefixes/suffixes
|
|
author_found = re.sub(r'^[\s\-–—:]+', '', author_found)
|
|
author_found = re.sub(r'[\s\-–—:]+$', '', author_found)
|
|
author_found = re.sub(r'\s+', ' ', author_found)
|
|
# Remove if too short or looks like garbage
|
|
if len(author_found) > 2 and len(author_found) < 100:
|
|
result['site_author'] = author_found[:255]
|
|
|
|
# Extract social media links
|
|
html_lower = html.lower()
|
|
for platform, patterns in SOCIAL_MEDIA_PATTERNS.items():
|
|
found_for_platform = False
|
|
for pattern in patterns:
|
|
if found_for_platform:
|
|
break # Already found this platform, skip remaining patterns
|
|
matches = re.findall(pattern, html, re.IGNORECASE)
|
|
if matches:
|
|
# Get first valid match, excluding common false positives
|
|
for match in matches:
|
|
# Skip very short matches (likely truncated or generic paths)
|
|
if len(match) < 2:
|
|
continue
|
|
# Check against exclusion list (exact match only to avoid false positives)
|
|
excludes = SOCIAL_MEDIA_EXCLUDE.get(platform, [])
|
|
if match.lower() not in excludes:
|
|
# Construct full URL
|
|
if platform == 'facebook':
|
|
if match.isdigit():
|
|
url = f'https://facebook.com/profile.php?id={match}'
|
|
elif '/' in match:
|
|
# Multi-segment path (e.g. p/PageName-123)
|
|
url = f'https://facebook.com/{match}'
|
|
else:
|
|
url = f'https://facebook.com/{match}'
|
|
elif platform == 'instagram':
|
|
# Skip Instagram handles with tracking params (igsh=, utm_)
|
|
if '?' in match or '&' in match:
|
|
match = match.split('?')[0].split('&')[0]
|
|
if len(match) < 2:
|
|
continue
|
|
url = f'https://instagram.com/{match}'
|
|
elif platform == 'youtube':
|
|
if match.startswith('@'):
|
|
url = f'https://youtube.com/{match}'
|
|
else:
|
|
url = f'https://youtube.com/channel/{match}'
|
|
elif platform == 'linkedin':
|
|
url = f'https://linkedin.com/{match}'
|
|
elif platform == 'tiktok':
|
|
url = f'https://tiktok.com/@{match}'
|
|
elif platform == 'twitter':
|
|
url = f'https://twitter.com/{match}'
|
|
else:
|
|
continue
|
|
|
|
result['social_media_links'][platform] = url
|
|
found_for_platform = True
|
|
break # Found valid match, stop searching this pattern's matches
|
|
|
|
except Exception as e:
|
|
result['errors'] = [f'HTML parsing: {str(e)}']
|
|
|
|
return result
|
|
|
|
|
|
class GooglePlacesSearcher:
|
|
"""Search for Google Business profiles using Google Places API."""
|
|
|
|
# Google Places API configuration
|
|
FIND_PLACE_URL = 'https://maps.googleapis.com/maps/api/place/findplacefromtext/json'
|
|
PLACE_DETAILS_URL = 'https://maps.googleapis.com/maps/api/place/details/json'
|
|
|
|
def __init__(self, api_key: Optional[str] = None):
|
|
"""
|
|
Initialize GooglePlacesSearcher.
|
|
|
|
Args:
|
|
api_key: Google Places API key. Falls back to GOOGLE_PLACES_API_KEY env var.
|
|
"""
|
|
self.api_key = api_key or os.getenv('GOOGLE_PLACES_API_KEY')
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def find_place(self, company_name: str, city: str = 'Wejherowo') -> Optional[str]:
|
|
"""
|
|
Find a place by company name and city.
|
|
|
|
Uses Google Places findplacefromtext API to search for a business
|
|
and returns the place_id if found.
|
|
|
|
Args:
|
|
company_name: Name of the company to search for.
|
|
city: City to narrow down the search (default: Wejherowo).
|
|
|
|
Returns:
|
|
place_id string if found, None otherwise.
|
|
"""
|
|
if not self.api_key:
|
|
logger.warning('Google Places API key not configured')
|
|
return None
|
|
|
|
try:
|
|
# Construct search query with company name and city
|
|
search_query = f'{company_name} {city}'
|
|
|
|
params = {
|
|
'input': search_query,
|
|
'inputtype': 'textquery',
|
|
'fields': 'place_id,name,formatted_address',
|
|
'language': 'pl',
|
|
'key': self.api_key,
|
|
}
|
|
|
|
response = self.session.get(
|
|
self.FIND_PLACE_URL,
|
|
params=params,
|
|
timeout=REQUEST_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
if data.get('status') == 'OK' and data.get('candidates'):
|
|
candidate = data['candidates'][0]
|
|
place_id = candidate.get('place_id')
|
|
logger.info(
|
|
f"Found place for '{company_name}': {candidate.get('name')} "
|
|
f"at {candidate.get('formatted_address')}"
|
|
)
|
|
return place_id
|
|
elif data.get('status') == 'ZERO_RESULTS':
|
|
logger.info(f"No Google Business Profile found for '{company_name}' in {city}")
|
|
return None
|
|
else:
|
|
logger.warning(
|
|
f"Google Places API returned status: {data.get('status')} "
|
|
f"for '{company_name}'"
|
|
)
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.error(f"Timeout searching for '{company_name}' on Google Places")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request error searching for '{company_name}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error finding place for '{company_name}': {e}")
|
|
return None
|
|
|
|
def get_place_details(self, place_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get detailed information about a place.
|
|
|
|
Retrieves rating, review count, opening hours, and other business details
|
|
from Google Places API.
|
|
|
|
Args:
|
|
place_id: Google Place ID returned from find_place().
|
|
|
|
Returns:
|
|
Dict containing:
|
|
- google_rating: Decimal rating (1.0-5.0) or None
|
|
- google_reviews_count: Integer review count or None
|
|
- opening_hours: Dict with weekday_text and open_now, or None
|
|
- business_status: String like 'OPERATIONAL', 'CLOSED_TEMPORARILY', etc.
|
|
- formatted_phone: Phone number or None
|
|
- website: Website URL or None
|
|
"""
|
|
result = {
|
|
'google_rating': None,
|
|
'google_reviews_count': None,
|
|
'google_photos_count': None,
|
|
'opening_hours': None,
|
|
'business_status': None,
|
|
'formatted_phone': None,
|
|
'website': None,
|
|
}
|
|
|
|
if not self.api_key:
|
|
logger.warning('Google Places API key not configured')
|
|
return result
|
|
|
|
if not place_id:
|
|
return result
|
|
|
|
try:
|
|
# Request fields we need for the audit
|
|
fields = [
|
|
'rating',
|
|
'user_ratings_total',
|
|
'opening_hours',
|
|
'business_status',
|
|
'formatted_phone_number',
|
|
'website',
|
|
'name',
|
|
'photos',
|
|
]
|
|
|
|
params = {
|
|
'place_id': place_id,
|
|
'fields': ','.join(fields),
|
|
'language': 'pl',
|
|
'key': self.api_key,
|
|
}
|
|
|
|
response = self.session.get(
|
|
self.PLACE_DETAILS_URL,
|
|
params=params,
|
|
timeout=REQUEST_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
if data.get('status') == 'OK' and data.get('result'):
|
|
place = data['result']
|
|
|
|
# Extract rating
|
|
if 'rating' in place:
|
|
result['google_rating'] = round(float(place['rating']), 1)
|
|
|
|
# Extract review count
|
|
if 'user_ratings_total' in place:
|
|
result['google_reviews_count'] = int(place['user_ratings_total'])
|
|
|
|
# Extract opening hours
|
|
if 'opening_hours' in place:
|
|
hours = place['opening_hours']
|
|
result['opening_hours'] = {
|
|
'weekday_text': hours.get('weekday_text', []),
|
|
'open_now': hours.get('open_now'),
|
|
'periods': hours.get('periods', []),
|
|
}
|
|
|
|
# Extract business status
|
|
if 'business_status' in place:
|
|
result['business_status'] = place['business_status']
|
|
|
|
# Extract phone
|
|
if 'formatted_phone_number' in place:
|
|
result['formatted_phone'] = place['formatted_phone_number']
|
|
|
|
# Extract website
|
|
if 'website' in place:
|
|
result['website'] = place['website']
|
|
|
|
# Extract photos count
|
|
if 'photos' in place:
|
|
result['google_photos_count'] = len(place['photos'])
|
|
|
|
logger.info(
|
|
f"Retrieved details for {place.get('name')}: "
|
|
f"rating={result['google_rating']}, "
|
|
f"reviews={result['google_reviews_count']}, "
|
|
f"photos={result['google_photos_count']}"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Google Places API returned status: {data.get('status')} "
|
|
f"for place_id: {place_id}"
|
|
)
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.error(f"Timeout getting details for place_id: {place_id}")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request error getting place details: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error getting place details for {place_id}: {e}")
|
|
|
|
return result
|
|
|
|
|
|
class BraveSearcher:
|
|
"""Search for social media profiles and Google reviews using Brave Search."""
|
|
|
|
def __init__(self, api_key: Optional[str] = None):
|
|
self.api_key = api_key or os.getenv('BRAVE_API_KEY')
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def search_social_media(self, company_name: str, city: str = 'Wejherowo') -> Dict[str, str]:
|
|
"""
|
|
Search for company social media profiles.
|
|
Returns dict of platform -> url.
|
|
"""
|
|
results = {}
|
|
|
|
platforms = [
|
|
('facebook', f'{company_name} {city} facebook'),
|
|
('instagram', f'{company_name} instagram'),
|
|
('tiktok', f'{company_name} tiktok'),
|
|
('youtube', f'{company_name} youtube kanał'),
|
|
('linkedin', f'{company_name} linkedin.com/company'),
|
|
]
|
|
|
|
for platform, query in platforms:
|
|
try:
|
|
url = self._search_brave(query, platform, company_name)
|
|
if url:
|
|
results[platform] = url
|
|
time.sleep(0.5) # Rate limiting
|
|
except Exception as e:
|
|
logger.warning(f'Brave search failed for {platform}: {e}')
|
|
|
|
# LinkedIn fallback: if company page not found, try general search (may find personal profile)
|
|
if 'linkedin' not in results:
|
|
try:
|
|
url = self._search_brave(f'{company_name} linkedin', 'linkedin', company_name)
|
|
if url:
|
|
results['linkedin'] = url
|
|
logger.info(f"LinkedIn fallback found profile: {url}")
|
|
except Exception as e:
|
|
logger.warning(f'Brave search LinkedIn fallback failed: {e}')
|
|
|
|
return results
|
|
|
|
def search_google_reviews(self, company_name: str, city: str = 'Wejherowo') -> Dict[str, Any]:
|
|
"""
|
|
Search for Google reviews using Google Places API.
|
|
|
|
This method uses the GooglePlacesSearcher to find the company on Google
|
|
and retrieve its rating and review count.
|
|
|
|
Args:
|
|
company_name: Name of the company to search for.
|
|
city: City to narrow down the search (default: Wejherowo).
|
|
|
|
Returns:
|
|
Dict containing:
|
|
- google_rating: Decimal rating (1.0-5.0) or None
|
|
- google_reviews_count: Integer review count or None
|
|
- opening_hours: Dict with weekday_text and open_now, or None
|
|
- business_status: String like 'OPERATIONAL', 'CLOSED_TEMPORARILY', etc.
|
|
"""
|
|
result = {
|
|
'google_rating': None,
|
|
'google_reviews_count': None,
|
|
'opening_hours': None,
|
|
'business_status': None,
|
|
}
|
|
|
|
try:
|
|
# Use Google Places API for accurate data
|
|
google_api_key = os.getenv('GOOGLE_PLACES_API_KEY')
|
|
|
|
if google_api_key:
|
|
# Use GooglePlacesSearcher for accurate data retrieval
|
|
places_searcher = GooglePlacesSearcher(api_key=google_api_key)
|
|
|
|
# Step 1: Find the place by company name and city
|
|
place_id = places_searcher.find_place(company_name, city)
|
|
|
|
if place_id:
|
|
# Step 2: Get detailed information including reviews
|
|
details = places_searcher.get_place_details(place_id)
|
|
|
|
result['google_rating'] = details.get('google_rating')
|
|
result['google_reviews_count'] = details.get('google_reviews_count')
|
|
result['opening_hours'] = details.get('opening_hours')
|
|
result['business_status'] = details.get('business_status')
|
|
|
|
logger.info(
|
|
f"Google reviews for '{company_name}': "
|
|
f"rating={result['google_rating']}, "
|
|
f"reviews={result['google_reviews_count']}, "
|
|
f"status={result['business_status']}"
|
|
)
|
|
else:
|
|
logger.info(f"No Google Business Profile found for '{company_name}' in {city}")
|
|
else:
|
|
# Fallback: Try Brave Search API if available
|
|
if self.api_key:
|
|
brave_result = self._search_brave_for_reviews(company_name, city)
|
|
if brave_result:
|
|
result.update(brave_result)
|
|
else:
|
|
logger.warning(
|
|
'Neither GOOGLE_PLACES_API_KEY nor BRAVE_API_KEY configured. '
|
|
'Cannot retrieve Google reviews data.'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(f'Google reviews search failed for {company_name}: {e}')
|
|
|
|
return result
|
|
|
|
def _search_brave_for_reviews(self, company_name: str, city: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Fallback method to search for Google reviews via Brave Search API.
|
|
|
|
This parses search results to extract rating and review count from
|
|
Google Business snippets in search results.
|
|
|
|
Args:
|
|
company_name: Name of the company.
|
|
city: City for location context.
|
|
|
|
Returns:
|
|
Dict with google_rating and google_reviews_count, or None if not found.
|
|
"""
|
|
if not self.api_key:
|
|
return None
|
|
|
|
try:
|
|
query = f'{company_name} {city} opinie google'
|
|
|
|
# Brave Web Search API endpoint
|
|
url = 'https://api.search.brave.com/res/v1/web/search'
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
'Accept-Encoding': 'gzip',
|
|
'X-Subscription-Token': self.api_key,
|
|
}
|
|
params = {
|
|
'q': query,
|
|
'count': 10,
|
|
'country': 'pl',
|
|
'search_lang': 'pl',
|
|
'ui_lang': 'pl-PL',
|
|
}
|
|
|
|
response = self.session.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Parse search results for rating/review patterns
|
|
# Google snippets often contain patterns like "4,5 (123 opinii)" or "Rating: 4.5 · 123 reviews"
|
|
for result in data.get('web', {}).get('results', []):
|
|
snippet = result.get('description', '') + ' ' + result.get('title', '')
|
|
|
|
# Pattern for Polish Google reviews: "4,5 (123 opinii)" or "4.5 · 123 reviews"
|
|
rating_patterns = [
|
|
r'(\d+[,\.]\d)\s*[·\(]\s*(\d+)\s*(?:opinii|recenzji|reviews)',
|
|
r'ocena[:\s]+(\d+[,\.]\d).*?(\d+)\s*(?:opinii|recenzji)',
|
|
r'rating[:\s]+(\d+[,\.]\d).*?(\d+)\s*(?:reviews|opinii)',
|
|
]
|
|
|
|
for pattern in rating_patterns:
|
|
match = re.search(pattern, snippet, re.IGNORECASE)
|
|
if match:
|
|
rating_str = match.group(1).replace(',', '.')
|
|
reviews_str = match.group(2)
|
|
|
|
return {
|
|
'google_rating': round(float(rating_str), 1),
|
|
'google_reviews_count': int(reviews_str),
|
|
}
|
|
|
|
logger.info(f"No Google reviews data found in Brave results for '{company_name}'")
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.warning(f"Timeout searching Brave for '{company_name}' reviews")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"Brave API request failed for '{company_name}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing Brave results for '{company_name}': {e}")
|
|
return None
|
|
|
|
def _search_brave(self, query: str, platform: str, company_name: str = '', **kwargs) -> Optional[str]:
|
|
"""
|
|
Perform Brave search and extract relevant social media URL.
|
|
Validates results against company_name to avoid false matches.
|
|
Returns normalized URL for the platform or None.
|
|
"""
|
|
if not self.api_key:
|
|
logger.debug(f"No Brave API key - skipping search for {platform}")
|
|
return None
|
|
|
|
try:
|
|
url = 'https://api.search.brave.com/res/v1/web/search'
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
'Accept-Encoding': 'gzip',
|
|
'X-Subscription-Token': self.api_key,
|
|
}
|
|
params = {
|
|
'q': query,
|
|
'count': 10,
|
|
'country': 'pl',
|
|
'search_lang': 'pl',
|
|
'ui_lang': 'pl-PL',
|
|
}
|
|
|
|
response = self.session.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
results = data.get('web', {}).get('results', [])
|
|
|
|
# Platform domain patterns
|
|
domain_patterns = {
|
|
'facebook': r'facebook\.com/',
|
|
'instagram': r'instagram\.com/',
|
|
'youtube': r'youtube\.com/',
|
|
'linkedin': r'linkedin\.com/(?:company|in)/',
|
|
'tiktok': r'tiktok\.com/@',
|
|
'twitter': r'(?:twitter|x)\.com/',
|
|
}
|
|
|
|
pattern = domain_patterns.get(platform)
|
|
if not pattern:
|
|
return None
|
|
|
|
# Prepare company name variations for matching
|
|
name_lower = company_name.lower().strip()
|
|
# Generate matching tokens with word boundary patterns
|
|
# (e.g. "Waterm Artur Wiertel" -> [r'\bwaterm\b', r'\bartur\b', r'\bwiertel\b'])
|
|
name_tokens = [re.compile(r'\b' + re.escape(t) + r'\b', re.IGNORECASE)
|
|
for t in name_lower.split() if len(t) >= 3]
|
|
|
|
candidates = []
|
|
for rank, result in enumerate(results):
|
|
result_url = result.get('url', '')
|
|
result_title = result.get('title', '')
|
|
result_desc = result.get('description', '')
|
|
|
|
if not re.search(pattern, result_url, re.IGNORECASE):
|
|
continue
|
|
|
|
# Extract handle first, then check excludes against handle (not full URL)
|
|
extracted_url = None
|
|
handle = None
|
|
for regex in SOCIAL_MEDIA_PATTERNS.get(platform, []):
|
|
match = re.search(regex, result_url, re.IGNORECASE)
|
|
if match:
|
|
handle = match.group(1)
|
|
if len(handle) >= 2:
|
|
extracted_url = self._build_social_url(platform, handle)
|
|
break
|
|
|
|
if not extracted_url:
|
|
extracted_url = result_url
|
|
|
|
# Validate it's a real profile, not a search/share page
|
|
# Check handle against excludes (exact match on first path segment)
|
|
excludes = SOCIAL_MEDIA_EXCLUDE.get(platform, [])
|
|
handle_base = (handle or '').split('/')[0].lower()
|
|
is_excluded = handle_base in [ex.lower() for ex in excludes]
|
|
if is_excluded:
|
|
continue
|
|
|
|
# Check if result relates to the company
|
|
searchable = f'{result_title} {result_desc} {result_url}'.lower()
|
|
# Count how many name tokens appear in the result (word boundary match)
|
|
token_matches = sum(1 for t in name_tokens if t.search(searchable))
|
|
|
|
if token_matches == 0:
|
|
continue # No connection to company at all
|
|
|
|
# For LinkedIn: prioritize /company/ over /in/ (company pages > personal)
|
|
is_company_page = 1 if (platform == 'linkedin' and '/company/' in (extracted_url or '')) else 0
|
|
|
|
candidates.append((is_company_page, token_matches, extracted_url))
|
|
|
|
if candidates:
|
|
# Sort by: 1) company page priority, 2) token matches (best match first)
|
|
candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
|
|
best_url = candidates[0][2]
|
|
logger.info(f"Brave search matched {platform}: {best_url} (company={candidates[0][0]}, score={candidates[0][1]}/{len(name_tokens)})")
|
|
return best_url
|
|
|
|
logger.debug(f"No {platform} profile found in Brave results for: {query}")
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.warning(f"Timeout searching Brave for '{query}'")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"Brave API request failed for '{query}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing Brave results for '{query}': {e}")
|
|
return None
|
|
|
|
def _check_linkedin_company_page(self, company_name: str) -> Optional[str]:
|
|
"""
|
|
Try direct LinkedIn company page URL based on company name slugs.
|
|
Returns URL if page exists and title matches, None otherwise.
|
|
"""
|
|
# Generate slug candidates from company name
|
|
name_clean = company_name.strip()
|
|
slugs = set()
|
|
|
|
# Basic slug: lowercase, spaces to hyphens
|
|
slug = re.sub(r'[^a-z0-9\s-]', '', name_clean.lower())
|
|
slug = re.sub(r'\s+', '-', slug).strip('-')
|
|
if slug:
|
|
slugs.add(slug)
|
|
|
|
# First word only (common for short brand names like "Waterm")
|
|
first_word = name_clean.split()[0].lower() if name_clean.split() else ''
|
|
first_word = re.sub(r'[^a-z0-9]', '', first_word)
|
|
if first_word and len(first_word) >= 3:
|
|
slugs.add(first_word)
|
|
|
|
name_tokens = [re.compile(r'\b' + re.escape(t) + r'\b', re.IGNORECASE)
|
|
for t in name_clean.lower().split() if len(t) >= 3]
|
|
|
|
for slug in slugs:
|
|
try:
|
|
check_url = f'https://www.linkedin.com/company/{slug}'
|
|
resp = self.session.get(check_url, timeout=8, allow_redirects=True)
|
|
if resp.status_code == 200:
|
|
# Verify title contains company name
|
|
title_match = re.search(r'<title>([^<]+)</title>', resp.text)
|
|
if title_match:
|
|
title = title_match.group(1).lower()
|
|
if any(t.search(title) for t in name_tokens):
|
|
logger.info(f"LinkedIn company page verified: {check_url} (title: {title_match.group(1)})")
|
|
return f'https://linkedin.com/company/{slug}'
|
|
else:
|
|
logger.debug(f"LinkedIn /company/{slug} exists but title '{title_match.group(1)}' doesn't match '{company_name}'")
|
|
except Exception as e:
|
|
logger.debug(f"LinkedIn company page check failed for {slug}: {e}")
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def _build_social_url(platform: str, handle: str) -> str:
|
|
"""Build normalized social media URL from platform and handle."""
|
|
if platform == 'facebook':
|
|
if handle.isdigit():
|
|
return f'https://facebook.com/profile.php?id={handle}'
|
|
return f'https://facebook.com/{handle}'
|
|
elif platform == 'instagram':
|
|
handle = handle.split('?')[0].split('&')[0]
|
|
return f'https://instagram.com/{handle}'
|
|
elif platform == 'youtube':
|
|
if handle.startswith('@'):
|
|
return f'https://youtube.com/{handle}'
|
|
return f'https://youtube.com/channel/{handle}'
|
|
elif platform == 'linkedin':
|
|
return f'https://linkedin.com/{handle}'
|
|
elif platform == 'tiktok':
|
|
return f'https://tiktok.com/@{handle}'
|
|
elif platform == 'twitter':
|
|
return f'https://twitter.com/{handle}'
|
|
return handle
|
|
|
|
|
|
class SocialProfileEnricher:
|
|
"""Enriches social media profiles with additional data from public APIs and scraping."""
|
|
|
|
def __init__(self):
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def enrich_profile(self, platform: str, url: str) -> Dict[str, Any]:
|
|
"""Fetch additional data for a social media profile."""
|
|
enrichers = {
|
|
'facebook': self._enrich_facebook,
|
|
'instagram': self._enrich_instagram,
|
|
'youtube': self._enrich_youtube,
|
|
'linkedin': self._enrich_linkedin,
|
|
'tiktok': self._enrich_tiktok,
|
|
'twitter': self._enrich_twitter,
|
|
}
|
|
enricher = enrichers.get(platform)
|
|
if enricher:
|
|
try:
|
|
return enricher(url)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to enrich {platform} profile {url}: {e}")
|
|
return {}
|
|
return {}
|
|
|
|
def _enrich_facebook(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich Facebook page data from public page HTML."""
|
|
result = {}
|
|
try:
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
# Extract page name from og:title
|
|
og_match = re.search(r'<meta\s+property="og:title"\s+content="([^"]+)"', html)
|
|
if og_match:
|
|
result['page_name'] = og_match.group(1)
|
|
# Check for profile photo via og:image
|
|
og_img = re.search(r'<meta\s+property="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
# Description from og:description
|
|
og_desc = re.search(r'<meta\s+property="og:description"\s+content="([^"]+)"', html)
|
|
if og_desc:
|
|
result['profile_description'] = og_desc.group(1)[:500]
|
|
result['has_bio'] = True
|
|
else:
|
|
result['has_bio'] = False
|
|
except Exception as e:
|
|
logger.debug(f"Facebook enrichment failed: {e}")
|
|
return result
|
|
|
|
def _enrich_instagram(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich Instagram profile data."""
|
|
result = {}
|
|
try:
|
|
# Try og:description which often contains "X Followers, Y Following, Z Posts"
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
# og:description format: "123 Followers, 45 Following, 67 Posts - See Instagram photos..."
|
|
og_desc = re.search(r'<meta\s+(?:property|name)="og:description"\s+content="([^"]+)"', html)
|
|
if og_desc:
|
|
desc = og_desc.group(1)
|
|
# Extract followers
|
|
followers_match = re.search(r'([\d,\.]+[KMkm]?)\s+Followers', desc)
|
|
if followers_match:
|
|
result['followers_count'] = self._parse_count(followers_match.group(1))
|
|
# Extract posts count
|
|
posts_match = re.search(r'([\d,\.]+[KMkm]?)\s+Posts', desc)
|
|
if posts_match:
|
|
result['posts_count_365d'] = self._parse_count(posts_match.group(1))
|
|
# Bio is after the dash
|
|
bio_match = re.search(r'Posts\s*[-\u2013\u2014]\s*(.+)', desc)
|
|
if bio_match:
|
|
bio_text = bio_match.group(1).strip()
|
|
if bio_text and not bio_text.startswith('See Instagram'):
|
|
result['profile_description'] = bio_text[:500]
|
|
result['has_bio'] = True
|
|
# Profile photo from og:image
|
|
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
except Exception as e:
|
|
logger.debug(f"Instagram enrichment failed: {e}")
|
|
return result
|
|
|
|
def _enrich_youtube(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich YouTube channel data."""
|
|
result = {}
|
|
try:
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
# Subscriber count from meta or JSON
|
|
subs_match = re.search(r'"subscriberCountText":\s*\{"simpleText":\s*"([^"]+)"\}', html)
|
|
if subs_match:
|
|
result['followers_count'] = self._parse_count(subs_match.group(1).split(' ')[0])
|
|
# Video count
|
|
videos_match = re.search(r'"videosCountText":\s*\{"runs":\s*\[\{"text":\s*"([^"]+)"\}', html)
|
|
if videos_match:
|
|
result['posts_count_365d'] = self._parse_count(videos_match.group(1))
|
|
# Channel description
|
|
desc_match = re.search(r'"description":\s*"([^"]*(?:\\.[^"]*)*)"', html)
|
|
if desc_match:
|
|
desc = desc_match.group(1).replace('\\n', ' ').strip()
|
|
if desc and len(desc) > 5:
|
|
result['profile_description'] = desc[:500]
|
|
result['has_bio'] = True
|
|
# Avatar from og:image
|
|
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
# Channel name
|
|
name_match = re.search(r'<meta\s+(?:property|name)="og:title"\s+content="([^"]+)"', html)
|
|
if name_match:
|
|
result['page_name'] = name_match.group(1)
|
|
except Exception as e:
|
|
logger.debug(f"YouTube enrichment failed: {e}")
|
|
return result
|
|
|
|
def _enrich_linkedin(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich LinkedIn company page data."""
|
|
result = {}
|
|
try:
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
og_desc = re.search(r'<meta\s+(?:property|name)="og:description"\s+content="([^"]+)"', html)
|
|
if og_desc:
|
|
desc = og_desc.group(1).strip()
|
|
# LinkedIn descriptions often have follower count
|
|
followers_match = re.search(r'([\d,\.]+)\s+followers', desc, re.IGNORECASE)
|
|
if followers_match:
|
|
result['followers_count'] = self._parse_count(followers_match.group(1))
|
|
result['profile_description'] = desc[:500]
|
|
result['has_bio'] = True
|
|
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
name_match = re.search(r'<meta\s+(?:property|name)="og:title"\s+content="([^"]+)"', html)
|
|
if name_match:
|
|
result['page_name'] = name_match.group(1)
|
|
except Exception as e:
|
|
logger.debug(f"LinkedIn enrichment failed: {e}")
|
|
return result
|
|
|
|
def _enrich_tiktok(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich TikTok profile data."""
|
|
result = {}
|
|
try:
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
# TikTok embeds profile data in JSON
|
|
followers_match = re.search(r'"followerCount":\s*(\d+)', html)
|
|
if followers_match:
|
|
result['followers_count'] = int(followers_match.group(1))
|
|
videos_match = re.search(r'"videoCount":\s*(\d+)', html)
|
|
if videos_match:
|
|
result['posts_count_365d'] = int(videos_match.group(1))
|
|
desc_match = re.search(r'"signature":\s*"([^"]*)"', html)
|
|
if desc_match and desc_match.group(1).strip():
|
|
result['profile_description'] = desc_match.group(1)[:500]
|
|
result['has_bio'] = True
|
|
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
name_match = re.search(r'"nickname":\s*"([^"]+)"', html)
|
|
if name_match:
|
|
result['page_name'] = name_match.group(1)
|
|
except Exception as e:
|
|
logger.debug(f"TikTok enrichment failed: {e}")
|
|
return result
|
|
|
|
def _enrich_twitter(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich Twitter/X profile data using og tags from public page."""
|
|
result = {}
|
|
try:
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
og_desc = re.search(r'<meta\s+(?:property|name)="og:description"\s+content="([^"]+)"', html)
|
|
if og_desc:
|
|
result['profile_description'] = og_desc.group(1)[:500]
|
|
result['has_bio'] = True
|
|
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
name_match = re.search(r'<meta\s+(?:property|name)="og:title"\s+content="([^"]+)"', html)
|
|
if name_match:
|
|
result['page_name'] = name_match.group(1)
|
|
except Exception as e:
|
|
logger.debug(f"Twitter enrichment failed: {e}")
|
|
return result
|
|
|
|
@staticmethod
|
|
def _parse_count(text: str) -> Optional[int]:
|
|
"""Parse follower/subscriber count strings like '1.2K', '3,456', '2.1M'."""
|
|
if not text:
|
|
return None
|
|
text = text.strip().replace(',', '').replace(' ', '')
|
|
try:
|
|
multipliers = {'k': 1000, 'm': 1000000, 'b': 1000000000}
|
|
last_char = text[-1].lower()
|
|
if last_char in multipliers:
|
|
return int(float(text[:-1]) * multipliers[last_char])
|
|
return int(float(text))
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
def calculate_profile_completeness(profile_data: Dict[str, Any]) -> int:
|
|
"""Calculate profile completeness score 0-100 for a social media profile."""
|
|
score = 0
|
|
if profile_data.get('url'): score += 20 # Profile exists
|
|
if profile_data.get('has_bio'): score += 15 # Bio filled
|
|
if profile_data.get('has_profile_photo'): score += 15 # Avatar
|
|
if profile_data.get('has_cover_photo'): score += 10 # Cover photo
|
|
if (profile_data.get('followers_count') or 0) > 10: score += 10 # Has followers
|
|
if (profile_data.get('posts_count_30d') or 0) > 0: score += 15 # Active in last 30d
|
|
if (profile_data.get('engagement_rate') or 0) > 1: score += 15 # Good engagement
|
|
return min(score, 100)
|
|
|
|
|
|
class SocialMediaAuditor:
|
|
"""Main auditor class that coordinates website and social media auditing."""
|
|
|
|
def __init__(self, database_url: str = DATABASE_URL):
|
|
self.engine = create_engine(database_url)
|
|
self.Session = sessionmaker(bind=self.engine)
|
|
self.website_auditor = WebsiteAuditor()
|
|
# Brave Search disabled — too many false positives (matches unrelated companies by name)
|
|
# Social media profiles are now sourced only from website scraping and manual admin entry
|
|
# self.brave_searcher = BraveSearcher()
|
|
self.profile_enricher = SocialProfileEnricher()
|
|
|
|
# Initialize Google Places searcher if API key is available
|
|
google_places_api_key = os.getenv('GOOGLE_PLACES_API_KEY')
|
|
if google_places_api_key:
|
|
self.google_places_searcher = GooglePlacesSearcher(api_key=google_places_api_key)
|
|
logger.info('Google Places API key found - using Places API for reviews')
|
|
else:
|
|
self.google_places_searcher = None
|
|
logger.info('GOOGLE_PLACES_API_KEY not set - falling back to Brave Search for reviews')
|
|
|
|
def get_companies(self, company_ids: Optional[List[int]] = None,
|
|
batch_start: Optional[int] = None,
|
|
batch_end: Optional[int] = None) -> List[Dict]:
|
|
"""Fetch companies from database."""
|
|
with self.Session() as session:
|
|
if company_ids:
|
|
query = text("""
|
|
SELECT id, name, slug, website, address_city
|
|
FROM companies
|
|
WHERE id = ANY(:ids)
|
|
ORDER BY id
|
|
""")
|
|
result = session.execute(query, {'ids': company_ids})
|
|
elif batch_start is not None and batch_end is not None:
|
|
query = text("""
|
|
SELECT id, name, slug, website, address_city
|
|
FROM companies
|
|
ORDER BY id
|
|
OFFSET :offset LIMIT :limit
|
|
""")
|
|
result = session.execute(query, {
|
|
'offset': batch_start - 1,
|
|
'limit': batch_end - batch_start + 1
|
|
})
|
|
else:
|
|
query = text("""
|
|
SELECT id, name, slug, website, address_city
|
|
FROM companies
|
|
ORDER BY id
|
|
""")
|
|
result = session.execute(query)
|
|
|
|
return [dict(row._mapping) for row in result]
|
|
|
|
def get_company_id_by_slug(self, slug: str) -> Optional[int]:
|
|
"""Get company ID by slug."""
|
|
with self.Session() as session:
|
|
query = text("""
|
|
SELECT id FROM companies WHERE slug = :slug
|
|
""")
|
|
result = session.execute(query, {'slug': slug})
|
|
row = result.fetchone()
|
|
if row:
|
|
return row[0]
|
|
return None
|
|
|
|
def audit_company(self, company: Dict) -> Dict[str, Any]:
|
|
"""
|
|
Perform full audit for a single company.
|
|
|
|
Returns comprehensive audit result.
|
|
"""
|
|
logger.info(f"Auditing company: {company['name']} (ID: {company['id']})")
|
|
logger.info(f"Company website: {company.get('website', 'NOT SET')}")
|
|
|
|
result = {
|
|
'company_id': company['id'],
|
|
'company_name': company['name'],
|
|
'audit_date': datetime.now(),
|
|
'website': {},
|
|
'social_media': {},
|
|
'google_reviews': {},
|
|
'errors': [],
|
|
}
|
|
|
|
# 1. Website audit
|
|
if company.get('website'):
|
|
try:
|
|
logger.info(f"Starting website audit for: {company['website']}")
|
|
result['website'] = self.website_auditor.audit_website(company['website'])
|
|
logger.info(f"Website audit completed. HTTP status: {result['website'].get('http_status')}")
|
|
except Exception as e:
|
|
logger.error(f"Website audit failed: {str(e)}")
|
|
result['errors'].append(f'Website audit failed: {str(e)}')
|
|
else:
|
|
logger.warning(f"No website URL for company {company['name']}")
|
|
result['website'] = {'errors': ['No website URL']}
|
|
|
|
# 2. Social media from website
|
|
website_social = result['website'].get('social_media_links', {})
|
|
social_sources = {} # Track source per platform
|
|
if website_social:
|
|
logger.info(f"Social media found on website: {list(website_social.keys())}")
|
|
for p in website_social:
|
|
social_sources[p] = 'website_scrape'
|
|
else:
|
|
logger.info("No social media links found on website")
|
|
|
|
# 3. Brave Search disabled — too many false positives
|
|
# Social media profiles are now sourced only from:
|
|
# - Website scraping (automatic, from company's own website)
|
|
# - Manual admin entry (via company profile editing)
|
|
# - OAuth API (Facebook Graph API for verified data)
|
|
|
|
result['social_media'] = website_social
|
|
result['social_sources'] = social_sources
|
|
logger.info(f"Total social media profiles found: {len(website_social)} - {list(website_social.keys())}")
|
|
|
|
# OAuth: Try Facebook/Instagram Graph API for authenticated data
|
|
try:
|
|
from oauth_service import OAuthService
|
|
from facebook_graph_service import FacebookGraphService
|
|
from database import SessionLocal as OAuthSessionLocal, OAuthToken
|
|
|
|
oauth = OAuthService()
|
|
company_id = company.get('id')
|
|
if company_id:
|
|
oauth_db = OAuthSessionLocal()
|
|
try:
|
|
fb_token = oauth.get_valid_token(oauth_db, company_id, 'meta', 'facebook')
|
|
if fb_token:
|
|
fb_service = FacebookGraphService(fb_token)
|
|
token_rec = oauth_db.query(OAuthToken).filter(
|
|
OAuthToken.company_id == company_id,
|
|
OAuthToken.provider == 'meta',
|
|
OAuthToken.service == 'facebook',
|
|
OAuthToken.is_active == True,
|
|
).first()
|
|
page_id = token_rec.account_id if token_rec else None
|
|
|
|
if page_id:
|
|
page_info = fb_service.get_page_info(page_id)
|
|
if page_info:
|
|
result['oauth_facebook'] = {
|
|
'fan_count': page_info.get('fan_count'),
|
|
'category': page_info.get('category'),
|
|
'data_source': 'oauth_api',
|
|
}
|
|
insights = fb_service.get_page_insights(page_id)
|
|
if insights:
|
|
result['oauth_facebook_insights'] = insights
|
|
|
|
ig_id = fb_service.get_instagram_account(page_id)
|
|
if ig_id:
|
|
ig_insights = fb_service.get_ig_media_insights(ig_id)
|
|
if ig_insights:
|
|
result['oauth_instagram'] = {
|
|
**ig_insights,
|
|
'data_source': 'oauth_api',
|
|
}
|
|
logger.info(f"OAuth Facebook/IG enrichment done for company {company_id}")
|
|
finally:
|
|
oauth_db.close()
|
|
except ImportError:
|
|
pass # Services not yet available
|
|
except Exception as e:
|
|
logger.warning(f"OAuth social media enrichment failed: {e}")
|
|
|
|
# 5. Enrich social media profiles with additional data
|
|
enriched_profiles = {}
|
|
for platform, url in website_social.items():
|
|
logger.info(f"Enriching {platform} profile: {url}")
|
|
enrichment = self.profile_enricher.enrich_profile(platform, url)
|
|
enriched_profiles[platform] = {
|
|
'url': url,
|
|
**enrichment,
|
|
}
|
|
# Calculate completeness score
|
|
enriched_profiles[platform]['profile_completeness_score'] = calculate_profile_completeness(enriched_profiles[platform])
|
|
|
|
# Calculate engagement rate (ESTIMATED - without API we don't have real engagement data)
|
|
profile = enriched_profiles[platform]
|
|
if profile.get('followers_count') and profile.get('followers_count') > 0 and profile.get('posts_count_30d') and profile.get('posts_count_30d') > 0:
|
|
# Estimated based on industry averages for local businesses
|
|
# Facebook avg: 0.5-2%, Instagram: 1-3%, LinkedIn: 0.5-1%
|
|
base_rates = {'facebook': 1.0, 'instagram': 2.0, 'linkedin': 0.7, 'youtube': 0.5, 'twitter': 0.3, 'tiktok': 3.0}
|
|
base = base_rates.get(platform, 1.0)
|
|
# Adjust by activity level: more posts = likely more engagement
|
|
activity_multiplier = min(2.0, profile.get('posts_count_30d', 0) / 4.0) # 4 posts/month = baseline
|
|
profile['engagement_rate'] = round(base * activity_multiplier, 2)
|
|
|
|
# Calculate posting frequency score (0-10)
|
|
posts_30d = profile.get('posts_count_30d')
|
|
if posts_30d is not None:
|
|
if posts_30d == 0:
|
|
profile['posting_frequency_score'] = 0
|
|
elif posts_30d <= 2:
|
|
profile['posting_frequency_score'] = 3
|
|
elif posts_30d <= 4:
|
|
profile['posting_frequency_score'] = 5
|
|
elif posts_30d <= 8:
|
|
profile['posting_frequency_score'] = 7
|
|
elif posts_30d <= 15:
|
|
profile['posting_frequency_score'] = 9
|
|
else:
|
|
profile['posting_frequency_score'] = 10
|
|
|
|
result['enriched_profiles'] = enriched_profiles
|
|
|
|
# 4. Google reviews search - prefer Google Places API if available
|
|
try:
|
|
if self.google_places_searcher:
|
|
# Use Google Places API directly for accurate data
|
|
place_id = self.google_places_searcher.find_place(company['name'], city)
|
|
if place_id:
|
|
details = self.google_places_searcher.get_place_details(place_id)
|
|
result['google_reviews'] = {
|
|
'google_rating': details.get('google_rating'),
|
|
'google_reviews_count': details.get('google_reviews_count'),
|
|
'google_opening_hours': details.get('opening_hours'),
|
|
'google_photos_count': details.get('google_photos_count'),
|
|
'business_status': details.get('business_status'),
|
|
}
|
|
else:
|
|
result['google_reviews'] = {
|
|
'google_rating': None,
|
|
'google_reviews_count': None,
|
|
'google_opening_hours': None,
|
|
'google_photos_count': None,
|
|
'business_status': None,
|
|
}
|
|
else:
|
|
# Fallback to Brave Search
|
|
result['google_reviews'] = self.brave_searcher.search_google_reviews(
|
|
company['name'], city
|
|
)
|
|
except Exception as e:
|
|
result['errors'].append(f'Google reviews search failed: {str(e)}')
|
|
|
|
return result
|
|
|
|
def save_audit_result(self, result: Dict) -> bool:
|
|
"""Save audit result to database."""
|
|
try:
|
|
with self.Session() as session:
|
|
company_id = result['company_id']
|
|
website = result.get('website', {})
|
|
|
|
# Update or insert website analysis
|
|
upsert_website = text("""
|
|
INSERT INTO company_website_analysis (
|
|
company_id, analyzed_at, website_url, http_status_code,
|
|
load_time_ms, has_ssl, ssl_expires_at, ssl_issuer, is_responsive,
|
|
is_mobile_friendly, has_viewport_meta, last_modified_at,
|
|
hosting_provider, hosting_ip, server_software, site_author,
|
|
cms_detected, google_rating, google_reviews_count,
|
|
google_opening_hours, google_photos_count,
|
|
audit_source, audit_version
|
|
) VALUES (
|
|
:company_id, :analyzed_at, :website_url, :http_status_code,
|
|
:load_time_ms, :has_ssl, :ssl_expires_at, :ssl_issuer, :is_responsive,
|
|
:is_mobile_friendly, :has_viewport_meta, :last_modified_at,
|
|
:hosting_provider, :hosting_ip, :server_software, :site_author,
|
|
:cms_detected, :google_rating, :google_reviews_count,
|
|
:google_opening_hours, :google_photos_count,
|
|
:audit_source, :audit_version
|
|
)
|
|
ON CONFLICT (company_id) DO UPDATE SET
|
|
analyzed_at = EXCLUDED.analyzed_at,
|
|
http_status_code = EXCLUDED.http_status_code,
|
|
load_time_ms = EXCLUDED.load_time_ms,
|
|
has_ssl = EXCLUDED.has_ssl,
|
|
ssl_expires_at = EXCLUDED.ssl_expires_at,
|
|
ssl_issuer = EXCLUDED.ssl_issuer,
|
|
is_mobile_friendly = EXCLUDED.is_mobile_friendly,
|
|
has_viewport_meta = EXCLUDED.has_viewport_meta,
|
|
last_modified_at = EXCLUDED.last_modified_at,
|
|
hosting_provider = EXCLUDED.hosting_provider,
|
|
hosting_ip = EXCLUDED.hosting_ip,
|
|
server_software = EXCLUDED.server_software,
|
|
site_author = EXCLUDED.site_author,
|
|
cms_detected = EXCLUDED.cms_detected,
|
|
google_rating = EXCLUDED.google_rating,
|
|
google_reviews_count = EXCLUDED.google_reviews_count,
|
|
google_opening_hours = EXCLUDED.google_opening_hours,
|
|
google_photos_count = EXCLUDED.google_photos_count,
|
|
audit_source = EXCLUDED.audit_source,
|
|
audit_version = EXCLUDED.audit_version
|
|
""")
|
|
|
|
google_reviews = result.get('google_reviews', {})
|
|
|
|
# Convert opening_hours dict to JSON string for JSONB column
|
|
opening_hours = google_reviews.get('google_opening_hours')
|
|
opening_hours_json = json.dumps(opening_hours) if opening_hours else None
|
|
|
|
session.execute(upsert_website, {
|
|
'company_id': company_id,
|
|
'analyzed_at': result['audit_date'],
|
|
'website_url': website.get('url'),
|
|
'http_status_code': website.get('http_status'),
|
|
'load_time_ms': website.get('load_time_ms'),
|
|
'has_ssl': website.get('has_ssl', False),
|
|
'ssl_expires_at': website.get('ssl_expiry'),
|
|
'ssl_issuer': website.get('ssl_issuer'),
|
|
'is_responsive': website.get('is_mobile_friendly', False),
|
|
'is_mobile_friendly': website.get('is_mobile_friendly', False),
|
|
'has_viewport_meta': website.get('has_viewport_meta', False),
|
|
'last_modified_at': website.get('last_modified_at'),
|
|
'hosting_provider': website.get('hosting_provider'),
|
|
'hosting_ip': website.get('hosting_ip'),
|
|
'server_software': website.get('server_software'),
|
|
'site_author': website.get('site_author'),
|
|
'cms_detected': website.get('site_generator'),
|
|
'google_rating': google_reviews.get('google_rating'),
|
|
'google_reviews_count': google_reviews.get('google_reviews_count'),
|
|
'google_opening_hours': opening_hours_json,
|
|
'google_photos_count': google_reviews.get('google_photos_count'),
|
|
'audit_source': 'automated',
|
|
'audit_version': '1.0',
|
|
})
|
|
|
|
# Save social media with enriched data
|
|
social_sources = result.get('social_sources', {})
|
|
for platform, url in result.get('social_media', {}).items():
|
|
normalized_url = normalize_social_url(url, platform)
|
|
|
|
# Get enrichment data if available
|
|
enriched = result.get('enriched_profiles', {}).get(platform, {})
|
|
|
|
upsert_social = text("""
|
|
INSERT INTO company_social_media (
|
|
company_id, platform, url, verified_at, source, is_valid,
|
|
page_name, followers_count,
|
|
has_profile_photo, has_cover_photo, has_bio, profile_description,
|
|
posts_count_30d, posts_count_365d, last_post_date,
|
|
engagement_rate, posting_frequency_score,
|
|
profile_completeness_score, updated_at
|
|
) VALUES (
|
|
:company_id, :platform, :url, :verified_at, :source, :is_valid,
|
|
:page_name, :followers_count,
|
|
:has_profile_photo, :has_cover_photo, :has_bio, :profile_description,
|
|
:posts_count_30d, :posts_count_365d, :last_post_date,
|
|
:engagement_rate, :posting_frequency_score,
|
|
:profile_completeness_score, NOW()
|
|
)
|
|
ON CONFLICT (company_id, platform, url) DO UPDATE SET
|
|
verified_at = EXCLUDED.verified_at,
|
|
source = EXCLUDED.source,
|
|
is_valid = EXCLUDED.is_valid,
|
|
page_name = COALESCE(EXCLUDED.page_name, company_social_media.page_name),
|
|
followers_count = COALESCE(EXCLUDED.followers_count, company_social_media.followers_count),
|
|
has_profile_photo = COALESCE(EXCLUDED.has_profile_photo, company_social_media.has_profile_photo),
|
|
has_cover_photo = COALESCE(EXCLUDED.has_cover_photo, company_social_media.has_cover_photo),
|
|
has_bio = COALESCE(EXCLUDED.has_bio, company_social_media.has_bio),
|
|
profile_description = COALESCE(EXCLUDED.profile_description, company_social_media.profile_description),
|
|
posts_count_30d = COALESCE(EXCLUDED.posts_count_30d, company_social_media.posts_count_30d),
|
|
posts_count_365d = COALESCE(EXCLUDED.posts_count_365d, company_social_media.posts_count_365d),
|
|
engagement_rate = COALESCE(EXCLUDED.engagement_rate, company_social_media.engagement_rate),
|
|
posting_frequency_score = COALESCE(EXCLUDED.posting_frequency_score, company_social_media.posting_frequency_score),
|
|
last_post_date = COALESCE(EXCLUDED.last_post_date, company_social_media.last_post_date),
|
|
profile_completeness_score = COALESCE(EXCLUDED.profile_completeness_score, company_social_media.profile_completeness_score),
|
|
updated_at = NOW()
|
|
""")
|
|
|
|
session.execute(upsert_social, {
|
|
'company_id': company_id,
|
|
'platform': platform,
|
|
'url': normalized_url,
|
|
'verified_at': result['audit_date'],
|
|
'source': social_sources.get(platform, 'website_scrape'),
|
|
'is_valid': True,
|
|
'page_name': enriched.get('page_name'),
|
|
'followers_count': enriched.get('followers_count'),
|
|
'has_profile_photo': enriched.get('has_profile_photo'),
|
|
'has_cover_photo': enriched.get('has_cover_photo'),
|
|
'has_bio': enriched.get('has_bio'),
|
|
'profile_description': enriched.get('profile_description'),
|
|
'posts_count_30d': enriched.get('posts_count_30d'),
|
|
'posts_count_365d': enriched.get('posts_count_365d'),
|
|
'last_post_date': enriched.get('last_post_date'),
|
|
'engagement_rate': enriched.get('engagement_rate'),
|
|
'posting_frequency_score': enriched.get('posting_frequency_score'),
|
|
'profile_completeness_score': enriched.get('profile_completeness_score'),
|
|
})
|
|
|
|
session.commit()
|
|
logger.info(f"Saved audit for company {company_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save audit result: {e}")
|
|
return False
|
|
|
|
def run_audit(self, company_ids: Optional[List[int]] = None,
|
|
batch_start: Optional[int] = None,
|
|
batch_end: Optional[int] = None,
|
|
dry_run: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Run audit for specified companies.
|
|
|
|
Returns summary of audit results.
|
|
"""
|
|
companies = self.get_companies(company_ids, batch_start, batch_end)
|
|
|
|
summary = {
|
|
'total': len(companies),
|
|
'success': 0,
|
|
'failed': 0,
|
|
'results': [],
|
|
}
|
|
|
|
for company in companies:
|
|
try:
|
|
result = self.audit_company(company)
|
|
|
|
if not dry_run:
|
|
if self.save_audit_result(result):
|
|
summary['success'] += 1
|
|
else:
|
|
summary['failed'] += 1
|
|
else:
|
|
summary['success'] += 1
|
|
print(json.dumps(result, default=str, indent=2))
|
|
|
|
summary['results'].append({
|
|
'company_id': company['id'],
|
|
'company_name': company['name'],
|
|
'status': 'success',
|
|
'social_media_found': len(result.get('social_media', {})),
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Audit failed for company {company['id']}: {e}")
|
|
summary['failed'] += 1
|
|
summary['results'].append({
|
|
'company_id': company['id'],
|
|
'company_name': company['name'],
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
})
|
|
|
|
return summary
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Social Media & Website Audit')
|
|
parser.add_argument('--company-id', type=int, help='Audit single company by ID')
|
|
parser.add_argument('--company-slug', type=str, help='Audit single company by slug')
|
|
parser.add_argument('--batch', type=str, help='Audit batch of companies (e.g., 1-10)')
|
|
parser.add_argument('--all', action='store_true', help='Audit all companies')
|
|
parser.add_argument('--dry-run', action='store_true', help='Print results without saving')
|
|
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
auditor = SocialMediaAuditor()
|
|
|
|
if args.company_id:
|
|
summary = auditor.run_audit(company_ids=[args.company_id], dry_run=args.dry_run)
|
|
elif args.company_slug:
|
|
# Look up company ID by slug
|
|
company_id = auditor.get_company_id_by_slug(args.company_slug)
|
|
if company_id:
|
|
summary = auditor.run_audit(company_ids=[company_id], dry_run=args.dry_run)
|
|
else:
|
|
print(f"Error: Company with slug '{args.company_slug}' not found")
|
|
sys.exit(1)
|
|
elif args.batch:
|
|
start, end = map(int, args.batch.split('-'))
|
|
summary = auditor.run_audit(batch_start=start, batch_end=end, dry_run=args.dry_run)
|
|
elif args.all:
|
|
summary = auditor.run_audit(dry_run=args.dry_run)
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"AUDIT SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Total companies: {summary['total']}")
|
|
print(f"Successful: {summary['success']}")
|
|
print(f"Failed: {summary['failed']}")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|