- Add --company-slug argument to social_media_audit.py for easier testing - Add get_company_id_by_slug() method to SocialMediaAuditor class - Add python-dotenv support to load .env file from project root - Create verify_google_places.py script for direct API testing Note: Full verification blocked - current API key (PageSpeed) doesn't have Places API enabled. Requires enabling Places API in Google Cloud Console for project NORDABIZNES (gen-lang-client-0540794446). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1183 lines
47 KiB
Python
1183 lines
47 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Social Media & Website Audit Script for Norda Biznes
|
|
=====================================================
|
|
|
|
Performs comprehensive audit of company websites and social media presence.
|
|
Designed to run with multiple parallel workers.
|
|
|
|
Features:
|
|
- Website analysis (SSL, hosting, author, responsiveness)
|
|
- Social media discovery (FB, IG, TikTok, YouTube, LinkedIn)
|
|
- Google Reviews scraping via Brave Search
|
|
- Parallel execution support
|
|
|
|
Usage:
|
|
python social_media_audit.py --company-id 26
|
|
python social_media_audit.py --batch 1-10
|
|
python social_media_audit.py --all
|
|
|
|
Author: Claude Code
|
|
Date: 2025-12-29
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import re
|
|
import ssl
|
|
import socket
|
|
import argparse
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, List, Tuple, Any
|
|
from urllib.parse import urlparse
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# Load .env file from project root
|
|
try:
|
|
from dotenv import load_dotenv
|
|
# Find .env file relative to this script
|
|
script_dir = Path(__file__).resolve().parent
|
|
project_root = script_dir.parent
|
|
env_path = project_root / '.env'
|
|
if env_path.exists():
|
|
load_dotenv(env_path)
|
|
logging.info(f"Loaded .env from {env_path}")
|
|
except ImportError:
|
|
pass # python-dotenv not installed, rely on system environment
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import whois
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Database configuration
|
|
DATABASE_URL = os.getenv(
|
|
'DATABASE_URL',
|
|
'postgresql://nordabiz_app:NordaBiz2025Secure@127.0.0.1:5432/nordabiz'
|
|
)
|
|
|
|
# Request configuration
|
|
REQUEST_TIMEOUT = 15
|
|
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
|
|
|
# Known Polish hosting providers (IP ranges and identifiers)
|
|
HOSTING_PROVIDERS = {
|
|
'nazwa.pl': ['nazwa.pl', '185.252.', '91.227.'],
|
|
'home.pl': ['home.pl', '212.85.', '195.26.'],
|
|
'OVH': ['ovh.', '51.38.', '51.68.', '51.75.', '51.77.', '51.83.', '51.89.', '51.91.', '54.36.', '54.37.', '54.38.', '135.125.', '141.94.', '141.95.', '142.4.', '144.217.', '145.239.', '147.135.', '149.202.', '151.80.', '158.69.', '164.132.', '167.114.', '176.31.', '178.32.', '185.15.', '188.165.', '192.95.', '193.70.', '194.182.', '195.154.', '198.27.', '198.50.', '198.100.', '213.186.', '213.251.', '217.182.'],
|
|
'cyber_Folks': ['cyberfolks', 'cf.', '77.55.'],
|
|
'Zenbox': ['zenbox', '195.181.'],
|
|
'Linuxpl': ['linuxpl', '91.200.'],
|
|
'Hekko': ['hekko', 'hekko.pl'],
|
|
'Smarthost': ['smarthost'],
|
|
'AZ.pl': ['az.pl', 'aznetwork'],
|
|
'Aftermarket': ['aftermarket', 'aftermarket.pl'],
|
|
'Cloudflare': ['cloudflare', '104.16.', '104.17.', '104.18.', '104.19.', '104.20.', '104.21.', '104.22.', '104.23.', '104.24.', '172.67.'],
|
|
'Google Cloud': ['google', '34.', '35.'],
|
|
'AWS': ['amazon', 'aws', '52.', '54.'],
|
|
'Vercel': ['vercel', '76.76.21.'],
|
|
'Netlify': ['netlify'],
|
|
}
|
|
|
|
# Social media patterns
|
|
SOCIAL_MEDIA_PATTERNS = {
|
|
'facebook': [
|
|
r'(?:https?://)?(?:www\.)?facebook\.com/([^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.)?fb\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
'instagram': [
|
|
r'(?:https?://)?(?:www\.)?instagram\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
'youtube': [
|
|
r'(?:https?://)?(?:www\.)?youtube\.com/(?:channel|c|user|@)/([^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.)?youtube\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
'linkedin': [
|
|
r'(?:https?://)?(?:www\.|pl\.)?linkedin\.com/company/([^/?\s"\'<>]+)',
|
|
r'(?:https?://)?(?:www\.|pl\.)?linkedin\.com/in/([^/?\s"\'<>]+)',
|
|
],
|
|
'tiktok': [
|
|
r'(?:https?://)?(?:www\.)?tiktok\.com/@([^/?\s"\'<>]+)',
|
|
],
|
|
'twitter': [
|
|
r'(?:https?://)?(?:www\.)?(?:twitter|x)\.com/([^/?\s"\'<>]+)',
|
|
],
|
|
}
|
|
|
|
# False positives to exclude
|
|
SOCIAL_MEDIA_EXCLUDE = {
|
|
'facebook': ['sharer', 'share', 'intent', 'plugins', 'dialog', 'sharer.php', 'login', 'pages'],
|
|
'instagram': ['explore', 'accounts', 'p', 'reel'],
|
|
'youtube': ['embed', 'watch', 'playlist', 'results', 'feed'],
|
|
'linkedin': ['shareArticle', 'share', 'login'],
|
|
'tiktok': ['embed', 'video'],
|
|
'twitter': ['intent', 'share', 'widgets.js', 'widgets', 'tweet', 'platform.twitter.com'],
|
|
}
|
|
|
|
|
|
class WebsiteAuditor:
|
|
"""Audits website technical details and metadata."""
|
|
|
|
def __init__(self):
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def audit_website(self, url: str) -> Dict[str, Any]:
|
|
"""
|
|
Perform comprehensive website audit.
|
|
|
|
Returns dict with:
|
|
- http_status, load_time_ms
|
|
- has_ssl, ssl_valid, ssl_expiry
|
|
- hosting_provider, hosting_ip, server_software
|
|
- site_author, site_generator
|
|
- is_mobile_friendly, has_viewport_meta
|
|
- last_modified_at
|
|
- social_media_links (dict of platform -> url)
|
|
"""
|
|
result = {
|
|
'url': url,
|
|
'http_status': None,
|
|
'load_time_ms': None,
|
|
'has_ssl': False,
|
|
'ssl_valid': False,
|
|
'ssl_expiry': None,
|
|
'ssl_issuer': None,
|
|
'hosting_provider': None,
|
|
'hosting_ip': None,
|
|
'server_software': None,
|
|
'site_author': None,
|
|
'site_generator': None,
|
|
'is_mobile_friendly': False,
|
|
'has_viewport_meta': False,
|
|
'last_modified_at': None,
|
|
'social_media_links': {},
|
|
'errors': [],
|
|
}
|
|
|
|
if not url:
|
|
result['errors'].append('No URL provided')
|
|
return result
|
|
|
|
# Normalize URL
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = 'https://' + url
|
|
|
|
parsed = urlparse(url)
|
|
domain = parsed.netloc
|
|
|
|
# 1. Check SSL certificate
|
|
try:
|
|
result.update(self._check_ssl(domain))
|
|
except Exception as e:
|
|
result['errors'].append(f'SSL check failed: {str(e)}')
|
|
|
|
# 2. Resolve IP and detect hosting
|
|
try:
|
|
result.update(self._detect_hosting(domain))
|
|
except Exception as e:
|
|
result['errors'].append(f'Hosting detection failed: {str(e)}')
|
|
|
|
# 3. Fetch page and analyze
|
|
try:
|
|
start_time = time.time()
|
|
response = self.session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True)
|
|
result['load_time_ms'] = int((time.time() - start_time) * 1000)
|
|
result['http_status'] = response.status_code
|
|
result['has_ssl'] = response.url.startswith('https://')
|
|
|
|
# Server header
|
|
result['server_software'] = response.headers.get('Server', '')[:100]
|
|
|
|
# Last-Modified header
|
|
last_mod = response.headers.get('Last-Modified')
|
|
if last_mod:
|
|
try:
|
|
result['last_modified_at'] = datetime.strptime(
|
|
last_mod, '%a, %d %b %Y %H:%M:%S %Z'
|
|
)
|
|
except:
|
|
pass
|
|
|
|
# Parse HTML
|
|
if response.status_code == 200:
|
|
result.update(self._parse_html(response.text))
|
|
|
|
except requests.exceptions.SSLError as e:
|
|
result['errors'].append(f'SSL Error: {str(e)}')
|
|
result['ssl_valid'] = False
|
|
# Try HTTP fallback
|
|
try:
|
|
http_url = url.replace('https://', 'http://')
|
|
response = self.session.get(http_url, timeout=REQUEST_TIMEOUT)
|
|
result['http_status'] = response.status_code
|
|
result['has_ssl'] = False
|
|
if response.status_code == 200:
|
|
result.update(self._parse_html(response.text))
|
|
except Exception as e2:
|
|
result['errors'].append(f'HTTP fallback failed: {str(e2)}')
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
result['errors'].append(f'Request failed: {str(e)}')
|
|
|
|
return result
|
|
|
|
def _check_ssl(self, domain: str) -> Dict[str, Any]:
|
|
"""Check SSL certificate validity, expiry and issuer."""
|
|
result = {'ssl_valid': False, 'ssl_expiry': None, 'ssl_issuer': None}
|
|
|
|
try:
|
|
context = ssl.create_default_context()
|
|
with socket.create_connection((domain, 443), timeout=10) as sock:
|
|
with context.wrap_socket(sock, server_hostname=domain) as ssock:
|
|
cert = ssock.getpeercert()
|
|
result['ssl_valid'] = True
|
|
|
|
# Parse expiry date
|
|
not_after = cert.get('notAfter')
|
|
if not_after:
|
|
result['ssl_expiry'] = datetime.strptime(
|
|
not_after, '%b %d %H:%M:%S %Y %Z'
|
|
).date()
|
|
|
|
# Extract issuer (Certificate Authority)
|
|
issuer = cert.get('issuer')
|
|
if issuer:
|
|
# issuer is tuple of tuples like ((('organizationName', 'Let\'s Encrypt'),),)
|
|
issuer_dict = {}
|
|
for item in issuer:
|
|
for key, value in item:
|
|
issuer_dict[key] = value
|
|
# Prefer Organization name, fallback to Common Name
|
|
issuer_name = issuer_dict.get('organizationName') or issuer_dict.get('commonName')
|
|
if issuer_name:
|
|
result['ssl_issuer'] = issuer_name[:100] # Limit length
|
|
except Exception as e:
|
|
result['ssl_valid'] = False
|
|
|
|
return result
|
|
|
|
def _detect_hosting(self, domain: str) -> Dict[str, Any]:
|
|
"""Detect hosting provider from IP and reverse DNS."""
|
|
result = {'hosting_provider': None, 'hosting_ip': None}
|
|
|
|
try:
|
|
ip = socket.gethostbyname(domain)
|
|
result['hosting_ip'] = ip
|
|
|
|
# Check against known hosting IP ranges
|
|
for provider, patterns in HOSTING_PROVIDERS.items():
|
|
for pattern in patterns:
|
|
if ip.startswith(pattern) or pattern in domain.lower():
|
|
result['hosting_provider'] = provider
|
|
return result
|
|
|
|
# Try reverse DNS
|
|
try:
|
|
reverse = socket.gethostbyaddr(ip)[0]
|
|
for provider, patterns in HOSTING_PROVIDERS.items():
|
|
for pattern in patterns:
|
|
if pattern in reverse.lower():
|
|
result['hosting_provider'] = provider
|
|
return result
|
|
except:
|
|
pass
|
|
|
|
# Try WHOIS for registrar
|
|
try:
|
|
w = whois.whois(domain)
|
|
if w.registrar:
|
|
result['domain_registrar'] = str(w.registrar)[:100]
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
result['errors'] = [f'Hosting detection: {str(e)}']
|
|
|
|
return result
|
|
|
|
def _parse_html(self, html: str) -> Dict[str, Any]:
|
|
"""Parse HTML for metadata and social media links."""
|
|
result = {
|
|
'site_author': None,
|
|
'site_generator': None,
|
|
'is_mobile_friendly': False,
|
|
'has_viewport_meta': False,
|
|
'social_media_links': {},
|
|
}
|
|
|
|
try:
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Check viewport meta (mobile-friendly indicator)
|
|
viewport = soup.find('meta', attrs={'name': 'viewport'})
|
|
if viewport:
|
|
result['has_viewport_meta'] = True
|
|
content = viewport.get('content', '')
|
|
if 'width=device-width' in content:
|
|
result['is_mobile_friendly'] = True
|
|
|
|
# Author meta
|
|
author = soup.find('meta', attrs={'name': 'author'})
|
|
if author:
|
|
result['site_author'] = author.get('content', '')[:255]
|
|
|
|
# Generator meta (CMS)
|
|
generator = soup.find('meta', attrs={'name': 'generator'})
|
|
if generator:
|
|
result['site_generator'] = generator.get('content', '')[:100]
|
|
|
|
# Look for author in multiple places
|
|
if not result['site_author']:
|
|
author_found = None
|
|
|
|
# 1. Check HTML comments for author info
|
|
comments = soup.find_all(string=lambda text: isinstance(text, str) and '<!--' in str(text.parent) if text.parent else False)
|
|
html_comments = re.findall(r'<!--(.+?)-->', html, re.DOTALL)
|
|
for comment in html_comments:
|
|
comment_patterns = [
|
|
r'(?:created by|designed by|developed by|made by|author)[:\s]+([^\n<>]+)',
|
|
r'(?:agencja|agency|studio)[:\s]+([^\n<>]+)',
|
|
]
|
|
for pattern in comment_patterns:
|
|
match = re.search(pattern, comment, re.IGNORECASE)
|
|
if match:
|
|
author_found = match.group(1).strip()
|
|
break
|
|
if author_found:
|
|
break
|
|
|
|
# 2. Check footer text
|
|
if not author_found:
|
|
footer = soup.find('footer')
|
|
if footer:
|
|
footer_text = footer.get_text(separator=' ')
|
|
footer_patterns = [
|
|
r'(?:wykonanie|realizacja|created by|designed by|made by|developed by)[:\s]+([^|<>\n©]+)',
|
|
r'(?:projekt|design|strona)[:\s]+([^|<>\n©]+)',
|
|
r'(?:powered by|built with)[:\s]+([^|<>\n©]+)',
|
|
r'(?:agencja|agency|studio)[:\s]+([^|<>\n©]+)',
|
|
]
|
|
for pattern in footer_patterns:
|
|
match = re.search(pattern, footer_text, re.IGNORECASE)
|
|
if match:
|
|
author_found = match.group(1).strip()
|
|
break
|
|
|
|
# 3. Check footer links for agency/studio domains
|
|
if not author_found:
|
|
footer_links = footer.find_all('a', href=True)
|
|
agency_domains = ['.pl', '.com', '.eu']
|
|
agency_keywords = ['studio', 'agencja', 'agency', 'design', 'web', 'digital', 'media', 'creative']
|
|
for link in footer_links:
|
|
href = link.get('href', '')
|
|
link_text = link.get_text().strip()
|
|
# Check if link looks like an agency
|
|
if any(kw in href.lower() or kw in link_text.lower() for kw in agency_keywords):
|
|
if any(dom in href for dom in agency_domains) and 'facebook' not in href and 'instagram' not in href:
|
|
# Extract domain or link text as author
|
|
if link_text and len(link_text) > 2 and len(link_text) < 50:
|
|
author_found = link_text
|
|
break
|
|
|
|
# 4. Check entire page for common Polish patterns
|
|
if not author_found:
|
|
page_text = soup.get_text(separator=' ')
|
|
page_patterns = [
|
|
r'(?:stronę wykonała?|witrynę wykonała?|stronę stworzył[ao]?)[:\s]+([^|<>\n©.]+)',
|
|
r'(?:copyright|©).*?(?:by|przez)[:\s]+([^|<>\n©.]+)',
|
|
]
|
|
for pattern in page_patterns:
|
|
match = re.search(pattern, page_text, re.IGNORECASE)
|
|
if match:
|
|
author_found = match.group(1).strip()
|
|
break
|
|
|
|
# Clean up author name
|
|
if author_found:
|
|
# Remove common prefixes/suffixes
|
|
author_found = re.sub(r'^[\s\-–—:]+', '', author_found)
|
|
author_found = re.sub(r'[\s\-–—:]+$', '', author_found)
|
|
author_found = re.sub(r'\s+', ' ', author_found)
|
|
# Remove if too short or looks like garbage
|
|
if len(author_found) > 2 and len(author_found) < 100:
|
|
result['site_author'] = author_found[:255]
|
|
|
|
# Extract social media links
|
|
html_lower = html.lower()
|
|
for platform, patterns in SOCIAL_MEDIA_PATTERNS.items():
|
|
for pattern in patterns:
|
|
matches = re.findall(pattern, html, re.IGNORECASE)
|
|
if matches:
|
|
# Get first valid match, excluding common false positives
|
|
for match in matches:
|
|
# Check against exclusion list (exact match only to avoid false positives)
|
|
excludes = SOCIAL_MEDIA_EXCLUDE.get(platform, [])
|
|
if match.lower() not in excludes:
|
|
# Construct full URL
|
|
if platform == 'facebook':
|
|
url = f'https://facebook.com/{match}'
|
|
elif platform == 'instagram':
|
|
url = f'https://instagram.com/{match}'
|
|
elif platform == 'youtube':
|
|
if match.startswith('@'):
|
|
url = f'https://youtube.com/{match}'
|
|
else:
|
|
url = f'https://youtube.com/channel/{match}'
|
|
elif platform == 'linkedin':
|
|
url = f'https://linkedin.com/company/{match}'
|
|
elif platform == 'tiktok':
|
|
url = f'https://tiktok.com/@{match}'
|
|
elif platform == 'twitter':
|
|
url = f'https://twitter.com/{match}'
|
|
else:
|
|
continue
|
|
|
|
result['social_media_links'][platform] = url
|
|
break
|
|
|
|
except Exception as e:
|
|
result['errors'] = [f'HTML parsing: {str(e)}']
|
|
|
|
return result
|
|
|
|
|
|
class GooglePlacesSearcher:
|
|
"""Search for Google Business profiles using Google Places API."""
|
|
|
|
# Google Places API configuration
|
|
FIND_PLACE_URL = 'https://maps.googleapis.com/maps/api/place/findplacefromtext/json'
|
|
PLACE_DETAILS_URL = 'https://maps.googleapis.com/maps/api/place/details/json'
|
|
|
|
def __init__(self, api_key: Optional[str] = None):
|
|
"""
|
|
Initialize GooglePlacesSearcher.
|
|
|
|
Args:
|
|
api_key: Google Places API key. Falls back to GOOGLE_PLACES_API_KEY env var.
|
|
"""
|
|
self.api_key = api_key or os.getenv('GOOGLE_PLACES_API_KEY')
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def find_place(self, company_name: str, city: str = 'Wejherowo') -> Optional[str]:
|
|
"""
|
|
Find a place by company name and city.
|
|
|
|
Uses Google Places findplacefromtext API to search for a business
|
|
and returns the place_id if found.
|
|
|
|
Args:
|
|
company_name: Name of the company to search for.
|
|
city: City to narrow down the search (default: Wejherowo).
|
|
|
|
Returns:
|
|
place_id string if found, None otherwise.
|
|
"""
|
|
if not self.api_key:
|
|
logger.warning('Google Places API key not configured')
|
|
return None
|
|
|
|
try:
|
|
# Construct search query with company name and city
|
|
search_query = f'{company_name} {city}'
|
|
|
|
params = {
|
|
'input': search_query,
|
|
'inputtype': 'textquery',
|
|
'fields': 'place_id,name,formatted_address',
|
|
'language': 'pl',
|
|
'key': self.api_key,
|
|
}
|
|
|
|
response = self.session.get(
|
|
self.FIND_PLACE_URL,
|
|
params=params,
|
|
timeout=REQUEST_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
if data.get('status') == 'OK' and data.get('candidates'):
|
|
candidate = data['candidates'][0]
|
|
place_id = candidate.get('place_id')
|
|
logger.info(
|
|
f"Found place for '{company_name}': {candidate.get('name')} "
|
|
f"at {candidate.get('formatted_address')}"
|
|
)
|
|
return place_id
|
|
elif data.get('status') == 'ZERO_RESULTS':
|
|
logger.info(f"No Google Business Profile found for '{company_name}' in {city}")
|
|
return None
|
|
else:
|
|
logger.warning(
|
|
f"Google Places API returned status: {data.get('status')} "
|
|
f"for '{company_name}'"
|
|
)
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.error(f"Timeout searching for '{company_name}' on Google Places")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request error searching for '{company_name}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error finding place for '{company_name}': {e}")
|
|
return None
|
|
|
|
def get_place_details(self, place_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get detailed information about a place.
|
|
|
|
Retrieves rating, review count, opening hours, and other business details
|
|
from Google Places API.
|
|
|
|
Args:
|
|
place_id: Google Place ID returned from find_place().
|
|
|
|
Returns:
|
|
Dict containing:
|
|
- google_rating: Decimal rating (1.0-5.0) or None
|
|
- google_reviews_count: Integer review count or None
|
|
- opening_hours: Dict with weekday_text and open_now, or None
|
|
- business_status: String like 'OPERATIONAL', 'CLOSED_TEMPORARILY', etc.
|
|
- formatted_phone: Phone number or None
|
|
- website: Website URL or None
|
|
"""
|
|
result = {
|
|
'google_rating': None,
|
|
'google_reviews_count': None,
|
|
'opening_hours': None,
|
|
'business_status': None,
|
|
'formatted_phone': None,
|
|
'website': None,
|
|
}
|
|
|
|
if not self.api_key:
|
|
logger.warning('Google Places API key not configured')
|
|
return result
|
|
|
|
if not place_id:
|
|
return result
|
|
|
|
try:
|
|
# Request fields we need for the audit
|
|
fields = [
|
|
'rating',
|
|
'user_ratings_total',
|
|
'opening_hours',
|
|
'business_status',
|
|
'formatted_phone_number',
|
|
'website',
|
|
'name',
|
|
]
|
|
|
|
params = {
|
|
'place_id': place_id,
|
|
'fields': ','.join(fields),
|
|
'language': 'pl',
|
|
'key': self.api_key,
|
|
}
|
|
|
|
response = self.session.get(
|
|
self.PLACE_DETAILS_URL,
|
|
params=params,
|
|
timeout=REQUEST_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
if data.get('status') == 'OK' and data.get('result'):
|
|
place = data['result']
|
|
|
|
# Extract rating
|
|
if 'rating' in place:
|
|
result['google_rating'] = round(float(place['rating']), 1)
|
|
|
|
# Extract review count
|
|
if 'user_ratings_total' in place:
|
|
result['google_reviews_count'] = int(place['user_ratings_total'])
|
|
|
|
# Extract opening hours
|
|
if 'opening_hours' in place:
|
|
hours = place['opening_hours']
|
|
result['opening_hours'] = {
|
|
'weekday_text': hours.get('weekday_text', []),
|
|
'open_now': hours.get('open_now'),
|
|
'periods': hours.get('periods', []),
|
|
}
|
|
|
|
# Extract business status
|
|
if 'business_status' in place:
|
|
result['business_status'] = place['business_status']
|
|
|
|
# Extract phone
|
|
if 'formatted_phone_number' in place:
|
|
result['formatted_phone'] = place['formatted_phone_number']
|
|
|
|
# Extract website
|
|
if 'website' in place:
|
|
result['website'] = place['website']
|
|
|
|
logger.info(
|
|
f"Retrieved details for {place.get('name')}: "
|
|
f"rating={result['google_rating']}, "
|
|
f"reviews={result['google_reviews_count']}"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Google Places API returned status: {data.get('status')} "
|
|
f"for place_id: {place_id}"
|
|
)
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.error(f"Timeout getting details for place_id: {place_id}")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request error getting place details: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error getting place details for {place_id}: {e}")
|
|
|
|
return result
|
|
|
|
|
|
class BraveSearcher:
|
|
"""Search for social media profiles and Google reviews using Brave Search."""
|
|
|
|
def __init__(self, api_key: Optional[str] = None):
|
|
self.api_key = api_key or os.getenv('BRAVE_API_KEY')
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def search_social_media(self, company_name: str, city: str = 'Wejherowo') -> Dict[str, str]:
|
|
"""
|
|
Search for company social media profiles.
|
|
Returns dict of platform -> url.
|
|
"""
|
|
results = {}
|
|
|
|
platforms = [
|
|
('facebook', f'{company_name} {city} facebook'),
|
|
('instagram', f'{company_name} instagram'),
|
|
('tiktok', f'{company_name} tiktok'),
|
|
('youtube', f'{company_name} youtube kanał'),
|
|
('linkedin', f'{company_name} linkedin'),
|
|
]
|
|
|
|
for platform, query in platforms:
|
|
try:
|
|
url = self._search_brave(query, platform)
|
|
if url:
|
|
results[platform] = url
|
|
time.sleep(0.5) # Rate limiting
|
|
except Exception as e:
|
|
logger.warning(f'Brave search failed for {platform}: {e}')
|
|
|
|
return results
|
|
|
|
def search_google_reviews(self, company_name: str, city: str = 'Wejherowo') -> Dict[str, Any]:
|
|
"""
|
|
Search for Google reviews using Google Places API.
|
|
|
|
This method uses the GooglePlacesSearcher to find the company on Google
|
|
and retrieve its rating and review count.
|
|
|
|
Args:
|
|
company_name: Name of the company to search for.
|
|
city: City to narrow down the search (default: Wejherowo).
|
|
|
|
Returns:
|
|
Dict containing:
|
|
- google_rating: Decimal rating (1.0-5.0) or None
|
|
- google_reviews_count: Integer review count or None
|
|
- opening_hours: Dict with weekday_text and open_now, or None
|
|
- business_status: String like 'OPERATIONAL', 'CLOSED_TEMPORARILY', etc.
|
|
"""
|
|
result = {
|
|
'google_rating': None,
|
|
'google_reviews_count': None,
|
|
'opening_hours': None,
|
|
'business_status': None,
|
|
}
|
|
|
|
try:
|
|
# Use Google Places API for accurate data
|
|
google_api_key = os.getenv('GOOGLE_PLACES_API_KEY')
|
|
|
|
if google_api_key:
|
|
# Use GooglePlacesSearcher for accurate data retrieval
|
|
places_searcher = GooglePlacesSearcher(api_key=google_api_key)
|
|
|
|
# Step 1: Find the place by company name and city
|
|
place_id = places_searcher.find_place(company_name, city)
|
|
|
|
if place_id:
|
|
# Step 2: Get detailed information including reviews
|
|
details = places_searcher.get_place_details(place_id)
|
|
|
|
result['google_rating'] = details.get('google_rating')
|
|
result['google_reviews_count'] = details.get('google_reviews_count')
|
|
result['opening_hours'] = details.get('opening_hours')
|
|
result['business_status'] = details.get('business_status')
|
|
|
|
logger.info(
|
|
f"Google reviews for '{company_name}': "
|
|
f"rating={result['google_rating']}, "
|
|
f"reviews={result['google_reviews_count']}, "
|
|
f"status={result['business_status']}"
|
|
)
|
|
else:
|
|
logger.info(f"No Google Business Profile found for '{company_name}' in {city}")
|
|
else:
|
|
# Fallback: Try Brave Search API if available
|
|
if self.api_key:
|
|
brave_result = self._search_brave_for_reviews(company_name, city)
|
|
if brave_result:
|
|
result.update(brave_result)
|
|
else:
|
|
logger.warning(
|
|
'Neither GOOGLE_PLACES_API_KEY nor BRAVE_API_KEY configured. '
|
|
'Cannot retrieve Google reviews data.'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(f'Google reviews search failed for {company_name}: {e}')
|
|
|
|
return result
|
|
|
|
def _search_brave_for_reviews(self, company_name: str, city: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Fallback method to search for Google reviews via Brave Search API.
|
|
|
|
This parses search results to extract rating and review count from
|
|
Google Business snippets in search results.
|
|
|
|
Args:
|
|
company_name: Name of the company.
|
|
city: City for location context.
|
|
|
|
Returns:
|
|
Dict with google_rating and google_reviews_count, or None if not found.
|
|
"""
|
|
if not self.api_key:
|
|
return None
|
|
|
|
try:
|
|
query = f'{company_name} {city} opinie google'
|
|
|
|
# Brave Web Search API endpoint
|
|
url = 'https://api.search.brave.com/res/v1/web/search'
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
'Accept-Encoding': 'gzip',
|
|
'X-Subscription-Token': self.api_key,
|
|
}
|
|
params = {
|
|
'q': query,
|
|
'count': 10,
|
|
'country': 'pl',
|
|
'search_lang': 'pl',
|
|
'ui_lang': 'pl-PL',
|
|
}
|
|
|
|
response = self.session.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Parse search results for rating/review patterns
|
|
# Google snippets often contain patterns like "4,5 (123 opinii)" or "Rating: 4.5 · 123 reviews"
|
|
for result in data.get('web', {}).get('results', []):
|
|
snippet = result.get('description', '') + ' ' + result.get('title', '')
|
|
|
|
# Pattern for Polish Google reviews: "4,5 (123 opinii)" or "4.5 · 123 reviews"
|
|
rating_patterns = [
|
|
r'(\d+[,\.]\d)\s*[·\(]\s*(\d+)\s*(?:opinii|recenzji|reviews)',
|
|
r'ocena[:\s]+(\d+[,\.]\d).*?(\d+)\s*(?:opinii|recenzji)',
|
|
r'rating[:\s]+(\d+[,\.]\d).*?(\d+)\s*(?:reviews|opinii)',
|
|
]
|
|
|
|
for pattern in rating_patterns:
|
|
match = re.search(pattern, snippet, re.IGNORECASE)
|
|
if match:
|
|
rating_str = match.group(1).replace(',', '.')
|
|
reviews_str = match.group(2)
|
|
|
|
return {
|
|
'google_rating': round(float(rating_str), 1),
|
|
'google_reviews_count': int(reviews_str),
|
|
}
|
|
|
|
logger.info(f"No Google reviews data found in Brave results for '{company_name}'")
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.warning(f"Timeout searching Brave for '{company_name}' reviews")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"Brave API request failed for '{company_name}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing Brave results for '{company_name}': {e}")
|
|
return None
|
|
|
|
def _search_brave(self, query: str, platform: str) -> Optional[str]:
|
|
"""
|
|
Perform Brave search and extract relevant URL.
|
|
Note: This is a placeholder - actual implementation would use Brave API.
|
|
"""
|
|
# Placeholder for Brave Search API integration
|
|
# In production, this would call the Brave Search API
|
|
return None
|
|
|
|
|
|
class SocialMediaAuditor:
|
|
"""Main auditor class that coordinates website and social media auditing."""
|
|
|
|
def __init__(self, database_url: str = DATABASE_URL):
|
|
self.engine = create_engine(database_url)
|
|
self.Session = sessionmaker(bind=self.engine)
|
|
self.website_auditor = WebsiteAuditor()
|
|
self.brave_searcher = BraveSearcher()
|
|
|
|
# Initialize Google Places searcher if API key is available
|
|
google_places_api_key = os.getenv('GOOGLE_PLACES_API_KEY')
|
|
if google_places_api_key:
|
|
self.google_places_searcher = GooglePlacesSearcher(api_key=google_places_api_key)
|
|
logger.info('Google Places API key found - using Places API for reviews')
|
|
else:
|
|
self.google_places_searcher = None
|
|
logger.info('GOOGLE_PLACES_API_KEY not set - falling back to Brave Search for reviews')
|
|
|
|
def get_companies(self, company_ids: Optional[List[int]] = None,
|
|
batch_start: Optional[int] = None,
|
|
batch_end: Optional[int] = None) -> List[Dict]:
|
|
"""Fetch companies from database."""
|
|
with self.Session() as session:
|
|
if company_ids:
|
|
query = text("""
|
|
SELECT id, name, slug, website, address_city
|
|
FROM companies
|
|
WHERE id = ANY(:ids)
|
|
ORDER BY id
|
|
""")
|
|
result = session.execute(query, {'ids': company_ids})
|
|
elif batch_start is not None and batch_end is not None:
|
|
query = text("""
|
|
SELECT id, name, slug, website, address_city
|
|
FROM companies
|
|
ORDER BY id
|
|
OFFSET :offset LIMIT :limit
|
|
""")
|
|
result = session.execute(query, {
|
|
'offset': batch_start - 1,
|
|
'limit': batch_end - batch_start + 1
|
|
})
|
|
else:
|
|
query = text("""
|
|
SELECT id, name, slug, website, address_city
|
|
FROM companies
|
|
ORDER BY id
|
|
""")
|
|
result = session.execute(query)
|
|
|
|
return [dict(row._mapping) for row in result]
|
|
|
|
def get_company_id_by_slug(self, slug: str) -> Optional[int]:
|
|
"""Get company ID by slug."""
|
|
with self.Session() as session:
|
|
query = text("""
|
|
SELECT id FROM companies WHERE slug = :slug
|
|
""")
|
|
result = session.execute(query, {'slug': slug})
|
|
row = result.fetchone()
|
|
if row:
|
|
return row[0]
|
|
return None
|
|
|
|
def audit_company(self, company: Dict) -> Dict[str, Any]:
|
|
"""
|
|
Perform full audit for a single company.
|
|
|
|
Returns comprehensive audit result.
|
|
"""
|
|
logger.info(f"Auditing company: {company['name']} (ID: {company['id']})")
|
|
|
|
result = {
|
|
'company_id': company['id'],
|
|
'company_name': company['name'],
|
|
'audit_date': datetime.now(),
|
|
'website': {},
|
|
'social_media': {},
|
|
'google_reviews': {},
|
|
'errors': [],
|
|
}
|
|
|
|
# 1. Website audit
|
|
if company.get('website'):
|
|
try:
|
|
result['website'] = self.website_auditor.audit_website(company['website'])
|
|
except Exception as e:
|
|
result['errors'].append(f'Website audit failed: {str(e)}')
|
|
else:
|
|
result['website'] = {'errors': ['No website URL']}
|
|
|
|
# 2. Social media from website
|
|
website_social = result['website'].get('social_media_links', {})
|
|
|
|
# 3. Search for additional social media via Brave
|
|
city = company.get('address_city', 'Wejherowo')
|
|
try:
|
|
brave_social = self.brave_searcher.search_social_media(company['name'], city)
|
|
# Merge, website takes precedence
|
|
for platform, url in brave_social.items():
|
|
if platform not in website_social:
|
|
website_social[platform] = url
|
|
except Exception as e:
|
|
result['errors'].append(f'Brave search failed: {str(e)}')
|
|
|
|
result['social_media'] = website_social
|
|
|
|
# 4. Google reviews search - prefer Google Places API if available
|
|
try:
|
|
if self.google_places_searcher:
|
|
# Use Google Places API directly for accurate data
|
|
place_id = self.google_places_searcher.find_place(company['name'], city)
|
|
if place_id:
|
|
details = self.google_places_searcher.get_place_details(place_id)
|
|
result['google_reviews'] = {
|
|
'google_rating': details.get('google_rating'),
|
|
'google_reviews_count': details.get('google_reviews_count'),
|
|
'opening_hours': details.get('opening_hours'),
|
|
'business_status': details.get('business_status'),
|
|
}
|
|
else:
|
|
result['google_reviews'] = {
|
|
'google_rating': None,
|
|
'google_reviews_count': None,
|
|
'opening_hours': None,
|
|
'business_status': None,
|
|
}
|
|
else:
|
|
# Fallback to Brave Search
|
|
result['google_reviews'] = self.brave_searcher.search_google_reviews(
|
|
company['name'], city
|
|
)
|
|
except Exception as e:
|
|
result['errors'].append(f'Google reviews search failed: {str(e)}')
|
|
|
|
return result
|
|
|
|
def save_audit_result(self, result: Dict) -> bool:
|
|
"""Save audit result to database."""
|
|
try:
|
|
with self.Session() as session:
|
|
company_id = result['company_id']
|
|
website = result.get('website', {})
|
|
|
|
# Update or insert website analysis
|
|
upsert_website = text("""
|
|
INSERT INTO company_website_analysis (
|
|
company_id, analyzed_at, website_url, http_status_code,
|
|
load_time_ms, has_ssl, ssl_expires_at, ssl_issuer, is_responsive,
|
|
is_mobile_friendly, has_viewport_meta, last_modified_at,
|
|
hosting_provider, hosting_ip, server_software, site_author,
|
|
cms_detected, google_rating, google_reviews_count,
|
|
audit_source, audit_version
|
|
) VALUES (
|
|
:company_id, :analyzed_at, :website_url, :http_status_code,
|
|
:load_time_ms, :has_ssl, :ssl_expires_at, :ssl_issuer, :is_responsive,
|
|
:is_mobile_friendly, :has_viewport_meta, :last_modified_at,
|
|
:hosting_provider, :hosting_ip, :server_software, :site_author,
|
|
:cms_detected, :google_rating, :google_reviews_count,
|
|
:audit_source, :audit_version
|
|
)
|
|
ON CONFLICT (company_id) DO UPDATE SET
|
|
analyzed_at = EXCLUDED.analyzed_at,
|
|
http_status_code = EXCLUDED.http_status_code,
|
|
load_time_ms = EXCLUDED.load_time_ms,
|
|
has_ssl = EXCLUDED.has_ssl,
|
|
ssl_expires_at = EXCLUDED.ssl_expires_at,
|
|
ssl_issuer = EXCLUDED.ssl_issuer,
|
|
is_mobile_friendly = EXCLUDED.is_mobile_friendly,
|
|
has_viewport_meta = EXCLUDED.has_viewport_meta,
|
|
last_modified_at = EXCLUDED.last_modified_at,
|
|
hosting_provider = EXCLUDED.hosting_provider,
|
|
hosting_ip = EXCLUDED.hosting_ip,
|
|
server_software = EXCLUDED.server_software,
|
|
site_author = EXCLUDED.site_author,
|
|
cms_detected = EXCLUDED.cms_detected,
|
|
google_rating = EXCLUDED.google_rating,
|
|
google_reviews_count = EXCLUDED.google_reviews_count,
|
|
audit_source = EXCLUDED.audit_source,
|
|
audit_version = EXCLUDED.audit_version
|
|
""")
|
|
|
|
google_reviews = result.get('google_reviews', {})
|
|
|
|
session.execute(upsert_website, {
|
|
'company_id': company_id,
|
|
'analyzed_at': result['audit_date'],
|
|
'website_url': website.get('url'),
|
|
'http_status_code': website.get('http_status'),
|
|
'load_time_ms': website.get('load_time_ms'),
|
|
'has_ssl': website.get('has_ssl', False),
|
|
'ssl_expires_at': website.get('ssl_expiry'),
|
|
'ssl_issuer': website.get('ssl_issuer'),
|
|
'is_responsive': website.get('is_mobile_friendly', False),
|
|
'is_mobile_friendly': website.get('is_mobile_friendly', False),
|
|
'has_viewport_meta': website.get('has_viewport_meta', False),
|
|
'last_modified_at': website.get('last_modified_at'),
|
|
'hosting_provider': website.get('hosting_provider'),
|
|
'hosting_ip': website.get('hosting_ip'),
|
|
'server_software': website.get('server_software'),
|
|
'site_author': website.get('site_author'),
|
|
'cms_detected': website.get('site_generator'),
|
|
'google_rating': google_reviews.get('google_rating'),
|
|
'google_reviews_count': google_reviews.get('google_reviews_count'),
|
|
'audit_source': 'automated',
|
|
'audit_version': '1.0',
|
|
})
|
|
|
|
# Save social media
|
|
for platform, url in result.get('social_media', {}).items():
|
|
upsert_social = text("""
|
|
INSERT INTO company_social_media (
|
|
company_id, platform, url, verified_at, source, is_valid
|
|
) VALUES (
|
|
:company_id, :platform, :url, :verified_at, :source, :is_valid
|
|
)
|
|
ON CONFLICT (company_id, platform, url) DO UPDATE SET
|
|
verified_at = EXCLUDED.verified_at,
|
|
source = EXCLUDED.source,
|
|
is_valid = EXCLUDED.is_valid
|
|
""")
|
|
|
|
session.execute(upsert_social, {
|
|
'company_id': company_id,
|
|
'platform': platform,
|
|
'url': url,
|
|
'verified_at': result['audit_date'],
|
|
'source': 'website_scrape',
|
|
'is_valid': True,
|
|
})
|
|
|
|
session.commit()
|
|
logger.info(f"Saved audit for company {company_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save audit result: {e}")
|
|
return False
|
|
|
|
def run_audit(self, company_ids: Optional[List[int]] = None,
|
|
batch_start: Optional[int] = None,
|
|
batch_end: Optional[int] = None,
|
|
dry_run: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Run audit for specified companies.
|
|
|
|
Returns summary of audit results.
|
|
"""
|
|
companies = self.get_companies(company_ids, batch_start, batch_end)
|
|
|
|
summary = {
|
|
'total': len(companies),
|
|
'success': 0,
|
|
'failed': 0,
|
|
'results': [],
|
|
}
|
|
|
|
for company in companies:
|
|
try:
|
|
result = self.audit_company(company)
|
|
|
|
if not dry_run:
|
|
if self.save_audit_result(result):
|
|
summary['success'] += 1
|
|
else:
|
|
summary['failed'] += 1
|
|
else:
|
|
summary['success'] += 1
|
|
print(json.dumps(result, default=str, indent=2))
|
|
|
|
summary['results'].append({
|
|
'company_id': company['id'],
|
|
'company_name': company['name'],
|
|
'status': 'success',
|
|
'social_media_found': len(result.get('social_media', {})),
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Audit failed for company {company['id']}: {e}")
|
|
summary['failed'] += 1
|
|
summary['results'].append({
|
|
'company_id': company['id'],
|
|
'company_name': company['name'],
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
})
|
|
|
|
return summary
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Social Media & Website Audit')
|
|
parser.add_argument('--company-id', type=int, help='Audit single company by ID')
|
|
parser.add_argument('--company-slug', type=str, help='Audit single company by slug')
|
|
parser.add_argument('--batch', type=str, help='Audit batch of companies (e.g., 1-10)')
|
|
parser.add_argument('--all', action='store_true', help='Audit all companies')
|
|
parser.add_argument('--dry-run', action='store_true', help='Print results without saving')
|
|
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
auditor = SocialMediaAuditor()
|
|
|
|
if args.company_id:
|
|
summary = auditor.run_audit(company_ids=[args.company_id], dry_run=args.dry_run)
|
|
elif args.company_slug:
|
|
# Look up company ID by slug
|
|
company_id = auditor.get_company_id_by_slug(args.company_slug)
|
|
if company_id:
|
|
summary = auditor.run_audit(company_ids=[company_id], dry_run=args.dry_run)
|
|
else:
|
|
print(f"Error: Company with slug '{args.company_slug}' not found")
|
|
sys.exit(1)
|
|
elif args.batch:
|
|
start, end = map(int, args.batch.split('-'))
|
|
summary = auditor.run_audit(batch_start=start, batch_end=end, dry_run=args.dry_run)
|
|
elif args.all:
|
|
summary = auditor.run_audit(dry_run=args.dry_run)
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"AUDIT SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Total companies: {summary['total']}")
|
|
print(f"Successful: {summary['success']}")
|
|
print(f"Failed: {summary['failed']}")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|