nordabiz/scripts/seo_audit.py
Maciej Pienczyn 59c50e0267 fix: Handle None values in SEO audit result extraction
Bug: When page fetch fails (SSL error), result['onpage'] is None.
Using dict.get('key', {}) returns None when key exists with None value.

Fix: Use 'or {}' pattern to handle both missing keys and None values.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-13 18:20:19 +01:00

1290 lines
52 KiB
Python

#!/usr/bin/env python3
"""
SEO Audit Script for Norda Biznes
=================================
Performs comprehensive SEO audit of company websites using:
- Google PageSpeed Insights API (performance, accessibility, SEO scores)
- On-page SEO analysis (meta tags, headings, images, links, structured data)
- Technical SEO checks (robots.txt, sitemap, canonical, indexability)
Designed to run in batches with rate limiting for API quota management.
Usage:
python seo_audit.py --company-id 26
python seo_audit.py --batch 1-10
python seo_audit.py --all
python seo_audit.py --company-id 26 --dry-run
Exit codes:
0 - All audits completed successfully
1 - Argument error or invalid input
2 - Partial failures (some audits failed)
3 - All audits failed
4 - Database connection error
5 - API quota exceeded
Author: Claude Code
Date: 2026-01-08
"""
import os
import sys
import json
import argparse
import logging
import time as time_module
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any, Tuple
import requests
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker
# Import SEO analysis components
from pagespeed_client import (
GooglePageSpeedClient,
PageSpeedResult,
PageSpeedAPIError,
QuotaExceededError,
Strategy,
)
from seo_analyzer import (
OnPageSEOAnalyzer,
OnPageSEOResult,
TechnicalSEOChecker,
TechnicalSEOResult,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Exit codes
EXIT_SUCCESS = 0
EXIT_ARGUMENT_ERROR = 1
EXIT_PARTIAL_FAILURES = 2
EXIT_ALL_FAILED = 3
EXIT_DATABASE_ERROR = 4
EXIT_QUOTA_EXCEEDED = 5
# Database configuration
# WARNING: The fallback DATABASE_URL uses a placeholder password.
# Production credentials MUST be set via the DATABASE_URL environment variable.
# NEVER commit real credentials to version control (CWE-798).
DATABASE_URL = os.getenv(
'DATABASE_URL',
'postgresql://nordabiz_app:CHANGE_ME@127.0.0.1:5432/nordabiz'
)
# Request configuration
REQUEST_TIMEOUT = 30
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 NordaBiznes-SEO-Auditor/1.0'
# SEO Audit version for tracking
SEO_AUDIT_VERSION = '1.0.0'
class SEOAuditor:
"""
Main SEO auditor class that coordinates website SEO auditing.
Follows the same pattern as SocialMediaAuditor from social_media_audit.py.
Orchestrates PageSpeed API, on-page analysis, and technical SEO checks.
"""
def __init__(self, database_url: str = DATABASE_URL):
"""
Initialize SEO Auditor.
Args:
database_url: Database connection string.
"""
self.engine = create_engine(database_url)
self.Session = sessionmaker(bind=self.engine)
# Initialize analysis components
self.pagespeed_client = GooglePageSpeedClient()
self.onpage_analyzer = OnPageSEOAnalyzer()
self.technical_checker = TechnicalSEOChecker()
# HTTP session for fetching pages
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def get_companies(self, company_ids: Optional[List[int]] = None,
batch_start: Optional[int] = None,
batch_end: Optional[int] = None) -> List[Dict]:
"""
Fetch companies from database.
Args:
company_ids: List of specific company IDs to fetch.
batch_start: Start index for batch processing (1-indexed).
batch_end: End index for batch processing (1-indexed).
Returns:
List of company dicts with id, name, slug, website.
"""
with self.Session() as session:
if company_ids:
# Use IN clause for SQLite/PostgreSQL compatibility
placeholders = ', '.join([f':id_{i}' for i in range(len(company_ids))])
query = text(f"""
SELECT id, name, slug, website, address_city
FROM companies
WHERE id IN ({placeholders})
ORDER BY id
""")
params = {f'id_{i}': cid for i, cid in enumerate(company_ids)}
result = session.execute(query, params)
elif batch_start is not None and batch_end is not None:
query = text("""
SELECT id, name, slug, website, address_city
FROM companies
ORDER BY id
OFFSET :offset LIMIT :limit
""")
result = session.execute(query, {
'offset': batch_start - 1,
'limit': batch_end - batch_start + 1
})
else:
query = text("""
SELECT id, name, slug, website, address_city
FROM companies
ORDER BY id
""")
result = session.execute(query)
return [dict(row._mapping) for row in result]
def audit_company(self, company: Dict) -> Dict[str, Any]:
"""
Perform full SEO audit for a single company.
Args:
company: Company dict with id, name, slug, website.
Returns:
Comprehensive SEO audit result dict.
"""
logger.info(f"Auditing SEO for: {company['name']} (ID: {company['id']})")
result = {
'company_id': company['id'],
'company_name': company['name'],
'company_slug': company['slug'],
'audit_date': datetime.now(),
'audit_version': SEO_AUDIT_VERSION,
'website_url': company.get('website'),
'pagespeed': None,
'onpage': None,
'technical': None,
'scores': {
'pagespeed_seo': None,
'pagespeed_performance': None,
'pagespeed_accessibility': None,
'pagespeed_best_practices': None,
'overall_seo': None,
},
'errors': [],
}
website_url = company.get('website')
# Check if company has a website
if not website_url:
result['errors'].append('No website URL configured')
logger.warning(f" Company {company['id']} has no website URL")
return result
# Normalize URL
if not website_url.startswith(('http://', 'https://')):
website_url = 'https://' + website_url
result['website_url'] = website_url
# 1. Fetch page HTML for on-page analysis
html_content = None
final_url = website_url
http_status = None
load_time_ms = None
try:
logger.info(f" Fetching page: {website_url}")
start_time = time_module.time()
response = self.session.get(
website_url,
timeout=REQUEST_TIMEOUT,
allow_redirects=True
)
load_time_ms = int((time_module.time() - start_time) * 1000)
http_status = response.status_code
final_url = response.url
if response.status_code == 200:
html_content = response.text
logger.info(f" Page fetched successfully ({load_time_ms}ms)")
else:
result['errors'].append(f'HTTP {response.status_code}')
logger.warning(f" HTTP {response.status_code} for {website_url}")
except requests.exceptions.SSLError as e:
result['errors'].append(f'SSL Error: {str(e)[:100]}')
logger.warning(f" SSL error for {website_url}: {e}")
# Try HTTP fallback
try:
http_url = website_url.replace('https://', 'http://')
response = self.session.get(http_url, timeout=REQUEST_TIMEOUT)
http_status = response.status_code
final_url = response.url
if response.status_code == 200:
html_content = response.text
except Exception as e2:
result['errors'].append(f'HTTP fallback failed: {str(e2)[:50]}')
except requests.exceptions.Timeout:
result['errors'].append(f'Timeout after {REQUEST_TIMEOUT}s')
logger.warning(f" Timeout for {website_url}")
except requests.exceptions.ConnectionError as e:
result['errors'].append(f'Connection error: {str(e)[:100]}')
logger.warning(f" Connection error for {website_url}")
except requests.exceptions.RequestException as e:
result['errors'].append(f'Request error: {str(e)[:100]}')
logger.warning(f" Request error for {website_url}: {e}")
# Store HTTP info
result['http_status'] = http_status
result['load_time_ms'] = load_time_ms
result['final_url'] = final_url
# 2. On-page SEO analysis (if we have HTML)
if html_content:
try:
logger.info(" Running on-page SEO analysis...")
onpage_result = self.onpage_analyzer.analyze_html(
html_content,
base_url=final_url
)
result['onpage'] = onpage_result.to_dict()
logger.info(f" On-page analysis complete")
except Exception as e:
result['errors'].append(f'On-page analysis failed: {str(e)[:100]}')
logger.error(f" On-page analysis error: {e}")
# 3. Technical SEO checks (robots.txt, sitemap, etc.)
try:
logger.info(" Running technical SEO checks...")
technical_result = self.technical_checker.check_url(final_url)
result['technical'] = technical_result.to_dict()
logger.info(f" Technical checks complete")
except Exception as e:
result['errors'].append(f'Technical checks failed: {str(e)[:100]}')
logger.error(f" Technical checks error: {e}")
# 4. PageSpeed Insights API (if quota available)
try:
remaining_quota = self.pagespeed_client.get_remaining_quota()
if remaining_quota > 0:
logger.info(f" Running PageSpeed Insights (quota: {remaining_quota})...")
pagespeed_result = self.pagespeed_client.analyze_url(
final_url,
strategy=Strategy.MOBILE
)
result['pagespeed'] = pagespeed_result.to_dict()
# Extract scores
result['scores']['pagespeed_seo'] = pagespeed_result.scores.seo
result['scores']['pagespeed_performance'] = pagespeed_result.scores.performance
result['scores']['pagespeed_accessibility'] = pagespeed_result.scores.accessibility
result['scores']['pagespeed_best_practices'] = pagespeed_result.scores.best_practices
logger.info(f" PageSpeed complete - SEO: {pagespeed_result.scores.seo}, "
f"Perf: {pagespeed_result.scores.performance}")
else:
result['errors'].append('PageSpeed API quota exceeded')
logger.warning(" PageSpeed quota exceeded, skipping")
except QuotaExceededError:
result['errors'].append('PageSpeed API quota exceeded')
logger.warning(" PageSpeed quota exceeded")
except PageSpeedAPIError as e:
result['errors'].append(f'PageSpeed API error: {str(e)[:100]}')
logger.error(f" PageSpeed error: {e}")
except Exception as e:
result['errors'].append(f'PageSpeed unexpected error: {str(e)[:100]}')
logger.error(f" PageSpeed unexpected error: {e}")
# 5. Calculate overall SEO score
result['scores']['overall_seo'] = self._calculate_overall_score(result)
return result
def _calculate_overall_score(self, result: Dict[str, Any]) -> Optional[int]:
"""
Calculate an overall SEO score based on all available metrics.
Args:
result: Full audit result dict.
Returns:
Overall SEO score 0-100, or None if insufficient data.
"""
scores = []
weights = []
# PageSpeed SEO score (weight: 3)
if result.get('scores', {}).get('pagespeed_seo') is not None:
scores.append(result['scores']['pagespeed_seo'])
weights.append(3)
# PageSpeed Performance (weight: 2)
if result.get('scores', {}).get('pagespeed_performance') is not None:
scores.append(result['scores']['pagespeed_performance'])
weights.append(2)
# On-page factors score (calculated from analysis)
onpage = result.get('onpage')
if onpage:
onpage_score = self._calculate_onpage_score(onpage)
if onpage_score is not None:
scores.append(onpage_score)
weights.append(2)
# Technical SEO score
technical = result.get('technical')
if technical:
technical_score = self._calculate_technical_score(technical)
if technical_score is not None:
scores.append(technical_score)
weights.append(2)
# Calculate weighted average
if scores and weights:
weighted_sum = sum(s * w for s, w in zip(scores, weights))
total_weight = sum(weights)
return int(round(weighted_sum / total_weight))
return None
def _calculate_onpage_score(self, onpage: Dict[str, Any]) -> Optional[int]:
"""Calculate on-page SEO score from analysis results."""
score = 100
deductions = 0
# Meta tags checks
meta = onpage.get('meta_tags', {})
if not meta.get('title'):
deductions += 15
elif meta.get('title_length', 0) < 30 or meta.get('title_length', 0) > 70:
deductions += 5
if not meta.get('description'):
deductions += 10
elif meta.get('description_length', 0) < 120 or meta.get('description_length', 0) > 160:
deductions += 5
if not meta.get('canonical_url'):
deductions += 5
# Headings check
headings = onpage.get('headings', {})
if headings.get('h1_count', 0) == 0:
deductions += 10
elif headings.get('h1_count', 0) > 1:
deductions += 5
if not headings.get('has_proper_hierarchy', True):
deductions += 5
# Images check
images = onpage.get('images', {})
total_images = images.get('total_images', 0)
images_without_alt = images.get('images_without_alt', 0)
if total_images > 0 and images_without_alt > 0:
alt_ratio = images_without_alt / total_images
if alt_ratio > 0.5:
deductions += 10
elif alt_ratio > 0.2:
deductions += 5
# Structured data check
structured = onpage.get('structured_data', {})
if not structured.get('has_structured_data', False):
deductions += 5
# Open Graph check
og = onpage.get('open_graph', {})
if not og.get('og_title'):
deductions += 3
return max(0, score - deductions)
def _calculate_technical_score(self, technical: Dict[str, Any]) -> Optional[int]:
"""Calculate technical SEO score from check results."""
score = 100
deductions = 0
# Robots.txt check
robots = technical.get('robots_txt', {})
if not robots.get('exists', False):
deductions += 10
elif robots.get('blocks_googlebot', False):
deductions += 20
# Sitemap check
sitemap = technical.get('sitemap', {})
if not sitemap.get('exists', False):
deductions += 10
elif not sitemap.get('is_valid_xml', False):
deductions += 5
# Redirect chain check
redirects = technical.get('redirect_chain', {})
chain_length = redirects.get('chain_length', 0)
if chain_length > 3:
deductions += 10
elif chain_length > 1:
deductions += 5
if redirects.get('has_redirect_loop', False):
deductions += 20
# Indexability check
indexability = technical.get('indexability', {})
if not indexability.get('is_indexable', True):
deductions += 15
# Canonical check
canonical = technical.get('canonical', {})
if canonical.get('has_canonical', False):
if canonical.get('points_to_different_domain', False):
deductions += 10
return max(0, score - deductions)
def save_audit_result(self, result: Dict) -> bool:
"""
Save audit result to database.
Uses ON CONFLICT DO UPDATE for idempotent upserts.
Args:
result: Full audit result dict.
Returns:
True if save was successful, False otherwise.
"""
try:
with self.Session() as session:
company_id = result['company_id']
# Extract values from result (use 'or {}' to handle None values)
onpage = result.get('onpage') or {}
technical = result.get('technical') or {}
pagespeed = result.get('pagespeed') or {}
meta_tags = onpage.get('meta_tags') or {}
headings = onpage.get('headings') or {}
images = onpage.get('images') or {}
links = onpage.get('links') or {}
structured_data = onpage.get('structured_data') or {}
og = onpage.get('open_graph') or {}
tc = onpage.get('twitter_card') or {}
robots = technical.get('robots_txt') or {}
sitemap = technical.get('sitemap') or {}
canonical = technical.get('canonical') or {}
indexability = technical.get('indexability') or {}
cwv = pagespeed.get('core_web_vitals') or {}
ps_scores = pagespeed.get('scores') or {}
# Upsert query for company_website_analysis
# Uses ON CONFLICT DO UPDATE for idempotent upserts
upsert_query = text("""
INSERT INTO company_website_analysis (
company_id, analyzed_at, website_url, final_url,
http_status_code, load_time_ms,
-- PageSpeed Insights
pagespeed_seo_score, pagespeed_performance_score,
pagespeed_accessibility_score, pagespeed_best_practices_score,
pagespeed_audits,
-- On-page SEO
meta_title, meta_description, meta_keywords,
h1_count, h2_count, h3_count, h1_text,
total_images, images_without_alt, images_with_alt,
internal_links_count, external_links_count, broken_links_count,
has_structured_data, structured_data_types, structured_data_json,
-- Technical SEO
has_canonical, canonical_url, is_indexable, noindex_reason,
has_sitemap, has_robots_txt,
viewport_configured, is_mobile_friendly,
-- Core Web Vitals
largest_contentful_paint_ms, first_input_delay_ms, cumulative_layout_shift,
-- Open Graph
has_og_tags, og_title, og_description, og_image,
has_twitter_cards,
-- Language & International
html_lang, has_hreflang,
-- Word count
word_count_homepage,
-- SEO Audit metadata
seo_audit_version, seo_audited_at, seo_audit_errors,
seo_overall_score, seo_health_score, seo_issues
) VALUES (
:company_id, :analyzed_at, :website_url, :final_url,
:http_status_code, :load_time_ms,
:pagespeed_seo_score, :pagespeed_performance_score,
:pagespeed_accessibility_score, :pagespeed_best_practices_score,
:pagespeed_audits,
:meta_title, :meta_description, :meta_keywords,
:h1_count, :h2_count, :h3_count, :h1_text,
:total_images, :images_without_alt, :images_with_alt,
:internal_links_count, :external_links_count, :broken_links_count,
:has_structured_data, :structured_data_types, :structured_data_json,
:has_canonical, :canonical_url, :is_indexable, :noindex_reason,
:has_sitemap, :has_robots_txt,
:viewport_configured, :is_mobile_friendly,
:largest_contentful_paint_ms, :first_input_delay_ms, :cumulative_layout_shift,
:has_og_tags, :og_title, :og_description, :og_image,
:has_twitter_cards,
:html_lang, :has_hreflang,
:word_count_homepage,
:seo_audit_version, :seo_audited_at, :seo_audit_errors,
:seo_overall_score, :seo_health_score, :seo_issues
)
ON CONFLICT (company_id) DO UPDATE SET
analyzed_at = EXCLUDED.analyzed_at,
website_url = EXCLUDED.website_url,
final_url = EXCLUDED.final_url,
http_status_code = EXCLUDED.http_status_code,
load_time_ms = EXCLUDED.load_time_ms,
pagespeed_seo_score = EXCLUDED.pagespeed_seo_score,
pagespeed_performance_score = EXCLUDED.pagespeed_performance_score,
pagespeed_accessibility_score = EXCLUDED.pagespeed_accessibility_score,
pagespeed_best_practices_score = EXCLUDED.pagespeed_best_practices_score,
pagespeed_audits = EXCLUDED.pagespeed_audits,
meta_title = EXCLUDED.meta_title,
meta_description = EXCLUDED.meta_description,
meta_keywords = EXCLUDED.meta_keywords,
h1_count = EXCLUDED.h1_count,
h2_count = EXCLUDED.h2_count,
h3_count = EXCLUDED.h3_count,
h1_text = EXCLUDED.h1_text,
total_images = EXCLUDED.total_images,
images_without_alt = EXCLUDED.images_without_alt,
images_with_alt = EXCLUDED.images_with_alt,
internal_links_count = EXCLUDED.internal_links_count,
external_links_count = EXCLUDED.external_links_count,
broken_links_count = EXCLUDED.broken_links_count,
has_structured_data = EXCLUDED.has_structured_data,
structured_data_types = EXCLUDED.structured_data_types,
structured_data_json = EXCLUDED.structured_data_json,
has_canonical = EXCLUDED.has_canonical,
canonical_url = EXCLUDED.canonical_url,
is_indexable = EXCLUDED.is_indexable,
noindex_reason = EXCLUDED.noindex_reason,
has_sitemap = EXCLUDED.has_sitemap,
has_robots_txt = EXCLUDED.has_robots_txt,
viewport_configured = EXCLUDED.viewport_configured,
is_mobile_friendly = EXCLUDED.is_mobile_friendly,
largest_contentful_paint_ms = EXCLUDED.largest_contentful_paint_ms,
first_input_delay_ms = EXCLUDED.first_input_delay_ms,
cumulative_layout_shift = EXCLUDED.cumulative_layout_shift,
has_og_tags = EXCLUDED.has_og_tags,
og_title = EXCLUDED.og_title,
og_description = EXCLUDED.og_description,
og_image = EXCLUDED.og_image,
has_twitter_cards = EXCLUDED.has_twitter_cards,
html_lang = EXCLUDED.html_lang,
has_hreflang = EXCLUDED.has_hreflang,
word_count_homepage = EXCLUDED.word_count_homepage,
seo_audit_version = EXCLUDED.seo_audit_version,
seo_audited_at = EXCLUDED.seo_audited_at,
seo_audit_errors = EXCLUDED.seo_audit_errors,
seo_overall_score = EXCLUDED.seo_overall_score,
seo_health_score = EXCLUDED.seo_health_score,
seo_issues = EXCLUDED.seo_issues
""")
# Build issues list from errors
issues = []
for error in result.get('errors', []):
issues.append({
'severity': 'error',
'message': error,
})
# Get first H1 text
h1_texts = headings.get('h1_texts', [])
h1_text = h1_texts[0] if h1_texts else None
session.execute(upsert_query, {
'company_id': company_id,
'analyzed_at': result['audit_date'],
'website_url': result.get('website_url'),
'final_url': result.get('final_url'),
'http_status_code': result.get('http_status'),
'load_time_ms': result.get('load_time_ms'),
# PageSpeed scores
'pagespeed_seo_score': ps_scores.get('seo'),
'pagespeed_performance_score': ps_scores.get('performance'),
'pagespeed_accessibility_score': ps_scores.get('accessibility'),
'pagespeed_best_practices_score': ps_scores.get('best_practices'),
'pagespeed_audits': json.dumps(pagespeed.get('audits', {})) if pagespeed else None,
# On-page SEO
'meta_title': meta_tags.get('title', '')[:500] if meta_tags.get('title') else None,
'meta_description': meta_tags.get('description'),
'meta_keywords': meta_tags.get('keywords'),
'h1_count': headings.get('h1_count'),
'h2_count': headings.get('h2_count'),
'h3_count': headings.get('h3_count'),
'h1_text': h1_text[:500] if h1_text else None,
'total_images': images.get('total_images'),
'images_without_alt': images.get('images_without_alt'),
'images_with_alt': images.get('images_with_alt'),
'internal_links_count': links.get('internal_links'),
'external_links_count': links.get('external_links'),
'broken_links_count': links.get('broken_links'), # May be None if not checked
'has_structured_data': structured_data.get('has_structured_data', False),
'structured_data_types': structured_data.get('all_types', []),
'structured_data_json': json.dumps(structured_data.get('json_ld_data', [])) if structured_data.get('json_ld_data') else None,
# Technical SEO
'has_canonical': canonical.get('has_canonical', False),
'canonical_url': canonical.get('canonical_url', '')[:500] if canonical.get('canonical_url') else None,
'is_indexable': indexability.get('is_indexable', True),
'noindex_reason': indexability.get('noindex_source'),
'has_sitemap': sitemap.get('exists', False),
'has_robots_txt': robots.get('exists', False),
# Viewport and mobile-friendliness derived from meta_tags
'viewport_configured': bool(meta_tags.get('viewport')),
'is_mobile_friendly': 'width=device-width' in (meta_tags.get('viewport') or '').lower(),
# Core Web Vitals
'largest_contentful_paint_ms': cwv.get('lcp_ms'),
'first_input_delay_ms': cwv.get('fid_ms'),
'cumulative_layout_shift': cwv.get('cls'),
# Open Graph
'has_og_tags': bool(og.get('og_title')),
'og_title': og.get('og_title', '')[:500] if og.get('og_title') else None,
'og_description': og.get('og_description'),
'og_image': og.get('og_image', '')[:500] if og.get('og_image') else None,
'has_twitter_cards': bool(tc.get('card_type')),
# Language & International
'html_lang': onpage.get('lang_attribute', '')[:10] if onpage.get('lang_attribute') else None,
'has_hreflang': onpage.get('has_hreflang', False), # Detected by analyzer if present
# Word count
'word_count_homepage': onpage.get('word_count'),
# Audit metadata
'seo_audit_version': result.get('audit_version'),
'seo_audited_at': result['audit_date'],
'seo_audit_errors': result.get('errors', []),
'seo_overall_score': result.get('scores', {}).get('overall_seo'),
'seo_health_score': self._calculate_onpage_score(onpage) if onpage else None,
'seo_issues': json.dumps(issues) if issues else None,
})
session.commit()
logger.info(f" Saved SEO audit for company {company_id}")
return True
except Exception as e:
logger.error(f"Failed to save audit result for company {result.get('company_id')}: {e}")
return False
def run_audit(self, company_ids: Optional[List[int]] = None,
batch_start: Optional[int] = None,
batch_end: Optional[int] = None,
dry_run: bool = False) -> Dict[str, Any]:
"""
Run SEO audit for specified companies.
Args:
company_ids: List of specific company IDs to audit.
batch_start: Start index for batch processing.
batch_end: End index for batch processing.
dry_run: If True, print results without saving to database.
Returns:
Summary dict with success/failed counts and results.
"""
start_time = time_module.time()
companies = self.get_companies(company_ids, batch_start, batch_end)
if not companies:
logger.warning("No companies found matching the specified criteria")
return {
'total': 0,
'success': 0,
'failed': 0,
'skipped': 0,
'no_website': 0,
'unavailable': 0,
'timeout': 0,
'quota_remaining': self.pagespeed_client.get_remaining_quota(),
'duration_seconds': 0,
'results': [],
}
summary = {
'total': len(companies),
'success': 0,
'failed': 0,
'skipped': 0,
'no_website': 0, # Companies without website URL
'unavailable': 0, # Websites that returned 4xx/5xx
'timeout': 0, # Websites that timed out
'ssl_errors': 0, # SSL certificate issues
'connection_errors': 0, # Connection refused/DNS errors
'quota_exceeded': False,
'quota_remaining': self.pagespeed_client.get_remaining_quota(),
'quota_start': self.pagespeed_client.get_remaining_quota(),
'results': [],
}
logger.info("=" * 60)
logger.info(f"SEO AUDIT STARTING")
logger.info("=" * 60)
logger.info(f"Companies to audit: {len(companies)}")
logger.info(f"Mode: {'DRY RUN (no database writes)' if dry_run else 'LIVE'}")
logger.info(f"PageSpeed API quota remaining: {summary['quota_remaining']}")
logger.info("=" * 60)
for i, company in enumerate(companies, 1):
# Progress estimation
elapsed = time_module.time() - start_time
if i > 1:
avg_time_per_company = elapsed / (i - 1)
remaining_companies = len(companies) - i + 1
eta_seconds = avg_time_per_company * remaining_companies
eta_str = str(timedelta(seconds=int(eta_seconds)))
else:
eta_str = "calculating..."
logger.info("")
logger.info(f"[{i}/{len(companies)}] {company['name']} (ID: {company['id']}) - ETA: {eta_str}")
# Check for quota before proceeding
current_quota = self.pagespeed_client.get_remaining_quota()
if current_quota <= 0:
logger.warning(f" PageSpeed quota exhausted, skipping PageSpeed analysis")
summary['quota_exceeded'] = True
try:
result = self.audit_company(company)
# Categorize the result based on errors
result_status = self._categorize_result(result)
if result_status == 'no_website':
summary['no_website'] += 1
summary['skipped'] += 1
logger.info(f" → SKIPPED: No website URL configured")
elif result_status == 'unavailable':
summary['unavailable'] += 1
summary['failed'] += 1
logger.warning(f" → UNAVAILABLE: HTTP {result.get('http_status')}")
elif result_status == 'timeout':
summary['timeout'] += 1
summary['failed'] += 1
logger.warning(f" → TIMEOUT: Website did not respond")
elif result_status == 'ssl_error':
summary['ssl_errors'] += 1
# Still count as success if we got data via HTTP fallback
if result.get('onpage'):
summary['success'] += 1
logger.info(f" → SUCCESS (with SSL warning)")
else:
summary['failed'] += 1
logger.warning(f" → FAILED: SSL error, no fallback data")
elif result_status == 'connection_error':
summary['connection_errors'] += 1
summary['failed'] += 1
logger.warning(f" → FAILED: Connection error")
else:
summary['success'] += 1
score = result.get('scores', {}).get('overall_seo')
logger.info(f" → SUCCESS: Overall SEO score: {score}")
# Save to database or print in dry-run mode
if not dry_run:
if result_status not in ('no_website',):
if self.save_audit_result(result):
logger.debug(f" Saved to database")
else:
logger.error(f" Failed to save to database")
else:
self._print_dry_run_result(company, result)
# Build result entry
summary['results'].append({
'company_id': company['id'],
'company_name': company['name'],
'status': result_status,
'overall_score': result.get('scores', {}).get('overall_seo'),
'pagespeed_seo': result.get('scores', {}).get('pagespeed_seo'),
'http_status': result.get('http_status'),
'load_time_ms': result.get('load_time_ms'),
'errors_count': len(result.get('errors', [])),
'errors': result.get('errors', []),
})
except QuotaExceededError:
logger.error(f" PageSpeed API quota exceeded!")
summary['quota_exceeded'] = True
summary['skipped'] += 1
summary['results'].append({
'company_id': company['id'],
'company_name': company['name'],
'status': 'quota_exceeded',
'error': 'PageSpeed API quota exceeded',
})
except Exception as e:
logger.error(f" Unexpected error: {e}")
summary['failed'] += 1
summary['results'].append({
'company_id': company['id'],
'company_name': company['name'],
'status': 'error',
'error': str(e),
})
# Final summary
summary['quota_remaining'] = self.pagespeed_client.get_remaining_quota()
summary['quota_used'] = summary['quota_start'] - summary['quota_remaining']
summary['duration_seconds'] = int(time_module.time() - start_time)
return summary
def _categorize_result(self, result: Dict[str, Any]) -> str:
"""
Categorize audit result based on errors encountered.
Returns one of: 'success', 'no_website', 'unavailable', 'timeout',
'ssl_error', 'connection_error', 'error'
"""
errors = result.get('errors', [])
error_text = ' '.join(errors).lower()
# No website URL
if 'no website url' in error_text:
return 'no_website'
# Timeout
if 'timeout' in error_text:
return 'timeout'
# Connection errors
if 'connection error' in error_text or 'connection refused' in error_text:
return 'connection_error'
# SSL errors (without successful fallback)
if 'ssl error' in error_text:
return 'ssl_error'
# HTTP errors (4xx, 5xx)
http_status = result.get('http_status')
if http_status and http_status >= 400:
return 'unavailable'
# If we have errors but also have data, it's partial success
if errors and not result.get('onpage') and not result.get('technical'):
return 'error'
return 'success'
def _print_dry_run_result(self, company: Dict, result: Dict[str, Any]) -> None:
"""Print formatted result in dry-run mode."""
print("\n" + "-" * 60)
print(f"Company: {company['name']} (ID: {company['id']})")
print(f"Website: {result.get('website_url') or 'Not configured'}")
if result.get('http_status'):
print(f"HTTP Status: {result.get('http_status')}")
if result.get('load_time_ms'):
print(f"Load Time: {result.get('load_time_ms')}ms")
if result.get('final_url') and result.get('final_url') != result.get('website_url'):
print(f"Final URL (after redirects): {result.get('final_url')}")
scores = result.get('scores', {})
if any(scores.values()):
print(f"\nScores:")
if scores.get('overall_seo') is not None:
print(f" Overall SEO: {scores.get('overall_seo')}")
if scores.get('pagespeed_seo') is not None:
print(f" PageSpeed SEO: {scores.get('pagespeed_seo')}")
if scores.get('pagespeed_performance') is not None:
print(f" PageSpeed Performance: {scores.get('pagespeed_performance')}")
if scores.get('pagespeed_accessibility') is not None:
print(f" PageSpeed Accessibility: {scores.get('pagespeed_accessibility')}")
if scores.get('pagespeed_best_practices') is not None:
print(f" PageSpeed Best Practices: {scores.get('pagespeed_best_practices')}")
# On-page summary
onpage = result.get('onpage', {})
if onpage:
print(f"\nOn-Page SEO:")
meta = onpage.get('meta_tags', {})
if meta.get('title'):
print(f" Title: {meta.get('title')[:60]}...")
headings = onpage.get('headings', {})
print(f" H1 count: {headings.get('h1_count', 0)}")
images = onpage.get('images', {})
if images.get('total_images'):
print(f" Images: {images.get('total_images')} total, {images.get('images_without_alt', 0)} missing alt")
structured = onpage.get('structured_data', {})
print(f" Structured Data: {'Yes' if structured.get('has_structured_data') else 'No'}")
# Technical SEO summary
technical = result.get('technical', {})
if technical:
print(f"\nTechnical SEO:")
robots = technical.get('robots_txt', {})
print(f" robots.txt: {'Yes' if robots.get('exists') else 'No'}")
sitemap = technical.get('sitemap', {})
print(f" sitemap.xml: {'Yes' if sitemap.get('exists') else 'No'}")
indexability = technical.get('indexability', {})
print(f" Indexable: {'Yes' if indexability.get('is_indexable', True) else 'No'}")
if result.get('errors'):
print(f"\nIssues ({len(result['errors'])}):")
for err in result['errors'][:5]: # Show first 5 errors
print(f"{err}")
if len(result['errors']) > 5:
print(f" ... and {len(result['errors']) - 5} more")
print("-" * 60)
def parse_batch_argument(batch_str: str) -> Tuple[int, int]:
"""
Parse batch argument in format 'START-END'.
Args:
batch_str: String like '1-10' or '5-20'
Returns:
Tuple of (start, end) integers
Raises:
ValueError: If format is invalid
"""
if '-' not in batch_str:
raise ValueError(f"Invalid batch format '{batch_str}'. Use START-END (e.g., 1-10)")
parts = batch_str.split('-')
if len(parts) != 2:
raise ValueError(f"Invalid batch format '{batch_str}'. Use START-END (e.g., 1-10)")
try:
start = int(parts[0].strip())
end = int(parts[1].strip())
except ValueError:
raise ValueError(f"Invalid batch values '{batch_str}'. START and END must be numbers")
if start < 1:
raise ValueError(f"Invalid batch start '{start}'. Must be >= 1")
if end < start:
raise ValueError(f"Invalid batch range '{start}-{end}'. END must be >= START")
return start, end
def print_summary(summary: Dict[str, Any], dry_run: bool = False) -> None:
"""Print formatted audit summary."""
duration = summary.get('duration_seconds', 0)
duration_str = str(timedelta(seconds=duration))
print("\n")
print("=" * 70)
print(" SEO AUDIT COMPLETE")
print("=" * 70)
print("")
print(f" Mode: {'DRY RUN' if dry_run else 'LIVE'}")
print(f" Duration: {duration_str}")
print("")
print("-" * 70)
print(" RESULTS BREAKDOWN")
print("-" * 70)
print(f" Total companies: {summary['total']}")
print(f" ✓ Successful: {summary['success']}")
print(f" ✗ Failed: {summary['failed']}")
print(f" ○ Skipped: {summary['skipped']}")
print("")
# Edge case breakdown
if summary.get('no_website', 0) > 0:
print(f" - No website: {summary['no_website']}")
if summary.get('unavailable', 0) > 0:
print(f" - Unavailable: {summary['unavailable']}")
if summary.get('timeout', 0) > 0:
print(f" - Timeout: {summary['timeout']}")
if summary.get('ssl_errors', 0) > 0:
print(f" - SSL errors: {summary['ssl_errors']}")
if summary.get('connection_errors', 0) > 0:
print(f" - Connection errors: {summary['connection_errors']}")
print("")
print("-" * 70)
print(" PAGESPEED API QUOTA")
print("-" * 70)
print(f" Quota at start: {summary.get('quota_start', 'N/A')}")
print(f" Quota used: {summary.get('quota_used', 'N/A')}")
print(f" Quota remaining: {summary.get('quota_remaining', 'N/A')}")
if summary.get('quota_exceeded'):
print(" ⚠ WARNING: Quota was exceeded during this run!")
# Score distribution
results = summary.get('results', [])
scores = [r.get('overall_score') for r in results if r.get('overall_score') is not None]
if scores:
avg_score = sum(scores) / len(scores)
print("")
print("-" * 70)
print(" SEO SCORE DISTRIBUTION")
print("-" * 70)
print(f" Companies with scores: {len(scores)}")
print(f" Average SEO score: {avg_score:.1f}")
print(f" Highest score: {max(scores)}")
print(f" Lowest score: {min(scores)}")
print("")
# Score ranges with visual bars
excellent = sum(1 for s in scores if s >= 90)
good = sum(1 for s in scores if 70 <= s < 90)
fair = sum(1 for s in scores if 50 <= s < 70)
poor = sum(1 for s in scores if s < 50)
max_bar = 30
total = len(scores)
def bar(count, total, max_bar=30):
if total == 0:
return ""
width = int((count / total) * max_bar)
return "" * width + "" * (max_bar - width)
print(f" Excellent (90-100): {excellent:3d} {bar(excellent, total)}")
print(f" Good (70-89): {good:3d} {bar(good, total)}")
print(f" Fair (50-69): {fair:3d} {bar(fair, total)}")
print(f" Poor (<50): {poor:3d} {bar(poor, total)}")
# List failed companies
failed_results = [r for r in results if r.get('status') in ('unavailable', 'timeout', 'connection_error', 'error')]
if failed_results:
print("")
print("-" * 70)
print(" FAILED AUDITS")
print("-" * 70)
for r in failed_results[:10]: # Show first 10
status_icon = {
'unavailable': '🔴',
'timeout': '',
'connection_error': '🔌',
'error': '',
}.get(r['status'], '?')
errors = r.get('errors', [])
error_msg = errors[0][:50] if errors else r.get('status', 'Unknown')
print(f" {status_icon} {r['company_name'][:30]:<30} - {error_msg}")
if len(failed_results) > 10:
print(f" ... and {len(failed_results) - 10} more")
print("")
print("=" * 70)
def main():
"""Main entry point for CLI usage."""
parser = argparse.ArgumentParser(
description='SEO Audit for Norda Biznes member websites',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python seo_audit.py --company-id 26 # Audit single company
python seo_audit.py --batch 1-10 # Audit companies 1-10
python seo_audit.py --all # Audit all companies
python seo_audit.py --company-id 26 --dry-run # Test without saving
python seo_audit.py --all --json > report.json # Export to JSON
Exit codes:
0 - All audits completed successfully
1 - Argument error or invalid input
2 - Partial failures (some audits failed)
3 - All audits failed
4 - Database connection error
5 - API quota exceeded
"""
)
# Selection arguments (mutually exclusive in practice)
selection = parser.add_argument_group('Company Selection (choose one)')
selection.add_argument('--company-id', type=int, metavar='ID',
help='Audit single company by ID')
selection.add_argument('--company-ids', type=str, metavar='IDS',
help='Audit multiple companies by IDs (comma-separated, e.g., 1,5,10)')
selection.add_argument('--batch', type=str, metavar='RANGE',
help='Audit batch of companies by row offset (e.g., 1-10)')
selection.add_argument('--all', action='store_true',
help='Audit all companies')
# Options
options = parser.add_argument_group('Options')
options.add_argument('--dry-run', action='store_true',
help='Print results without saving to database')
options.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose/debug output')
options.add_argument('--quiet', '-q', action='store_true',
help='Suppress progress output (only show summary)')
options.add_argument('--json', action='store_true',
help='Output results as JSON (for scripting)')
options.add_argument('--database-url', type=str, metavar='URL',
help='Database connection URL (overrides DATABASE_URL env var)')
args = parser.parse_args()
# Configure logging level
if args.quiet:
logging.getLogger().setLevel(logging.WARNING)
elif args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Validate that at least one selection method is provided
selection_count = sum([
args.company_id is not None,
args.company_ids is not None,
args.batch is not None,
args.all
])
if selection_count == 0:
parser.print_help()
print("\n❌ Error: Please specify one of --company-id, --company-ids, --batch, or --all")
sys.exit(EXIT_ARGUMENT_ERROR)
if selection_count > 1:
print("❌ Error: Please specify only one selection method (--company-id, --company-ids, --batch, or --all)")
sys.exit(EXIT_ARGUMENT_ERROR)
# Parse batch argument if provided
batch_start, batch_end = None, None
if args.batch:
try:
batch_start, batch_end = parse_batch_argument(args.batch)
except ValueError as e:
print(f"❌ Error: {e}")
sys.exit(EXIT_ARGUMENT_ERROR)
# Parse company IDs if provided
company_ids = None
if args.company_id:
company_ids = [args.company_id]
elif args.company_ids:
try:
company_ids = [int(x.strip()) for x in args.company_ids.split(',')]
if not company_ids:
raise ValueError("Empty list")
except ValueError:
print(f"❌ Error: Invalid --company-ids format. Use comma-separated integers (e.g., 1,5,10)")
sys.exit(EXIT_ARGUMENT_ERROR)
# Determine database URL
database_url = args.database_url or DATABASE_URL
# Initialize auditor
try:
auditor = SEOAuditor(database_url=database_url)
except SQLAlchemyError as e:
logger.error(f"Failed to connect to database: {e}")
print(f"❌ Error: Database connection failed: {e}")
sys.exit(EXIT_DATABASE_ERROR)
except Exception as e:
logger.error(f"Failed to initialize auditor: {e}")
print(f"❌ Error: Failed to initialize SEO auditor: {e}")
sys.exit(EXIT_DATABASE_ERROR)
# Run audit
try:
summary = auditor.run_audit(
company_ids=company_ids,
batch_start=batch_start,
batch_end=batch_end,
dry_run=args.dry_run
)
except QuotaExceededError:
logger.error("PageSpeed API quota exceeded")
print("❌ Error: PageSpeed API quota exceeded. Try again tomorrow.")
sys.exit(EXIT_QUOTA_EXCEEDED)
except SQLAlchemyError as e:
logger.error(f"Database error during audit: {e}")
print(f"❌ Error: Database error: {e}")
sys.exit(EXIT_DATABASE_ERROR)
except Exception as e:
logger.error(f"Unexpected error during audit: {e}")
print(f"❌ Error: Unexpected error: {e}")
sys.exit(EXIT_ALL_FAILED)
# Output results
if args.json:
print(json.dumps(summary, default=str, indent=2))
else:
print_summary(summary, dry_run=args.dry_run)
# Determine exit code
if summary['total'] == 0:
logger.warning("No companies found to audit")
sys.exit(EXIT_ARGUMENT_ERROR)
elif summary.get('quota_exceeded'):
sys.exit(EXIT_QUOTA_EXCEEDED)
elif summary['failed'] == summary['total'] - summary['skipped']:
sys.exit(EXIT_ALL_FAILED)
elif summary['failed'] > 0:
sys.exit(EXIT_PARTIAL_FAILURES)
else:
sys.exit(EXIT_SUCCESS)
if __name__ == '__main__':
main()