#!/usr/bin/env python3 """ Google PageSpeed Insights API Client ===================================== Client for interacting with Google PageSpeed Insights API with built-in: - Rate limiting (25,000 requests/day free tier) - Exponential backoff retry logic - Comprehensive error handling Usage: from pagespeed_client import GooglePageSpeedClient client = GooglePageSpeedClient() result = client.analyze_url('https://example.com') Author: Maciej Pienczyn, InPi sp. z o.o. Date: 2026-01-08 """ import os import json import time import logging from datetime import datetime, date from pathlib import Path from typing import Optional, Dict, Any, List from dataclasses import dataclass, field, asdict from enum import Enum import requests # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # API Configuration PAGESPEED_API_URL = 'https://www.googleapis.com/pagespeedonline/v5/runPagespeed' PAGESPEED_API_KEY = os.getenv('GOOGLE_PAGESPEED_API_KEY', '') # Rate limiting configuration DAILY_QUOTA_LIMIT = 25000 # Free tier limit REQUESTS_PER_MINUTE = 60 # Conservative limit to avoid bursts MIN_REQUEST_INTERVAL = 1.0 # Minimum seconds between requests # Retry configuration MAX_RETRIES = 3 INITIAL_BACKOFF = 1.0 # Initial backoff in seconds MAX_BACKOFF = 60.0 # Maximum backoff in seconds BACKOFF_MULTIPLIER = 2.0 # Request configuration REQUEST_TIMEOUT = 60 # PageSpeed analysis can take a while USER_AGENT = 'NordaBiznes-SEO-Auditor/1.0' class Strategy(Enum): """PageSpeed analysis strategy (device type).""" MOBILE = 'mobile' DESKTOP = 'desktop' class Category(Enum): """PageSpeed Lighthouse audit categories.""" PERFORMANCE = 'performance' ACCESSIBILITY = 'accessibility' BEST_PRACTICES = 'best-practices' SEO = 'seo' @dataclass class PageSpeedScore: """Container for PageSpeed Lighthouse scores.""" performance: Optional[int] = None accessibility: Optional[int] = None best_practices: Optional[int] = None seo: Optional[int] = None def to_dict(self) -> Dict[str, Optional[int]]: return asdict(self) @dataclass class CoreWebVitals: """Core Web Vitals metrics from PageSpeed.""" lcp_ms: Optional[int] = None # Largest Contentful Paint inp_ms: Optional[int] = None # Interaction to Next Paint (replaced FID March 2024) cls: Optional[float] = None # Cumulative Layout Shift fcp_ms: Optional[int] = None # First Contentful Paint ttfb_ms: Optional[int] = None # Time to First Byte def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class PageSpeedResult: """Complete PageSpeed analysis result.""" url: str final_url: str strategy: str analyzed_at: datetime scores: PageSpeedScore core_web_vitals: CoreWebVitals audits: Dict[str, Any] = field(default_factory=dict) lighthouse_version: Optional[str] = None fetch_time_ms: Optional[int] = None error: Optional[str] = None def to_dict(self) -> Dict[str, Any]: result = { 'url': self.url, 'final_url': self.final_url, 'strategy': self.strategy, 'analyzed_at': self.analyzed_at.isoformat() if self.analyzed_at else None, 'scores': self.scores.to_dict(), 'core_web_vitals': self.core_web_vitals.to_dict(), 'audits': self.audits, 'lighthouse_version': self.lighthouse_version, 'fetch_time_ms': self.fetch_time_ms, 'error': self.error, } return result class RateLimiter: """ Simple rate limiter with daily quota tracking. Persists quota usage to a JSON file to track usage across script runs. """ def __init__(self, daily_limit: int = DAILY_QUOTA_LIMIT, min_interval: float = MIN_REQUEST_INTERVAL, quota_file: Optional[str] = None): self.daily_limit = daily_limit self.min_interval = min_interval self.last_request_time: Optional[float] = None # Quota persistence file if quota_file: self.quota_file = Path(quota_file) else: # Default to /tmp (writable by any user, resets on reboot which is fine for daily quota) self.quota_file = Path('/tmp/.pagespeed_quota.json') self._load_quota() def _load_quota(self) -> None: """Load quota usage from persistent storage.""" self.today = date.today().isoformat() self.requests_today = 0 if self.quota_file.exists(): try: with open(self.quota_file, 'r') as f: data = json.load(f) if data.get('date') == self.today: self.requests_today = data.get('requests', 0) else: # New day, reset counter self._save_quota() except (json.JSONDecodeError, IOError) as e: logger.warning(f"Failed to load quota file: {e}") self._save_quota() else: self._save_quota() def _save_quota(self) -> None: """Persist quota usage to file.""" try: with open(self.quota_file, 'w') as f: json.dump({ 'date': self.today, 'requests': self.requests_today, 'limit': self.daily_limit, }, f) except IOError as e: logger.warning(f"Failed to save quota file: {e}") def can_make_request(self) -> bool: """Check if we can make another request.""" # Check daily quota if self.requests_today >= self.daily_limit: return False return True def wait_if_needed(self) -> None: """Wait if necessary to respect rate limits.""" if self.last_request_time is not None: elapsed = time.time() - self.last_request_time if elapsed < self.min_interval: sleep_time = self.min_interval - elapsed logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s") time.sleep(sleep_time) def record_request(self) -> None: """Record that a request was made.""" self.last_request_time = time.time() self.requests_today += 1 # Reset date if it's a new day today = date.today().isoformat() if today != self.today: self.today = today self.requests_today = 1 self._save_quota() logger.debug(f"Quota: {self.requests_today}/{self.daily_limit} requests today") def get_remaining_quota(self) -> int: """Get remaining requests for today.""" return max(0, self.daily_limit - self.requests_today) def get_usage_stats(self) -> Dict[str, Any]: """Get current usage statistics.""" return { 'date': self.today, 'requests_today': self.requests_today, 'daily_limit': self.daily_limit, 'remaining': self.get_remaining_quota(), 'usage_percent': round(self.requests_today / self.daily_limit * 100, 1), } class PageSpeedAPIError(Exception): """Base exception for PageSpeed API errors.""" pass class QuotaExceededError(PageSpeedAPIError): """Raised when daily quota is exceeded.""" pass class RateLimitError(PageSpeedAPIError): """Raised when API returns 429 Too Many Requests.""" pass class GooglePageSpeedClient: """ Client for Google PageSpeed Insights API. Features: - Rate limiting with daily quota tracking - Exponential backoff retry for transient errors - Comprehensive error handling - Support for both mobile and desktop analysis Usage: client = GooglePageSpeedClient() # Analyze a single URL result = client.analyze_url('https://example.com') # Analyze with both mobile and desktop results = client.analyze_url_both_strategies('https://example.com') # Check quota before batch processing if client.get_remaining_quota() >= 80: # Process all 80 companies pass """ def __init__(self, api_key: Optional[str] = None, rate_limiter: Optional[RateLimiter] = None): """ Initialize PageSpeed client. Args: api_key: Google PageSpeed API key. If not provided, uses GOOGLE_PAGESPEED_API_KEY environment variable. rate_limiter: Optional custom rate limiter instance. """ self.api_key = api_key or PAGESPEED_API_KEY if not self.api_key: logger.warning( "No API key provided. PageSpeed API will work but with " "stricter rate limits. Set GOOGLE_PAGESPEED_API_KEY env var." ) self.rate_limiter = rate_limiter or RateLimiter() self.session = requests.Session() self.session.headers.update({'User-Agent': USER_AGENT}) def analyze_url(self, url: str, strategy: Strategy = Strategy.MOBILE, categories: Optional[List[Category]] = None) -> PageSpeedResult: """ Analyze a URL using PageSpeed Insights API. Args: url: The URL to analyze. strategy: Device strategy (mobile or desktop). categories: List of categories to analyze. Defaults to all. Returns: PageSpeedResult with scores and audit details. Raises: QuotaExceededError: If daily quota is exhausted. PageSpeedAPIError: For other API errors. """ # Check quota before making request if not self.rate_limiter.can_make_request(): raise QuotaExceededError( f"Daily quota of {self.rate_limiter.daily_limit} requests exceeded. " f"Try again tomorrow or use a different API key." ) # Default to all categories if categories is None: categories = list(Category) # Build request parameters params = { 'url': url, 'strategy': strategy.value, 'category': [cat.value for cat in categories], } if self.api_key: params['key'] = self.api_key # Wait for rate limit self.rate_limiter.wait_if_needed() # Make request with retry logic response = self._make_request_with_retry(params) # Record successful request self.rate_limiter.record_request() # Parse response return self._parse_response(response, url, strategy) def analyze_url_both_strategies(self, url: str, categories: Optional[List[Category]] = None ) -> Dict[str, PageSpeedResult]: """ Analyze URL for both mobile and desktop strategies. Args: url: The URL to analyze. categories: List of categories to analyze. Returns: Dict with 'mobile' and 'desktop' PageSpeedResult. """ results = {} for strategy in [Strategy.MOBILE, Strategy.DESKTOP]: try: results[strategy.value] = self.analyze_url(url, strategy, categories) except PageSpeedAPIError as e: logger.error(f"Failed to analyze {url} ({strategy.value}): {e}") results[strategy.value] = PageSpeedResult( url=url, final_url=url, strategy=strategy.value, analyzed_at=datetime.now(), scores=PageSpeedScore(), core_web_vitals=CoreWebVitals(), error=str(e), ) return results def _make_request_with_retry(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Make API request with exponential backoff retry. Retries on: - 429 Too Many Requests - 5xx Server Errors - Connection errors Args: params: Request parameters. Returns: Parsed JSON response. Raises: PageSpeedAPIError: If all retries fail. """ last_error: Optional[Exception] = None backoff = INITIAL_BACKOFF for attempt in range(MAX_RETRIES + 1): try: logger.debug(f"API request attempt {attempt + 1}/{MAX_RETRIES + 1}") response = self.session.get( PAGESPEED_API_URL, params=params, timeout=REQUEST_TIMEOUT, ) # Handle rate limiting (429) if response.status_code == 429: retry_after = response.headers.get('Retry-After', backoff) try: retry_after = float(retry_after) except ValueError: retry_after = backoff if attempt < MAX_RETRIES: logger.warning( f"Rate limited (429). Retrying in {retry_after}s " f"(attempt {attempt + 1}/{MAX_RETRIES + 1})" ) time.sleep(retry_after) backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF) continue else: raise RateLimitError( f"Rate limited after {MAX_RETRIES + 1} attempts" ) # Handle server errors (5xx) if response.status_code >= 500: if attempt < MAX_RETRIES: logger.warning( f"Server error ({response.status_code}). " f"Retrying in {backoff}s " f"(attempt {attempt + 1}/{MAX_RETRIES + 1})" ) time.sleep(backoff) backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF) continue else: raise PageSpeedAPIError( f"Server error {response.status_code} after " f"{MAX_RETRIES + 1} attempts" ) # Handle client errors (4xx except 429) if response.status_code >= 400: error_data = response.json().get('error', {}) error_message = error_data.get('message', response.text) raise PageSpeedAPIError( f"API error {response.status_code}: {error_message}" ) # Success return response.json() except requests.exceptions.Timeout: last_error = PageSpeedAPIError( f"Request timed out after {REQUEST_TIMEOUT}s" ) if attempt < MAX_RETRIES: logger.warning( f"Request timeout. Retrying in {backoff}s " f"(attempt {attempt + 1}/{MAX_RETRIES + 1})" ) time.sleep(backoff) backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF) continue except requests.exceptions.ConnectionError as e: last_error = PageSpeedAPIError(f"Connection error: {e}") if attempt < MAX_RETRIES: logger.warning( f"Connection error. Retrying in {backoff}s " f"(attempt {attempt + 1}/{MAX_RETRIES + 1})" ) time.sleep(backoff) backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF) continue except requests.exceptions.RequestException as e: last_error = PageSpeedAPIError(f"Request failed: {e}") if attempt < MAX_RETRIES: logger.warning( f"Request error. Retrying in {backoff}s " f"(attempt {attempt + 1}/{MAX_RETRIES + 1})" ) time.sleep(backoff) backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF) continue # All retries exhausted raise last_error or PageSpeedAPIError("Request failed after all retries") def _parse_response(self, data: Dict[str, Any], original_url: str, strategy: Strategy) -> PageSpeedResult: """ Parse PageSpeed API response into structured result. Args: data: Raw API response. original_url: The URL that was analyzed. strategy: The analysis strategy used. Returns: PageSpeedResult with parsed data. """ lighthouse = data.get('lighthouseResult', {}) # Extract scores (0-1 float -> 0-100 int) categories = lighthouse.get('categories', {}) scores = PageSpeedScore( performance=self._extract_score(categories.get('performance')), accessibility=self._extract_score(categories.get('accessibility')), best_practices=self._extract_score(categories.get('best-practices')), seo=self._extract_score(categories.get('seo')), ) # Extract Core Web Vitals audits = lighthouse.get('audits', {}) core_web_vitals = CoreWebVitals( lcp_ms=self._extract_metric_ms(audits.get('largest-contentful-paint')), inp_ms=self._extract_metric_ms(audits.get('interaction-to-next-paint') or audits.get('max-potential-fid')), cls=self._extract_cls(audits.get('cumulative-layout-shift')), fcp_ms=self._extract_metric_ms(audits.get('first-contentful-paint')), ttfb_ms=self._extract_metric_ms(audits.get('server-response-time')), ) # Extract relevant audits for SEO seo_audits = self._extract_seo_audits(audits) # Get timing info timing = lighthouse.get('timing', {}) fetch_time = timing.get('total') return PageSpeedResult( url=original_url, final_url=lighthouse.get('finalUrl', original_url), strategy=strategy.value, analyzed_at=datetime.now(), scores=scores, core_web_vitals=core_web_vitals, audits=seo_audits, lighthouse_version=lighthouse.get('lighthouseVersion'), fetch_time_ms=int(fetch_time) if fetch_time else None, ) def _extract_score(self, category_data: Optional[Dict]) -> Optional[int]: """Extract score from category data (0-1 float -> 0-100 int).""" if not category_data: return None score = category_data.get('score') if score is not None: return int(round(score * 100)) return None def _extract_metric_ms(self, audit_data: Optional[Dict]) -> Optional[int]: """Extract metric value in milliseconds.""" if not audit_data: return None value = audit_data.get('numericValue') if value is not None: return int(round(value)) return None def _extract_cls(self, audit_data: Optional[Dict]) -> Optional[float]: """Extract Cumulative Layout Shift value.""" if not audit_data: return None value = audit_data.get('numericValue') if value is not None: return round(value, 3) return None def _extract_seo_audits(self, audits: Dict[str, Any]) -> Dict[str, Any]: """ Extract SEO-relevant audits from Lighthouse results. Returns a dict with audit results organized by category. """ seo_audits = { 'meta': {}, 'crawlability': {}, 'content': {}, 'mobile': {}, 'performance': {}, } # Meta tags meta_audits = [ 'document-title', 'meta-description', 'viewport', 'hreflang', 'canonical', 'robots-txt', ] for audit_id in meta_audits: if audit_id in audits: audit = audits[audit_id] seo_audits['meta'][audit_id] = { 'score': audit.get('score'), 'title': audit.get('title'), 'description': audit.get('description'), } # Crawlability crawl_audits = [ 'is-crawlable', 'http-status-code', 'link-text', 'crawlable-anchors', ] for audit_id in crawl_audits: if audit_id in audits: audit = audits[audit_id] seo_audits['crawlability'][audit_id] = { 'score': audit.get('score'), 'title': audit.get('title'), } # Content content_audits = [ 'image-alt', 'structured-data', 'font-size', 'tap-targets', ] for audit_id in content_audits: if audit_id in audits: audit = audits[audit_id] seo_audits['content'][audit_id] = { 'score': audit.get('score'), 'title': audit.get('title'), } # Mobile mobile_audits = [ 'viewport', 'content-width', ] for audit_id in mobile_audits: if audit_id in audits: audit = audits[audit_id] seo_audits['mobile'][audit_id] = { 'score': audit.get('score'), 'title': audit.get('title'), } # Performance (affects SEO) perf_audits = [ 'speed-index', 'interactive', 'total-blocking-time', ] for audit_id in perf_audits: if audit_id in audits: audit = audits[audit_id] seo_audits['performance'][audit_id] = { 'score': audit.get('score'), 'numericValue': audit.get('numericValue'), 'displayValue': audit.get('displayValue'), } return seo_audits def get_remaining_quota(self) -> int: """Get remaining API requests for today.""" return self.rate_limiter.get_remaining_quota() def get_usage_stats(self) -> Dict[str, Any]: """Get API usage statistics.""" return self.rate_limiter.get_usage_stats() # Convenience function for simple usage def analyze_url(url: str, strategy: str = 'mobile') -> Dict[str, Any]: """ Convenience function to analyze a URL. Args: url: The URL to analyze. strategy: 'mobile' or 'desktop'. Returns: Dict with analysis results. """ client = GooglePageSpeedClient() strat = Strategy.MOBILE if strategy == 'mobile' else Strategy.DESKTOP result = client.analyze_url(url, strat) return result.to_dict() if __name__ == '__main__': # Quick test import sys if len(sys.argv) < 2: print("Usage: python pagespeed_client.py ") print("Example: python pagespeed_client.py https://pixlab.pl") sys.exit(1) test_url = sys.argv[1] print(f"Analyzing: {test_url}") print("-" * 60) client = GooglePageSpeedClient() print(f"API Key: {'Set' if client.api_key else 'Not set (using public API)'}") print(f"Remaining quota: {client.get_remaining_quota()}") print("-" * 60) try: result = client.analyze_url(test_url) print(f"URL: {result.url}") print(f"Final URL: {result.final_url}") print(f"Strategy: {result.strategy}") print(f"Analyzed at: {result.analyzed_at}") print() print("Scores:") print(f" Performance: {result.scores.performance}") print(f" Accessibility: {result.scores.accessibility}") print(f" Best Practices: {result.scores.best_practices}") print(f" SEO: {result.scores.seo}") print() print("Core Web Vitals:") print(f" LCP: {result.core_web_vitals.lcp_ms}ms") print(f" FCP: {result.core_web_vitals.fcp_ms}ms") print(f" CLS: {result.core_web_vitals.cls}") print(f" TTFB: {result.core_web_vitals.ttfb_ms}ms") print() print(f"Lighthouse version: {result.lighthouse_version}") print(f"Fetch time: {result.fetch_time_ms}ms") print() print(f"Remaining quota: {client.get_remaining_quota()}") except QuotaExceededError as e: print(f"ERROR: Quota exceeded - {e}") sys.exit(1) except PageSpeedAPIError as e: print(f"ERROR: API error - {e}") sys.exit(1)