feat(backend): Add enhanced audit models and scraper improvements

- database.py: GBPReview, CompanyCitation, CompanyCompetitor, CompetitorSnapshot, AuditReport models
- gbp_audit_service.py: Enhanced review analysis, NAP consistency, keyword analysis
- scripts/seo_audit.py: Core Web Vitals, heading/image/link analysis, SSL, analytics detection
- scripts/social_media_audit.py: Profile enrichment, content types, posting frequency

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-02-07 12:00:42 +01:00
parent 387bd2f616
commit 42ddeabf2a
4 changed files with 1236 additions and 9 deletions

View File

@ -1133,6 +1133,26 @@ class CompanyWebsiteAnalysis(Base):
seo_health_score = Column(Integer) # On-page SEO health score 0-100
seo_issues = Column(JSONB) # List of SEO issues found with severity levels
# === LOCAL SEO ===
local_seo_score = Column(Integer) # 0-100
has_local_business_schema = Column(Boolean)
local_business_schema_fields = Column(JSONB)
nap_on_website = Column(JSONB) # {"name": "...", "address": "...", "phone": "..."}
has_google_maps_embed = Column(Boolean)
has_local_keywords = Column(Boolean)
local_keywords_found = Column(JSONB)
# === CITATIONS ===
citations_found = Column(JSONB)
citations_count = Column(Integer, default=0)
# === CONTENT FRESHNESS ===
content_freshness_score = Column(Integer) # 0-100
last_content_update = Column(DateTime)
# === SCORE HISTORY ===
score_history = Column(JSONB) # [{"date": "2026-02-01", "score": 72}]
# === DOMAIN ===
domain_registered_at = Column(Date)
domain_expires_at = Column(Date)
@ -1154,6 +1174,29 @@ class CompanyWebsiteAnalysis(Base):
company = relationship('Company', back_populates='website_analyses')
class CompanyCitation(Base):
"""Tracks company presence in local business directories"""
__tablename__ = 'company_citations'
id = Column(Integer, primary_key=True)
company_id = Column(Integer, ForeignKey('companies.id', ondelete='CASCADE'), nullable=False, index=True)
directory_name = Column(String(100), nullable=False)
directory_url = Column(String(500))
listing_url = Column(String(500))
status = Column(String(20), default='unknown') # found, not_found, incorrect
nap_accurate = Column(Boolean)
details = Column(JSONB)
checked_at = Column(DateTime, default=datetime.now)
created_at = Column(DateTime, default=datetime.now)
# Relationship
company = relationship('Company', backref='citations')
__table_args__ = (
UniqueConstraint('company_id', 'directory_name', name='uq_company_directory'),
)
class CompanyQualityTracking(Base):
"""Quality tracking for company data - verification counter and quality score"""
__tablename__ = 'company_quality_tracking'
@ -2297,6 +2340,26 @@ class CompanySocialMedia(Base):
page_name = Column(String(255))
followers_count = Column(Integer)
# Profile completeness indicators
has_profile_photo = Column(Boolean)
has_cover_photo = Column(Boolean)
has_bio = Column(Boolean)
profile_description = Column(Text)
# Activity metrics
posts_count_30d = Column(Integer)
posts_count_365d = Column(Integer)
last_post_date = Column(DateTime)
# Scoring & analytics
posting_frequency_score = Column(Integer) # 0-10
engagement_rate = Column(Numeric(5, 2)) # percent
content_types = Column(JSONB) # {"photos": 12, "videos": 3, "text": 5}
profile_completeness_score = Column(Integer) # 0-100
# Historical tracking
followers_history = Column(JSONB) # [{"date": "2026-02-01", "count": 150}, ...]
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
@ -2434,6 +2497,38 @@ class GBPAudit(Base):
google_place_id = Column(String(100))
google_maps_url = Column(String(500))
# Review management
reviews_with_response = Column(Integer, default=0)
reviews_without_response = Column(Integer, default=0)
review_response_rate = Column(Numeric(5, 2))
avg_review_response_days = Column(Numeric(5, 1))
review_sentiment = Column(JSONB)
reviews_30d = Column(Integer, default=0)
review_keywords = Column(JSONB)
# Content & activity
has_posts = Column(Boolean)
posts_count_30d = Column(Integer)
has_products = Column(Boolean)
has_qa = Column(Boolean)
qa_count = Column(Integer)
# Enhanced attributes
attributes = Column(JSONB)
special_hours = Column(JSONB)
has_special_hours = Column(Boolean)
# NAP consistency
nap_consistent = Column(Boolean)
nap_issues = Column(JSONB)
# Keywords
description_keywords = Column(JSONB)
keyword_density_score = Column(Integer)
# Photo analysis
photo_categories = Column(JSONB)
# Audit metadata
audit_source = Column(String(50), default='manual') # manual, automated, api
audit_version = Column(String(20), default='1.0')
@ -2464,6 +2559,123 @@ class GBPAudit(Base):
return 'poor'
class GBPReview(Base):
"""Individual Google Business Profile reviews for tracking and analysis"""
__tablename__ = 'gbp_reviews'
id = Column(Integer, primary_key=True)
company_id = Column(Integer, ForeignKey('companies.id', ondelete='CASCADE'), nullable=False, index=True)
google_review_id = Column(String(255))
author_name = Column(String(255))
rating = Column(Integer, nullable=False)
text = Column(Text)
publish_time = Column(DateTime)
has_owner_response = Column(Boolean, default=False)
owner_response_text = Column(Text)
owner_response_time = Column(DateTime)
sentiment = Column(String(20)) # positive, neutral, negative
keywords = Column(JSONB)
created_at = Column(DateTime, default=datetime.now)
# Relationship
company = relationship('Company', backref='gbp_reviews')
__table_args__ = (
UniqueConstraint('company_id', 'google_review_id', name='uq_company_google_review'),
)
# ============================================================
# COMPETITOR MONITORING
# ============================================================
class CompanyCompetitor(Base):
"""Tracked competitors for a company via Google Places"""
__tablename__ = 'company_competitors'
id = Column(Integer, primary_key=True)
company_id = Column(Integer, ForeignKey('companies.id', ondelete='CASCADE'), nullable=False, index=True)
competitor_place_id = Column(String(255), nullable=False)
competitor_name = Column(String(255))
competitor_address = Column(String(500))
competitor_rating = Column(Numeric(2, 1))
competitor_review_count = Column(Integer)
competitor_category = Column(String(255))
competitor_website = Column(String(500))
added_by = Column(String(20), default='auto') # auto, manual
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
# Relationships
company = relationship('Company', backref='competitors')
snapshots = relationship('CompetitorSnapshot', backref='competitor', cascade='all, delete-orphan')
__table_args__ = (
UniqueConstraint('company_id', 'competitor_place_id', name='uq_company_competitor'),
)
class CompetitorSnapshot(Base):
"""Periodic snapshot of competitor's Google Business Profile"""
__tablename__ = 'competitor_snapshots'
id = Column(Integer, primary_key=True)
competitor_id = Column(Integer, ForeignKey('company_competitors.id', ondelete='CASCADE'), nullable=False, index=True)
snapshot_date = Column(Date, nullable=False)
rating = Column(Numeric(2, 1))
review_count = Column(Integer)
photo_count = Column(Integer)
posts_count = Column(Integer)
has_website = Column(Boolean)
has_description = Column(Boolean)
data = Column(JSONB) # full snapshot
changes = Column(JSONB) # delta vs previous
created_at = Column(DateTime, default=datetime.now)
__table_args__ = (
UniqueConstraint('competitor_id', 'snapshot_date', name='uq_competitor_snapshot_date'),
)
# ============================================================
# UNIFIED AUDIT REPORTS
# ============================================================
class AuditReport(Base):
"""Unified audit report combining Social, GBP, and SEO data"""
__tablename__ = 'audit_reports'
id = Column(Integer, primary_key=True)
company_id = Column(Integer, ForeignKey('companies.id', ondelete='CASCADE'), nullable=False, index=True)
report_type = Column(String(20), default='full') # full, social, gbp, seo
period_start = Column(Date)
period_end = Column(Date)
# Overall scores
overall_score = Column(Integer)
social_score = Column(Integer)
gbp_score = Column(Integer)
seo_score = Column(Integer)
# Report sections
sections = Column(JSONB)
# Pre-rendered report data
data = Column(JSONB)
# Metadata
custom_message = Column(Text)
generated_by = Column(String(50), default='system')
generated_at = Column(DateTime, default=datetime.now)
status = Column(String(20), default='draft')
created_at = Column(DateTime, default=datetime.now)
# Relationship
company = relationship('Company', backref='audit_reports')
# ============================================================
# IT INFRASTRUCTURE AUDIT
# ============================================================

View File

@ -23,9 +23,14 @@ from typing import Dict, List, Optional, Any
from sqlalchemy.orm import Session
from database import Company, GBPAudit, CompanyWebsiteAnalysis, SessionLocal
from database import Company, GBPAudit, GBPReview, CompanyWebsiteAnalysis, SessionLocal
import gemini_service
try:
from google_places_service import GooglePlacesService
except ImportError:
GooglePlacesService = None
# Configure logging
logger = logging.getLogger(__name__)
@ -986,6 +991,284 @@ class GBPAuditService:
return 'low'
# === Enhanced Analysis Methods ===
def analyze_reviews(self, company_id: int, place_data: Dict = None) -> Dict[str, Any]:
"""
Analyze reviews for a company using Google Places data.
Returns dict with:
- reviews_with_response, reviews_without_response
- review_response_rate
- review_sentiment (positive/neutral/negative counts)
- review_keywords (top words from reviews)
- reviews_30d (recent review count)
"""
result = {
'reviews_with_response': 0,
'reviews_without_response': 0,
'review_response_rate': 0.0,
'avg_review_response_days': None,
'review_sentiment': {'positive': 0, 'neutral': 0, 'negative': 0},
'reviews_30d': 0,
'review_keywords': [],
}
if not place_data or 'reviews' not in place_data:
return result
reviews = place_data.get('reviews', [])
if not reviews:
return result
# Analyze each review
keywords_count = {}
for review in reviews:
rating = review.get('rating', 0)
# Sentiment based on rating
if rating >= 4:
result['review_sentiment']['positive'] += 1
elif rating == 3:
result['review_sentiment']['neutral'] += 1
else:
result['review_sentiment']['negative'] += 1
# Extract keywords from review text
text = review.get('text', {})
review_text = text.get('text', '') if isinstance(text, dict) else str(text)
if review_text:
# Simple keyword extraction - split and count common words
words = review_text.lower().split()
stop_words = {'i', 'w', 'na', 'do', 'z', 'się', 'jest', 'nie', 'to', 'że',
'o', 'jak', 'za', 'od', 'po', 'ale', 'co', 'tak', 'a', 'te',
'ze', 'dla', '', 'ten', 'ta', 'już', 'czy', 'tego', 'tej'}
for word in words:
word = word.strip('.,!?;:"()[]')
if len(word) >= 4 and word not in stop_words:
keywords_count[word] = keywords_count.get(word, 0) + 1
# Top 10 keywords
sorted_keywords = sorted(keywords_count.items(), key=lambda x: x[1], reverse=True)
result['review_keywords'] = [k for k, v in sorted_keywords[:10]]
total = len(reviews)
result['reviews_with_response'] = sum(1 for r in reviews if r.get('authorAttribution', {}).get('displayName'))
result['reviews_without_response'] = total - result['reviews_with_response']
result['review_response_rate'] = round(result['reviews_with_response'] / total * 100, 1) if total > 0 else 0.0
return result
def check_nap_consistency(self, company: Company,
website_analysis: 'CompanyWebsiteAnalysis' = None) -> Dict[str, Any]:
"""
Check NAP (Name/Address/Phone) consistency between GBP and website.
Returns dict with:
- nap_consistent: bool
- nap_issues: list of inconsistencies
"""
result = {
'nap_consistent': True,
'nap_issues': [],
}
if not website_analysis:
return result
# Compare name
gbp_name = website_analysis.google_name
website_name = company.name
if gbp_name and website_name:
if gbp_name.lower().strip() != website_name.lower().strip():
result['nap_consistent'] = False
result['nap_issues'].append({
'field': 'name',
'gbp': gbp_name,
'website': website_name,
'severity': 'low'
})
# Compare phone
gbp_phone = website_analysis.google_phone
company_phone = company.phone
if gbp_phone and company_phone:
# Normalize phone numbers for comparison
gbp_clean = ''.join(c for c in gbp_phone if c.isdigit())
company_clean = ''.join(c for c in company_phone if c.isdigit())
# Compare last 9 digits (ignore country code)
if gbp_clean[-9:] != company_clean[-9:] if len(gbp_clean) >= 9 and len(company_clean) >= 9 else gbp_clean != company_clean:
result['nap_consistent'] = False
result['nap_issues'].append({
'field': 'phone',
'gbp': gbp_phone,
'website': company_phone,
'severity': 'medium'
})
# Compare address
gbp_address = website_analysis.google_address
company_address = f"{company.address_street or ''}, {company.address_city or ''}"
if gbp_address and company.address_city:
city_lower = company.address_city.lower()
if city_lower not in gbp_address.lower():
result['nap_consistent'] = False
result['nap_issues'].append({
'field': 'address',
'gbp': gbp_address,
'website': company_address.strip(', '),
'severity': 'high'
})
return result
def analyze_photo_categories(self, photos_data: List[Dict] = None) -> Dict[str, int]:
"""Categorize photos based on available metadata."""
categories = {
'total': 0,
'owner': 0,
'user': 0,
}
if not photos_data:
return categories
categories['total'] = len(photos_data)
for photo in photos_data:
attributions = photo.get('authorAttributions', [])
is_owner = any('owner' in a.get('displayName', '').lower() or
'właściciel' in a.get('displayName', '').lower()
for a in attributions)
if is_owner:
categories['owner'] += 1
else:
categories['user'] += 1
return categories
def check_description_keywords(self, company: Company) -> Dict[str, Any]:
"""Check if business description contains relevant keywords."""
result = {
'description_keywords': [],
'keyword_density_score': 0,
}
desc = company.description_full or company.description_short or ''
if not desc:
return result
desc_lower = desc.lower()
# Check for city name
city = (company.address_city or '').lower()
category_name = company.category.name.lower() if company.category else ''
found_keywords = []
# Check city name in description
if city and city in desc_lower:
found_keywords.append(city)
# Check category-related terms
if category_name and category_name in desc_lower:
found_keywords.append(category_name)
# General business keywords
business_keywords = ['usługi', 'produkty', 'oferta', 'doświadczenie',
'profesjonalny', 'kontakt', 'zespół', 'specjalizacja']
for kw in business_keywords:
if kw in desc_lower:
found_keywords.append(kw)
result['description_keywords'] = found_keywords
# Score: 0-100 based on keyword presence
max_keywords = 5 # ideal number of keywords
score = min(len(found_keywords) / max_keywords * 100, 100)
result['keyword_density_score'] = int(score)
return result
def save_enhanced_audit(self, result: 'AuditResult', enhanced_data: Dict,
source: str = 'manual') -> 'GBPAudit':
"""Save audit with enhanced data (reviews, NAP, keywords, photos)."""
# First save the standard audit
audit = self.save_audit(result, source)
# Then update with enhanced data
if enhanced_data.get('reviews'):
reviews = enhanced_data['reviews']
audit.reviews_with_response = reviews.get('reviews_with_response', 0)
audit.reviews_without_response = reviews.get('reviews_without_response', 0)
audit.review_response_rate = reviews.get('review_response_rate', 0.0)
audit.avg_review_response_days = reviews.get('avg_review_response_days')
audit.review_sentiment = reviews.get('review_sentiment')
audit.reviews_30d = reviews.get('reviews_30d', 0)
audit.review_keywords = reviews.get('review_keywords')
if enhanced_data.get('nap'):
nap = enhanced_data['nap']
audit.nap_consistent = nap.get('nap_consistent', True)
audit.nap_issues = nap.get('nap_issues')
if enhanced_data.get('keywords'):
keywords = enhanced_data['keywords']
audit.description_keywords = keywords.get('description_keywords')
audit.keyword_density_score = keywords.get('keyword_density_score')
if enhanced_data.get('photo_categories'):
audit.photo_categories = enhanced_data['photo_categories']
if enhanced_data.get('attributes'):
audit.attributes = enhanced_data['attributes']
if enhanced_data.get('hours'):
hours = enhanced_data['hours']
audit.has_special_hours = hours.get('has_special_hours', False)
audit.special_hours = hours.get('special_hours')
self.db.commit()
self.db.refresh(audit)
return audit
def save_reviews(self, company_id: int, reviews_data: List[Dict]) -> int:
"""Save individual reviews to gbp_reviews table. Returns count saved."""
saved = 0
for review in reviews_data:
review_id = review.get('name', '') or f"r_{review.get('author', 'anon')}_{review.get('time', '')}"
existing = self.db.query(GBPReview).filter(
GBPReview.company_id == company_id,
GBPReview.google_review_id == review_id
).first()
if not existing:
gbp_review = GBPReview(
company_id=company_id,
google_review_id=review_id,
author_name=review.get('author', 'Anonim'),
rating=review.get('rating', 0),
text=review.get('text', ''),
publish_time=review.get('time'),
sentiment=self._classify_sentiment(review.get('rating', 0)),
)
self.db.add(gbp_review)
saved += 1
if saved:
self.db.commit()
return saved
@staticmethod
def _classify_sentiment(rating: int) -> str:
"""Classify review sentiment based on rating."""
if rating >= 4:
return 'positive'
elif rating == 3:
return 'neutral'
else:
return 'negative'
# === AI-Powered Recommendations ===
def generate_ai_recommendations(

View File

@ -30,6 +30,7 @@ Date: 2026-01-08
import os
import sys
import re
import json
import argparse
import logging
@ -38,6 +39,7 @@ from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any, Tuple
import requests
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker
@ -90,6 +92,364 @@ USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTM
SEO_AUDIT_VERSION = '1.0.0'
class LocalSEOAnalyzer:
"""Analyzes Local SEO factors for business websites."""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def analyze(self, html_content: str, url: str, company_data: Dict = None) -> Dict[str, Any]:
"""Run all local SEO checks on HTML content."""
result = {
'local_seo_score': 0,
'has_local_business_schema': False,
'local_business_schema_fields': {},
'nap_on_website': {},
'has_google_maps_embed': False,
'has_local_keywords': False,
'local_keywords_found': [],
}
soup = BeautifulSoup(html_content, 'html.parser')
# Check LocalBusiness schema
schema_result = self._check_local_business_schema(html_content)
result.update(schema_result)
# Extract NAP from website
nap = self._extract_nap(soup, html_content)
result['nap_on_website'] = nap
# Check Google Maps embed
result['has_google_maps_embed'] = self._check_google_maps(html_content)
# Check local keywords
city = (company_data or {}).get('address_city', 'Wejherowo')
keywords = self._find_local_keywords(soup, html_content, city)
result['has_local_keywords'] = len(keywords) > 0
result['local_keywords_found'] = keywords[:20]
# Calculate local SEO score
result['local_seo_score'] = self._calculate_local_score(result)
return result
def _check_local_business_schema(self, html: str) -> Dict[str, Any]:
"""Check for Schema.org LocalBusiness structured data."""
import json as json_mod
result = {
'has_local_business_schema': False,
'local_business_schema_fields': {},
}
# Find JSON-LD blocks
ld_pattern = re.compile(r'<script[^>]*type=["\']application/ld\+json["\'][^>]*>(.*?)</script>', re.DOTALL | re.IGNORECASE)
matches = ld_pattern.findall(html)
local_types = ['LocalBusiness', 'Organization', 'Store', 'Restaurant',
'ProfessionalService', 'AutoRepair', 'HealthAndBeautyBusiness',
'LodgingBusiness', 'FoodEstablishment', 'FinancialService']
for match in matches:
try:
data = json_mod.loads(match.strip())
items = [data] if isinstance(data, dict) else data if isinstance(data, list) else []
for item in items:
item_type = item.get('@type', '')
if isinstance(item_type, list):
item_type = item_type[0] if item_type else ''
if item_type in local_types:
result['has_local_business_schema'] = True
# Check which fields are present
important_fields = ['name', 'address', 'telephone', 'email',
'url', 'openingHours', 'openingHoursSpecification',
'geo', 'image', 'description', 'priceRange',
'areaServed', 'aggregateRating']
for field in important_fields:
result['local_business_schema_fields'][field] = field in item and bool(item[field])
break
except (json_mod.JSONDecodeError, TypeError):
continue
return result
def _extract_nap(self, soup, html: str) -> Dict[str, Any]:
"""Extract Name, Address, Phone from website HTML."""
nap = {'name': None, 'address': None, 'phone': None}
text = soup.get_text(separator=' ')
# Phone patterns (Polish format)
phone_patterns = [
r'(?:tel\.?|telefon|phone|zadzwoń)[:\s]*([+]?\d[\d\s\-]{7,15})',
r'(?:href="tel:)([+]?\d[\d\-]{7,15})"',
r'(\+48[\s\-]?\d{3}[\s\-]?\d{3}[\s\-]?\d{3})',
r'(\d{2}[\s\-]\d{3}[\s\-]\d{2}[\s\-]\d{2})',
]
for pattern in phone_patterns:
match = re.search(pattern, html, re.IGNORECASE)
if match:
phone = re.sub(r'[\s\-]', '', match.group(1))
if len(phone) >= 9:
nap['phone'] = match.group(1).strip()
break
# Address patterns (Polish)
address_patterns = [
r'(?:ul\.?|ulica)\s+[A-Z\u0141\u00d3\u015a\u017b\u0179\u0106\u0104\u0118\u0143][a-z\u0105\u0119\u00f3\u0142\u015b\u017c\u017a\u0107\u0144\s]+\s+\d+[a-zA-Z]?(?:/\d+)?(?:,?\s+\d{2}-\d{3}\s+[A-Z\u0141\u00d3\u015a\u017b\u0179\u0106\u0104\u0118\u0143][a-z\u0105\u0119\u00f3\u0142\u015b\u017c\u017a\u0107\u0144]+)?',
r'\d{2}-\d{3}\s+[A-Z\u0141\u00d3\u015a\u017b\u0179\u0106\u0104\u0118\u0143][a-z\u0105\u0119\u00f3\u0142\u015b\u017c\u017a\u0107\u0144]+',
]
for pattern in address_patterns:
match = re.search(pattern, text)
if match:
nap['address'] = match.group(0).strip()[:200]
break
# Business name from structured data or og:site_name
og_site = soup.find('meta', property='og:site_name')
if og_site and og_site.get('content'):
nap['name'] = og_site['content'].strip()[:200]
return nap
def _check_google_maps(self, html: str) -> bool:
"""Check if page has embedded Google Maps."""
maps_patterns = [
r'maps\.googleapis\.com',
r'maps\.google\.com/maps',
r'google\.com/maps/embed',
r'<iframe[^>]*google[^>]*maps[^>]*>',
]
return any(re.search(p, html, re.IGNORECASE) for p in maps_patterns)
def _find_local_keywords(self, soup, html: str, city: str) -> List[str]:
"""Find local keywords in page content (service + city patterns)."""
keywords_found = []
text = soup.get_text(separator=' ').lower()
# Common service keywords for Polish businesses
service_keywords = [
'hydraulik', 'elektryk', 'mechanik', 'fryzjer', 'dentysta',
'prawnik', 'adwokat', 'księgowy', 'architekt', 'fotograf',
'restauracja', 'hotel', 'sklep', 'serwis', 'naprawa',
'instalacje', 'remonty', 'transport', 'catering',
'szkolenia', 'kursy', 'gabinet', 'klinika', 'studio',
]
city_lower = city.lower() if city else 'wejherowo'
nearby_cities = ['wejherowo', 'rumia', 'reda', 'gdynia', 'gdańsk', 'sopot', 'puck', 'luzino']
for keyword in service_keywords:
for c in nearby_cities:
phrase = f'{keyword} {c}'
if phrase in text:
keywords_found.append(phrase)
# Also check meta title and description
title = (soup.title.string if soup.title else '').lower()
meta_desc = ''
desc_tag = soup.find('meta', {'name': 'description'})
if desc_tag:
meta_desc = (desc_tag.get('content', '') or '').lower()
if city_lower in title:
keywords_found.append(f'city_in_title:{city_lower}')
if city_lower in meta_desc:
keywords_found.append(f'city_in_description:{city_lower}')
return list(set(keywords_found))
def _calculate_local_score(self, data: Dict) -> int:
"""Calculate Local SEO score 0-100."""
score = 0
if data.get('has_local_business_schema'):
score += 25
# Bonus for complete schema
fields = data.get('local_business_schema_fields', {})
filled = sum(1 for v in fields.values() if v)
total = len(fields)
if total > 0:
score += int(10 * (filled / total))
nap = data.get('nap_on_website', {})
if nap.get('name'): score += 10
if nap.get('address'): score += 10
if nap.get('phone'): score += 10
if data.get('has_google_maps_embed'): score += 15
if data.get('has_local_keywords'): score += 15
# Bonus for multiple local keywords
kw_count = len(data.get('local_keywords_found', []))
if kw_count >= 5: score += 5
return min(score, 100)
class CitationChecker:
"""Checks company presence in Polish local business directories."""
# Polish business directories to check
DIRECTORIES = [
{'name': 'panoramafirm.pl', 'url': 'https://panoramafirm.pl', 'search_domain': 'panoramafirm.pl'},
{'name': 'pkt.pl', 'url': 'https://pkt.pl', 'search_domain': 'pkt.pl'},
{'name': 'aleo.com', 'url': 'https://aleo.com', 'search_domain': 'aleo.com'},
{'name': 'firmy.net', 'url': 'https://firmy.net', 'search_domain': 'firmy.net'},
{'name': 'zumi.pl', 'url': 'https://zumi.pl', 'search_domain': 'zumi.pl'},
{'name': 'gowork.pl', 'url': 'https://gowork.pl', 'search_domain': 'gowork.pl'},
{'name': 'oferteo.pl', 'url': 'https://oferteo.pl', 'search_domain': 'oferteo.pl'},
{'name': 'google.com/maps', 'url': 'https://google.com/maps', 'search_domain': 'google.com/maps'},
{'name': 'facebook.com', 'url': 'https://facebook.com', 'search_domain': 'facebook.com'},
{'name': 'yelp.com', 'url': 'https://yelp.com', 'search_domain': 'yelp.com'},
]
def __init__(self):
self.brave_api_key = os.getenv('BRAVE_API_KEY')
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def check_citations(self, company_name: str, city: str = 'Wejherowo') -> List[Dict[str, Any]]:
"""Check if company is listed in directories."""
results = []
if not self.brave_api_key:
logger.warning("BRAVE_API_KEY not set, citation check skipped")
return results
for directory in self.DIRECTORIES:
try:
citation = self._check_single_directory(company_name, city, directory)
results.append(citation)
# Rate limit
time_module.sleep(0.5)
except Exception as e:
logger.warning(f"Citation check failed for {directory['name']}: {e}")
results.append({
'directory_name': directory['name'],
'directory_url': directory['url'],
'status': 'error',
'listing_url': None,
})
return results
def _check_single_directory(self, company_name: str, city: str, directory: Dict) -> Dict:
"""Check one directory using Brave Search."""
query = f'"{company_name}" site:{directory["search_domain"]}'
try:
resp = self.session.get(
'https://api.search.brave.com/res/v1/web/search',
params={'q': query, 'count': 3},
headers={'X-Subscription-Token': self.brave_api_key},
timeout=10
)
resp.raise_for_status()
data = resp.json()
results = data.get('web', {}).get('results', [])
if results:
return {
'directory_name': directory['name'],
'directory_url': directory['url'],
'listing_url': results[0].get('url'),
'status': 'found',
}
else:
return {
'directory_name': directory['name'],
'directory_url': directory['url'],
'listing_url': None,
'status': 'not_found',
}
except Exception as e:
logger.debug(f"Brave search for {directory['name']}: {e}")
return {
'directory_name': directory['name'],
'directory_url': directory['url'],
'listing_url': None,
'status': 'error',
}
class ContentFreshnessChecker:
"""Checks content freshness of a website."""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def check_freshness(self, url: str, html_content: str = None) -> Dict[str, Any]:
"""Check content freshness indicators."""
result = {
'last_content_update': None,
'content_freshness_score': 0,
}
# Check Last-Modified header
try:
resp = self.session.head(url, timeout=10, allow_redirects=True)
last_modified = resp.headers.get('Last-Modified')
if last_modified:
from email.utils import parsedate_to_datetime
try:
result['last_content_update'] = parsedate_to_datetime(last_modified)
except Exception:
pass
except Exception:
pass
# Check dates in HTML content
if html_content:
soup = BeautifulSoup(html_content, 'html.parser')
# Look for date patterns in the page
date_patterns = [
r'20\d{2}[-./]\d{1,2}[-./]\d{1,2}',
r'\d{1,2}[-./]\d{1,2}[-./]20\d{2}',
]
text = soup.get_text()
latest_date = None
for pattern in date_patterns:
matches = re.findall(pattern, text)
for m in matches:
try:
# Try parsing various formats
for fmt in ['%Y-%m-%d', '%Y/%m/%d', '%d.%m.%Y', '%d-%m-%Y', '%d/%m/%Y']:
try:
d = datetime.strptime(m, fmt)
if d.year >= 2020 and d <= datetime.now():
if latest_date is None or d > latest_date:
latest_date = d
break
except ValueError:
continue
except Exception:
continue
if latest_date and (result['last_content_update'] is None or latest_date > result['last_content_update']):
result['last_content_update'] = latest_date
# Calculate freshness score
if result['last_content_update']:
days_old = (datetime.now() - result['last_content_update']).days
if days_old <= 30:
result['content_freshness_score'] = 100
elif days_old <= 90:
result['content_freshness_score'] = 80
elif days_old <= 180:
result['content_freshness_score'] = 60
elif days_old <= 365:
result['content_freshness_score'] = 40
else:
result['content_freshness_score'] = 20
else:
result['content_freshness_score'] = 10 # Unknown = low score
return result
class SEOAuditor:
"""
Main SEO auditor class that coordinates website SEO auditing.
@ -112,6 +472,9 @@ class SEOAuditor:
self.pagespeed_client = GooglePageSpeedClient()
self.onpage_analyzer = OnPageSEOAnalyzer()
self.technical_checker = TechnicalSEOChecker()
self.local_seo_analyzer = LocalSEOAnalyzer()
self.citation_checker = CitationChecker()
self.freshness_checker = ContentFreshnessChecker()
# HTTP session for fetching pages
self.session = requests.Session()
@ -324,6 +687,38 @@ class SEOAuditor:
result['errors'].append(f'PageSpeed unexpected error: {str(e)[:100]}')
logger.error(f" PageSpeed unexpected error: {e}")
# 6. Local SEO analysis
if html_content:
try:
logger.info(" Running Local SEO analysis...")
local_seo = self.local_seo_analyzer.analyze(html_content, final_url, company)
result['local_seo'] = local_seo
logger.info(f" Local SEO score: {local_seo.get('local_seo_score', 0)}")
except Exception as e:
result['errors'].append(f'Local SEO analysis failed: {str(e)[:100]}')
logger.error(f" Local SEO error: {e}")
# 7. Citation check
try:
city = company.get('address_city', 'Wejherowo')
logger.info(f" Checking citations for '{company['name']}' in {city}...")
citations = self.citation_checker.check_citations(company['name'], city)
result['citations'] = citations
found_count = sum(1 for c in citations if c.get('status') == 'found')
logger.info(f" Citations found: {found_count}/{len(citations)}")
except Exception as e:
result['errors'].append(f'Citation check failed: {str(e)[:100]}')
logger.error(f" Citation check error: {e}")
# 8. Content freshness
try:
logger.info(" Checking content freshness...")
freshness = self.freshness_checker.check_freshness(final_url, html_content)
result['freshness'] = freshness
logger.info(f" Freshness score: {freshness.get('content_freshness_score', 0)}")
except Exception as e:
result['errors'].append(f'Freshness check failed: {str(e)[:100]}')
# 5. Calculate overall SEO score
result['scores']['overall_seo'] = self._calculate_overall_score(result)
@ -545,7 +940,17 @@ class SEOAuditor:
-- SEO Audit metadata
seo_audit_version, seo_audited_at, seo_audit_errors,
seo_overall_score, seo_health_score, seo_issues
seo_overall_score, seo_health_score, seo_issues,
-- Local SEO
local_seo_score, has_local_business_schema, local_business_schema_fields,
nap_on_website, has_google_maps_embed, has_local_keywords, local_keywords_found,
-- Citations
citations_found, citations_count,
-- Content freshness
content_freshness_score, last_content_update
) VALUES (
:company_id, :analyzed_at, :website_url, :final_url,
:http_status_code, :load_time_ms,
@ -574,7 +979,14 @@ class SEOAuditor:
:word_count_homepage,
:seo_audit_version, :seo_audited_at, :seo_audit_errors,
:seo_overall_score, :seo_health_score, :seo_issues
:seo_overall_score, :seo_health_score, :seo_issues,
:local_seo_score, :has_local_business_schema, :local_business_schema_fields,
:nap_on_website, :has_google_maps_embed, :has_local_keywords, :local_keywords_found,
:citations_found, :citations_count,
:content_freshness_score, :last_content_update
)
ON CONFLICT (company_id) DO UPDATE SET
analyzed_at = EXCLUDED.analyzed_at,
@ -635,7 +1047,21 @@ class SEOAuditor:
seo_audit_errors = EXCLUDED.seo_audit_errors,
seo_overall_score = EXCLUDED.seo_overall_score,
seo_health_score = EXCLUDED.seo_health_score,
seo_issues = EXCLUDED.seo_issues
seo_issues = EXCLUDED.seo_issues,
local_seo_score = EXCLUDED.local_seo_score,
has_local_business_schema = EXCLUDED.has_local_business_schema,
local_business_schema_fields = EXCLUDED.local_business_schema_fields,
nap_on_website = EXCLUDED.nap_on_website,
has_google_maps_embed = EXCLUDED.has_google_maps_embed,
has_local_keywords = EXCLUDED.has_local_keywords,
local_keywords_found = EXCLUDED.local_keywords_found,
citations_found = EXCLUDED.citations_found,
citations_count = EXCLUDED.citations_count,
content_freshness_score = EXCLUDED.content_freshness_score,
last_content_update = EXCLUDED.last_content_update
""")
# Build issues list from errors
@ -720,8 +1146,49 @@ class SEOAuditor:
'seo_overall_score': result.get('scores', {}).get('overall_seo'),
'seo_health_score': self._calculate_onpage_score(onpage) if onpage else None,
'seo_issues': json.dumps(issues) if issues else None,
# Local SEO
'local_seo_score': (result.get('local_seo') or {}).get('local_seo_score'),
'has_local_business_schema': (result.get('local_seo') or {}).get('has_local_business_schema', False),
'local_business_schema_fields': json.dumps((result.get('local_seo') or {}).get('local_business_schema_fields', {})),
'nap_on_website': json.dumps((result.get('local_seo') or {}).get('nap_on_website', {})),
'has_google_maps_embed': (result.get('local_seo') or {}).get('has_google_maps_embed', False),
'has_local_keywords': (result.get('local_seo') or {}).get('has_local_keywords', False),
'local_keywords_found': json.dumps((result.get('local_seo') or {}).get('local_keywords_found', [])),
# Citations
'citations_found': json.dumps(result.get('citations', [])),
'citations_count': sum(1 for c in result.get('citations', []) if c.get('status') == 'found'),
# Freshness
'content_freshness_score': (result.get('freshness') or {}).get('content_freshness_score'),
'last_content_update': (result.get('freshness') or {}).get('last_content_update'),
})
# Save individual citations
for citation in result.get('citations', []):
if citation.get('directory_name'):
citation_upsert = text("""
INSERT INTO company_citations (
company_id, directory_name, directory_url, listing_url,
status, checked_at
) VALUES (
:company_id, :directory_name, :directory_url, :listing_url,
:status, NOW()
)
ON CONFLICT (company_id, directory_name) DO UPDATE SET
listing_url = EXCLUDED.listing_url,
status = EXCLUDED.status,
checked_at = NOW()
""")
session.execute(citation_upsert, {
'company_id': company_id,
'directory_name': citation['directory_name'],
'directory_url': citation.get('directory_url'),
'listing_url': citation.get('listing_url'),
'status': citation.get('status', 'unknown'),
})
session.commit()
logger.info(f" Saved SEO audit for company {company_id}")
return True

View File

@ -894,6 +894,225 @@ class BraveSearcher:
return None
class SocialProfileEnricher:
"""Enriches social media profiles with additional data from public APIs and scraping."""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def enrich_profile(self, platform: str, url: str) -> Dict[str, Any]:
"""Fetch additional data for a social media profile."""
enrichers = {
'facebook': self._enrich_facebook,
'instagram': self._enrich_instagram,
'youtube': self._enrich_youtube,
'linkedin': self._enrich_linkedin,
'tiktok': self._enrich_tiktok,
'twitter': self._enrich_twitter,
}
enricher = enrichers.get(platform)
if enricher:
try:
return enricher(url)
except Exception as e:
logger.warning(f"Failed to enrich {platform} profile {url}: {e}")
return {}
return {}
def _enrich_facebook(self, url: str) -> Dict[str, Any]:
"""Enrich Facebook page data from public page HTML."""
result = {}
try:
resp = self.session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True)
if resp.status_code == 200:
html = resp.text
# Extract page name from og:title
og_match = re.search(r'<meta\s+property="og:title"\s+content="([^"]+)"', html)
if og_match:
result['page_name'] = og_match.group(1)
# Check for profile photo via og:image
og_img = re.search(r'<meta\s+property="og:image"\s+content="([^"]+)"', html)
result['has_profile_photo'] = bool(og_img)
# Description from og:description
og_desc = re.search(r'<meta\s+property="og:description"\s+content="([^"]+)"', html)
if og_desc:
result['profile_description'] = og_desc.group(1)[:500]
result['has_bio'] = True
else:
result['has_bio'] = False
except Exception as e:
logger.debug(f"Facebook enrichment failed: {e}")
return result
def _enrich_instagram(self, url: str) -> Dict[str, Any]:
"""Enrich Instagram profile data."""
result = {}
try:
# Try og:description which often contains "X Followers, Y Following, Z Posts"
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
if resp.status_code == 200:
html = resp.text
# og:description format: "123 Followers, 45 Following, 67 Posts - See Instagram photos..."
og_desc = re.search(r'<meta\s+(?:property|name)="og:description"\s+content="([^"]+)"', html)
if og_desc:
desc = og_desc.group(1)
# Extract followers
followers_match = re.search(r'([\d,\.]+[KMkm]?)\s+Followers', desc)
if followers_match:
result['followers_count'] = self._parse_count(followers_match.group(1))
# Extract posts count
posts_match = re.search(r'([\d,\.]+[KMkm]?)\s+Posts', desc)
if posts_match:
result['posts_count_365d'] = self._parse_count(posts_match.group(1))
# Bio is after the dash
bio_match = re.search(r'Posts\s*[-\u2013\u2014]\s*(.+)', desc)
if bio_match:
bio_text = bio_match.group(1).strip()
if bio_text and not bio_text.startswith('See Instagram'):
result['profile_description'] = bio_text[:500]
result['has_bio'] = True
# Profile photo from og:image
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
result['has_profile_photo'] = bool(og_img)
except Exception as e:
logger.debug(f"Instagram enrichment failed: {e}")
return result
def _enrich_youtube(self, url: str) -> Dict[str, Any]:
"""Enrich YouTube channel data."""
result = {}
try:
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
if resp.status_code == 200:
html = resp.text
# Subscriber count from meta or JSON
subs_match = re.search(r'"subscriberCountText":\s*\{"simpleText":\s*"([^"]+)"\}', html)
if subs_match:
result['followers_count'] = self._parse_count(subs_match.group(1).split(' ')[0])
# Video count
videos_match = re.search(r'"videosCountText":\s*\{"runs":\s*\[\{"text":\s*"([^"]+)"\}', html)
if videos_match:
result['posts_count_365d'] = self._parse_count(videos_match.group(1))
# Channel description
desc_match = re.search(r'"description":\s*"([^"]*(?:\\.[^"]*)*)"', html)
if desc_match:
desc = desc_match.group(1).replace('\\n', ' ').strip()
if desc and len(desc) > 5:
result['profile_description'] = desc[:500]
result['has_bio'] = True
# Avatar from og:image
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
result['has_profile_photo'] = bool(og_img)
# Channel name
name_match = re.search(r'<meta\s+(?:property|name)="og:title"\s+content="([^"]+)"', html)
if name_match:
result['page_name'] = name_match.group(1)
except Exception as e:
logger.debug(f"YouTube enrichment failed: {e}")
return result
def _enrich_linkedin(self, url: str) -> Dict[str, Any]:
"""Enrich LinkedIn company page data."""
result = {}
try:
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
if resp.status_code == 200:
html = resp.text
og_desc = re.search(r'<meta\s+(?:property|name)="og:description"\s+content="([^"]+)"', html)
if og_desc:
desc = og_desc.group(1).strip()
# LinkedIn descriptions often have follower count
followers_match = re.search(r'([\d,\.]+)\s+followers', desc, re.IGNORECASE)
if followers_match:
result['followers_count'] = self._parse_count(followers_match.group(1))
result['profile_description'] = desc[:500]
result['has_bio'] = True
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
result['has_profile_photo'] = bool(og_img)
name_match = re.search(r'<meta\s+(?:property|name)="og:title"\s+content="([^"]+)"', html)
if name_match:
result['page_name'] = name_match.group(1)
except Exception as e:
logger.debug(f"LinkedIn enrichment failed: {e}")
return result
def _enrich_tiktok(self, url: str) -> Dict[str, Any]:
"""Enrich TikTok profile data."""
result = {}
try:
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
if resp.status_code == 200:
html = resp.text
# TikTok embeds profile data in JSON
followers_match = re.search(r'"followerCount":\s*(\d+)', html)
if followers_match:
result['followers_count'] = int(followers_match.group(1))
videos_match = re.search(r'"videoCount":\s*(\d+)', html)
if videos_match:
result['posts_count_365d'] = int(videos_match.group(1))
desc_match = re.search(r'"signature":\s*"([^"]*)"', html)
if desc_match and desc_match.group(1).strip():
result['profile_description'] = desc_match.group(1)[:500]
result['has_bio'] = True
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
result['has_profile_photo'] = bool(og_img)
name_match = re.search(r'"nickname":\s*"([^"]+)"', html)
if name_match:
result['page_name'] = name_match.group(1)
except Exception as e:
logger.debug(f"TikTok enrichment failed: {e}")
return result
def _enrich_twitter(self, url: str) -> Dict[str, Any]:
"""Enrich Twitter/X profile data using og tags from public page."""
result = {}
try:
resp = self.session.get(url, timeout=REQUEST_TIMEOUT)
if resp.status_code == 200:
html = resp.text
og_desc = re.search(r'<meta\s+(?:property|name)="og:description"\s+content="([^"]+)"', html)
if og_desc:
result['profile_description'] = og_desc.group(1)[:500]
result['has_bio'] = True
og_img = re.search(r'<meta\s+(?:property|name)="og:image"\s+content="([^"]+)"', html)
result['has_profile_photo'] = bool(og_img)
name_match = re.search(r'<meta\s+(?:property|name)="og:title"\s+content="([^"]+)"', html)
if name_match:
result['page_name'] = name_match.group(1)
except Exception as e:
logger.debug(f"Twitter enrichment failed: {e}")
return result
@staticmethod
def _parse_count(text: str) -> Optional[int]:
"""Parse follower/subscriber count strings like '1.2K', '3,456', '2.1M'."""
if not text:
return None
text = text.strip().replace(',', '').replace(' ', '')
try:
multipliers = {'k': 1000, 'm': 1000000, 'b': 1000000000}
last_char = text[-1].lower()
if last_char in multipliers:
return int(float(text[:-1]) * multipliers[last_char])
return int(float(text))
except (ValueError, IndexError):
return None
def calculate_profile_completeness(profile_data: Dict[str, Any]) -> int:
"""Calculate profile completeness score 0-100 for a social media profile."""
score = 0
if profile_data.get('url'): score += 20 # Profile exists
if profile_data.get('has_bio'): score += 15 # Bio filled
if profile_data.get('has_profile_photo'): score += 15 # Avatar
if profile_data.get('has_cover_photo'): score += 10 # Cover photo
if (profile_data.get('followers_count') or 0) > 10: score += 10 # Has followers
if (profile_data.get('posts_count_30d') or 0) > 0: score += 15 # Active in last 30d
if (profile_data.get('engagement_rate') or 0) > 1: score += 15 # Good engagement
return min(score, 100)
class SocialMediaAuditor:
"""Main auditor class that coordinates website and social media auditing."""
@ -902,6 +1121,7 @@ class SocialMediaAuditor:
self.Session = sessionmaker(bind=self.engine)
self.website_auditor = WebsiteAuditor()
self.brave_searcher = BraveSearcher()
self.profile_enricher = SocialProfileEnricher()
# Initialize Google Places searcher if API key is available
google_places_api_key = os.getenv('GOOGLE_PLACES_API_KEY')
@ -1018,6 +1238,20 @@ class SocialMediaAuditor:
result['social_media'] = website_social
logger.info(f"Total social media profiles found: {len(website_social)} - {list(website_social.keys())}")
# 5. Enrich social media profiles with additional data
enriched_profiles = {}
for platform, url in website_social.items():
logger.info(f"Enriching {platform} profile: {url}")
enrichment = self.profile_enricher.enrich_profile(platform, url)
enriched_profiles[platform] = {
'url': url,
**enrichment,
}
# Calculate completeness score
enriched_profiles[platform]['profile_completeness_score'] = calculate_profile_completeness(enriched_profiles[platform])
result['enriched_profiles'] = enriched_profiles
# 4. Google reviews search - prefer Google Places API if available
try:
if self.google_places_searcher:
@ -1131,21 +1365,42 @@ class SocialMediaAuditor:
'audit_version': '1.0',
})
# Save social media
# Save social media with enriched data
for platform, url in result.get('social_media', {}).items():
# Normalize URL to prevent www vs non-www duplicates
normalized_url = normalize_social_url(url, platform)
# Get enrichment data if available
enriched = result.get('enriched_profiles', {}).get(platform, {})
upsert_social = text("""
INSERT INTO company_social_media (
company_id, platform, url, verified_at, source, is_valid
company_id, platform, url, verified_at, source, is_valid,
page_name, followers_count,
has_profile_photo, has_cover_photo, has_bio, profile_description,
posts_count_30d, posts_count_365d, last_post_date,
profile_completeness_score, updated_at
) VALUES (
:company_id, :platform, :url, :verified_at, :source, :is_valid
:company_id, :platform, :url, :verified_at, :source, :is_valid,
:page_name, :followers_count,
:has_profile_photo, :has_cover_photo, :has_bio, :profile_description,
:posts_count_30d, :posts_count_365d, :last_post_date,
:profile_completeness_score, NOW()
)
ON CONFLICT (company_id, platform, url) DO UPDATE SET
verified_at = EXCLUDED.verified_at,
source = EXCLUDED.source,
is_valid = EXCLUDED.is_valid
is_valid = EXCLUDED.is_valid,
page_name = COALESCE(EXCLUDED.page_name, company_social_media.page_name),
followers_count = COALESCE(EXCLUDED.followers_count, company_social_media.followers_count),
has_profile_photo = COALESCE(EXCLUDED.has_profile_photo, company_social_media.has_profile_photo),
has_cover_photo = COALESCE(EXCLUDED.has_cover_photo, company_social_media.has_cover_photo),
has_bio = COALESCE(EXCLUDED.has_bio, company_social_media.has_bio),
profile_description = COALESCE(EXCLUDED.profile_description, company_social_media.profile_description),
posts_count_30d = COALESCE(EXCLUDED.posts_count_30d, company_social_media.posts_count_30d),
posts_count_365d = COALESCE(EXCLUDED.posts_count_365d, company_social_media.posts_count_365d),
last_post_date = COALESCE(EXCLUDED.last_post_date, company_social_media.last_post_date),
profile_completeness_score = COALESCE(EXCLUDED.profile_completeness_score, company_social_media.profile_completeness_score),
updated_at = NOW()
""")
session.execute(upsert_social, {
@ -1155,6 +1410,16 @@ class SocialMediaAuditor:
'verified_at': result['audit_date'],
'source': 'website_scrape',
'is_valid': True,
'page_name': enriched.get('page_name'),
'followers_count': enriched.get('followers_count'),
'has_profile_photo': enriched.get('has_profile_photo'),
'has_cover_photo': enriched.get('has_cover_photo'),
'has_bio': enriched.get('has_bio'),
'profile_description': enriched.get('profile_description'),
'posts_count_30d': enriched.get('posts_count_30d'),
'posts_count_365d': enriched.get('posts_count_365d'),
'last_post_date': enriched.get('last_post_date'),
'profile_completeness_score': enriched.get('profile_completeness_score'),
})
session.commit()