""" Analytics Helpers ================= Functions for tracking page views, API usage, and user analytics. """ import logging import uuid from datetime import date from flask import request, session from flask_login import current_user from sqlalchemy import func, extract from user_agents import parse as parse_user_agent from database import ( SessionLocal, UserSession, PageView, AIAPICostLog, Company ) logger = logging.getLogger(__name__) # Global variable to store current page_view_id for templates _current_page_view_id = {} def get_or_create_analytics_session(): """ Get existing analytics session or create new one. Returns: The database session ID (integer) or None on error. """ analytics_session_id = session.get('analytics_session_id') if not analytics_session_id: analytics_session_id = str(uuid.uuid4()) session['analytics_session_id'] = analytics_session_id # DEBUG: log every mobile request cookies ua_str = request.headers.get('User-Agent', '').lower() if 'mobile' in ua_str or 'android' in ua_str: try: with open('/tmp/pwa_debug.log', 'a') as _f: import json as _json _f.write(_json.dumps({ 'path': request.path, 'pwa_mode': request.cookies.get('pwa_mode'), 'pwa_display': request.cookies.get('pwa_display'), 'cookie_names': list(request.cookies.keys()), 'session_key': analytics_session_id[:8], }) + '\n') except Exception as _e: logger.error(f"PWA debug write failed: {_e}") db = SessionLocal() try: user_session = db.query(UserSession).filter_by( session_id=analytics_session_id ).first() if not user_session: # Parse user agent ua_string = request.headers.get('User-Agent', '') try: ua = parse_user_agent(ua_string) device_type = 'mobile' if ua.is_mobile else ( 'tablet' if ua.is_tablet else 'desktop' ) browser = ua.browser.family browser_version = ua.browser.version_string os_name = ua.os.family os_version = ua.os.version_string ua_lower = ua_string.lower() is_bot = ua.is_bot or any(p in ua_lower for p in ['curl/', 'python-requests', 'axios/', 'wget/', 'scrapy', 'werkzeug', 'leakix', 'nuclei', 'masscan', 'zgrab', 'httpx', 'googleassociationservice', 'censysinspect', 'paloaltonetworks', 'cortex', 'netcraft', 'fasthttp', 'cms-checker', 'wp-safe-scanner', 'notebooklm', 'ruby/', 'skypeuri', 'com.apple.webkit', 'networkingextension']) # Flag empty or bare Mozilla/5.0 user agents as bots if not ua_string.strip() or ua_string.strip() == 'Mozilla/5.0': is_bot = True except Exception: device_type = 'desktop' browser = 'Unknown' browser_version = '' os_name = 'Unknown' os_version = '' is_bot = False user_session = UserSession( session_id=analytics_session_id, user_id=current_user.id if current_user.is_authenticated else None, ip_address=request.remote_addr, user_agent=ua_string[:2000] if ua_string else None, device_type=device_type, browser=browser[:50] if browser else None, browser_version=browser_version[:20] if browser_version else None, os=os_name[:50] if os_name else None, os_version=os_version[:20] if os_version else None, is_bot=is_bot ) # PWA detection from cookie is_pwa = request.cookies.get('pwa_mode') == '1' user_session.is_pwa = is_pwa db.add(user_session) db.commit() db.refresh(user_session) else: # Update last activity from datetime import datetime user_session.last_activity_at = datetime.now() if current_user.is_authenticated and not user_session.user_id: user_session.user_id = current_user.id # PWA detection from cookie (set by JS in standalone mode) pwa_cookie = request.cookies.get('pwa_mode') == '1' pwa_display = request.cookies.get('pwa_display', '') # DEBUG: log all cookies for mobile sessions to /tmp/pwa_debug.log if user_session.device_type == 'mobile': try: with open('/tmp/pwa_debug.log', 'a') as _f: import json as _json _f.write(_json.dumps({ 'session_id': user_session.id, 'path': request.path, 'pwa_mode': request.cookies.get('pwa_mode'), 'pwa_display': request.cookies.get('pwa_display'), 'all_cookies': list(request.cookies.keys()), }) + '\n') except Exception: pass if not user_session.is_pwa and pwa_cookie: user_session.is_pwa = True db.commit() return user_session.id except Exception as e: logger.error(f"Analytics session error: {e}") db.rollback() return None finally: db.close() def track_page_view_for_request(): """ Track page view for current request. Called from before_request middleware. Returns: page_view_id or None """ try: session_db_id = get_or_create_analytics_session() if not session_db_id: return None db = SessionLocal() try: page_view = PageView( session_id=session_db_id, user_id=current_user.id if current_user.is_authenticated else None, url=request.url[:2000] if request.url else '', path=request.path[:500] if request.path else '/', referrer=request.referrer[:2000] if request.referrer else None ) # Extract company_id from path if on company page if request.path.startswith('/company/'): try: slug = request.path.split('/')[2].split('?')[0] company = db.query(Company).filter_by(slug=slug).first() if company: page_view.company_id = company.id except Exception: pass db.add(page_view) # Update session page count user_session = db.query(UserSession).filter_by(id=session_db_id).first() if user_session: user_session.page_views_count = (user_session.page_views_count or 0) + 1 db.commit() return page_view.id except Exception as e: logger.error(f"Page view tracking error: {e}") db.rollback() return None finally: db.close() except Exception as e: logger.error(f"Page view tracking outer error: {e}") return None def get_current_page_view_id(): """Get page_view_id for current request.""" return _current_page_view_id.get(id(request), '') def set_current_page_view_id(page_view_id): """Set page_view_id for current request.""" _current_page_view_id[id(request)] = page_view_id def cleanup_page_view_id(): """Clean up page_view_id from global dict after request.""" _current_page_view_id.pop(id(request), None) def get_free_tier_usage(): """ Get today's Gemini API usage for free tier tracking. Returns: Dict with requests_today and tokens_today """ db = SessionLocal() try: today = date.today() result = db.query( func.count(AIAPICostLog.id).label('requests'), func.coalesce(func.sum(AIAPICostLog.total_tokens), 0).label('tokens') ).filter( func.date(AIAPICostLog.timestamp) == today, AIAPICostLog.api_provider == 'gemini' ).first() return { 'requests_today': result.requests or 0, 'tokens_today': int(result.tokens or 0) } except Exception as e: logger.warning(f"Failed to get free tier usage: {e}") return {'requests_today': 0, 'tokens_today': 0} finally: db.close() def get_brave_api_usage(): """ Get Brave Search API usage for current month. Brave free tier: 2000 requests/month Returns: Dict with usage stats and limits """ db = SessionLocal() try: today = date.today() current_month = today.month current_year = today.year # Monthly usage monthly_result = db.query( func.count(AIAPICostLog.id).label('requests') ).filter( extract('month', AIAPICostLog.timestamp) == current_month, extract('year', AIAPICostLog.timestamp) == current_year, AIAPICostLog.api_provider == 'brave' ).first() # Today's usage daily_result = db.query( func.count(AIAPICostLog.id).label('requests') ).filter( func.date(AIAPICostLog.timestamp) == today, AIAPICostLog.api_provider == 'brave' ).first() monthly_used = monthly_result.requests or 0 daily_used = daily_result.requests or 0 monthly_limit = 2000 # Brave free tier return { 'requests_today': daily_used, 'requests_this_month': monthly_used, 'monthly_limit': monthly_limit, 'remaining': max(0, monthly_limit - monthly_used), 'usage_percent': round((monthly_used / monthly_limit) * 100, 1) if monthly_limit > 0 else 0, 'tier': 'free', 'is_limit_reached': monthly_used >= monthly_limit } except Exception as e: logger.warning(f"Failed to get Brave API usage: {e}") return { 'requests_today': 0, 'requests_this_month': 0, 'monthly_limit': 2000, 'remaining': 2000, 'usage_percent': 0, 'tier': 'free', 'is_limit_reached': False } finally: db.close() def log_brave_api_call(user_id=None, feature='news_search', company_name=None): """ Log a Brave API call for usage tracking. Args: user_id: User who triggered the call (optional) feature: Feature name (news_search, etc.) company_name: Company being searched (for reference) """ db = SessionLocal() try: log_entry = AIAPICostLog( api_provider='brave', model_name='search_api', feature=feature, user_id=user_id, input_tokens=0, output_tokens=0, total_tokens=0 ) db.add(log_entry) db.commit() logger.debug(f"Logged Brave API call: {feature} for {company_name}") except Exception as e: logger.error(f"Failed to log Brave API call: {e}") db.rollback() finally: db.close()