Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
- google_places_service.py: Google Places API integration - competitor_monitoring_service.py: Competitor tracking service - scripts/competitor_monitor_cron.py, scripts/generate_audit_report.py - blueprints/admin/routes_competitors.py, templates/admin/competitor_dashboard.html Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
384 lines
15 KiB
Python
384 lines
15 KiB
Python
"""
|
|
Competitor Monitoring Service for NordaBiz
|
|
==========================================
|
|
|
|
Discovers and monitors competitors via Google Places API.
|
|
Tracks changes in ratings, reviews, and profile activity.
|
|
|
|
Author: NordaBiz Development Team
|
|
Created: 2026-02-06
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
from datetime import datetime, date, timedelta
|
|
from typing import Optional, Dict, List, Any
|
|
from decimal import Decimal
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from database import (
|
|
Company, CompanyCompetitor, CompetitorSnapshot,
|
|
CompanyWebsiteAnalysis, SessionLocal
|
|
)
|
|
|
|
try:
|
|
from google_places_service import GooglePlacesService
|
|
except ImportError:
|
|
GooglePlacesService = None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CompetitorMonitoringService:
|
|
"""Discovers and monitors competitors for companies."""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self._places_service = None
|
|
|
|
@property
|
|
def places_service(self) -> Optional['GooglePlacesService']:
|
|
"""Lazy-init Google Places service."""
|
|
if self._places_service is None and GooglePlacesService:
|
|
try:
|
|
self._places_service = GooglePlacesService()
|
|
except ValueError:
|
|
logger.warning("Google Places API key not configured")
|
|
return self._places_service
|
|
|
|
def discover_competitors(self, company_id: int, max_results: int = 10) -> List[Dict[str, Any]]:
|
|
"""
|
|
Find competitors via Google Places Nearby Search.
|
|
|
|
Uses the company's Google Place data to find similar businesses nearby.
|
|
"""
|
|
if not self.places_service:
|
|
logger.error("Google Places API not available for competitor discovery")
|
|
return []
|
|
|
|
company = self.db.query(Company).filter(Company.id == company_id).first()
|
|
if not company:
|
|
raise ValueError(f"Company {company_id} not found")
|
|
|
|
# Get company's Google location
|
|
analysis = self.db.query(CompanyWebsiteAnalysis).filter(
|
|
CompanyWebsiteAnalysis.company_id == company_id
|
|
).first()
|
|
|
|
place_id = analysis.google_place_id if analysis else None
|
|
if not place_id:
|
|
logger.warning(f"Company {company_id} has no Google Place ID, searching by name")
|
|
# Search for the company first
|
|
place = self.places_service.search_place(
|
|
f"{company.name} {company.address_city or 'Wejherowo'}"
|
|
)
|
|
if not place:
|
|
return []
|
|
place_id = place.get('id', '').replace('places/', '')
|
|
|
|
# Get place details to get location
|
|
place_data = self.places_service.get_place_details(place_id)
|
|
if not place_data:
|
|
return []
|
|
|
|
location = place_data.get('location', {})
|
|
latitude = location.get('latitude')
|
|
longitude = location.get('longitude')
|
|
primary_type = place_data.get('primaryType')
|
|
|
|
if not latitude or not longitude:
|
|
logger.error(f"No location data for company {company_id}")
|
|
return []
|
|
|
|
# Search nearby for similar businesses
|
|
included_types = [primary_type] if primary_type else None
|
|
nearby = self.places_service.search_nearby(
|
|
latitude=latitude,
|
|
longitude=longitude,
|
|
radius=5000.0,
|
|
included_types=included_types,
|
|
max_results=max_results + 1 # +1 because our own business may appear
|
|
)
|
|
|
|
# Filter out own business and save competitors
|
|
competitors = []
|
|
for place in nearby:
|
|
competitor_place_id = place.get('id', '').replace('places/', '')
|
|
if competitor_place_id == place_id:
|
|
continue # Skip own business
|
|
|
|
display_name = place.get('displayName', {})
|
|
name = display_name.get('text', '') if isinstance(display_name, dict) else str(display_name)
|
|
|
|
competitor_data = {
|
|
'competitor_place_id': competitor_place_id,
|
|
'competitor_name': name,
|
|
'competitor_address': place.get('formattedAddress', ''),
|
|
'competitor_rating': place.get('rating'),
|
|
'competitor_review_count': place.get('userRatingCount'),
|
|
'competitor_category': place.get('primaryType', ''),
|
|
'competitor_website': place.get('websiteUri', ''),
|
|
}
|
|
competitors.append(competitor_data)
|
|
|
|
if len(competitors) >= max_results:
|
|
break
|
|
|
|
return competitors
|
|
|
|
def save_competitors(self, company_id: int, competitors: List[Dict]) -> int:
|
|
"""Save discovered competitors to database. Returns count saved."""
|
|
saved = 0
|
|
for comp_data in competitors:
|
|
existing = self.db.query(CompanyCompetitor).filter(
|
|
CompanyCompetitor.company_id == company_id,
|
|
CompanyCompetitor.competitor_place_id == comp_data['competitor_place_id']
|
|
).first()
|
|
|
|
if existing:
|
|
# Update existing
|
|
existing.competitor_name = comp_data.get('competitor_name') or existing.competitor_name
|
|
existing.competitor_rating = comp_data.get('competitor_rating') or existing.competitor_rating
|
|
existing.competitor_review_count = comp_data.get('competitor_review_count') or existing.competitor_review_count
|
|
existing.updated_at = datetime.now()
|
|
else:
|
|
# Create new
|
|
competitor = CompanyCompetitor(
|
|
company_id=company_id,
|
|
competitor_place_id=comp_data['competitor_place_id'],
|
|
competitor_name=comp_data.get('competitor_name'),
|
|
competitor_address=comp_data.get('competitor_address'),
|
|
competitor_rating=comp_data.get('competitor_rating'),
|
|
competitor_review_count=comp_data.get('competitor_review_count'),
|
|
competitor_category=comp_data.get('competitor_category'),
|
|
competitor_website=comp_data.get('competitor_website'),
|
|
added_by='auto',
|
|
)
|
|
self.db.add(competitor)
|
|
saved += 1
|
|
|
|
self.db.commit()
|
|
return saved
|
|
|
|
def take_snapshot(self, competitor_id: int) -> Optional[CompetitorSnapshot]:
|
|
"""
|
|
Take a snapshot of a competitor's current state.
|
|
Compares with previous snapshot to detect changes.
|
|
"""
|
|
competitor = self.db.query(CompanyCompetitor).filter(
|
|
CompanyCompetitor.id == competitor_id
|
|
).first()
|
|
|
|
if not competitor or not competitor.is_active:
|
|
return None
|
|
|
|
if not self.places_service:
|
|
logger.error("Google Places API not available for snapshots")
|
|
return None
|
|
|
|
# Fetch current data from Google
|
|
place_data = self.places_service.get_place_details(competitor.competitor_place_id)
|
|
if not place_data:
|
|
logger.warning(f"Could not fetch data for competitor {competitor.competitor_name}")
|
|
return None
|
|
|
|
today = date.today()
|
|
|
|
# Check if snapshot already exists for today
|
|
existing = self.db.query(CompetitorSnapshot).filter(
|
|
CompetitorSnapshot.competitor_id == competitor_id,
|
|
CompetitorSnapshot.snapshot_date == today
|
|
).first()
|
|
|
|
if existing:
|
|
logger.info(f"Snapshot already exists for competitor {competitor_id} on {today}")
|
|
return existing
|
|
|
|
# Build snapshot data
|
|
photos = place_data.get('photos', [])
|
|
display_name = place_data.get('displayName', {})
|
|
|
|
snapshot_data = {
|
|
'name': display_name.get('text', '') if isinstance(display_name, dict) else str(display_name),
|
|
'address': place_data.get('formattedAddress', ''),
|
|
'rating': place_data.get('rating'),
|
|
'review_count': place_data.get('userRatingCount', 0),
|
|
'photo_count': len(photos),
|
|
'business_status': place_data.get('businessStatus', ''),
|
|
'types': place_data.get('types', []),
|
|
'website': place_data.get('websiteUri', ''),
|
|
}
|
|
|
|
# Get previous snapshot for change detection
|
|
previous = self.db.query(CompetitorSnapshot).filter(
|
|
CompetitorSnapshot.competitor_id == competitor_id
|
|
).order_by(CompetitorSnapshot.snapshot_date.desc()).first()
|
|
|
|
changes = {}
|
|
if previous:
|
|
prev_data = previous.data or {}
|
|
if snapshot_data.get('rating') != prev_data.get('rating'):
|
|
changes['rating'] = {
|
|
'old': prev_data.get('rating'),
|
|
'new': snapshot_data.get('rating')
|
|
}
|
|
if snapshot_data.get('review_count', 0) != prev_data.get('review_count', 0):
|
|
changes['review_count'] = {
|
|
'old': prev_data.get('review_count', 0),
|
|
'new': snapshot_data.get('review_count', 0),
|
|
'delta': (snapshot_data.get('review_count', 0) or 0) - (prev_data.get('review_count', 0) or 0)
|
|
}
|
|
if snapshot_data.get('photo_count', 0) != prev_data.get('photo_count', 0):
|
|
changes['photo_count'] = {
|
|
'old': prev_data.get('photo_count', 0),
|
|
'new': snapshot_data.get('photo_count', 0)
|
|
}
|
|
|
|
# Create snapshot
|
|
snapshot = CompetitorSnapshot(
|
|
competitor_id=competitor_id,
|
|
snapshot_date=today,
|
|
rating=snapshot_data.get('rating'),
|
|
review_count=snapshot_data.get('review_count'),
|
|
photo_count=snapshot_data.get('photo_count'),
|
|
has_website=bool(snapshot_data.get('website')),
|
|
has_description=bool(place_data.get('editorialSummary')),
|
|
data=snapshot_data,
|
|
changes=changes if changes else None,
|
|
)
|
|
|
|
self.db.add(snapshot)
|
|
|
|
# Update competitor record with latest data
|
|
competitor.competitor_rating = snapshot_data.get('rating')
|
|
competitor.competitor_review_count = snapshot_data.get('review_count')
|
|
competitor.updated_at = datetime.now()
|
|
|
|
self.db.commit()
|
|
self.db.refresh(snapshot)
|
|
|
|
if changes:
|
|
logger.info(f"Changes detected for {competitor.competitor_name}: {list(changes.keys())}")
|
|
|
|
return snapshot
|
|
|
|
def take_all_snapshots(self, company_id: int) -> Dict[str, Any]:
|
|
"""Take snapshots for all active competitors of a company."""
|
|
competitors = self.db.query(CompanyCompetitor).filter(
|
|
CompanyCompetitor.company_id == company_id,
|
|
CompanyCompetitor.is_active == True
|
|
).all()
|
|
|
|
results = {
|
|
'total': len(competitors),
|
|
'success': 0,
|
|
'failed': 0,
|
|
'changes_detected': 0,
|
|
}
|
|
|
|
for competitor in competitors:
|
|
try:
|
|
snapshot = self.take_snapshot(competitor.id)
|
|
if snapshot:
|
|
results['success'] += 1
|
|
if snapshot.changes:
|
|
results['changes_detected'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
except Exception as e:
|
|
logger.error(f"Snapshot failed for competitor {competitor.id}: {e}")
|
|
results['failed'] += 1
|
|
|
|
return results
|
|
|
|
def get_changes_report(self, company_id: int, days: int = 30) -> List[Dict[str, Any]]:
|
|
"""Get competitor changes report for the last N days."""
|
|
since = date.today() - timedelta(days=days)
|
|
|
|
competitors = self.db.query(CompanyCompetitor).filter(
|
|
CompanyCompetitor.company_id == company_id,
|
|
CompanyCompetitor.is_active == True
|
|
).all()
|
|
|
|
report = []
|
|
for competitor in competitors:
|
|
snapshots = self.db.query(CompetitorSnapshot).filter(
|
|
CompetitorSnapshot.competitor_id == competitor.id,
|
|
CompetitorSnapshot.snapshot_date >= since
|
|
).order_by(CompetitorSnapshot.snapshot_date.asc()).all()
|
|
|
|
changes = []
|
|
for snap in snapshots:
|
|
if snap.changes:
|
|
changes.append({
|
|
'date': snap.snapshot_date.isoformat(),
|
|
'changes': snap.changes,
|
|
})
|
|
|
|
report.append({
|
|
'competitor_id': competitor.id,
|
|
'competitor_name': competitor.competitor_name,
|
|
'current_rating': float(competitor.competitor_rating) if competitor.competitor_rating else None,
|
|
'current_review_count': competitor.competitor_review_count,
|
|
'snapshots_count': len(snapshots),
|
|
'changes': changes,
|
|
})
|
|
|
|
return report
|
|
|
|
def get_comparison(self, company_id: int) -> Dict[str, Any]:
|
|
"""Compare company with its competitors."""
|
|
company = self.db.query(Company).filter(Company.id == company_id).first()
|
|
if not company:
|
|
return {}
|
|
|
|
analysis = self.db.query(CompanyWebsiteAnalysis).filter(
|
|
CompanyWebsiteAnalysis.company_id == company_id
|
|
).first()
|
|
|
|
company_data = {
|
|
'name': company.name,
|
|
'rating': float(analysis.google_rating) if analysis and analysis.google_rating else None,
|
|
'review_count': analysis.google_reviews_count if analysis else 0,
|
|
'photo_count': analysis.google_photos_count if analysis else 0,
|
|
}
|
|
|
|
competitors = self.db.query(CompanyCompetitor).filter(
|
|
CompanyCompetitor.company_id == company_id,
|
|
CompanyCompetitor.is_active == True
|
|
).all()
|
|
|
|
competitor_data = []
|
|
for comp in competitors:
|
|
competitor_data.append({
|
|
'name': comp.competitor_name,
|
|
'rating': float(comp.competitor_rating) if comp.competitor_rating else None,
|
|
'review_count': comp.competitor_review_count or 0,
|
|
})
|
|
|
|
# Calculate averages
|
|
ratings = [c['rating'] for c in competitor_data if c['rating']]
|
|
review_counts = [c['review_count'] for c in competitor_data]
|
|
|
|
return {
|
|
'company': company_data,
|
|
'competitors': competitor_data,
|
|
'avg_competitor_rating': round(sum(ratings) / len(ratings), 1) if ratings else None,
|
|
'avg_competitor_reviews': round(sum(review_counts) / len(review_counts)) if review_counts else 0,
|
|
'company_rank_by_rating': self._calculate_rank(company_data.get('rating'), ratings),
|
|
'company_rank_by_reviews': self._calculate_rank(company_data.get('review_count'), review_counts),
|
|
}
|
|
|
|
@staticmethod
|
|
def _calculate_rank(value, others: List) -> Optional[int]:
|
|
"""Calculate rank (1 = best) among competitors."""
|
|
if value is None or not others:
|
|
return None
|
|
all_values = sorted([value] + [v for v in others if v is not None], reverse=True)
|
|
try:
|
|
return all_values.index(value) + 1
|
|
except ValueError:
|
|
return None
|