#!/usr/bin/env python3 """ Competitor Monitor Cron Job =========================== Periodically takes snapshots of competitors for all companies with Google Place IDs. Designed to run weekly via cron. Usage: python competitor_monitor_cron.py python competitor_monitor_cron.py --company-id 26 python competitor_monitor_cron.py --discover # Auto-discover competitors first Cron entry (weekly, Sunday 3 AM): 0 3 * * 0 cd /var/www/nordabiznes && /var/www/nordabiznes/venv/bin/python3 scripts/competitor_monitor_cron.py >> /var/log/nordabiznes/competitor_monitor.log 2>&1 Author: Maciej Pienczyn, InPi sp. z o.o. Created: 2026-02-06 """ import os import sys import argparse import logging from datetime import datetime from pathlib import Path # Load .env file from project root try: from dotenv import load_dotenv script_dir = Path(__file__).resolve().parent project_root = script_dir.parent env_path = project_root / '.env' if env_path.exists(): load_dotenv(env_path) except ImportError: pass # Add parent directory to path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from database import Company, CompanyCompetitor, CompanyWebsiteAnalysis from competitor_monitoring_service import CompetitorMonitoringService # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) DATABASE_URL = os.getenv( 'DATABASE_URL', 'postgresql://nordabiz_app:CHANGE_ME@127.0.0.1:5432/nordabiz' ) def run_snapshots(session, company_ids=None): """Take snapshots for competitors.""" service = CompetitorMonitoringService(session) if company_ids: companies = session.query(Company).filter(Company.id.in_(company_ids)).all() else: # Get companies that have competitors tracked companies_with_competitors = session.query(CompanyCompetitor.company_id).distinct().all() company_ids_with_competitors = [c[0] for c in companies_with_competitors] companies = session.query(Company).filter(Company.id.in_(company_ids_with_competitors)).all() total_results = { 'companies_processed': 0, 'total_snapshots': 0, 'total_changes': 0, 'errors': 0, } for company in companies: logger.info(f"Processing competitors for: {company.name} (ID: {company.id})") try: results = service.take_all_snapshots(company.id) total_results['companies_processed'] += 1 total_results['total_snapshots'] += results['success'] total_results['total_changes'] += results['changes_detected'] total_results['errors'] += results['failed'] logger.info( f" Snapshots: {results['success']}/{results['total']}, " f"Changes: {results['changes_detected']}" ) except Exception as e: logger.error(f" Error: {e}") total_results['errors'] += 1 return total_results def run_discovery(session, company_ids=None, max_per_company=5): """Discover competitors for companies.""" service = CompetitorMonitoringService(session) if company_ids: companies = session.query(Company).filter(Company.id.in_(company_ids)).all() else: # Companies with Google Place IDs but no competitors companies_with_place = session.query(Company).join( CompanyWebsiteAnalysis, Company.id == CompanyWebsiteAnalysis.company_id ).filter( CompanyWebsiteAnalysis.google_place_id.isnot(None) ).all() # Filter out those that already have competitors companies = [] for company in companies_with_place: existing = session.query(CompanyCompetitor).filter( CompanyCompetitor.company_id == company.id ).count() if existing == 0: companies.append(company) total_discovered = 0 for company in companies: logger.info(f"Discovering competitors for: {company.name} (ID: {company.id})") try: competitors = service.discover_competitors(company.id, max_results=max_per_company) saved = service.save_competitors(company.id, competitors) total_discovered += saved logger.info(f" Found {len(competitors)} competitors, saved {saved} new") except Exception as e: logger.error(f" Discovery error: {e}") return total_discovered def main(): parser = argparse.ArgumentParser(description='Competitor Monitor Cron Job') parser.add_argument('--company-id', type=int, help='Process single company') parser.add_argument('--discover', action='store_true', help='Discover competitors first') parser.add_argument('--max-competitors', type=int, default=5, help='Max competitors per company') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) engine = create_engine(DATABASE_URL) Session = sessionmaker(bind=engine) session = Session() company_ids = [args.company_id] if args.company_id else None try: logger.info("=" * 60) logger.info("COMPETITOR MONITOR CRON JOB") logger.info(f"Started at: {datetime.now()}") logger.info("=" * 60) if args.discover: logger.info("\n--- COMPETITOR DISCOVERY ---") discovered = run_discovery(session, company_ids, args.max_competitors) logger.info(f"Total new competitors discovered: {discovered}") logger.info("\n--- COMPETITOR SNAPSHOTS ---") results = run_snapshots(session, company_ids) logger.info("\n" + "=" * 60) logger.info("SUMMARY") logger.info("=" * 60) logger.info(f"Companies processed: {results['companies_processed']}") logger.info(f"Snapshots taken: {results['total_snapshots']}") logger.info(f"Changes detected: {results['total_changes']}") logger.info(f"Errors: {results['errors']}") logger.info("=" * 60) finally: session.close() if __name__ == '__main__': main()