""" Google Search Console API Client ================================= Uses OAuth 2.0 to fetch search analytics data (clicks, impressions, CTR, positions). API docs: https://developers.google.com/webmaster-tools/v3/searchanalytics """ import logging from datetime import datetime, timedelta from typing import Dict, List, Optional import requests logger = logging.getLogger(__name__) class SearchConsoleService: """Google Search Console API client.""" BASE_URL = "https://www.googleapis.com/webmasters/v3" def __init__(self, access_token: str): self.session = requests.Session() self.session.headers.update({ 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json', }) self.session.timeout = 15 def list_sites(self) -> List[Dict]: """List verified sites in Search Console.""" try: resp = self.session.get(f"{self.BASE_URL}/sites") resp.raise_for_status() return resp.json().get('siteEntry', []) except Exception as e: logger.error(f"Search Console list_sites failed: {e}") return [] def _normalize_site_url(self, url: str) -> Optional[str]: """Try to find matching site URL in Search Console. Search Console uses exact URL format — with/without trailing slash, http/https, www/no-www. Try common variants. """ sites = self.list_sites() site_urls = [s.get('siteUrl', '') for s in sites] # Direct match if url in site_urls: return url # Try variants variants = [url] if not url.endswith('/'): variants.append(url + '/') if url.startswith('https://'): variants.append(url.replace('https://', 'http://')) if url.startswith('http://'): variants.append(url.replace('http://', 'https://')) # www variants for v in list(variants): if '://www.' in v: variants.append(v.replace('://www.', '://')) else: variants.append(v.replace('://', '://www.')) # Domain property variant (sc-domain:example.com) from urllib.parse import urlparse parsed = urlparse(url) domain = parsed.hostname or '' if domain.startswith('www.'): domain = domain[4:] if domain: variants.append(f'sc-domain:{domain}') for v in variants: if v in site_urls: return v logger.debug(f"No match for {url}. Available sites: {site_urls}") return None def get_search_analytics(self, site_url: str, days: int = 28) -> Dict: """Get search analytics for a site. Returns: Dict with keys: clicks, impressions, ctr, position, top_queries (list), top_pages (list), period_days """ normalized = self._normalize_site_url(site_url) if not normalized: logger.warning(f"Site {site_url} not found in Search Console") return {} end_date = datetime.now() - timedelta(days=3) # SC data has ~3 day delay start_date = end_date - timedelta(days=days) try: # Totals resp = self.session.post( f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query", json={ 'startDate': start_date.strftime('%Y-%m-%d'), 'endDate': end_date.strftime('%Y-%m-%d'), 'dimensions': [], } ) resp.raise_for_status() rows = resp.json().get('rows', []) totals = rows[0] if rows else {} result = { 'clicks': totals.get('clicks', 0), 'impressions': totals.get('impressions', 0), 'ctr': round(totals.get('ctr', 0) * 100, 2), 'position': round(totals.get('position', 0), 1), 'period_days': days, } # Top queries resp_q = self.session.post( f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query", json={ 'startDate': start_date.strftime('%Y-%m-%d'), 'endDate': end_date.strftime('%Y-%m-%d'), 'dimensions': ['query'], 'rowLimit': 10, } ) if resp_q.status_code == 200: result['top_queries'] = [ { 'query': r['keys'][0], 'clicks': r.get('clicks', 0), 'impressions': r.get('impressions', 0), 'ctr': round(r.get('ctr', 0) * 100, 2), 'position': round(r.get('position', 0), 1), } for r in resp_q.json().get('rows', []) ] # Top pages resp_p = self.session.post( f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query", json={ 'startDate': start_date.strftime('%Y-%m-%d'), 'endDate': end_date.strftime('%Y-%m-%d'), 'dimensions': ['page'], 'rowLimit': 10, } ) if resp_p.status_code == 200: result['top_pages'] = [ {'page': r['keys'][0], 'clicks': r.get('clicks', 0), 'impressions': r.get('impressions', 0)} for r in resp_p.json().get('rows', []) ] return result except Exception as e: logger.error(f"Search Console analytics failed for {site_url}: {e}") return {} def get_device_breakdown(self, site_url: str, days: int = 28) -> Dict: """Get clicks/impressions breakdown by device type. Returns: Dict like {'desktop': {'clicks': N, 'impressions': N}, 'mobile': {...}, 'tablet': {...}} """ normalized = self._normalize_site_url(site_url) if not normalized: return {} end_date = datetime.now() - timedelta(days=3) start_date = end_date - timedelta(days=days) try: resp = self.session.post( f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query", json={ 'startDate': start_date.strftime('%Y-%m-%d'), 'endDate': end_date.strftime('%Y-%m-%d'), 'dimensions': ['device'], } ) if resp.status_code != 200: return {} result = {} for row in resp.json().get('rows', []): device = row['keys'][0].lower() result[device] = { 'clicks': row.get('clicks', 0), 'impressions': row.get('impressions', 0), 'ctr': round(row.get('ctr', 0) * 100, 2), 'position': round(row.get('position', 0), 1), } return result except Exception as e: logger.warning(f"Device breakdown failed for {site_url}: {e}") return {} def get_country_breakdown(self, site_url: str, days: int = 28) -> List[Dict]: """Get top countries by clicks/impressions. Returns: List of dicts: [{'country': 'POL', 'clicks': N, 'impressions': N}, ...] """ normalized = self._normalize_site_url(site_url) if not normalized: return [] end_date = datetime.now() - timedelta(days=3) start_date = end_date - timedelta(days=days) try: resp = self.session.post( f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query", json={ 'startDate': start_date.strftime('%Y-%m-%d'), 'endDate': end_date.strftime('%Y-%m-%d'), 'dimensions': ['country'], 'rowLimit': 10, } ) if resp.status_code != 200: return [] return [ { 'country': row['keys'][0], 'clicks': row.get('clicks', 0), 'impressions': row.get('impressions', 0), } for row in resp.json().get('rows', []) ] except Exception as e: logger.warning(f"Country breakdown failed for {site_url}: {e}") return [] def get_search_type_breakdown(self, site_url: str, days: int = 28) -> Dict: """Get breakdown by search type (web, image, video, news). Returns: Dict like {'web': {'clicks': N, 'impressions': N}, 'image': {...}, ...} """ normalized = self._normalize_site_url(site_url) if not normalized: return {} end_date = datetime.now() - timedelta(days=3) start_date = end_date - timedelta(days=days) result = {} for search_type in ['web', 'image', 'video', 'news']: try: resp = self.session.post( f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query", json={ 'startDate': start_date.strftime('%Y-%m-%d'), 'endDate': end_date.strftime('%Y-%m-%d'), 'searchType': search_type, 'dimensions': [], } ) if resp.status_code == 200: rows = resp.json().get('rows', []) if rows: row = rows[0] result[search_type] = { 'clicks': row.get('clicks', 0), 'impressions': row.get('impressions', 0), } except Exception: continue return result def get_trend_data(self, site_url: str, days: int = 28) -> Dict: """Compare current period vs previous period. Returns: Dict with current, previous values and % change: { 'clicks': {'current': N, 'previous': N, 'change_pct': float}, 'impressions': {'current': N, 'previous': N, 'change_pct': float}, 'ctr': {'current': float, 'previous': float, 'change_pct': float}, 'position': {'current': float, 'previous': float, 'change_pct': float}, } """ normalized = self._normalize_site_url(site_url) if not normalized: return {} end_date = datetime.now() - timedelta(days=3) current_start = end_date - timedelta(days=days) prev_end = current_start - timedelta(days=1) prev_start = prev_end - timedelta(days=days) def _get_totals(start, end): try: resp = self.session.post( f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query", json={ 'startDate': start.strftime('%Y-%m-%d'), 'endDate': end.strftime('%Y-%m-%d'), 'dimensions': [], } ) if resp.status_code == 200: rows = resp.json().get('rows', []) if rows: return rows[0] except Exception: pass return {} current = _get_totals(current_start, end_date) previous = _get_totals(prev_start, prev_end) if not current: return {} def _calc_change(curr_val, prev_val): if prev_val and prev_val != 0: return round((curr_val - prev_val) / abs(prev_val) * 100, 1) return None result = {} for key in ['clicks', 'impressions']: c = current.get(key, 0) p = previous.get(key, 0) result[key] = { 'current': c, 'previous': p, 'change_pct': _calc_change(c, p), } for key in ['ctr']: c = round(current.get(key, 0) * 100, 2) p = round(previous.get(key, 0) * 100, 2) result[key] = { 'current': c, 'previous': p, 'change_pct': _calc_change(c, p), } for key in ['position']: c = round(current.get(key, 0), 1) p = round(previous.get(key, 0), 1) # For position, lower is better, so invert the change change = _calc_change(c, p) result[key] = { 'current': c, 'previous': p, 'change_pct': -change if change is not None else None, } return result def inspect_url(self, site_url: str, page_url: str) -> Dict: """Inspect a URL's indexing status using URL Inspection API. Requires 'webmasters' scope (not readonly). Args: site_url: The site property URL (as registered in Search Console) page_url: The specific page URL to inspect Returns: Dict with: index_status, last_crawl, crawled_as, canonical_url, is_indexed """ INSPECTION_URL = "https://searchconsole.googleapis.com/v1/urlInspection/index:inspect" normalized = self._normalize_site_url(site_url) if not normalized: return {} try: resp = self.session.post( INSPECTION_URL, json={ 'inspectionUrl': page_url, 'siteUrl': normalized, } ) if resp.status_code != 200: logger.debug(f"URL Inspection returned {resp.status_code} for {page_url}") return {} data = resp.json() result_data = data.get('inspectionResult', {}) index_status = result_data.get('indexStatusResult', {}) crawl_result = index_status return { 'index_status': index_status.get('verdict', 'UNKNOWN'), 'coverage_state': index_status.get('coverageState', ''), 'robots_txt_state': index_status.get('robotsTxtState', ''), 'indexing_state': index_status.get('indexingState', ''), 'last_crawl': index_status.get('lastCrawlTime', ''), 'crawled_as': index_status.get('crawledAs', ''), 'canonical_url': index_status.get('googleCanonical', ''), 'user_canonical': index_status.get('userCanonical', ''), 'is_indexed': index_status.get('verdict') == 'PASS', } except Exception as e: logger.warning(f"URL Inspection failed for {page_url}: {e}") return {} def get_sitemaps(self, site_url: str) -> List[Dict]: """Get sitemaps status for a site. Returns: List of dicts: [{'path': str, 'lastSubmitted': str, 'isPending': bool, 'lastDownloaded': str, 'warnings': int, 'errors': int, ...}] """ normalized = self._normalize_site_url(site_url) if not normalized: return [] try: resp = self.session.get( f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/sitemaps" ) if resp.status_code != 200: logger.debug(f"Sitemaps API returned {resp.status_code}") return [] sitemaps = [] for sm in resp.json().get('sitemap', []): sitemaps.append({ 'path': sm.get('path', ''), 'last_submitted': sm.get('lastSubmitted', ''), 'last_downloaded': sm.get('lastDownloaded', ''), 'is_pending': sm.get('isPending', False), 'warnings': sm.get('warnings', 0), 'errors': sm.get('errors', 0), 'contents': [ { 'type': c.get('type', ''), 'submitted': c.get('submitted', 0), 'indexed': c.get('indexed', 0), } for c in sm.get('contents', []) ], }) return sitemaps except Exception as e: logger.warning(f"Sitemaps fetch failed for {site_url}: {e}") return []