Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Add GBP Performance API integration for visibility metrics (Maps/Search impressions, call/website clicks, direction requests, search keywords). Extend Search Console with URL Inspection, Sitemaps, device/country/type breakdowns, and period-over-period trend comparison. Change OAuth scope from webmasters.readonly to webmasters for URL Inspection support. Migration 064 adds 24 new columns to company_website_analysis. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
461 lines
16 KiB
Python
461 lines
16 KiB
Python
"""
|
|
Google Search Console API Client
|
|
=================================
|
|
|
|
Uses OAuth 2.0 to fetch search analytics data (clicks, impressions, CTR, positions).
|
|
|
|
API docs: https://developers.google.com/webmaster-tools/v3/searchanalytics
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SearchConsoleService:
|
|
"""Google Search Console API client."""
|
|
|
|
BASE_URL = "https://www.googleapis.com/webmasters/v3"
|
|
|
|
def __init__(self, access_token: str):
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'Authorization': f'Bearer {access_token}',
|
|
'Content-Type': 'application/json',
|
|
})
|
|
self.session.timeout = 15
|
|
|
|
def list_sites(self) -> List[Dict]:
|
|
"""List verified sites in Search Console."""
|
|
try:
|
|
resp = self.session.get(f"{self.BASE_URL}/sites")
|
|
resp.raise_for_status()
|
|
return resp.json().get('siteEntry', [])
|
|
except Exception as e:
|
|
logger.error(f"Search Console list_sites failed: {e}")
|
|
return []
|
|
|
|
def _normalize_site_url(self, url: str) -> Optional[str]:
|
|
"""Try to find matching site URL in Search Console.
|
|
|
|
Search Console uses exact URL format — with/without trailing slash,
|
|
http/https, www/no-www. Try common variants.
|
|
"""
|
|
sites = self.list_sites()
|
|
site_urls = [s.get('siteUrl', '') for s in sites]
|
|
|
|
# Direct match
|
|
if url in site_urls:
|
|
return url
|
|
|
|
# Try variants
|
|
variants = [url]
|
|
if not url.endswith('/'):
|
|
variants.append(url + '/')
|
|
if url.startswith('https://'):
|
|
variants.append(url.replace('https://', 'http://'))
|
|
if url.startswith('http://'):
|
|
variants.append(url.replace('http://', 'https://'))
|
|
# www variants
|
|
for v in list(variants):
|
|
if '://www.' in v:
|
|
variants.append(v.replace('://www.', '://'))
|
|
else:
|
|
variants.append(v.replace('://', '://www.'))
|
|
|
|
# Domain property variant (sc-domain:example.com)
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(url)
|
|
domain = parsed.hostname or ''
|
|
if domain.startswith('www.'):
|
|
domain = domain[4:]
|
|
if domain:
|
|
variants.append(f'sc-domain:{domain}')
|
|
|
|
for v in variants:
|
|
if v in site_urls:
|
|
return v
|
|
|
|
logger.debug(f"No match for {url}. Available sites: {site_urls}")
|
|
return None
|
|
|
|
def get_search_analytics(self, site_url: str, days: int = 28) -> Dict:
|
|
"""Get search analytics for a site.
|
|
|
|
Returns:
|
|
Dict with keys: clicks, impressions, ctr, position,
|
|
top_queries (list), top_pages (list), period_days
|
|
"""
|
|
normalized = self._normalize_site_url(site_url)
|
|
if not normalized:
|
|
logger.warning(f"Site {site_url} not found in Search Console")
|
|
return {}
|
|
|
|
end_date = datetime.now() - timedelta(days=3) # SC data has ~3 day delay
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
try:
|
|
# Totals
|
|
resp = self.session.post(
|
|
f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query",
|
|
json={
|
|
'startDate': start_date.strftime('%Y-%m-%d'),
|
|
'endDate': end_date.strftime('%Y-%m-%d'),
|
|
'dimensions': [],
|
|
}
|
|
)
|
|
resp.raise_for_status()
|
|
rows = resp.json().get('rows', [])
|
|
|
|
totals = rows[0] if rows else {}
|
|
result = {
|
|
'clicks': totals.get('clicks', 0),
|
|
'impressions': totals.get('impressions', 0),
|
|
'ctr': round(totals.get('ctr', 0) * 100, 2),
|
|
'position': round(totals.get('position', 0), 1),
|
|
'period_days': days,
|
|
}
|
|
|
|
# Top queries
|
|
resp_q = self.session.post(
|
|
f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query",
|
|
json={
|
|
'startDate': start_date.strftime('%Y-%m-%d'),
|
|
'endDate': end_date.strftime('%Y-%m-%d'),
|
|
'dimensions': ['query'],
|
|
'rowLimit': 10,
|
|
}
|
|
)
|
|
if resp_q.status_code == 200:
|
|
result['top_queries'] = [
|
|
{
|
|
'query': r['keys'][0],
|
|
'clicks': r.get('clicks', 0),
|
|
'impressions': r.get('impressions', 0),
|
|
'ctr': round(r.get('ctr', 0) * 100, 2),
|
|
'position': round(r.get('position', 0), 1),
|
|
}
|
|
for r in resp_q.json().get('rows', [])
|
|
]
|
|
|
|
# Top pages
|
|
resp_p = self.session.post(
|
|
f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query",
|
|
json={
|
|
'startDate': start_date.strftime('%Y-%m-%d'),
|
|
'endDate': end_date.strftime('%Y-%m-%d'),
|
|
'dimensions': ['page'],
|
|
'rowLimit': 10,
|
|
}
|
|
)
|
|
if resp_p.status_code == 200:
|
|
result['top_pages'] = [
|
|
{'page': r['keys'][0], 'clicks': r.get('clicks', 0), 'impressions': r.get('impressions', 0)}
|
|
for r in resp_p.json().get('rows', [])
|
|
]
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Search Console analytics failed for {site_url}: {e}")
|
|
return {}
|
|
|
|
def get_device_breakdown(self, site_url: str, days: int = 28) -> Dict:
|
|
"""Get clicks/impressions breakdown by device type.
|
|
|
|
Returns:
|
|
Dict like {'desktop': {'clicks': N, 'impressions': N}, 'mobile': {...}, 'tablet': {...}}
|
|
"""
|
|
normalized = self._normalize_site_url(site_url)
|
|
if not normalized:
|
|
return {}
|
|
|
|
end_date = datetime.now() - timedelta(days=3)
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
try:
|
|
resp = self.session.post(
|
|
f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query",
|
|
json={
|
|
'startDate': start_date.strftime('%Y-%m-%d'),
|
|
'endDate': end_date.strftime('%Y-%m-%d'),
|
|
'dimensions': ['device'],
|
|
}
|
|
)
|
|
if resp.status_code != 200:
|
|
return {}
|
|
|
|
result = {}
|
|
for row in resp.json().get('rows', []):
|
|
device = row['keys'][0].lower()
|
|
result[device] = {
|
|
'clicks': row.get('clicks', 0),
|
|
'impressions': row.get('impressions', 0),
|
|
'ctr': round(row.get('ctr', 0) * 100, 2),
|
|
'position': round(row.get('position', 0), 1),
|
|
}
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Device breakdown failed for {site_url}: {e}")
|
|
return {}
|
|
|
|
def get_country_breakdown(self, site_url: str, days: int = 28) -> List[Dict]:
|
|
"""Get top countries by clicks/impressions.
|
|
|
|
Returns:
|
|
List of dicts: [{'country': 'POL', 'clicks': N, 'impressions': N}, ...]
|
|
"""
|
|
normalized = self._normalize_site_url(site_url)
|
|
if not normalized:
|
|
return []
|
|
|
|
end_date = datetime.now() - timedelta(days=3)
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
try:
|
|
resp = self.session.post(
|
|
f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query",
|
|
json={
|
|
'startDate': start_date.strftime('%Y-%m-%d'),
|
|
'endDate': end_date.strftime('%Y-%m-%d'),
|
|
'dimensions': ['country'],
|
|
'rowLimit': 10,
|
|
}
|
|
)
|
|
if resp.status_code != 200:
|
|
return []
|
|
|
|
return [
|
|
{
|
|
'country': row['keys'][0],
|
|
'clicks': row.get('clicks', 0),
|
|
'impressions': row.get('impressions', 0),
|
|
}
|
|
for row in resp.json().get('rows', [])
|
|
]
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Country breakdown failed for {site_url}: {e}")
|
|
return []
|
|
|
|
def get_search_type_breakdown(self, site_url: str, days: int = 28) -> Dict:
|
|
"""Get breakdown by search type (web, image, video, news).
|
|
|
|
Returns:
|
|
Dict like {'web': {'clicks': N, 'impressions': N}, 'image': {...}, ...}
|
|
"""
|
|
normalized = self._normalize_site_url(site_url)
|
|
if not normalized:
|
|
return {}
|
|
|
|
end_date = datetime.now() - timedelta(days=3)
|
|
start_date = end_date - timedelta(days=days)
|
|
result = {}
|
|
|
|
for search_type in ['web', 'image', 'video', 'news']:
|
|
try:
|
|
resp = self.session.post(
|
|
f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query",
|
|
json={
|
|
'startDate': start_date.strftime('%Y-%m-%d'),
|
|
'endDate': end_date.strftime('%Y-%m-%d'),
|
|
'searchType': search_type,
|
|
'dimensions': [],
|
|
}
|
|
)
|
|
if resp.status_code == 200:
|
|
rows = resp.json().get('rows', [])
|
|
if rows:
|
|
row = rows[0]
|
|
result[search_type] = {
|
|
'clicks': row.get('clicks', 0),
|
|
'impressions': row.get('impressions', 0),
|
|
}
|
|
except Exception:
|
|
continue
|
|
|
|
return result
|
|
|
|
def get_trend_data(self, site_url: str, days: int = 28) -> Dict:
|
|
"""Compare current period vs previous period.
|
|
|
|
Returns:
|
|
Dict with current, previous values and % change:
|
|
{
|
|
'clicks': {'current': N, 'previous': N, 'change_pct': float},
|
|
'impressions': {'current': N, 'previous': N, 'change_pct': float},
|
|
'ctr': {'current': float, 'previous': float, 'change_pct': float},
|
|
'position': {'current': float, 'previous': float, 'change_pct': float},
|
|
}
|
|
"""
|
|
normalized = self._normalize_site_url(site_url)
|
|
if not normalized:
|
|
return {}
|
|
|
|
end_date = datetime.now() - timedelta(days=3)
|
|
current_start = end_date - timedelta(days=days)
|
|
prev_end = current_start - timedelta(days=1)
|
|
prev_start = prev_end - timedelta(days=days)
|
|
|
|
def _get_totals(start, end):
|
|
try:
|
|
resp = self.session.post(
|
|
f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/searchAnalytics/query",
|
|
json={
|
|
'startDate': start.strftime('%Y-%m-%d'),
|
|
'endDate': end.strftime('%Y-%m-%d'),
|
|
'dimensions': [],
|
|
}
|
|
)
|
|
if resp.status_code == 200:
|
|
rows = resp.json().get('rows', [])
|
|
if rows:
|
|
return rows[0]
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
current = _get_totals(current_start, end_date)
|
|
previous = _get_totals(prev_start, prev_end)
|
|
|
|
if not current:
|
|
return {}
|
|
|
|
def _calc_change(curr_val, prev_val):
|
|
if prev_val and prev_val != 0:
|
|
return round((curr_val - prev_val) / abs(prev_val) * 100, 1)
|
|
return None
|
|
|
|
result = {}
|
|
for key in ['clicks', 'impressions']:
|
|
c = current.get(key, 0)
|
|
p = previous.get(key, 0)
|
|
result[key] = {
|
|
'current': c,
|
|
'previous': p,
|
|
'change_pct': _calc_change(c, p),
|
|
}
|
|
|
|
for key in ['ctr']:
|
|
c = round(current.get(key, 0) * 100, 2)
|
|
p = round(previous.get(key, 0) * 100, 2)
|
|
result[key] = {
|
|
'current': c,
|
|
'previous': p,
|
|
'change_pct': _calc_change(c, p),
|
|
}
|
|
|
|
for key in ['position']:
|
|
c = round(current.get(key, 0), 1)
|
|
p = round(previous.get(key, 0), 1)
|
|
# For position, lower is better, so invert the change
|
|
change = _calc_change(c, p)
|
|
result[key] = {
|
|
'current': c,
|
|
'previous': p,
|
|
'change_pct': -change if change is not None else None,
|
|
}
|
|
|
|
return result
|
|
|
|
def inspect_url(self, site_url: str, page_url: str) -> Dict:
|
|
"""Inspect a URL's indexing status using URL Inspection API.
|
|
|
|
Requires 'webmasters' scope (not readonly).
|
|
|
|
Args:
|
|
site_url: The site property URL (as registered in Search Console)
|
|
page_url: The specific page URL to inspect
|
|
|
|
Returns:
|
|
Dict with: index_status, last_crawl, crawled_as, canonical_url, is_indexed
|
|
"""
|
|
INSPECTION_URL = "https://searchconsole.googleapis.com/v1/urlInspection/index:inspect"
|
|
|
|
normalized = self._normalize_site_url(site_url)
|
|
if not normalized:
|
|
return {}
|
|
|
|
try:
|
|
resp = self.session.post(
|
|
INSPECTION_URL,
|
|
json={
|
|
'inspectionUrl': page_url,
|
|
'siteUrl': normalized,
|
|
}
|
|
)
|
|
|
|
if resp.status_code != 200:
|
|
logger.debug(f"URL Inspection returned {resp.status_code} for {page_url}")
|
|
return {}
|
|
|
|
data = resp.json()
|
|
result_data = data.get('inspectionResult', {})
|
|
index_status = result_data.get('indexStatusResult', {})
|
|
crawl_result = index_status
|
|
|
|
return {
|
|
'index_status': index_status.get('verdict', 'UNKNOWN'),
|
|
'coverage_state': index_status.get('coverageState', ''),
|
|
'robots_txt_state': index_status.get('robotsTxtState', ''),
|
|
'indexing_state': index_status.get('indexingState', ''),
|
|
'last_crawl': index_status.get('lastCrawlTime', ''),
|
|
'crawled_as': index_status.get('crawledAs', ''),
|
|
'canonical_url': index_status.get('googleCanonical', ''),
|
|
'user_canonical': index_status.get('userCanonical', ''),
|
|
'is_indexed': index_status.get('verdict') == 'PASS',
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"URL Inspection failed for {page_url}: {e}")
|
|
return {}
|
|
|
|
def get_sitemaps(self, site_url: str) -> List[Dict]:
|
|
"""Get sitemaps status for a site.
|
|
|
|
Returns:
|
|
List of dicts: [{'path': str, 'lastSubmitted': str, 'isPending': bool,
|
|
'lastDownloaded': str, 'warnings': int, 'errors': int, ...}]
|
|
"""
|
|
normalized = self._normalize_site_url(site_url)
|
|
if not normalized:
|
|
return []
|
|
|
|
try:
|
|
resp = self.session.get(
|
|
f"{self.BASE_URL}/sites/{requests.utils.quote(normalized, safe='')}/sitemaps"
|
|
)
|
|
|
|
if resp.status_code != 200:
|
|
logger.debug(f"Sitemaps API returned {resp.status_code}")
|
|
return []
|
|
|
|
sitemaps = []
|
|
for sm in resp.json().get('sitemap', []):
|
|
sitemaps.append({
|
|
'path': sm.get('path', ''),
|
|
'last_submitted': sm.get('lastSubmitted', ''),
|
|
'last_downloaded': sm.get('lastDownloaded', ''),
|
|
'is_pending': sm.get('isPending', False),
|
|
'warnings': sm.get('warnings', 0),
|
|
'errors': sm.get('errors', 0),
|
|
'contents': [
|
|
{
|
|
'type': c.get('type', ''),
|
|
'submitted': c.get('submitted', 0),
|
|
'indexed': c.get('indexed', 0),
|
|
}
|
|
for c in sm.get('contents', [])
|
|
],
|
|
})
|
|
return sitemaps
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Sitemaps fetch failed for {site_url}: {e}")
|
|
return []
|