Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
- Add direct URL check for linkedin.com/company/{slug} before Brave Search
- Prioritize /company/ over /in/ in search result ranking
- Use targeted query "company_name linkedin.com/company" first
- Fall back to personal profile search only if company page not found
- Verify page title matches company name to avoid false positives
Fixes: WATERM showed employee's personal profile instead of existing
company page at linkedin.com/company/waterm
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1804 lines
80 KiB
Python
1804 lines
80 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Social Media & Website Audit Script for Norda Biznes
|
|
=====================================================
|
|
|
|
Performs comprehensive audit of company websites and social media presence.
|
|
Designed to run with multiple parallel workers.
|
|
|
|
Features:
|
|
- Website analysis (SSL, hosting, author, responsiveness)
|
|
- Social media discovery (FB, IG, TikTok, YouTube, LinkedIn)
|
|
- Google Reviews scraping via Brave Search
|
|
- Parallel execution support
|
|
|
|
Usage:
|
|
python social_media_audit.py --company-id 26
|
|
python social_media_audit.py --batch 1-10
|
|
python social_media_audit.py --all
|
|
|
|
Author: Claude Code
|
|
Date: 2025-12-29
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import re
|
|
import ssl
|
|
import socket
|
|
import argparse
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, List, Tuple, Any
|
|
from urllib.parse import urlparse
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# Load .env file from project root
|
|
try:
|
|
from dotenv import load_dotenv
|
|
# Find .env file relative to this script
|
|
script_dir = Path(__file__).resolve().parent
|
|
project_root = script_dir.parent
|
|
env_path = project_root / '.env'
|
|
if env_path.exists():
|
|
load_dotenv(env_path)
|
|
logging.info(f"Loaded .env from {env_path}")
|
|
except ImportError:
|
|
pass # python-dotenv not installed, rely on system environment
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import whois
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
try:
|
|
from database import normalize_social_url
|
|
except ImportError:
|
|
# Fallback: define locally if import fails
|
|
def normalize_social_url(url: str, platform: str = None) -> str:
|
|
"""Normalize social media URLs to prevent duplicates."""
|
|
if not url:
|
|
return url
|
|
url = url.strip()
|
|
if url.startswith('http://'):
|
|
url = 'https://' + url[7:]
|
|
elif not url.startswith('https://'):
|
|
url = 'https://' + url
|
|
url = url.replace('https://www.', 'https://')
|
|
url = url.rstrip('/')
|
|
return url
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Database configuration
|
|
# WARNING: The fallback DATABASE_URL uses a placeholder password.
|
|
# Production credentials MUST be set via the DATABASE_URL environment variable.
|
|
# NEVER commit real credentials to version control (CWE-798).
|
|
DATABASE_URL = os.getenv(
|
|
'DATABASE_URL',
|
|
'postgresql://nordabiz_app:CHANGE_ME@127.0.0.1:5432/nordabiz'
|
|
)
|
|
|
|
# Request configuration
|
|
REQUEST_TIMEOUT = 15
|
|
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
|
|
|
# Known Polish hosting providers (IP ranges and identifiers)
|
|
HOSTING_PROVIDERS = {
|
|
'nazwa.pl': ['nazwa.pl', '185.252.', '91.227.'],
|
|
'home.pl': ['home.pl', '212.85.', '195.26.'],
|
|
'OVH': ['ovh.', '51.38.', '51.68.', '51.75.', '51.77.', '51.83.', '51.89.', '51.91.', '54.36.', '54.37.', '54.38.', '135.125.', '141.94.', '141.95.', '142.4.', '144.217.', '145.239.', '147.135.', '149.202.', '151.80.', '158.69.', '164.132.', '167.114.', '176.31.', '178.32.', '185.15.', '188.165.', '192.95.', '193.70.', '194.182.', '195.154.', '198.27.', '198.50.', '198.100.', '213.186.', '213.251.', '217.182.'],
|
|
'cyber_Folks': ['cyberfolks', 'cf.', '77.55.'],
|
|
'Zenbox': ['zenbox', '195.181.'],
|
|
'Linuxpl': ['linuxpl', '91.200.'],
|
|
'Hekko': ['hekko', 'hekko.pl'],
|
|
'Smarthost': ['smarthost'],
|
|
'AZ.pl': ['az.pl', 'aznetwork'],
|
|
'Aftermarket': ['aftermarket', 'aftermarket.pl'],
|
|
'Cloudflare': ['cloudflare', '104.16.', '104.17.', '104.18.', '104.19.', '104.20.', '104.21.', '104.22.', '104.23.', '104.24.', '172.67.'],
|
|
'Google Cloud': ['google', '34.', '35.'],
|
|
'AWS': ['amazon', 'aws', '52.', '54.'],
|
|
'Vercel': ['vercel', '76.76.21.'],
|
|
'Netlify': ['netlify'],
|
|
}
|
|
|
|
# Social media patterns
|
|
SOCIAL_MEDIA_PATTERNS = {
|
|
'facebook': [
|
|
r'(?:https?://)?(?:www\.)?facebook\.com/profile\.php\?id=(\d+)',
|
|
# Multi-segment paths like /p/PageName-12345/ - capture full path
|
|
r'(?:https?://)?(?:www\.)?facebook\.com/(p/[^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.)?facebook\.com/([^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.)?fb\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
'instagram': [
|
|
r'(?:https?://)?(?:www\.)?instagram\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
'youtube': [
|
|
r'(?:https?://)?(?:www\.)?youtube\.com/(?:channel|c|user|@)/([^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.)?youtube\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
'linkedin': [
|
|
r'(?:https?://)?(?:www\.|pl\.)?linkedin\.com/(company/[^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.|pl\.)?linkedin\.com/(in/[^/?\s"\'<>]+)',
|
|
],
|
|
'tiktok': [
|
|
r'(?:https?://)?(?:www\.)?tiktok\.com/@([^/?\s"\'<>]+)',
|
|
],
|
|
'twitter': [
|
|
r'(?:https?://)?(?:www\.)?(?:twitter|x)\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
}
|
|
|
|
# False positives to exclude
|
|
SOCIAL_MEDIA_EXCLUDE = {
|
|
'facebook': ['sharer', 'share', 'intent', 'plugins', 'dialog', 'sharer.php', 'login', 'pages', 'boldthemes', 'profile.php', 'profile', 'watch', 'groups', 'events', 'marketplace', 'gaming', 'stories', 'p', 'people', 'hashtag', 'help', 'settings', 'notifications', 'tr', 'privacy', 'policies', 'ads', 'business', 'legal', 'flx'],
|
|
'instagram': ['explore', 'accounts', 'p', 'reel'],
|
|
'youtube': ['embed', 'watch', 'playlist', 'results', 'feed', 'channel', 'c', 'user', '@', 'about', 'featured', 'videos', 'shorts', 'streams', 'playlists', 'community', 'channels', 'store'],
|
|
'linkedin': ['company/shareArticle', 'company/share', 'company/login', 'in/shareArticle', 'in/share', 'in/login'],
|
|
'tiktok': ['embed', 'video'],
|
|
'twitter': ['intent', 'share', 'widgets.js', 'widgets', 'tweet', 'platform.twitter.com', 'bold_themes', 'boldthemes'],
|
|
}
|
|
|
|
|
|
class WebsiteAuditor:
|
|
"""Audits website technical details and metadata."""
|
|
|
|
def __init__(self):
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def audit_website(self, url: str) -> Dict[str, Any]:
|
|
"""
|
|
Perform comprehensive website audit.
|
|
|
|
Returns dict with:
|
|
- http_status, load_time_ms
|
|
- has_ssl, ssl_valid, ssl_expiry
|
|
- hosting_provider, hosting_ip, server_software
|
|
- site_author, site_generator
|
|
- is_mobile_friendly, has_viewport_meta
|
|
- last_modified_at
|
|
- social_media_links (dict of platform -> url)
|
|
"""
|
|
result = {
|
|
'url': url,
|
|
'http_status': None,
|
|
'load_time_ms': None,
|
|
'has_ssl': False,
|
|
'ssl_valid': False,
|
|
'ssl_expiry': None,
|
|
'ssl_issuer': None,
|
|
'hosting_provider': None,
|
|
'hosting_ip': None,
|
|
'server_software': None,
|
|
'site_author': None,
|
|
'site_generator': None,
|
|
'is_mobile_friendly': False,
|
|
'has_viewport_meta': False,
|
|
'last_modified_at': None,
|
|
'social_media_links': {},
|
|
'errors': [],
|
|
}
|
|
|
|
if not url:
|
|
result['errors'].append('No URL provided')
|
|
return result
|
|
|
|
# Normalize URL
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = 'https://' + url
|
|
|
|
parsed = urlparse(url)
|
|
domain = parsed.netloc
|
|
|
|
# 1. Check SSL certificate
|
|
try:
|
|
result.update(self._check_ssl(domain))
|
|
except Exception as e:
|
|
result['errors'].append(f'SSL check failed: {str(e)}')
|
|
|
|
# 2. Resolve IP and detect hosting
|
|
try:
|
|
result.update(self._detect_hosting(domain))
|
|
except Exception as e:
|
|
result['errors'].append(f'Hosting detection failed: {str(e)}')
|
|
|
|
# 3. Fetch page and analyze
|
|
try:
|
|
start_time = time.time()
|
|
response = self.session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True)
|
|
result['load_time_ms'] = int((time.time() - start_time) * 1000)
|
|
result['http_status'] = response.status_code
|
|
result['has_ssl'] = response.url.startswith('https://')
|
|
|
|
# Server header
|
|
result['server_software'] = response.headers.get('Server', '')[:100]
|
|
|
|
# Last-Modified header
|
|
last_mod = response.headers.get('Last-Modified')
|
|
if last_mod:
|
|
try:
|
|
result['last_modified_at'] = datetime.strptime(
|
|
last_mod, '%a, %d %b %Y %H:%M:%S %Z'
|
|
)
|
|
except:
|
|
pass
|
|
|
|
# Parse HTML
|
|
if response.status_code == 200:
|
|
result.update(self._parse_html(response.text))
|
|
|
|
except requests.exceptions.SSLError as e:
|
|
result['errors'].append(f'SSL Error: {str(e)}')
|
|
result['ssl_valid'] = False
|
|
# Try HTTP fallback
|
|
try:
|
|
http_url = url.replace('https://', 'http://')
|
|
response = self.session.get(http_url, timeout=REQUEST_TIMEOUT)
|
|
result['http_status'] = response.status_code
|
|
result['has_ssl'] = False
|
|
if response.status_code == 200:
|
|
result.update(self._parse_html(response.text))
|
|
except Exception as e2:
|
|
result['errors'].append(f'HTTP fallback failed: {str(e2)}')
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
result['errors'].append(f'Request failed: {str(e)}')
|
|
|
|
return result
|
|
|
|
def _check_ssl(self, domain: str) -> Dict[str, Any]:
|
|
"""Check SSL certificate validity, expiry and issuer."""
|
|
result = {'ssl_valid': False, 'ssl_expiry': None, 'ssl_issuer': None}
|
|
|
|
try:
|
|
context = ssl.create_default_context()
|
|
with socket.create_connection((domain, 443), timeout=10) as sock:
|
|
with context.wrap_socket(sock, server_hostname=domain) as ssock:
|
|
cert = ssock.getpeercert()
|
|
result['ssl_valid'] = True
|
|
|
|
# Parse expiry date
|
|
not_after = cert.get('notAfter')
|
|
if not_after:
|
|
result['ssl_expiry'] = datetime.strptime(
|
|
not_after, '%b %d %H:%M:%S %Y %Z'
|
|
).date()
|
|
|
|
# Extract issuer (Certificate Authority)
|
|
issuer = cert.get('issuer')
|
|
if issuer:
|
|
# issuer is tuple of tuples like ((('organizationName', 'Let\'s Encrypt'),),)
|
|
issuer_dict = {}
|
|
for item in issuer:
|
|
for key, value in item:
|
|
issuer_dict[key] = value
|
|
# Prefer Organization name, fallback to Common Name
|
|
issuer_name = issuer_dict.get('organizationName') or issuer_dict.get('commonName')
|
|
if issuer_name:
|
|
result['ssl_issuer'] = issuer_name[:100] # Limit length
|
|
except Exception as e:
|
|
result['ssl_valid'] = False
|
|
|
|
return result
|
|
|
|
def _detect_hosting(self, domain: str) -> Dict[str, Any]:
|
|
"""Detect hosting provider from IP and reverse DNS."""
|
|
result = {'hosting_provider': None, 'hosting_ip': None}
|
|
|
|
try:
|
|
ip = socket.gethostbyname(domain)
|
|
result['hosting_ip'] = ip
|
|
|
|
# Check against known hosting IP ranges
|
|
for provider, patterns in HOSTING_PROVIDERS.items():
|
|
for pattern in patterns:
|
|
if ip.startswith(pattern) or pattern in domain.lower():
|
|
result['hosting_provider'] = provider
|
|
return result
|
|
|
|
# Try reverse DNS
|
|
try:
|
|
reverse = socket.gethostbyaddr(ip)[0]
|
|
for provider, patterns in HOSTING_PROVIDERS.items():
|
|
for pattern in patterns:
|
|
if pattern in reverse.lower():
|
|
result['hosting_provider'] = provider
|
|
return result
|
|
except:
|
|
pass
|
|
|
|
# Try WHOIS for registrar
|
|
try:
|
|
w = whois.whois(domain)
|
|
if w.registrar:
|
|
result['domain_registrar'] = str(w.registrar)[:100]
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
result['errors'] = [f'Hosting detection: {str(e)}']
|
|
|
|
return result
|
|
|
|
def _parse_html(self, html: str) -> Dict[str, Any]:
|
|
"""Parse HTML for metadata and social media links."""
|
|
result = {
|
|
'site_author': None,
|
|
'site_generator': None,
|
|
'is_mobile_friendly': False,
|
|
'has_viewport_meta': False,
|
|
'social_media_links': {},
|
|
}
|
|
|
|
try:
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Check viewport meta (mobile-friendly indicator)
|
|
viewport = soup.find('meta', attrs={'name': 'viewport'})
|
|
if viewport:
|
|
result['has_viewport_meta'] = True
|
|
content = viewport.get('content', '')
|
|
if 'width=device-width' in content:
|
|
result['is_mobile_friendly'] = True
|
|
|
|
# Author meta
|
|
author = soup.find('meta', attrs={'name': 'author'})
|
|
if author:
|
|
result['site_author'] = author.get('content', '')[:255]
|
|
|
|
# Generator meta (CMS)
|
|
generator = soup.find('meta', attrs={'name': 'generator'})
|
|
if generator:
|
|
result['site_generator'] = generator.get('content', '')[:100]
|
|
|
|
# Look for author in multiple places
|
|
if not result['site_author']:
|
|
author_found = None
|
|
|
|
# 1. Check HTML comments for author info
|
|
comments = soup.find_all(string=lambda text: isinstance(text, str) and '<!--' in str(text.parent) if text.parent else False)
|
|
html_comments = re.findall(r'<!--(.+?)-->', html, re.DOTALL)
|
|
for comment in html_comments:
|
|
comment_patterns = [
|
|
r'(?:created by|designed by|developed by|made by|author)[:\s]+([^\n<>]+)',
|
|
r'(?:agencja|agency|studio)[:\s]+([^\n<>]+)',
|
|
]
|
|
for pattern in comment_patterns:
|
|
match = re.search(pattern, comment, re.IGNORECASE)
|
|
if match:
|
|
author_found = match.group(1).strip()
|
|
break
|
|
if author_found:
|
|
break
|
|
|
|
# 2. Check footer text
|
|
if not author_found:
|
|
footer = soup.find('footer')
|
|
if footer:
|
|
footer_text = footer.get_text(separator=' ')
|
|
footer_patterns = [
|
|
r'(?:wykonanie|realizacja|created by|designed by|made by|developed by)[:\s]+([^|<>\n©]+)',
|
|
r'(?:projekt|design|strona)[:\s]+([^|<>\n©]+)',
|
|
r'(?:powered by|built with)[:\s]+([^|<>\n©]+)',
|
|
r'(?:agencja|agency|studio)[:\s]+([^|<>\n©]+)',
|
|
]
|
|
for pattern in footer_patterns:
|
|
match = re.search(pattern, footer_text, re.IGNORECASE)
|
|
if match:
|
|
author_found = match.group(1).strip()
|
|
break
|
|
|
|
# 3. Check footer links for agency/studio domains
|
|
if not author_found:
|
|
footer_links = footer.find_all('a', href=True)
|
|
agency_domains = ['.pl', '.com', '.eu']
|
|
agency_keywords = ['studio', 'agencja', 'agency', 'design', 'web', 'digital', 'media', 'creative']
|
|
for link in footer_links:
|
|
href = link.get('href', '')
|
|
link_text = link.get_text().strip()
|
|
# Check if link looks like an agency
|
|
if any(kw in href.lower() or kw in link_text.lower() for kw in agency_keywords):
|
|
if any(dom in href for dom in agency_domains) and 'facebook' not in href and 'instagram' not in href:
|
|
# Extract domain or link text as author
|
|
if link_text and len(link_text) > 2 and len(link_text) < 50:
|
|
author_found = link_text
|
|
break
|
|
|
|
# 4. Check entire page for common Polish patterns
|
|
if not author_found:
|
|
page_text = soup.get_text(separator=' ')
|
|
page_patterns = [
|
|
r'(?:stronę wykonała?|witrynę wykonała?|stronę stworzył[ao]?)[:\s]+([^|<>\n©.]+)',
|
|
r'(?:copyright|©).*?(?:by|przez)[:\s]+([^|<>\n©.]+)',
|
|
]
|
|
for pattern in page_patterns:
|
|
match = re.search(pattern, page_text, re.IGNORECASE)
|
|
if match:
|
|
author_found = match.group(1).strip()
|
|
break
|
|
|
|
# Clean up author name
|
|
if author_found:
|
|
# Remove common prefixes/suffixes
|
|
author_found = re.sub(r'^[\s\-–—:]+', '', author_found)
|
|
author_found = re.sub(r'[\s\-–—:]+$', '', author_found)
|
|
author_found = re.sub(r'\s+', ' ', author_found)
|
|
# Remove if too short or looks like garbage
|
|
if len(author_found) > 2 and len(author_found) < 100:
|
|
result['site_author'] = author_found[:255]
|
|
|
|
# Extract social media links
|
|
html_lower = html.lower()
|
|
for platform, patterns in SOCIAL_MEDIA_PATTERNS.items():
|
|
found_for_platform = False
|
|
for pattern in patterns:
|
|
if found_for_platform:
|
|
break # Already found this platform, skip remaining patterns
|
|
matches = re.findall(pattern, html, re.IGNORECASE)
|
|
if matches:
|
|
# Get first valid match, excluding common false positives
|
|
for match in matches:
|
|
# Skip very short matches (likely truncated or generic paths)
|
|
if len(match) < 2:
|
|
continue
|
|
# Check against exclusion list (exact match only to avoid false positives)
|
|
excludes = SOCIAL_MEDIA_EXCLUDE.get(platform, [])
|
|
if match.lower() not in excludes:
|
|
# Construct full URL
|
|
if platform == 'facebook':
|
|
if match.isdigit():
|
|
url = f'https://facebook.com/profile.php?id={match}'
|
|
elif '/' in match:
|
|
# Multi-segment path (e.g. p/PageName-123)
|
|
url = f'https://facebook.com/{match}'
|
|
else:
|
|
url = f'https://facebook.com/{match}'
|
|
elif platform == 'instagram':
|
|
# Skip Instagram handles with tracking params (igsh=, utm_)
|
|
if '?' in match or '&' in match:
|
|
match = match.split('?')[0].split('&')[0]
|
|
if len(match) < 2:
|
|
continue
|
|
url = f'https://instagram.com/{match}'
|
|
elif platform == 'youtube':
|
|
if match.startswith('@'):
|
|
url = f'https://youtube.com/{match}'
|
|
else:
|
|
url = f'https://youtube.com/channel/{match}'
|
|
elif platform == 'linkedin':
|
|
url = f'https://linkedin.com/{match}'
|
|
elif platform == 'tiktok':
|
|
url = f'https://tiktok.com/@{match}'
|
|
elif platform == 'twitter':
|
|
url = f'https://twitter.com/{match}'
|
|
else:
|
|
continue
|
|
|
|
result['social_media_links'][platform] = url
|
|
found_for_platform = True
|
|
break # Found valid match, stop searching this pattern's matches
|
|
|
|
except Exception as e:
|
|
result['errors'] = [f'HTML parsing: {str(e)}']
|
|
|
|
return result
|
|
|
|
|
|
class GooglePlacesSearcher:
|
|
"""Search for Google Business profiles using Google Places API."""
|
|
|
|
# Google Places API configuration
|
|
FIND_PLACE_URL = 'https://maps.googleapis.com/maps/api/place/findplacefromtext/json'
|
|
PLACE_DETAILS_URL = 'https://maps.googleapis.com/maps/api/place/details/json'
|
|
|
|
def __init__(self, api_key: Optional[str] = None):
|
|
"""
|
|
Initialize GooglePlacesSearcher.
|
|
|
|
Args:
|
|
api_key: Google Places API key. Falls back to GOOGLE_PLACES_API_KEY env var.
|
|
"""
|
|
self.api_key = api_key or os.getenv('GOOGLE_PLACES_API_KEY')
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def find_place(self, company_name: str, city: str = 'Wejherowo') -> Optional[str]:
|
|
"""
|
|
Find a place by company name and city.
|
|
|
|
Uses Google Places findplacefromtext API to search for a business
|
|
and returns the place_id if found.
|
|
|
|
Args:
|
|
company_name: Name of the company to search for.
|
|
city: City to narrow down the search (default: Wejherowo).
|
|
|
|
Returns:
|
|
place_id string if found, None otherwise.
|
|
"""
|
|
if not self.api_key:
|
|
logger.warning('Google Places API key not configured')
|
|
return None
|
|
|
|
try:
|
|
# Construct search query with company name and city
|
|
search_query = f'{company_name} {city}'
|
|
|
|
params = {
|
|
'input': search_query,
|
|
'inputtype': 'textquery',
|
|
'fields': 'place_id,name,formatted_address',
|
|
'language': 'pl',
|
|
'key': self.api_key,
|
|
}
|
|
|
|
response = self.session.get(
|
|
self.FIND_PLACE_URL,
|
|
params=params,
|
|
timeout=REQUEST_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
if data.get('status') == 'OK' and data.get('candidates'):
|
|
candidate = data['candidates'][0]
|
|
place_id = candidate.get('place_id')
|
|
logger.info(
|
|
f"Found place for '{company_name}': {candidate.get('name')} "
|
|
f"at {candidate.get('formatted_address')}"
|
|
)
|
|
return place_id
|
|
elif data.get('status') == 'ZERO_RESULTS':
|
|
logger.info(f"No Google Business Profile found for '{company_name}' in {city}")
|
|
return None
|
|
else:
|
|
logger.warning(
|
|
f"Google Places API returned status: {data.get('status')} "
|
|
f"for '{company_name}'"
|
|
)
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.error(f"Timeout searching for '{company_name}' on Google Places")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request error searching for '{company_name}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error finding place for '{company_name}': {e}")
|
|
return None
|
|
|
|
def get_place_details(self, place_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get detailed information about a place.
|
|
|
|
Retrieves rating, review count, opening hours, and other business details
|
|
from Google Places API.
|
|
|
|
Args:
|
|
place_id: Google Place ID returned from find_place().
|
|
|
|
Returns:
|
|
Dict containing:
|
|
- google_rating: Decimal rating (1.0-5.0) or None
|
|
- google_reviews_count: Integer review count or None
|
|
- opening_hours: Dict with weekday_text and open_now, or None
|
|
- business_status: String like 'OPERATIONAL', 'CLOSED_TEMPORARILY', etc.
|
|
- formatted_phone: Phone number or None
|
|
- website: Website URL or None
|
|
"""
|
|
result = {
|
|
'google_rating': None,
|
|
'google_reviews_count': None,
|
|
'google_photos_count': None,
|
|
'opening_hours': None,
|
|
'business_status': None,
|
|
'formatted_phone': None,
|
|
'website': None,
|
|
}
|
|
|
|
if not self.api_key:
|
|
logger.warning('Google Places API key not configured')
|
|
return result
|
|
|
|
if not place_id:
|
|
return result
|
|
|
|
try:
|
|
# Request fields we need for the audit
|
|
fields = [
|
|
'rating',
|
|
'user_ratings_total',
|
|
'opening_hours',
|
|
'business_status',
|
|
'formatted_phone_number',
|
|
'website',
|
|
'name',
|
|
'photos',
|
|
]
|
|
|
|
params = {
|
|
'place_id': place_id,
|
|
'fields': ','.join(fields),
|
|
'language': 'pl',
|
|
'key': self.api_key,
|
|
}
|
|
|
|
response = self.session.get(
|
|
self.PLACE_DETAILS_URL,
|
|
params=params,
|
|
timeout=REQUEST_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
if data.get('status') == 'OK' and data.get('result'):
|
|
place = data['result']
|
|
|
|
# Extract rating
|
|
if 'rating' in place:
|
|
result['google_rating'] = round(float(place['rating']), 1)
|
|
|
|
# Extract review count
|
|
if 'user_ratings_total' in place:
|
|
result['google_reviews_count'] = int(place['user_ratings_total'])
|
|
|
|
# Extract opening hours
|
|
if 'opening_hours' in place:
|
|
hours = place['opening_hours']
|
|
result['opening_hours'] = {
|
|
'weekday_text': hours.get('weekday_text', []),
|
|
'open_now': hours.get('open_now'),
|
|
'periods': hours.get('periods', []),
|
|
}
|
|
|
|
# Extract business status
|
|
if 'business_status' in place:
|
|
result['business_status'] = place['business_status']
|
|
|
|
# Extract phone
|
|
if 'formatted_phone_number' in place:
|
|
result['formatted_phone'] = place['formatted_phone_number']
|
|
|
|
# Extract website
|
|
if 'website' in place:
|
|
result['website'] = place['website']
|
|
|
|
# Extract photos count
|
|
if 'photos' in place:
|
|
result['google_photos_count'] = len(place['photos'])
|
|
|
|
logger.info(
|
|
f"Retrieved details for {place.get('name')}: "
|
|
f"rating={result['google_rating']}, "
|
|
f"reviews={result['google_reviews_count']}, "
|
|
f"photos={result['google_photos_count']}"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Google Places API returned status: {data.get('status')} "
|
|
f"for place_id: {place_id}"
|
|
)
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.error(f"Timeout getting details for place_id: {place_id}")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request error getting place details: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error getting place details for {place_id}: {e}")
|
|
|
|
return result
|
|
|
|
|
|
class BraveSearcher:
|
|
"""Search for social media profiles and Google reviews using Brave Search."""
|
|
|
|
def __init__(self, api_key: Optional[str] = None):
|
|
self.api_key = api_key or os.getenv('BRAVE_API_KEY')
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def search_social_media(self, company_name: str, city: str = 'Wejherowo') -> Dict[str, str]:
|
|
"""
|
|
Search for company social media profiles.
|
|
Returns dict of platform -> url.
|
|
"""
|
|
results = {}
|
|
|
|
platforms = [
|
|
('facebook', f'{company_name} {city} facebook'),
|
|
('instagram', f'{company_name} instagram'),
|
|
('tiktok', f'{company_name} tiktok'),
|
|
('youtube', f'{company_name} youtube kanał'),
|
|
('linkedin', f'{company_name} linkedin.com/company'),
|
|
]
|
|
|
|
for platform, query in platforms:
|
|
try:
|
|
url = self._search_brave(query, platform, company_name)
|
|
if url:
|
|
results[platform] = url
|
|
time.sleep(0.5) # Rate limiting
|
|
except Exception as e:
|
|
logger.warning(f'Brave search failed for {platform}: {e}')
|
|
|
|
# LinkedIn: try direct URL check first, then Brave Search fallback
|
|
if 'linkedin' not in results or '/in/' in results.get('linkedin', ''):
|
|
direct_url = self._check_linkedin_company_page(company_name)
|
|
if direct_url:
|
|
results['linkedin'] = direct_url
|
|
logger.info(f"LinkedIn direct URL check found company page: {direct_url}")
|
|
elif 'linkedin' not in results:
|
|
# Last resort: search for any LinkedIn profile (personal included)
|
|
try:
|
|
url = self._search_brave(f'{company_name} linkedin', 'linkedin', company_name)
|
|
if url:
|
|
results['linkedin'] = url
|
|
logger.info(f"LinkedIn fallback found profile: {url}")
|
|
except Exception as e:
|
|
logger.warning(f'Brave search LinkedIn fallback failed: {e}')
|
|
|
|
return results
|
|
|
|
def search_google_reviews(self, company_name: str, city: str = 'Wejherowo') -> Dict[str, Any]:
|
|
"""
|
|
Search for Google reviews using Google Places API.
|
|
|
|
This method uses the GooglePlacesSearcher to find the company on Google
|
|
and retrieve its rating and review count.
|
|
|
|
Args:
|
|
company_name: Name of the company to search for.
|
|
city: City to narrow down the search (default: Wejherowo).
|
|
|
|
Returns:
|
|
Dict containing:
|
|
- google_rating: Decimal rating (1.0-5.0) or None
|
|
- google_reviews_count: Integer review count or None
|
|
- opening_hours: Dict with weekday_text and open_now, or None
|
|
- business_status: String like 'OPERATIONAL', 'CLOSED_TEMPORARILY', etc.
|
|
"""
|
|
result = {
|
|
'google_rating': None,
|
|
'google_reviews_count': None,
|
|
'opening_hours': None,
|
|
'business_status': None,
|
|
}
|
|
|
|
try:
|
|
# Use Google Places API for accurate data
|
|
google_api_key = os.getenv('GOOGLE_PLACES_API_KEY')
|
|
|
|
if google_api_key:
|
|
# Use GooglePlacesSearcher for accurate data retrieval
|
|
places_searcher = GooglePlacesSearcher(api_key=google_api_key)
|
|
|
|
# Step 1: Find the place by company name and city
|
|
place_id = places_searcher.find_place(company_name, city)
|
|
|
|
if place_id:
|
|
# Step 2: Get detailed information including reviews
|
|
details = places_searcher.get_place_details(place_id)
|
|
|
|
result['google_rating'] = details.get('google_rating')
|
|
result['google_reviews_count'] = details.get('google_reviews_count')
|
|
result['opening_hours'] = details.get('opening_hours')
|
|
result['business_status'] = details.get('business_status')
|
|
|
|
logger.info(
|
|
f"Google reviews for '{company_name}': "
|
|
f"rating={result['google_rating']}, "
|
|
f"reviews={result['google_reviews_count']}, "
|
|
f"status={result['business_status']}"
|
|
)
|
|
else:
|
|
logger.info(f"No Google Business Profile found for '{company_name}' in {city}")
|
|
else:
|
|
# Fallback: Try Brave Search API if available
|
|
if self.api_key:
|
|
brave_result = self._search_brave_for_reviews(company_name, city)
|
|
if brave_result:
|
|
result.update(brave_result)
|
|
else:
|
|
logger.warning(
|
|
'Neither GOOGLE_PLACES_API_KEY nor BRAVE_API_KEY configured. '
|
|
'Cannot retrieve Google reviews data.'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(f'Google reviews search failed for {company_name}: {e}')
|
|
|
|
return result
|
|
|
|
def _search_brave_for_reviews(self, company_name: str, city: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Fallback method to search for Google reviews via Brave Search API.
|
|
|
|
This parses search results to extract rating and review count from
|
|
Google Business snippets in search results.
|
|
|
|
Args:
|
|
company_name: Name of the company.
|
|
city: City for location context.
|
|
|
|
Returns:
|
|
Dict with google_rating and google_reviews_count, or None if not found.
|
|
"""
|
|
if not self.api_key:
|
|
return None
|
|
|
|
try:
|
|
query = f'{company_name} {city} opinie google'
|
|
|
|
# Brave Web Search API endpoint
|
|
url = 'https://api.search.brave.com/res/v1/web/search'
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
'Accept-Encoding': 'gzip',
|
|
'X-Subscription-Token': self.api_key,
|
|
}
|
|
params = {
|
|
'q': query,
|
|
'count': 10,
|
|
'country': 'pl',
|
|
'search_lang': 'pl',
|
|
'ui_lang': 'pl-PL',
|
|
}
|
|
|
|
response = self.session.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Parse search results for rating/review patterns
|
|
# Google snippets often contain patterns like "4,5 (123 opinii)" or "Rating: 4.5 · 123 reviews"
|
|
for result in data.get('web', {}).get('results', []):
|
|
snippet = result.get('description', '') + ' ' + result.get('title', '')
|
|
|
|
# Pattern for Polish Google reviews: "4,5 (123 opinii)" or "4.5 · 123 reviews"
|
|
rating_patterns = [
|
|
r'(\d+[,\.]\d)\s*[·\(]\s*(\d+)\s*(?:opinii|recenzji|reviews)',
|
|
r'ocena[:\s]+(\d+[,\.]\d).*?(\d+)\s*(?:opinii|recenzji)',
|
|
r'rating[:\s]+(\d+[,\.]\d).*?(\d+)\s*(?:reviews|opinii)',
|
|
]
|
|
|
|
for pattern in rating_patterns:
|
|
match = re.search(pattern, snippet, re.IGNORECASE)
|
|
if match:
|
|
rating_str = match.group(1).replace(',', '.')
|
|
reviews_str = match.group(2)
|
|
|
|
return {
|
|
'google_rating': round(float(rating_str), 1),
|
|
'google_reviews_count': int(reviews_str),
|
|
}
|
|
|
|
logger.info(f"No Google reviews data found in Brave results for '{company_name}'")
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.warning(f"Timeout searching Brave for '{company_name}' reviews")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"Brave API request failed for '{company_name}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing Brave results for '{company_name}': {e}")
|
|
return None
|
|
|
|
def _search_brave(self, query: str, platform: str, company_name: str = '') -> Optional[str]:
|
|
"""
|
|
Perform Brave search and extract relevant social media URL.
|
|
Validates results against company_name to avoid false matches.
|
|
Returns normalized URL for the platform or None.
|
|
"""
|
|
if not self.api_key:
|
|
logger.debug(f"No Brave API key - skipping search for {platform}")
|
|
return None
|
|
|
|
try:
|
|
url = 'https://api.search.brave.com/res/v1/web/search'
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
'Accept-Encoding': 'gzip',
|
|
'X-Subscription-Token': self.api_key,
|
|
}
|
|
params = {
|
|
'q': query,
|
|
'count': 10,
|
|
'country': 'pl',
|
|
'search_lang': 'pl',
|
|
'ui_lang': 'pl-PL',
|
|
}
|
|
|
|
response = self.session.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
results = data.get('web', {}).get('results', [])
|
|
|
|
# Platform domain patterns
|
|
domain_patterns = {
|
|
'facebook': r'facebook\.com/',
|
|
'instagram': r'instagram\.com/',
|
|
'youtube': r'youtube\.com/',
|
|
'linkedin': r'linkedin\.com/(?:company|in)/',
|
|
'tiktok': r'tiktok\.com/@',
|
|
'twitter': r'(?:twitter|x)\.com/',
|
|
}
|
|
|
|
pattern = domain_patterns.get(platform)
|
|
if not pattern:
|
|
return None
|
|
|
|
# Prepare company name variations for matching
|
|
name_lower = company_name.lower().strip()
|
|
# Generate matching tokens with word boundary patterns
|
|
# (e.g. "Waterm Artur Wiertel" -> [r'\bwaterm\b', r'\bartur\b', r'\bwiertel\b'])
|
|
name_tokens = [re.compile(r'\b' + re.escape(t) + r'\b', re.IGNORECASE)
|
|
for t in name_lower.split() if len(t) >= 3]
|
|
|
|
candidates = []
|
|
for result in results:
|
|
result_url = result.get('url', '')
|
|
result_title = result.get('title', '')
|
|
result_desc = result.get('description', '')
|
|
|
|
if not re.search(pattern, result_url, re.IGNORECASE):
|
|
continue
|
|
|
|
# Validate it's a real profile, not a search/share page
|
|
excludes = SOCIAL_MEDIA_EXCLUDE.get(platform, [])
|
|
is_excluded = any(ex.lower() in result_url.lower() for ex in excludes)
|
|
if is_excluded:
|
|
continue
|
|
|
|
# Check if result relates to the company
|
|
searchable = f'{result_title} {result_desc} {result_url}'.lower()
|
|
# Count how many name tokens appear in the result (word boundary match)
|
|
token_matches = sum(1 for t in name_tokens if t.search(searchable))
|
|
|
|
if token_matches == 0:
|
|
continue # No connection to company at all
|
|
|
|
# Extract handle using platform patterns
|
|
extracted_url = None
|
|
for regex in SOCIAL_MEDIA_PATTERNS.get(platform, []):
|
|
match = re.search(regex, result_url, re.IGNORECASE)
|
|
if match:
|
|
handle = match.group(1)
|
|
if len(handle) >= 2:
|
|
extracted_url = self._build_social_url(platform, handle)
|
|
break
|
|
|
|
if not extracted_url:
|
|
extracted_url = result_url
|
|
|
|
# For LinkedIn: prioritize /company/ over /in/ (company pages > personal)
|
|
is_company_page = 1 if (platform == 'linkedin' and '/company/' in (extracted_url or '')) else 0
|
|
|
|
candidates.append((is_company_page, token_matches, extracted_url))
|
|
|
|
if candidates:
|
|
# Sort by: 1) company page priority, 2) token matches (best match first)
|
|
candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
|
|
best_url = candidates[0][2]
|
|
logger.info(f"Brave search matched {platform}: {best_url} (company={candidates[0][0]}, score={candidates[0][1]}/{len(name_tokens)})")
|
|
return best_url
|
|
|
|
logger.debug(f"No {platform} profile found in Brave results for: {query}")
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.warning(f"Timeout searching Brave for '{query}'")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"Brave API request failed for '{query}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing Brave results for '{query}': {e}")
|
|
return None
|
|
|
|
def _check_linkedin_company_page(self, company_name: str) -> Optional[str]:
|
|
"""
|
|
Try direct LinkedIn company page URL based on company name slugs.
|
|
Returns URL if page exists and title matches, None otherwise.
|
|
"""
|
|
# Generate slug candidates from company name
|
|
name_clean = company_name.strip()
|
|
slugs = set()
|
|
|
|
# Basic slug: lowercase, spaces to hyphens
|
|
slug = re.sub(r'[^a-z0-9\s-]', '', name_clean.lower())
|
|
slug = re.sub(r'\s+', '-', slug).strip('-')
|
|
if slug:
|
|
slugs.add(slug)
|
|
|
|
# First word only (common for short brand names like "Waterm")
|
|
first_word = name_clean.split()[0].lower() if name_clean.split() else ''
|
|
first_word = re.sub(r'[^a-z0-9]', '', first_word)
|
|
if first_word and len(first_word) >= 3:
|
|
slugs.add(first_word)
|
|
|
|
name_tokens = [re.compile(r'\b' + re.escape(t) + r'\b', re.IGNORECASE)
|
|
for t in name_clean.lower().split() if len(t) >= 3]
|
|
|
|
for slug in slugs:
|
|
try:
|
|
check_url = f'https://www.linkedin.com/company/{slug}'
|
|
resp = self.session.get(check_url, timeout=8, allow_redirects=True)
|
|
if resp.status_code == 200:
|
|
# Verify title contains company name
|
|
title_match = re.search(r'<title>([^<]+)</title>', resp.text)
|
|
if title_match:
|
|
title = title_match.group(1).lower()
|
|
if any(t.search(title) for t in name_tokens):
|
|
logger.info(f"LinkedIn company page verified: {check_url} (title: {title_match.group(1)})")
|
|
return f'https://linkedin.com/company/{slug}'
|
|
else:
|
|
logger.debug(f"LinkedIn /company/{slug} exists but title '{title_match.group(1)}' doesn't match '{company_name}'")
|
|
except Exception as e:
|
|
logger.debug(f"LinkedIn company page check failed for {slug}: {e}")
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def _build_social_url(platform: str, handle: str) -> str:
|
|
"""Build normalized social media URL from platform and handle."""
|
|
if platform == 'facebook':
|
|
if handle.isdigit():
|
|
return f'https://facebook.com/profile.php?id={handle}'
|
|
return f'https://facebook.com/{handle}'
|
|
elif platform == 'instagram':
|
|
handle = handle.split('?')[0].split('&')[0]
|
|
return f'https://instagram.com/{handle}'
|
|
elif platform == 'youtube':
|
|
if handle.startswith('@'):
|
|
return f'https://youtube.com/{handle}'
|
|
return f'https://youtube.com/channel/{handle}'
|
|
elif platform == 'linkedin':
|
|
return f'https://linkedin.com/{handle}'
|
|
elif platform == 'tiktok':
|
|
return f'https://tiktok.com/@{handle}'
|
|
elif platform == 'twitter':
|
|
return f'https://twitter.com/{handle}'
|
|
return handle
|
|
|
|
|
|
class SocialProfileEnricher:
|
|
"""Enriches social media profiles with additional data from public APIs and scraping."""
|
|
|
|
def __init__(self):
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def enrich_profile(self, platform: str, url: str) -> Dict[str, Any]:
|
|
"""Fetch additional data for a social media profile."""
|
|
enrichers = {
|
|
'facebook': self._enrich_facebook,
|
|
'instagram': self._enrich_instagram,
|
|
'youtube': self._enrich_youtube,
|
|
'linkedin': self._enrich_linkedin,
|
|
'tiktok': self._enrich_tiktok,
|
|
'twitter': self._enrich_twitter,
|
|
}
|
|
enricher = enrichers.get(platform)
|
|
if enricher:
|
|
try:
|
|
return enricher(url)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to enrich {platform} profile {url}: {e}")
|
|
return {}
|
|
return {}
|
|
|
|
def _enrich_facebook(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich Facebook page data from public page HTML."""
|
|
result = {}
|
|
try:
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
# Extract page name from og:title
|
|
og_match = re.search(r'<meta\s+property="og:title"\s+content="([^"]+)"', html)
|
|
if og_match:
|
|
result['page_name'] = og_match.group(1)
|
|
# Check for profile photo via og:image
|
|
og_img = re.search(r'<meta\s+property="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
# Description from og:description
|
|
og_desc = re.search(r'<meta\s+property="og:description"\s+content="([^"]+)"', html)
|
|
if og_desc:
|
|
result['profile_description'] = og_desc.group(1)[:500]
|
|
result['has_bio'] = True
|
|
else:
|
|
result['has_bio'] = False
|
|
except Exception as e:
|
|
logger.debug(f"Facebook enrichment failed: {e}")
|
|
return result
|
|
|
|
def _enrich_instagram(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich Instagram profile data."""
|
|
result = {}
|
|
try:
|
|
# Try og:description which often contains "X Followers, Y Following, Z Posts"
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
# og:description format: "123 Followers, 45 Following, 67 Posts - See Instagram photos..."
|
|
og_desc = re.search(r'<meta\s+(?:property|name)="og:description"\s+content="([^"]+)"', html)
|
|
if og_desc:
|
|
desc = og_desc.group(1)
|
|
# Extract followers
|
|
followers_match = re.search(r'([\d,\.]+[KMkm]?)\s+Followers', desc)
|
|
if followers_match:
|
|
result['followers_count'] = self._parse_count(followers_match.group(1))
|
|
# Extract posts count
|
|
posts_match = re.search(r'([\d,\.]+[KMkm]?)\s+Posts', desc)
|
|
if posts_match:
|
|
result['posts_count_365d'] = self._parse_count(posts_match.group(1))
|
|
# Bio is after the dash
|
|
bio_match = re.search(r'Posts\s*[-\u2013\u2014]\s*(.+)', desc)
|
|
if bio_match:
|
|
bio_text = bio_match.group(1).strip()
|
|
if bio_text and not bio_text.startswith('See Instagram'):
|
|
result['profile_description'] = bio_text[:500]
|
|
result['has_bio'] = True
|
|
# Profile photo from og:image
|
|
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
except Exception as e:
|
|
logger.debug(f"Instagram enrichment failed: {e}")
|
|
return result
|
|
|
|
def _enrich_youtube(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich YouTube channel data."""
|
|
result = {}
|
|
try:
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
# Subscriber count from meta or JSON
|
|
subs_match = re.search(r'"subscriberCountText":\s*\{"simpleText":\s*"([^"]+)"\}', html)
|
|
if subs_match:
|
|
result['followers_count'] = self._parse_count(subs_match.group(1).split(' ')[0])
|
|
# Video count
|
|
videos_match = re.search(r'"videosCountText":\s*\{"runs":\s*\[\{"text":\s*"([^"]+)"\}', html)
|
|
if videos_match:
|
|
result['posts_count_365d'] = self._parse_count(videos_match.group(1))
|
|
# Channel description
|
|
desc_match = re.search(r'"description":\s*"([^"]*(?:\\.[^"]*)*)"', html)
|
|
if desc_match:
|
|
desc = desc_match.group(1).replace('\\n', ' ').strip()
|
|
if desc and len(desc) > 5:
|
|
result['profile_description'] = desc[:500]
|
|
result['has_bio'] = True
|
|
# Avatar from og:image
|
|
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
# Channel name
|
|
name_match = re.search(r'<meta\s+(?:property|name)="og:title"\s+content="([^"]+)"', html)
|
|
if name_match:
|
|
result['page_name'] = name_match.group(1)
|
|
except Exception as e:
|
|
logger.debug(f"YouTube enrichment failed: {e}")
|
|
return result
|
|
|
|
def _enrich_linkedin(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich LinkedIn company page data."""
|
|
result = {}
|
|
try:
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
og_desc = re.search(r'<meta\s+(?:property|name)="og:description"\s+content="([^"]+)"', html)
|
|
if og_desc:
|
|
desc = og_desc.group(1).strip()
|
|
# LinkedIn descriptions often have follower count
|
|
followers_match = re.search(r'([\d,\.]+)\s+followers', desc, re.IGNORECASE)
|
|
if followers_match:
|
|
result['followers_count'] = self._parse_count(followers_match.group(1))
|
|
result['profile_description'] = desc[:500]
|
|
result['has_bio'] = True
|
|
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
name_match = re.search(r'<meta\s+(?:property|name)="og:title"\s+content="([^"]+)"', html)
|
|
if name_match:
|
|
result['page_name'] = name_match.group(1)
|
|
except Exception as e:
|
|
logger.debug(f"LinkedIn enrichment failed: {e}")
|
|
return result
|
|
|
|
def _enrich_tiktok(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich TikTok profile data."""
|
|
result = {}
|
|
try:
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
# TikTok embeds profile data in JSON
|
|
followers_match = re.search(r'"followerCount":\s*(\d+)', html)
|
|
if followers_match:
|
|
result['followers_count'] = int(followers_match.group(1))
|
|
videos_match = re.search(r'"videoCount":\s*(\d+)', html)
|
|
if videos_match:
|
|
result['posts_count_365d'] = int(videos_match.group(1))
|
|
desc_match = re.search(r'"signature":\s*"([^"]*)"', html)
|
|
if desc_match and desc_match.group(1).strip():
|
|
result['profile_description'] = desc_match.group(1)[:500]
|
|
result['has_bio'] = True
|
|
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
name_match = re.search(r'"nickname":\s*"([^"]+)"', html)
|
|
if name_match:
|
|
result['page_name'] = name_match.group(1)
|
|
except Exception as e:
|
|
logger.debug(f"TikTok enrichment failed: {e}")
|
|
return result
|
|
|
|
def _enrich_twitter(self, url: str) -> Dict[str, Any]:
|
|
"""Enrich Twitter/X profile data using og tags from public page."""
|
|
result = {}
|
|
try:
|
|
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
|
|
if resp.status_code == 200:
|
|
html = resp.text
|
|
og_desc = re.search(r'<meta\s+(?:property|name)="og:description"\s+content="([^"]+)"', html)
|
|
if og_desc:
|
|
result['profile_description'] = og_desc.group(1)[:500]
|
|
result['has_bio'] = True
|
|
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
|
|
result['has_profile_photo'] = bool(og_img)
|
|
name_match = re.search(r'<meta\s+(?:property|name)="og:title"\s+content="([^"]+)"', html)
|
|
if name_match:
|
|
result['page_name'] = name_match.group(1)
|
|
except Exception as e:
|
|
logger.debug(f"Twitter enrichment failed: {e}")
|
|
return result
|
|
|
|
@staticmethod
|
|
def _parse_count(text: str) -> Optional[int]:
|
|
"""Parse follower/subscriber count strings like '1.2K', '3,456', '2.1M'."""
|
|
if not text:
|
|
return None
|
|
text = text.strip().replace(',', '').replace(' ', '')
|
|
try:
|
|
multipliers = {'k': 1000, 'm': 1000000, 'b': 1000000000}
|
|
last_char = text[-1].lower()
|
|
if last_char in multipliers:
|
|
return int(float(text[:-1]) * multipliers[last_char])
|
|
return int(float(text))
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
def calculate_profile_completeness(profile_data: Dict[str, Any]) -> int:
|
|
"""Calculate profile completeness score 0-100 for a social media profile."""
|
|
score = 0
|
|
if profile_data.get('url'): score += 20 # Profile exists
|
|
if profile_data.get('has_bio'): score += 15 # Bio filled
|
|
if profile_data.get('has_profile_photo'): score += 15 # Avatar
|
|
if profile_data.get('has_cover_photo'): score += 10 # Cover photo
|
|
if (profile_data.get('followers_count') or 0) > 10: score += 10 # Has followers
|
|
if (profile_data.get('posts_count_30d') or 0) > 0: score += 15 # Active in last 30d
|
|
if (profile_data.get('engagement_rate') or 0) > 1: score += 15 # Good engagement
|
|
return min(score, 100)
|
|
|
|
|
|
class SocialMediaAuditor:
|
|
"""Main auditor class that coordinates website and social media auditing."""
|
|
|
|
def __init__(self, database_url: str = DATABASE_URL):
|
|
self.engine = create_engine(database_url)
|
|
self.Session = sessionmaker(bind=self.engine)
|
|
self.website_auditor = WebsiteAuditor()
|
|
self.brave_searcher = BraveSearcher()
|
|
self.profile_enricher = SocialProfileEnricher()
|
|
|
|
# Initialize Google Places searcher if API key is available
|
|
google_places_api_key = os.getenv('GOOGLE_PLACES_API_KEY')
|
|
if google_places_api_key:
|
|
self.google_places_searcher = GooglePlacesSearcher(api_key=google_places_api_key)
|
|
logger.info('Google Places API key found - using Places API for reviews')
|
|
else:
|
|
self.google_places_searcher = None
|
|
logger.info('GOOGLE_PLACES_API_KEY not set - falling back to Brave Search for reviews')
|
|
|
|
def get_companies(self, company_ids: Optional[List[int]] = None,
|
|
batch_start: Optional[int] = None,
|
|
batch_end: Optional[int] = None) -> List[Dict]:
|
|
"""Fetch companies from database."""
|
|
with self.Session() as session:
|
|
if company_ids:
|
|
query = text("""
|
|
SELECT id, name, slug, website, address_city
|
|
FROM companies
|
|
WHERE id = ANY(:ids)
|
|
ORDER BY id
|
|
""")
|
|
result = session.execute(query, {'ids': company_ids})
|
|
elif batch_start is not None and batch_end is not None:
|
|
query = text("""
|
|
SELECT id, name, slug, website, address_city
|
|
FROM companies
|
|
ORDER BY id
|
|
OFFSET :offset LIMIT :limit
|
|
""")
|
|
result = session.execute(query, {
|
|
'offset': batch_start - 1,
|
|
'limit': batch_end - batch_start + 1
|
|
})
|
|
else:
|
|
query = text("""
|
|
SELECT id, name, slug, website, address_city
|
|
FROM companies
|
|
ORDER BY id
|
|
""")
|
|
result = session.execute(query)
|
|
|
|
return [dict(row._mapping) for row in result]
|
|
|
|
def get_company_id_by_slug(self, slug: str) -> Optional[int]:
|
|
"""Get company ID by slug."""
|
|
with self.Session() as session:
|
|
query = text("""
|
|
SELECT id FROM companies WHERE slug = :slug
|
|
""")
|
|
result = session.execute(query, {'slug': slug})
|
|
row = result.fetchone()
|
|
if row:
|
|
return row[0]
|
|
return None
|
|
|
|
def audit_company(self, company: Dict) -> Dict[str, Any]:
|
|
"""
|
|
Perform full audit for a single company.
|
|
|
|
Returns comprehensive audit result.
|
|
"""
|
|
logger.info(f"Auditing company: {company['name']} (ID: {company['id']})")
|
|
logger.info(f"Company website: {company.get('website', 'NOT SET')}")
|
|
|
|
result = {
|
|
'company_id': company['id'],
|
|
'company_name': company['name'],
|
|
'audit_date': datetime.now(),
|
|
'website': {},
|
|
'social_media': {},
|
|
'google_reviews': {},
|
|
'errors': [],
|
|
}
|
|
|
|
# 1. Website audit
|
|
if company.get('website'):
|
|
try:
|
|
logger.info(f"Starting website audit for: {company['website']}")
|
|
result['website'] = self.website_auditor.audit_website(company['website'])
|
|
logger.info(f"Website audit completed. HTTP status: {result['website'].get('http_status')}")
|
|
except Exception as e:
|
|
logger.error(f"Website audit failed: {str(e)}")
|
|
result['errors'].append(f'Website audit failed: {str(e)}')
|
|
else:
|
|
logger.warning(f"No website URL for company {company['name']}")
|
|
result['website'] = {'errors': ['No website URL']}
|
|
|
|
# 2. Social media from website
|
|
website_social = result['website'].get('social_media_links', {})
|
|
social_sources = {} # Track source per platform
|
|
if website_social:
|
|
logger.info(f"Social media found on website: {list(website_social.keys())}")
|
|
for p in website_social:
|
|
social_sources[p] = 'website_scrape'
|
|
else:
|
|
logger.info("No social media links found on website")
|
|
|
|
# 3. Search for additional social media via Brave
|
|
city = company.get('address_city', 'Wejherowo')
|
|
try:
|
|
logger.info(f"Searching Brave for social media: {company['name']} in {city}")
|
|
brave_social = self.brave_searcher.search_social_media(company['name'], city)
|
|
if brave_social:
|
|
logger.info(f"Brave search found: {list(brave_social.keys())}")
|
|
else:
|
|
logger.info("Brave search found no additional social media")
|
|
# Merge, website takes precedence
|
|
for platform, url in brave_social.items():
|
|
if platform not in website_social:
|
|
website_social[platform] = url
|
|
social_sources[platform] = 'brave_search'
|
|
logger.info(f"Added {platform} from Brave search: {url}")
|
|
except Exception as e:
|
|
logger.warning(f"Brave search failed: {str(e)}")
|
|
result['errors'].append(f'Brave search failed: {str(e)}')
|
|
|
|
result['social_media'] = website_social
|
|
result['social_sources'] = social_sources
|
|
logger.info(f"Total social media profiles found: {len(website_social)} - {list(website_social.keys())}")
|
|
|
|
# OAuth: Try Facebook/Instagram Graph API for authenticated data
|
|
try:
|
|
from oauth_service import OAuthService
|
|
from facebook_graph_service import FacebookGraphService
|
|
from database import SessionLocal as OAuthSessionLocal, OAuthToken
|
|
|
|
oauth = OAuthService()
|
|
company_id = company.get('id')
|
|
if company_id:
|
|
oauth_db = OAuthSessionLocal()
|
|
try:
|
|
fb_token = oauth.get_valid_token(oauth_db, company_id, 'meta', 'facebook')
|
|
if fb_token:
|
|
fb_service = FacebookGraphService(fb_token)
|
|
token_rec = oauth_db.query(OAuthToken).filter(
|
|
OAuthToken.company_id == company_id,
|
|
OAuthToken.provider == 'meta',
|
|
OAuthToken.service == 'facebook',
|
|
OAuthToken.is_active == True,
|
|
).first()
|
|
page_id = token_rec.account_id if token_rec else None
|
|
|
|
if page_id:
|
|
page_info = fb_service.get_page_info(page_id)
|
|
if page_info:
|
|
result['oauth_facebook'] = {
|
|
'fan_count': page_info.get('fan_count'),
|
|
'category': page_info.get('category'),
|
|
'data_source': 'oauth_api',
|
|
}
|
|
insights = fb_service.get_page_insights(page_id)
|
|
if insights:
|
|
result['oauth_facebook_insights'] = insights
|
|
|
|
ig_id = fb_service.get_instagram_account(page_id)
|
|
if ig_id:
|
|
ig_insights = fb_service.get_ig_media_insights(ig_id)
|
|
if ig_insights:
|
|
result['oauth_instagram'] = {
|
|
**ig_insights,
|
|
'data_source': 'oauth_api',
|
|
}
|
|
logger.info(f"OAuth Facebook/IG enrichment done for company {company_id}")
|
|
finally:
|
|
oauth_db.close()
|
|
except ImportError:
|
|
pass # Services not yet available
|
|
except Exception as e:
|
|
logger.warning(f"OAuth social media enrichment failed: {e}")
|
|
|
|
# 5. Enrich social media profiles with additional data
|
|
enriched_profiles = {}
|
|
for platform, url in website_social.items():
|
|
logger.info(f"Enriching {platform} profile: {url}")
|
|
enrichment = self.profile_enricher.enrich_profile(platform, url)
|
|
enriched_profiles[platform] = {
|
|
'url': url,
|
|
**enrichment,
|
|
}
|
|
# Calculate completeness score
|
|
enriched_profiles[platform]['profile_completeness_score'] = calculate_profile_completeness(enriched_profiles[platform])
|
|
|
|
# Calculate engagement rate (ESTIMATED - without API we don't have real engagement data)
|
|
profile = enriched_profiles[platform]
|
|
if profile.get('followers_count') and profile.get('followers_count') > 0 and profile.get('posts_count_30d') and profile.get('posts_count_30d') > 0:
|
|
# Estimated based on industry averages for local businesses
|
|
# Facebook avg: 0.5-2%, Instagram: 1-3%, LinkedIn: 0.5-1%
|
|
base_rates = {'facebook': 1.0, 'instagram': 2.0, 'linkedin': 0.7, 'youtube': 0.5, 'twitter': 0.3, 'tiktok': 3.0}
|
|
base = base_rates.get(platform, 1.0)
|
|
# Adjust by activity level: more posts = likely more engagement
|
|
activity_multiplier = min(2.0, profile.get('posts_count_30d', 0) / 4.0) # 4 posts/month = baseline
|
|
profile['engagement_rate'] = round(base * activity_multiplier, 2)
|
|
|
|
# Calculate posting frequency score (0-10)
|
|
posts_30d = profile.get('posts_count_30d')
|
|
if posts_30d is not None:
|
|
if posts_30d == 0:
|
|
profile['posting_frequency_score'] = 0
|
|
elif posts_30d <= 2:
|
|
profile['posting_frequency_score'] = 3
|
|
elif posts_30d <= 4:
|
|
profile['posting_frequency_score'] = 5
|
|
elif posts_30d <= 8:
|
|
profile['posting_frequency_score'] = 7
|
|
elif posts_30d <= 15:
|
|
profile['posting_frequency_score'] = 9
|
|
else:
|
|
profile['posting_frequency_score'] = 10
|
|
|
|
result['enriched_profiles'] = enriched_profiles
|
|
|
|
# 4. Google reviews search - prefer Google Places API if available
|
|
try:
|
|
if self.google_places_searcher:
|
|
# Use Google Places API directly for accurate data
|
|
place_id = self.google_places_searcher.find_place(company['name'], city)
|
|
if place_id:
|
|
details = self.google_places_searcher.get_place_details(place_id)
|
|
result['google_reviews'] = {
|
|
'google_rating': details.get('google_rating'),
|
|
'google_reviews_count': details.get('google_reviews_count'),
|
|
'google_opening_hours': details.get('opening_hours'),
|
|
'google_photos_count': details.get('google_photos_count'),
|
|
'business_status': details.get('business_status'),
|
|
}
|
|
else:
|
|
result['google_reviews'] = {
|
|
'google_rating': None,
|
|
'google_reviews_count': None,
|
|
'google_opening_hours': None,
|
|
'google_photos_count': None,
|
|
'business_status': None,
|
|
}
|
|
else:
|
|
# Fallback to Brave Search
|
|
result['google_reviews'] = self.brave_searcher.search_google_reviews(
|
|
company['name'], city
|
|
)
|
|
except Exception as e:
|
|
result['errors'].append(f'Google reviews search failed: {str(e)}')
|
|
|
|
return result
|
|
|
|
def save_audit_result(self, result: Dict) -> bool:
|
|
"""Save audit result to database."""
|
|
try:
|
|
with self.Session() as session:
|
|
company_id = result['company_id']
|
|
website = result.get('website', {})
|
|
|
|
# Update or insert website analysis
|
|
upsert_website = text("""
|
|
INSERT INTO company_website_analysis (
|
|
company_id, analyzed_at, website_url, http_status_code,
|
|
load_time_ms, has_ssl, ssl_expires_at, ssl_issuer, is_responsive,
|
|
is_mobile_friendly, has_viewport_meta, last_modified_at,
|
|
hosting_provider, hosting_ip, server_software, site_author,
|
|
cms_detected, google_rating, google_reviews_count,
|
|
google_opening_hours, google_photos_count,
|
|
audit_source, audit_version
|
|
) VALUES (
|
|
:company_id, :analyzed_at, :website_url, :http_status_code,
|
|
:load_time_ms, :has_ssl, :ssl_expires_at, :ssl_issuer, :is_responsive,
|
|
:is_mobile_friendly, :has_viewport_meta, :last_modified_at,
|
|
:hosting_provider, :hosting_ip, :server_software, :site_author,
|
|
:cms_detected, :google_rating, :google_reviews_count,
|
|
:google_opening_hours, :google_photos_count,
|
|
:audit_source, :audit_version
|
|
)
|
|
ON CONFLICT (company_id) DO UPDATE SET
|
|
analyzed_at = EXCLUDED.analyzed_at,
|
|
http_status_code = EXCLUDED.http_status_code,
|
|
load_time_ms = EXCLUDED.load_time_ms,
|
|
has_ssl = EXCLUDED.has_ssl,
|
|
ssl_expires_at = EXCLUDED.ssl_expires_at,
|
|
ssl_issuer = EXCLUDED.ssl_issuer,
|
|
is_mobile_friendly = EXCLUDED.is_mobile_friendly,
|
|
has_viewport_meta = EXCLUDED.has_viewport_meta,
|
|
last_modified_at = EXCLUDED.last_modified_at,
|
|
hosting_provider = EXCLUDED.hosting_provider,
|
|
hosting_ip = EXCLUDED.hosting_ip,
|
|
server_software = EXCLUDED.server_software,
|
|
site_author = EXCLUDED.site_author,
|
|
cms_detected = EXCLUDED.cms_detected,
|
|
google_rating = EXCLUDED.google_rating,
|
|
google_reviews_count = EXCLUDED.google_reviews_count,
|
|
google_opening_hours = EXCLUDED.google_opening_hours,
|
|
google_photos_count = EXCLUDED.google_photos_count,
|
|
audit_source = EXCLUDED.audit_source,
|
|
audit_version = EXCLUDED.audit_version
|
|
""")
|
|
|
|
google_reviews = result.get('google_reviews', {})
|
|
|
|
# Convert opening_hours dict to JSON string for JSONB column
|
|
opening_hours = google_reviews.get('google_opening_hours')
|
|
opening_hours_json = json.dumps(opening_hours) if opening_hours else None
|
|
|
|
session.execute(upsert_website, {
|
|
'company_id': company_id,
|
|
'analyzed_at': result['audit_date'],
|
|
'website_url': website.get('url'),
|
|
'http_status_code': website.get('http_status'),
|
|
'load_time_ms': website.get('load_time_ms'),
|
|
'has_ssl': website.get('has_ssl', False),
|
|
'ssl_expires_at': website.get('ssl_expiry'),
|
|
'ssl_issuer': website.get('ssl_issuer'),
|
|
'is_responsive': website.get('is_mobile_friendly', False),
|
|
'is_mobile_friendly': website.get('is_mobile_friendly', False),
|
|
'has_viewport_meta': website.get('has_viewport_meta', False),
|
|
'last_modified_at': website.get('last_modified_at'),
|
|
'hosting_provider': website.get('hosting_provider'),
|
|
'hosting_ip': website.get('hosting_ip'),
|
|
'server_software': website.get('server_software'),
|
|
'site_author': website.get('site_author'),
|
|
'cms_detected': website.get('site_generator'),
|
|
'google_rating': google_reviews.get('google_rating'),
|
|
'google_reviews_count': google_reviews.get('google_reviews_count'),
|
|
'google_opening_hours': opening_hours_json,
|
|
'google_photos_count': google_reviews.get('google_photos_count'),
|
|
'audit_source': 'automated',
|
|
'audit_version': '1.0',
|
|
})
|
|
|
|
# Save social media with enriched data
|
|
social_sources = result.get('social_sources', {})
|
|
for platform, url in result.get('social_media', {}).items():
|
|
normalized_url = normalize_social_url(url, platform)
|
|
|
|
# Get enrichment data if available
|
|
enriched = result.get('enriched_profiles', {}).get(platform, {})
|
|
|
|
upsert_social = text("""
|
|
INSERT INTO company_social_media (
|
|
company_id, platform, url, verified_at, source, is_valid,
|
|
page_name, followers_count,
|
|
has_profile_photo, has_cover_photo, has_bio, profile_description,
|
|
posts_count_30d, posts_count_365d, last_post_date,
|
|
engagement_rate, posting_frequency_score,
|
|
profile_completeness_score, updated_at
|
|
) VALUES (
|
|
:company_id, :platform, :url, :verified_at, :source, :is_valid,
|
|
:page_name, :followers_count,
|
|
:has_profile_photo, :has_cover_photo, :has_bio, :profile_description,
|
|
:posts_count_30d, :posts_count_365d, :last_post_date,
|
|
:engagement_rate, :posting_frequency_score,
|
|
:profile_completeness_score, NOW()
|
|
)
|
|
ON CONFLICT (company_id, platform, url) DO UPDATE SET
|
|
verified_at = EXCLUDED.verified_at,
|
|
source = EXCLUDED.source,
|
|
is_valid = EXCLUDED.is_valid,
|
|
page_name = COALESCE(EXCLUDED.page_name, company_social_media.page_name),
|
|
followers_count = COALESCE(EXCLUDED.followers_count, company_social_media.followers_count),
|
|
has_profile_photo = COALESCE(EXCLUDED.has_profile_photo, company_social_media.has_profile_photo),
|
|
has_cover_photo = COALESCE(EXCLUDED.has_cover_photo, company_social_media.has_cover_photo),
|
|
has_bio = COALESCE(EXCLUDED.has_bio, company_social_media.has_bio),
|
|
profile_description = COALESCE(EXCLUDED.profile_description, company_social_media.profile_description),
|
|
posts_count_30d = COALESCE(EXCLUDED.posts_count_30d, company_social_media.posts_count_30d),
|
|
posts_count_365d = COALESCE(EXCLUDED.posts_count_365d, company_social_media.posts_count_365d),
|
|
engagement_rate = COALESCE(EXCLUDED.engagement_rate, company_social_media.engagement_rate),
|
|
posting_frequency_score = COALESCE(EXCLUDED.posting_frequency_score, company_social_media.posting_frequency_score),
|
|
last_post_date = COALESCE(EXCLUDED.last_post_date, company_social_media.last_post_date),
|
|
profile_completeness_score = COALESCE(EXCLUDED.profile_completeness_score, company_social_media.profile_completeness_score),
|
|
updated_at = NOW()
|
|
""")
|
|
|
|
session.execute(upsert_social, {
|
|
'company_id': company_id,
|
|
'platform': platform,
|
|
'url': normalized_url,
|
|
'verified_at': result['audit_date'],
|
|
'source': social_sources.get(platform, 'website_scrape'),
|
|
'is_valid': True,
|
|
'page_name': enriched.get('page_name'),
|
|
'followers_count': enriched.get('followers_count'),
|
|
'has_profile_photo': enriched.get('has_profile_photo'),
|
|
'has_cover_photo': enriched.get('has_cover_photo'),
|
|
'has_bio': enriched.get('has_bio'),
|
|
'profile_description': enriched.get('profile_description'),
|
|
'posts_count_30d': enriched.get('posts_count_30d'),
|
|
'posts_count_365d': enriched.get('posts_count_365d'),
|
|
'last_post_date': enriched.get('last_post_date'),
|
|
'engagement_rate': enriched.get('engagement_rate'),
|
|
'posting_frequency_score': enriched.get('posting_frequency_score'),
|
|
'profile_completeness_score': enriched.get('profile_completeness_score'),
|
|
})
|
|
|
|
session.commit()
|
|
logger.info(f"Saved audit for company {company_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save audit result: {e}")
|
|
return False
|
|
|
|
def run_audit(self, company_ids: Optional[List[int]] = None,
|
|
batch_start: Optional[int] = None,
|
|
batch_end: Optional[int] = None,
|
|
dry_run: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Run audit for specified companies.
|
|
|
|
Returns summary of audit results.
|
|
"""
|
|
companies = self.get_companies(company_ids, batch_start, batch_end)
|
|
|
|
summary = {
|
|
'total': len(companies),
|
|
'success': 0,
|
|
'failed': 0,
|
|
'results': [],
|
|
}
|
|
|
|
for company in companies:
|
|
try:
|
|
result = self.audit_company(company)
|
|
|
|
if not dry_run:
|
|
if self.save_audit_result(result):
|
|
summary['success'] += 1
|
|
else:
|
|
summary['failed'] += 1
|
|
else:
|
|
summary['success'] += 1
|
|
print(json.dumps(result, default=str, indent=2))
|
|
|
|
summary['results'].append({
|
|
'company_id': company['id'],
|
|
'company_name': company['name'],
|
|
'status': 'success',
|
|
'social_media_found': len(result.get('social_media', {})),
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Audit failed for company {company['id']}: {e}")
|
|
summary['failed'] += 1
|
|
summary['results'].append({
|
|
'company_id': company['id'],
|
|
'company_name': company['name'],
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
})
|
|
|
|
return summary
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Social Media & Website Audit')
|
|
parser.add_argument('--company-id', type=int, help='Audit single company by ID')
|
|
parser.add_argument('--company-slug', type=str, help='Audit single company by slug')
|
|
parser.add_argument('--batch', type=str, help='Audit batch of companies (e.g., 1-10)')
|
|
parser.add_argument('--all', action='store_true', help='Audit all companies')
|
|
parser.add_argument('--dry-run', action='store_true', help='Print results without saving')
|
|
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
auditor = SocialMediaAuditor()
|
|
|
|
if args.company_id:
|
|
summary = auditor.run_audit(company_ids=[args.company_id], dry_run=args.dry_run)
|
|
elif args.company_slug:
|
|
# Look up company ID by slug
|
|
company_id = auditor.get_company_id_by_slug(args.company_slug)
|
|
if company_id:
|
|
summary = auditor.run_audit(company_ids=[company_id], dry_run=args.dry_run)
|
|
else:
|
|
print(f"Error: Company with slug '{args.company_slug}' not found")
|
|
sys.exit(1)
|
|
elif args.batch:
|
|
start, end = map(int, args.batch.split('-'))
|
|
summary = auditor.run_audit(batch_start=start, batch_end=end, dry_run=args.dry_run)
|
|
elif args.all:
|
|
summary = auditor.run_audit(dry_run=args.dry_run)
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"AUDIT SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Total companies: {summary['total']}")
|
|
print(f"Successful: {summary['success']}")
|
|
print(f"Failed: {summary['failed']}")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|