#!/usr/bin/env python3 """ Norda Biznes Partner - Flask Application ==================================== Main Flask application for Norda Biznes company directory with AI chat. Features: - User authentication with email confirmation - Company directory with advanced search - AI chat assistant powered by Google Gemini - PostgreSQL database integration - Analytics dashboard for chat insights Author: Maciej Pienczyn, InPi sp. z o.o. Created: 2025-11-23 """ import os import logging import secrets import re import json import time from pathlib import Path from datetime import datetime, timedelta, date from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, session, Response, send_file, abort from flask_login import login_user, logout_user, login_required, current_user # Note: CSRFProtect, Limiter, LoginManager are imported from extensions.py (line ~250) from werkzeug.security import generate_password_hash, check_password_hash from dotenv import load_dotenv from user_agents import parse as parse_user_agent import uuid import traceback as tb_module # Load environment variables (override any existing env vars) # Try .env first, then nordabiz_config.txt for production flexibility import os if os.path.exists('.env'): load_dotenv('.env', override=True) elif os.path.exists('nordabiz_config.txt'): load_dotenv('nordabiz_config.txt', override=True) else: load_dotenv(override=True) # ============================================================ # GLOBAL CONSTANTS - MARKETING # ============================================================ # Liczba podmiotów gospodarczych (cel marketingowy Izby NORDA) # Używana we wszystkich miejscach wyświetlających liczbę firm COMPANY_COUNT_MARKETING = 150 # ============================================================ # STAGING TEST FEATURES # ============================================================ # Features currently being tested on staging environment. # Only rendered when STAGING=true in .env. Edit this dict to update. STAGING_TEST_FEATURES = { } logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Security logger for fail2ban integration # Logs to /var/log/nordabiznes/security.log in production security_logger = logging.getLogger('security') security_logger.setLevel(logging.WARNING) _security_log_path = '/var/log/nordabiznes/security.log' if os.path.exists('/var/log/nordabiznes'): _security_handler = logging.FileHandler(_security_log_path) _security_handler.setFormatter(logging.Formatter( '%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' )) security_logger.addHandler(_security_handler) # Import database models from database import ( init_db, SessionLocal, User, Company, Category, Service, Competency, CompanyDigitalMaturity, CompanyWebsiteAnalysis, CompanyQualityTracking, CompanyWebsiteContent, CompanyAIInsights, CompanyEvent, CompanySocialMedia, CompanyContact, AIChatConversation, AIChatMessage, AIChatFeedback, AIAPICostLog, ForumTopic, ForumReply, ForumAttachment, NordaEvent, EventAttendee, PrivateMessage, Classified, UserNotification, CompanyRecommendation, MembershipFee, MembershipFeeConfig, Person, CompanyPerson, GBPAudit, ITAudit, KRSAudit, CompanyPKD, CompanyFinancialReport, UserSession, UserBlock, PageView, UserClick, AnalyticsDaily, PopularPagesDaily, SearchQuery, ConversionEvent, JSError, PopularSearchesDaily, HourlyActivity, AuditLog, SecurityAlert, ZOPKNews, SystemRole ) from utils.decorators import role_required # Import services import gemini_service from nordabiz_chat import NordaBizChatEngine from search_service import search_companies import krs_api_service from file_upload_service import FileUploadService # Security service for audit log, alerting, GeoIP, 2FA try: from security_service import ( log_audit, create_security_alert, get_client_ip, is_ip_allowed, geoip_check, init_security_service, generate_totp_secret, get_totp_uri, verify_totp, generate_backup_codes, verify_backup_code, requires_2fa ) SECURITY_SERVICE_AVAILABLE = True except ImportError as e: SECURITY_SERVICE_AVAILABLE = False logger.warning(f"Security service not available: {e}") # SEO audit components for triggering audits via API import sys _scripts_path = os.path.join(os.path.dirname(__file__), 'scripts') if _scripts_path not in sys.path: sys.path.insert(0, _scripts_path) try: from seo_audit import SEOAuditor, SEO_AUDIT_VERSION SEO_AUDIT_AVAILABLE = True except ImportError as e: SEO_AUDIT_AVAILABLE = False logger.warning(f"SEO audit service not available: {e}") # GBP (Google Business Profile) audit service try: from gbp_audit_service import ( GBPAuditService, audit_company as gbp_audit_company, get_company_audit as gbp_get_company_audit, fetch_google_business_data as gbp_fetch_google_data ) GBP_AUDIT_AVAILABLE = True GBP_AUDIT_VERSION = '1.0' except ImportError as e: GBP_AUDIT_AVAILABLE = False GBP_AUDIT_VERSION = None logger.warning(f"GBP audit service not available: {e}") # KRS (Krajowy Rejestr Sądowy) audit service try: from krs_audit_service import parse_krs_pdf, parse_krs_pdf_full KRS_AUDIT_AVAILABLE = True KRS_AUDIT_VERSION = '1.0' except ImportError as e: KRS_AUDIT_AVAILABLE = False KRS_AUDIT_VERSION = None logger.warning(f"KRS audit service not available: {e}") # Initialize Flask app app = Flask(__name__) # Fix URL scheme behind reverse proxy (NPM → Gunicorn) from werkzeug.middleware.proxy_fix import ProxyFix app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) app.config['PREFERRED_URL_SCHEME'] = 'https' # Security: Require strong SECRET_KEY (no default value allowed) SECRET_KEY = os.getenv('SECRET_KEY') if not SECRET_KEY or len(SECRET_KEY) < 32: raise ValueError("SECRET_KEY must be set in environment variables and be at least 32 characters long") app.config['SECRET_KEY'] = SECRET_KEY app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7) # Security configurations app.config['WTF_CSRF_ENABLED'] = True app.config['WTF_CSRF_TIME_LIMIT'] = None # No time limit for CSRF tokens app.config['SESSION_COOKIE_SECURE'] = os.getenv('FLASK_ENV') != 'development' # HTTPS only in production app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # Template filters from zoneinfo import ZoneInfo _WARSAW_TZ = ZoneInfo('Europe/Warsaw') _UTC_TZ = ZoneInfo('UTC') @app.template_filter('local_time') def local_time_filter(dt, fmt='%d.%m.%Y %H:%M'): """Convert naive UTC datetime to Europe/Warsaw and format.""" if not dt: return '' # date objects (not datetime) — format directly, no timezone conversion if not hasattr(dt, 'hour'): return dt.strftime(fmt) # time objects — format directly, no timezone conversion if not hasattr(dt, 'year'): return dt.strftime(fmt) if dt.tzinfo is None: dt = dt.replace(tzinfo=_UTC_TZ) return dt.astimezone(_WARSAW_TZ).strftime(fmt) @app.template_filter('ensure_url') def ensure_url_filter(url): """Ensure URL has http:// or https:// scheme""" if url and not url.startswith(('http://', 'https://')): return f'https://{url}' return url # Register linkify filter for messages from utils.helpers import linkify_urls app.jinja_env.filters['linkify'] = linkify_urls # Register forum markdown filter from utils.markdown import register_markdown_filter register_markdown_filter(app) # Register founding history formatter from utils.history_formatter import register_history_filter register_history_filter(app) # Initialize extensions from centralized extensions.py from extensions import csrf, limiter, login_manager csrf.init_app(app) # Initialize rate limiter with Redis storage (persistent across restarts) # Falls back to memory if Redis unavailable _redis_available = False try: import redis _redis_client = redis.Redis(host='localhost', port=6379, db=0) _redis_client.ping() _redis_available = True logger.info("Rate limiter using Redis storage") except Exception: logger.warning("Redis unavailable, rate limiter using memory storage") # Note: default_limits are set in extensions.py # Here we only configure storage if _redis_available: limiter._storage_uri = "redis://localhost:6379/0" else: limiter._storage_uri = "memory://" limiter.init_app(app) from redis_service import init_redis init_redis(app) @limiter.request_filter def is_admin_exempt(): """Exempt logged-in admins from rate limiting.""" from flask_login import current_user try: return current_user.is_authenticated and current_user.has_role(SystemRole.ADMIN) except Exception: return False # Initialize database init_db() # Initialize Login Manager (imported from extensions.py) login_manager.init_app(app) login_manager.login_view = 'login' # Will change to 'auth.login' after full migration login_manager.login_message = 'Zaloguj się, aby uzyskać dostęp do tej strony.' # Initialize Gemini service try: gemini_service.init_gemini_service(model='3-flash') # Paid tier: 10K RPD, thinking mode, fallback: 2.5-flash-lite (Unlimited) → 2.5-flash (10K) logger.info("Gemini service initialized successfully") except Exception as e: logger.error(f"Failed to initialize Gemini service: {e}") # Register blueprints (Phase 1: reports, community) from blueprints import register_blueprints register_blueprints(app) logger.info("Blueprints registered") @login_manager.user_loader def load_user(user_id): """Load user from database with eager-loaded relationships""" from sqlalchemy.orm import joinedload db = SessionLocal() try: user = db.query(User).options( joinedload(User.company_associations) ).filter_by(id=int(user_id)).first() if user: # Force-load associations before detaching from session _ = user.company_associations return user finally: db.close() # ============================================================ # TEMPLATE CONTEXT PROCESSORS # ============================================================ @app.context_processor def inject_globals(): """Inject global variables into all templates""" is_staging = os.getenv('STAGING') == 'true' return { 'current_year': datetime.now().year, 'now': datetime.now(), # Must be value, not method - templates use now.strftime() 'COMPANY_COUNT': COMPANY_COUNT_MARKETING, # Liczba podmiotów (cel marketingowy) 'is_staging': is_staging, 'staging_features': STAGING_TEST_FEATURES if is_staging else {}, 'SystemRole': SystemRole, } @app.context_processor def inject_audit_access(): from utils.decorators import is_audit_owner return dict(is_audit_owner=is_audit_owner()) @app.context_processor def inject_push_visibility(): """Udostępnij szablonom informację, czy dzwonek Web Push ma być widoczny dla bieżącego użytkownika. Reguła: jeśli PUSH_USER_WHITELIST jest niepusty, to tylko wymienieni user_id widzą dzwonek. Pusty = wszyscy zalogowani. """ if not current_user.is_authenticated: return {'push_bell_visible': False} raw = os.getenv('PUSH_USER_WHITELIST', '').strip() if not raw: return {'push_bell_visible': True} try: whitelist = {int(x) for x in raw.split(',') if x.strip().isdigit()} return {'push_bell_visible': current_user.id in whitelist} except Exception: return {'push_bell_visible': False} @app.context_processor def inject_company_context(): """Inject multi-company context into all templates.""" if not current_user.is_authenticated or not current_user.company_id: return {} from database import UserCompany from helpers.company_context import get_active_company_id db = SessionLocal() try: user_companies = db.query(UserCompany).filter_by( user_id=current_user.id ).order_by(UserCompany.is_primary.desc(), UserCompany.created_at.asc()).all() # Eager-load company objects while session is open for uc in user_companies: _ = uc.company.name if uc.company else None active_cid = get_active_company_id() # Validate active_company_id is still valid for this user valid_ids = {uc.company_id for uc in user_companies} if active_cid not in valid_ids: active_cid = current_user.company_id session.pop('active_company_id', None) active_company = None for uc in user_companies: if uc.company_id == active_cid: active_company = uc.company break return { 'user_companies': user_companies, 'active_company_id': active_cid, 'active_company': active_company, 'has_multiple_companies': len(user_companies) > 1, } except Exception as e: logger.error(f"inject_company_context error for user {current_user.id}: {e}") return {} finally: db.close() @app.context_processor def inject_notifications(): """Inject unread notifications count into all templates""" if current_user.is_authenticated: db = SessionLocal() try: unread_count = db.query(UserNotification).filter( UserNotification.user_id == current_user.id, UserNotification.is_read == False ).count() return {'unread_notifications_count': unread_count} finally: db.close() return {'unread_notifications_count': 0} # ============================================================ # NOTIFICATION HELPERS # ============================================================ def create_notification(user_id, title, message, notification_type='info', related_type=None, related_id=None, action_url=None): """ Create a notification for a user. Args: user_id: ID of the user to notify title: Notification title message: Notification message/body notification_type: Type of notification (news, system, message, event, alert) related_type: Type of related entity (company_news, event, message, etc.) related_id: ID of the related entity action_url: URL to navigate when notification is clicked Returns: UserNotification object or None on error """ db = SessionLocal() try: notification = UserNotification( user_id=user_id, title=title, message=message, notification_type=notification_type, related_type=related_type, related_id=related_id, action_url=action_url ) db.add(notification) db.commit() db.refresh(notification) logger.info(f"Created notification for user {user_id}: {title}") return notification except Exception as e: logger.error(f"Error creating notification: {e}") db.rollback() return None finally: db.close() def create_news_notification(company_id, news_id, news_title): """ Create notification for company owner when their news is approved. Args: company_id: ID of the company news_id: ID of the approved news news_title: Title of the news """ db = SessionLocal() try: # Find users associated with this company users = db.query(User).filter( User.company_id == company_id, User.is_active == True ).all() for user in users: create_notification( user_id=user.id, title="Nowa aktualnosc o Twojej firmie", message=f"Aktualnosc '{news_title}' zostala zatwierdzona i jest widoczna na profilu firmy.", notification_type='news', related_type='company_news', related_id=news_id, action_url=f"/company/{company_id}" ) finally: db.close() # ============================================================ # USER ANALYTICS - TRACKING HELPERS # ============================================================ # Global variable to store current page_view_id for templates _current_page_view_id = {} def get_or_create_analytics_session(): """ Get existing analytics session or create new one. Returns the database session ID (integer). Includes GeoIP lookup and UTM parameter parsing. """ analytics_session_id = session.get('analytics_session_id') if not analytics_session_id: analytics_session_id = str(uuid.uuid4()) session['analytics_session_id'] = analytics_session_id db = SessionLocal() try: user_session = db.query(UserSession).filter_by(session_id=analytics_session_id).first() if not user_session: # Parse user agent ua_string = request.headers.get('User-Agent', '') try: ua = parse_user_agent(ua_string) device_type = 'mobile' if ua.is_mobile else ('tablet' if ua.is_tablet else 'desktop') browser = ua.browser.family browser_version = ua.browser.version_string os_name = ua.os.family os_version = ua.os.version_string ua_lower = ua_string.lower() is_bot = ua.is_bot or any(p in ua_lower for p in [ 'curl/', 'python-requests', 'axios/', 'wget/', 'scrapy', 'werkzeug', 'leakix', 'nuclei', 'masscan', 'zgrab', 'httpx', 'googleassociationservice', 'censysinspect', 'paloaltonetworks', 'cortex', 'netcraft', 'fasthttp', 'cms-checker', 'wp-safe-scanner', 'notebooklm', 'ruby/', 'skypeuri', 'com.apple.webkit', 'networkingextension', 'oai-searchbot', 'gptbot', 'chatgpt-user', ]) if not ua_string.strip() or ua_string.strip() == 'Mozilla/5.0': is_bot = True except Exception: device_type = 'desktop' browser = 'Unknown' browser_version = '' os_name = 'Unknown' os_version = '' is_bot = False # GeoIP lookup country, city, region = None, None, None ip_address = request.headers.get('X-Forwarded-For', request.remote_addr) if ip_address: ip_address = ip_address.split(',')[0].strip() try: from security_service import get_geoip_info geo_info = get_geoip_info(ip_address) if geo_info: country = geo_info.get('country') city = geo_info.get('city') region = geo_info.get('region') except Exception as e: logger.debug(f"GeoIP lookup failed for {ip_address}: {e}") # UTM parameters (z pierwszego requestu sesji) utm_source = request.args.get('utm_source', '')[:255] or None utm_medium = request.args.get('utm_medium', '')[:255] or None utm_campaign = request.args.get('utm_campaign', '')[:255] or None utm_term = request.args.get('utm_term', '')[:255] or None utm_content = request.args.get('utm_content', '')[:255] or None user_session = UserSession( session_id=analytics_session_id, user_id=current_user.id if current_user.is_authenticated else None, ip_address=ip_address, user_agent=ua_string[:2000] if ua_string else None, device_type=device_type, browser=browser[:50] if browser else None, browser_version=browser_version[:20] if browser_version else None, os=os_name[:50] if os_name else None, os_version=os_version[:20] if os_version else None, # GeoIP country=country, city=city, region=region, # UTM utm_source=utm_source, utm_medium=utm_medium, utm_campaign=utm_campaign, utm_term=utm_term, utm_content=utm_content, is_bot=is_bot, ) # PWA detection from cookie (set by JS in standalone mode) if request.cookies.get('pwa_mode') == '1': user_session.is_pwa = True db.add(user_session) db.commit() db.refresh(user_session) else: # Update last activity AND duration user_session.last_activity_at = datetime.now() user_session.duration_seconds = int( (datetime.now() - user_session.started_at).total_seconds() ) if current_user.is_authenticated and not user_session.user_id: user_session.user_id = current_user.id # PWA cookie arrives on 2nd request (after JS sets it) if not user_session.is_pwa and request.cookies.get('pwa_mode') == '1': user_session.is_pwa = True db.commit() return user_session.id except Exception as e: logger.error(f"Analytics session error: {e}") db.rollback() return None finally: db.close() def track_conversion(event_type: str, company_id: int = None, target_type: str = None, target_value: str = None, metadata: dict = None): """ Track conversion event. Args: event_type: Type of conversion (register, login, contact_click, rsvp, message, classified) company_id: Related company ID (for contact_click) target_type: What was clicked (email, phone, website) target_value: The value (email address, phone number, etc.) metadata: Additional data as dict """ try: analytics_session_id = session.get('analytics_session_id') session_db_id = None db = SessionLocal() try: if analytics_session_id: user_session = db.query(UserSession).filter_by(session_id=analytics_session_id).first() if user_session: session_db_id = user_session.id # Określ kategorię konwersji category_map = { 'register': 'acquisition', 'login': 'activation', 'contact_click': 'engagement', 'rsvp': 'engagement', 'message': 'engagement', 'classified': 'engagement' } conversion = ConversionEvent( session_id=session_db_id, user_id=current_user.id if current_user.is_authenticated else None, event_type=event_type, event_category=category_map.get(event_type, 'other'), company_id=company_id, target_type=target_type, target_value=target_value[:500] if target_value else None, source_page=request.url[:500] if request.url else None, referrer=request.referrer[:500] if request.referrer else None, event_metadata=metadata ) db.add(conversion) db.commit() logger.info(f"Conversion tracked: {event_type} company={company_id} target={target_type}") except Exception as e: logger.error(f"Conversion tracking error: {e}") db.rollback() finally: db.close() except Exception as e: logger.error(f"Conversion tracking outer error: {e}") @app.before_request def check_geoip(): """Block requests from high-risk countries (RU, CN, KP, IR, BY, SY, VE, CU).""" # Skip static files and health checks if request.path.startswith('/static') or request.path == '/health': return if not is_ip_allowed(): ip = request.headers.get('X-Forwarded-For', request.remote_addr) if ip: ip = ip.split(',')[0].strip() from security_service import get_country_code country = get_country_code(ip) logger.warning(f"GEOIP_BLOCKED ip={ip} country={country} path={request.path}") # Create alert for blocked access try: db = SessionLocal() from security_service import create_security_alert create_security_alert( db, 'geo_blocked', 'low', ip_address=ip, details={'country': country, 'path': request.path, 'user_agent': request.user_agent.string[:200]} ) db.commit() db.close() except Exception as e: logger.error(f"Failed to create geo block alert: {e}") abort(403) @app.before_request def update_last_active(): """Update last_active_at and page_views_count for engagement tracking.""" if not current_user.is_authenticated: return if request.path.startswith('/static') or request.path == '/health' or request.path == '/favicon.ico': return if request.path.startswith('/api'): return from flask import session as flask_session import time now = time.time() last_update = flask_session.get('_last_active_update', 0) if now - last_update < 60: # 1 minute throttle return flask_session['_last_active_update'] = now # Count page views in session, flush to DB periodically pv = flask_session.get('_pv_buffer', 0) flask_session['_pv_buffer'] = 0 try: db = SessionLocal() user = db.query(User).filter_by(id=current_user.id).first() if user: user.last_active_at = datetime.now() user.page_views_count = (user.page_views_count or 0) + pv db.commit() db.close() except Exception: pass @app.before_request def track_page_view(): """Track page views (excluding static files and API calls)""" # Skip static files if request.path.startswith('/static'): return # Skip API calls except selected ones if request.path.startswith('/api'): return # Skip analytics tracking endpoints if request.path in ['/api/analytics/track', '/api/analytics/heartbeat']: return # Skip health checks if request.path == '/health': return # Skip favicon if request.path == '/favicon.ico': return # Buffer page views for authenticated users (flushed in update_last_active) if current_user.is_authenticated: from flask import session as flask_session flask_session['_pv_buffer'] = flask_session.get('_pv_buffer', 0) + 1 try: session_db_id = get_or_create_analytics_session() if not session_db_id: return db = SessionLocal() try: page_view = PageView( session_id=session_db_id, user_id=current_user.id if current_user.is_authenticated else None, url=request.url[:2000] if request.url else '', path=request.path[:500] if request.path else '/', referrer=request.referrer[:2000] if request.referrer else None ) # Extract company_id from path if on company page if request.path.startswith('/company/'): try: slug = request.path.split('/')[2].split('?')[0] company = db.query(Company).filter_by(slug=slug).first() if company: page_view.company_id = company.id except Exception: pass db.add(page_view) # Update session page count user_session = db.query(UserSession).filter_by(id=session_db_id).first() if user_session: user_session.page_views_count = (user_session.page_views_count or 0) + 1 db.commit() # Store page_view_id for click tracking (in request context) _current_page_view_id[id(request)] = page_view.id except Exception as e: logger.error(f"Page view tracking error: {e}") db.rollback() finally: db.close() except Exception as e: logger.error(f"Page view tracking outer error: {e}") @app.context_processor def inject_page_view_id(): """Inject page_view_id into all templates for JS tracking""" page_view_id = _current_page_view_id.get(id(request), '') return {'page_view_id': page_view_id} @app.teardown_request def cleanup_page_view_id(exception=None): """Clean up page_view_id from global dict after request""" _current_page_view_id.pop(id(request), None) # ============================================================ # SECURITY MIDDLEWARE & HELPERS # ============================================================ @app.after_request def set_security_headers(response): """Add security headers to all responses""" response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'SAMEORIGIN' response.headers['X-XSS-Protection'] = '1; mode=block' response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains' response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' response.headers['Permissions-Policy'] = 'camera=(), microphone=(), geolocation=(self)' # Note: static file caching is handled by Nginx (30d), not Flask # Freshness signal for SEO crawlers if response.content_type and 'text/html' in response.content_type and 'Last-Modified' not in response.headers: from email.utils import formatdate from time import time response.headers['Last-Modified'] = formatdate(timeval=time(), localtime=False, usegmt=True) # Content Security Policy csp = ( "default-src 'self'; " "script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; " "style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; " "img-src 'self' data: https:; " "font-src 'self' https://cdn.jsdelivr.net; " "frame-src https://www.google.com/maps/; " "connect-src 'self'" ) response.headers['Content-Security-Policy'] = csp return response def validate_email(email): """Validate email format""" if not email or len(email) > 255: return False # RFC 5322 compliant email regex (simplified) pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None def validate_password(password): """ Validate password strength Requirements: - Minimum 8 characters - At least one uppercase letter - At least one lowercase letter - At least one digit """ if not password or len(password) < 8: return False, "Hasło musi mieć minimum 8 znaków" if not re.search(r'[A-Z]', password): return False, "Hasło musi zawierać przynajmniej jedną wielką literę" if not re.search(r'[a-z]', password): return False, "Hasło musi zawierać przynajmniej jedną małą literę" if not re.search(r'\d', password): return False, "Hasło musi zawierać przynajmniej jedną cyfrę" return True, "OK" def sanitize_input(text, max_length=1000): """Sanitize user input - remove potentially dangerous characters""" if not text: return "" # Remove null bytes text = text.replace('\x00', '') # Trim to max length text = text[:max_length] # Strip whitespace text = text.strip() return text def get_free_tier_usage(): """ Get today's Gemini API usage for free tier tracking. Returns: Dict with requests_today and tokens_today """ from datetime import date from sqlalchemy import func db = SessionLocal() try: today = date.today() result = db.query( func.count(AIAPICostLog.id).label('requests'), func.coalesce(func.sum(AIAPICostLog.total_tokens), 0).label('tokens') ).filter( func.date(AIAPICostLog.timestamp) == today, AIAPICostLog.api_provider == 'gemini' ).first() return { 'requests_today': result.requests or 0, 'tokens_today': int(result.tokens or 0) } except Exception as e: logger.warning(f"Failed to get free tier usage: {e}") return {'requests_today': 0, 'tokens_today': 0} finally: db.close() def get_brave_api_usage(): """ Get Brave Search API usage for current month. Brave free tier: 2000 requests/month Returns: Dict with usage stats and limits """ from datetime import date from sqlalchemy import func, extract db = SessionLocal() try: today = date.today() current_month = today.month current_year = today.year # Monthly usage monthly_result = db.query( func.count(AIAPICostLog.id).label('requests') ).filter( extract('month', AIAPICostLog.timestamp) == current_month, extract('year', AIAPICostLog.timestamp) == current_year, AIAPICostLog.api_provider == 'brave' ).first() # Today's usage daily_result = db.query( func.count(AIAPICostLog.id).label('requests') ).filter( func.date(AIAPICostLog.timestamp) == today, AIAPICostLog.api_provider == 'brave' ).first() monthly_used = monthly_result.requests or 0 daily_used = daily_result.requests or 0 monthly_limit = 2000 # Brave free tier return { 'requests_today': daily_used, 'requests_this_month': monthly_used, 'monthly_limit': monthly_limit, 'remaining': max(0, monthly_limit - monthly_used), 'usage_percent': round((monthly_used / monthly_limit) * 100, 1) if monthly_limit > 0 else 0, 'tier': 'free', 'is_limit_reached': monthly_used >= monthly_limit } except Exception as e: logger.warning(f"Failed to get Brave API usage: {e}") return { 'requests_today': 0, 'requests_this_month': 0, 'monthly_limit': 2000, 'remaining': 2000, 'usage_percent': 0, 'tier': 'free', 'is_limit_reached': False } finally: db.close() def log_brave_api_call(user_id=None, feature='news_search', company_name=None): """ Log a Brave API call for usage tracking. Args: user_id: User who triggered the call (optional) feature: Feature name (news_search, etc.) company_name: Company being searched (for reference) """ db = SessionLocal() try: log_entry = AIAPICostLog( api_provider='brave', model_name='search_api', feature=feature, user_id=user_id, input_tokens=0, output_tokens=0, total_tokens=0 ) db.add(log_entry) db.commit() logger.debug(f"Logged Brave API call: {feature} for {company_name}") except Exception as e: logger.error(f"Failed to log Brave API call: {e}") db.rollback() finally: db.close() # ============================================================ # HEALTH CHECK # ============================================================ @app.route('/health') def health(): """Health check endpoint for monitoring""" return {'status': 'ok'}, 200 @app.route('/test-error-500') @login_required def test_error_500(): """Test endpoint to trigger 500 error for notification testing. Admin only.""" if not current_user.can_access_admin_panel(): flash('Brak uprawnień', 'error') return redirect(url_for('index')) # Intentionally raise an error to test error notification raise Exception("TEST ERROR 500 - Celowy błąd testowy do sprawdzenia powiadomień email") @app.route('/health/full') @login_required @role_required(SystemRole.ADMIN) def health_full(): """ Extended health check - verifies all critical endpoints. Returns detailed status of each endpoint. Access: /health/full """ results = [] all_ok = True # List of ALL endpoints to check (path, name) # Comprehensive list updated 2026-01-17 endpoints = [ # ========== PUBLIC PAGES ========== ('/', 'Strona główna'), ('/login', 'Logowanie'), ('/register', 'Rejestracja'), ('/release-notes', 'Historia zmian'), ('/search?q=test', 'Wyszukiwarka'), ('/aktualnosci', 'Aktualności'), ('/forum', 'Forum'), ('/kalendarz', 'Kalendarz wydarzeń'), ('/tablica', 'Tablica ogłoszeń'), ('/nowi-czlonkowie', 'Nowi członkowie'), ('/mapa-polaczen', 'Mapa połączeń'), ('/forgot-password', 'Reset hasła'), # ========== RAPORTY ========== ('/raporty/', 'Raporty'), ('/raporty/staz-czlonkostwa', 'Raport: Staż członkostwa'), ('/raporty/social-media', 'Raport: Social Media'), ('/raporty/struktura-branzowa', 'Raport: Struktura branżowa'), # ========== ZOPK PUBLIC ========== ('/zopk', 'ZOPK: Strona główna'), ('/zopk/aktualnosci', 'ZOPK: Aktualności'), # ========== CHAT ========== ('/chat', 'NordaGPT Chat'), # ========== IT AUDIT ========== ('/it-audit/form', 'IT Audit: Formularz'), # ========== PUBLIC API ========== ('/api/companies', 'API: Lista firm'), ('/api/model-info', 'API: Model info'), ('/api/gbp/audit/health', 'API: GBP health'), # ========== ADMIN: CORE ========== ('/admin/security', 'Admin: Bezpieczeństwo'), ('/admin/analytics', 'Admin: Analityka'), ('/admin/status', 'Admin: Status systemu'), ('/admin/health', 'Admin: Health dashboard'), ('/admin/ai-usage', 'Admin: AI Usage'), ('/admin/chat-analytics', 'Admin: Chat analytics'), ('/admin/users', 'Admin: Użytkownicy'), ('/admin/recommendations', 'Admin: Rekomendacje'), ('/admin/fees', 'Admin: Składki'), # ========== ADMIN: AUDITS ========== ('/admin/seo', 'Admin: SEO Audit'), ('/admin/gbp-audit', 'Admin: GBP Audit'), ('/admin/social-media', 'Admin: Social Media'), ('/admin/social-audit', 'Admin: Social Audit'), ('/admin/it-audit', 'Admin: IT Audit'), ('/admin/digital-maturity', 'Admin: Digital Maturity'), ('/admin/krs-audit', 'Admin: KRS Audit'), # ========== ADMIN: COMMUNITY ========== ('/admin/forum', 'Admin: Forum'), ('/admin/kalendarz', 'Admin: Kalendarz'), # ========== ADMIN: ZOPK ========== ('/admin/zopk', 'Admin: ZOPK Panel'), ('/admin/zopk/news', 'Admin: ZOPK News'), ('/admin/zopk/knowledge', 'Admin: ZOPK Knowledge'), ('/admin/zopk/knowledge/chunks', 'Admin: ZOPK Chunks'), ('/admin/zopk/knowledge/facts', 'Admin: ZOPK Facts'), ('/admin/zopk/knowledge/entities', 'Admin: ZOPK Entities'), ('/admin/zopk/knowledge/duplicates', 'Admin: ZOPK Duplikaty'), ('/admin/zopk/knowledge/fact-duplicates', 'Admin: ZOPK Fact Duplicates'), ('/admin/zopk/knowledge/graph', 'Admin: ZOPK Graf'), ('/admin/zopk/timeline', 'Admin: ZOPK Timeline'), # ========== ZOPK API ========== ('/api/zopk/milestones', 'API: ZOPK Milestones'), ('/api/zopk/knowledge/dashboard-stats', 'API: ZOPK Dashboard stats'), # ========== USER SETTINGS (v1.19.0) ========== ('/settings/privacy', 'Ustawienia: Prywatność'), ('/settings/blocks', 'Ustawienia: Blokady'), ('/settings/2fa', 'Ustawienia: 2FA'), # ========== WIADOMOŚCI ========== ('/wiadomosci', 'Wiadomości: Odebrane'), ('/wiadomosci/wyslane', 'Wiadomości: Wysłane'), ('/wiadomosci/nowa', 'Wiadomości: Nowa'), # ========== EDUKACJA ========== ('/edukacja', 'Edukacja: Strona główna'), # ========== ADMIN: INSIGHTS ========== ('/admin/insights', 'Admin: Insights'), ] # Dodaj losową firmę do sprawdzenia db = SessionLocal() try: random_company = db.query(Company).first() if random_company: endpoints.append((f'/company/{random_company.slug}', f'Profil: {random_company.name[:25]}')) finally: db.close() # Testuj każdy endpoint używając test client with app.test_client() as client: for path, name in endpoints: try: response = client.get(path, follow_redirects=False) status = response.status_code # 200 = OK, 302 = redirect (np. do logowania) = OK # 429 = rate limited (endpoint działa, tylko ograniczony) # 500 = błąd serwera, 404 = nie znaleziono if status in (200, 302, 304, 429): results.append({ 'endpoint': path, 'name': name, 'status': status, 'ok': True }) else: results.append({ 'endpoint': path, 'name': name, 'status': status, 'ok': False }) all_ok = False except Exception as e: results.append({ 'endpoint': path, 'name': name, 'status': 500, 'ok': False, 'error': str(e)[:100] }) all_ok = False # Podsumowanie passed = sum(1 for r in results if r['ok']) failed = len(results) - passed return { 'status': 'ok' if all_ok else 'degraded', 'summary': { 'total': len(results), 'passed': passed, 'failed': failed }, 'endpoints': results, 'timestamp': datetime.now().isoformat() }, 200 if all_ok else 503 # ============================================================ # PUBLIC ROUTES - MOVED TO blueprints/public/routes.py # ============================================================ # The routes below have been migrated to the public blueprint. # They are commented out but preserved for reference. # See: blueprints/public/routes.py # ============================================================ # RECOMMENDATIONS ADMIN ROUTES - MOVED TO: blueprints/admin/routes.py # ============================================================ # ============================================================ # USER MANAGEMENT ADMIN ROUTES # Moved to: blueprints/admin/routes.py # NOTE: AI-parse routes remain below # ============================================================ # admin_users, admin_user_add - MOVED TO: blueprints/admin/routes.py # AI-ASSISTED USER CREATION - MOVED TO blueprints/admin/routes_users_api.py # Routes: /admin/users-api/ai-parse, /admin/users-api/bulk-create # ============================================================ # USER ANALYTICS API ROUTES - MOVED TO blueprints/api/routes_analytics.py # ============================================================ # Routes: /api/analytics/track, /api/analytics/heartbeat, /api/analytics/scroll, # /api/analytics/error, /api/analytics/performance, /api/analytics/conversion # ============================================================ # RECOMMENDATIONS API ROUTES - MOVED TO blueprints/api/routes_recommendations.py # ============================================================ # Routes: /api/recommendations/, /api/recommendations/create, # /api/recommendations//edit, /api/recommendations//delete # ============================================================ # B2B CLASSIFIEDS ROUTES - MIGRATED TO blueprints/community/classifieds/ # ============================================================ # Routes: /tablica, /tablica/nowe, /tablica/, /tablica//zakoncz # ============================================================ # NEW MEMBERS ROUTE - MOVED TO blueprints/public/routes.py # ============================================================ # AUTHENTICATION ROUTES - MOVED TO blueprints/auth/routes.py # ============================================================ # The routes below have been migrated to the auth blueprint. # They are commented out but preserved for reference. # See: blueprints/auth/routes.py # ============================================================ # TWO-FACTOR AUTHENTICATION - MOVED TO blueprints/auth/routes.py # ============================================================ # MOJE KONTO - MOVED TO blueprints/auth/routes.py # ============================================================ # USER DASHBOARD - MOVED TO blueprints/public/routes.py # ============================================================ # API ROUTES - MOVED TO: blueprints/api/routes_company.py # Routes: /api/companies, /api/connections, /api/check-email, /api/verify-nip, # /api/verify-krs, /api/company//refresh-krs, /api/company//enrich-ai, # /api/model-info, /api/admin/test-sanitization # ============================================================ # ============================================================ # SEO/GBP/SOCIAL AUDIT API - MOVED TO: blueprints/api/routes_*_audit.py # ============================================================ # ============================================================ # AUDIT DASHBOARDS - MOVED TO: blueprints/audit/routes.py # ============================================================ # Validation and Company API routes moved to blueprints/api/routes_company.py # ============================================================ # MODEL COMPARISON - Porównanie modeli AI # ============================================================ # ============================================================ # SYSTEM STATUS DASHBOARD (Admin only) # MOVED TO blueprints/admin/routes_status.py # ============================================================ # ============================================================ # DEBUG PANEL (Admin only) # ============================================================ # ============================================================ # SOCIAL MEDIA AUDIT ADMIN DASHBOARD # ============================================================ # ============================================================ # IT AUDIT ADMIN DASHBOARD # ============================================================ # ============================================================ # IT AUDIT FORM - MOVED TO blueprints/it_audit/ # ============================================================ # Routes: /it-audit/form, /it-audit/save, /api/it-audit/* # ============================================================ # RAPORTY - MIGRATED TO blueprints/reports/ # ============================================================ # Routes: /raporty, /raporty/staz-czlonkostwa, /raporty/social-media, /raporty/struktura-branzowa # RELEASE NOTES - MOVED TO blueprints/admin/routes.py (admin_notify_release) # ============================================================ # ============================================================ # ZOPK PUBLIC ROUTES - MOVED TO blueprints/public/routes_zopk.py # Routes: /zopk, /zopk/projekty/, /zopk/aktualnosci # ============================================================ # ============================================================ # ZOPK ROUTES - MOVED TO BLUEPRINTS # ============================================================ # All ZOPK routes have been migrated to: # - blueprints/admin/routes_zopk_dashboard.py # - blueprints/admin/routes_zopk_news.py # - blueprints/admin/routes_zopk_knowledge.py # - blueprints/admin/routes_zopk_timeline.py # ============================================================ # Endpoint aliases for ZOPK are created in blueprints/__init__.py # ============================================================ # KRS AUDIT (Krajowy Rejestr Sądowy) # ============================================================ # ============================================================ # KRS API ROUTES - MOVED TO blueprints/admin/routes_krs_api.py # ============================================================ # Routes: /admin/krs-api/audit, /admin/krs-api/audit/batch, /admin/krs-api/pdf/ # ============================================================ # ERROR HANDLERS # ============================================================ @app.errorhandler(404) def not_found(error): return render_template('errors/404.html'), 404 from flask_wtf.csrf import CSRFError @app.errorhandler(CSRFError) def handle_csrf_error(e): flash('Sesja wygasła lub formularz został nieprawidłowo przesłany. Spróbuj ponownie.', 'warning') return redirect(request.referrer or url_for('index')) def send_registration_notification(user_info): """Send email notification when a new user registers""" try: from email_service import send_email, is_configured if not is_configured(): logger.warning("Email service not configured - skipping registration notification") return notify_email = os.getenv('ERROR_NOTIFY_EMAIL', 'maciej.pienczyn@inpi.pl') if not notify_email: return reg_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') is_member = "✅ TAK" if user_info.get('is_norda_member') else "❌ NIE" company_name = user_info.get('company_name', 'Brak przypisanej firmy') subject = f"👤 NordaBiz: Nowa rejestracja - {user_info.get('name', 'Nieznany')}" body_text = f"""👤 NOWA REJESTRACJA NA NORDABIZNES.PL {'='*50} 🕐 Czas: {reg_time} 👤 Imię: {user_info.get('name', 'N/A')} 📧 Email: {user_info.get('email', 'N/A')} 🏢 NIP: {user_info.get('company_nip', 'N/A')} 🏛️ Firma: {company_name} 🎫 Członek NORDA: {is_member} {'='*50} 🔗 Panel użytkowników: https://nordabiznes.pl/admin/users """ body_html = f"""

👤 Nowa rejestracja na NordaBiznes.pl

🕐 Czas:{reg_time}
👤 Imię:{user_info.get('name', 'N/A')}
📧 Email:{user_info.get('email', 'N/A')}
🏢 NIP:{user_info.get('company_nip', 'N/A')}
🏛️ Firma:{company_name}
🎫 Członek NORDA:{is_member}
""" result = send_email( to=[notify_email], subject=subject, body_text=body_text, body_html=body_html, email_type='registration_notification' ) if result: logger.info(f"Registration notification sent to {notify_email}") else: logger.error(f"Failed to send registration notification to {notify_email}") except Exception as e: logger.error(f"Failed to send registration notification: {e}") def send_error_notification(error, request_info): """Send email notification about 500 errors via Microsoft Graph""" try: from email_service import send_email, is_configured if not is_configured(): logger.warning("Email service not configured - skipping error notification") return error_email = os.getenv('ERROR_NOTIFY_EMAIL', 'maciej.pienczyn@inpi.pl') if not error_email: return # Build error details error_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') traceback_str = tb_module.format_exc() subject = f"🚨 NordaBiz ERROR 500: {request_info.get('path', 'Unknown')}" body_text = f"""⚠️ BŁĄD 500 NA NORDABIZNES.PL {'='*50} 🕐 Czas: {error_time} 🌐 URL: {request_info.get('url', 'N/A')} 📍 Ścieżka: {request_info.get('path', 'N/A')} 📝 Metoda: {request_info.get('method', 'N/A')} 👤 Użytkownik: {request_info.get('user', 'Anonimowy')} 🖥️ IP: {request_info.get('ip', 'N/A')} 🌍 User-Agent: {request_info.get('user_agent', 'N/A')} {'='*50} 📋 BŁĄD: {str(error)} {'='*50} 📜 TRACEBACK: {traceback_str} {'='*50} 🔧 Sprawdź logi: ssh maciejpi@57.128.200.27 "sudo journalctl -u nordabiznes --since '10 minutes ago'" """ body_html = f"""

🚨 BŁĄD 500 NA NORDABIZNES.PL

🕐 Czas:{error_time}
🌐 URL:{request_info.get('url', 'N/A')}
📍 Ścieżka:{request_info.get('path', 'N/A')}
📝 Metoda:{request_info.get('method', 'N/A')}
👤 Użytkownik:{request_info.get('user', 'Anonimowy')}
🖥️ IP:{request_info.get('ip', 'N/A')}
📋 BŁĄD:
{str(error)}
📜 TRACEBACK:
{traceback_str}
🔧 Sprawdź logi:
ssh maciejpi@57.128.200.27 "sudo journalctl -u nordabiznes --since '10 minutes ago'"
""" result = send_email( to=[error_email], subject=subject, body_text=body_text, body_html=body_html, email_type='error_notification' ) if result: logger.info(f"Error notification sent to {error_email}") else: logger.error(f"Failed to send error notification to {error_email}") except Exception as e: logger.error(f"Failed to send error notification: {e}") @app.errorhandler(500) def internal_error(error): # Collect request info for notification request_info = { 'url': request.url if request else 'N/A', 'path': request.path if request else 'N/A', 'method': request.method if request else 'N/A', 'ip': request.remote_addr if request else 'N/A', 'user_agent': request.headers.get('User-Agent', 'N/A') if request else 'N/A', 'user': current_user.email if current_user and current_user.is_authenticated else 'Anonimowy' } # Send notification in background (don't block response) try: send_error_notification(error, request_info) except Exception as e: logger.error(f"Error notification failed: {e}") return render_template('errors/500.html'), 500 # ============================================================ # ADMIN - SECURITY DASHBOARD # ============================================================ # ============================================================ # ANNOUNCEMENTS (Ogłoszenia dla członków) # ============================================================ def generate_slug(title): """ Generate URL-friendly slug from title. Uses unidecode for proper Polish character handling. """ import re try: from unidecode import unidecode text = unidecode(title.lower()) except ImportError: # Fallback without unidecode text = title.lower() replacements = { 'ą': 'a', 'ć': 'c', 'ę': 'e', 'ł': 'l', 'ń': 'n', 'ó': 'o', 'ś': 's', 'ź': 'z', 'ż': 'z' } for pl, en in replacements.items(): text = text.replace(pl, en) # Remove special characters, replace spaces with hyphens text = re.sub(r'[^\w\s-]', '', text) text = re.sub(r'[-\s]+', '-', text).strip('-') return text[:200] # Limit slug length # ============================================================ # PUBLIC ANNOUNCEMENTS - MOVED TO blueprints/public/routes_announcements.py # ============================================================ # Routes: /ogloszenia, /ogloszenia/ # ============================================================ # EXTERNAL CONTACTS - PAGE ROUTES MIGRATED TO blueprints/community/contacts/ # ============================================================ # Routes: /kontakty, /kontakty/, /kontakty/dodaj, /kontakty//edytuj, /kontakty//usun # API routes remain below for backwards compatibility # ============================================================ # CONTACTS API ROUTES - MOVED TO blueprints/api/routes_contacts.py # ============================================================ # Routes: /api/contacts/ai-parse, /api/contacts/bulk-create # Includes AI prompts for contact parsing # ============================================================ # HONEYPOT ENDPOINTS (trap for malicious bots) # ============================================================ @app.route('/wp-admin') @app.route('/wp-admin/') @app.route('/wp-login.php') @app.route('/administrator') @app.route('/phpmyadmin') @app.route('/phpmyadmin/') @app.route('/.env') @app.route('/.git/config') @app.route('/xmlrpc.php') @app.route('/config.php') @app.route('/admin.php') def honeypot_trap(path=None): """ Honeypot endpoints - log and return 404. These URLs are commonly probed by malicious bots looking for WordPress, phpMyAdmin, or exposed configuration files. """ client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) if client_ip and ',' in client_ip: client_ip = client_ip.split(',')[0].strip() security_logger.warning(f"HONEYPOT ip={client_ip} path={request.path} ua={request.user_agent.string[:100]}") # Return 404 to not reveal this is a trap return render_template('errors/404.html'), 404 # ============================================================ # MAIN # ============================================================ if __name__ == '__main__': # Port 5001 jako domyślny - macOS AirPlay zajmuje 5000 port = int(os.getenv('PORT', 5001)) debug = os.getenv('FLASK_ENV') == 'development' logger.info(f"Starting Norda Biznes Partner on port {port}") app.run(host='0.0.0.0', port=port, debug=debug)