""" IT Audit Routes - IT Audit blueprint Migrated from app.py as part of the blueprint refactoring. Contains IT infrastructure audit routes for companies. """ import csv import json import logging from io import StringIO from flask import abort, flash, jsonify, redirect, render_template, request, Response, url_for from flask_login import current_user, login_required from utils.decorators import is_audit_owner from database import SessionLocal, Company from . import bp logger = logging.getLogger(__name__) # Import limiter from app - will be initialized when app starts from flask import current_app def get_limiter(): """Get rate limiter from current app.""" return current_app.extensions.get('limiter') # ============================================================ # IT AUDIT FORM ROUTES # ============================================================ @bp.route('/it-audit/form') @login_required def it_audit_form(): """ IT Audit form for data collection. Displays a 9-section form for collecting IT infrastructure data: - IT Contact - Cloud & Identity - Server Infrastructure - Endpoints - Security - Backup & DR - Monitoring - Business Apps - Collaboration Query parameters: company_id (int, optional): Company ID to audit. If not provided, defaults to current user's company. Access control: - Admin users can access form for any company - Regular users can only access form for their own company Returns: Rendered it_audit_form.html template with company and audit data """ if not is_audit_owner(): abort(404) db = SessionLocal() try: from database import ITAudit # Get company_id from query params or use current user's company company_id = request.args.get('company_id', type=int) if not company_id: # If no company_id provided, use current user's company if current_user.company_id: company_id = current_user.company_id elif current_user.can_access_admin_panel(): # Admin without specific company_id should redirect to admin dashboard flash('Wybierz firmę do przeprowadzenia audytu IT.', 'info') return redirect(url_for('admin_it_audit')) else: flash('Nie jesteś przypisany do żadnej firmy.', 'error') return redirect(url_for('dashboard')) # Find company company = db.query(Company).filter( Company.id == company_id, Company.status == 'active' ).first() if not company: flash('Firma nie została znaleziona.', 'error') return redirect(url_for('dashboard')) # Access control: users with company edit rights can access if not current_user.can_edit_company(company.id): flash('Nie masz uprawnień do edycji audytu IT tej firmy.', 'error') return redirect(url_for('dashboard')) # Get latest audit for this company (for pre-filling the form) audit = db.query(ITAudit).filter( ITAudit.company_id == company.id ).order_by( ITAudit.audit_date.desc() ).first() logger.info(f"IT audit form viewed by {current_user.email} for company: {company.name}") return render_template('it_audit_form.html', company=company, audit=audit ) finally: db.close() @bp.route('/it-audit/save', methods=['POST']) @login_required def it_audit_save(): """ Save IT audit form data with automatic scoring. This endpoint saves IT infrastructure audit data from the form, calculates security, collaboration, and completeness scores, and stores the audit in the database. Request JSON body: - company_id: Company ID (integer, required) - All audit fields from the 9-section form Returns: - Success: Audit results with scores and redirect URL - Error: Error message with status code Access: - Members can save audits for their own company - Admins can save audits for any company Rate limited to 30 requests per hour per user. """ if not is_audit_owner(): abort(404) # Apply rate limiting manually since decorator doesn't work with blueprint limiter = get_limiter() if limiter: try: limiter.check() except Exception: pass # Allow request if limiter fails from database import ITAudit from it_audit_service import ITAuditService # Parse request data (supports both JSON and form data) if request.is_json: data = request.get_json() else: data = request.form.to_dict(flat=True) if not data: return jsonify({ 'success': False, 'error': 'Brak danych w żądaniu.' }), 400 # Get company_id company_id = data.get('company_id') if company_id: try: company_id = int(company_id) except (ValueError, TypeError): return jsonify({ 'success': False, 'error': 'Nieprawidłowy identyfikator firmy.' }), 400 else: # Use current user's company if not specified if current_user.company_id: company_id = current_user.company_id else: return jsonify({ 'success': False, 'error': 'Podaj company_id firmy do audytu.' }), 400 db = SessionLocal() try: # Find company company = db.query(Company).filter( Company.id == company_id, Company.status == 'active' ).first() if not company: return jsonify({ 'success': False, 'error': 'Firma nie znaleziona lub nieaktywna.' }), 404 # Access control: users with company edit rights can save if not current_user.can_edit_company(company.id): return jsonify({ 'success': False, 'error': 'Nie masz uprawnień do edycji audytu IT tej firmy.' }), 403 # Parse form data into audit_data dictionary audit_data = _parse_it_audit_form_data(data) audit_data['audited_by'] = current_user.id audit_data['audit_source'] = 'form' # Save audit using service service = ITAuditService(db) audit = service.save_audit(company_id, audit_data) # Check if this is a partial submission (completeness < 100) is_partial = audit.completeness_score < 100 if audit.completeness_score else True # Count previous audits for this company (to indicate if history exists) audit_history_count = db.query(ITAudit).filter( ITAudit.company_id == company_id ).count() logger.info( f"IT audit saved by {current_user.email} for company {company.name}: " f"overall={audit.overall_score}, security={audit.security_score}, " f"collaboration={audit.collaboration_score}, completeness={audit.completeness_score}" f"{' (partial)' if is_partial else ''}" ) # Build appropriate success message if is_partial: if audit.completeness_score < 30: message = f'Audyt IT został zapisany. Formularz wypełniony w {audit.completeness_score}%. Uzupełnij więcej sekcji, aby uzyskać pełniejszy obraz infrastruktury IT.' elif audit.completeness_score < 70: message = f'Audyt IT został zapisany. Wypełniono {audit.completeness_score}% formularza. Rozważ uzupełnienie pozostałych sekcji.' else: message = f'Audyt IT został zapisany. Formularz prawie kompletny ({audit.completeness_score}%).' else: message = 'Audyt IT został zapisany pomyślnie. Formularz jest kompletny.' # Return success response with detailed information return jsonify({ 'success': True, 'message': message, 'company_id': company.id, 'company_name': company.name, 'company_slug': company.slug, 'audit': { 'id': audit.id, 'audit_date': audit.audit_date.isoformat() if audit.audit_date else None, 'overall_score': audit.overall_score, 'security_score': audit.security_score, 'collaboration_score': audit.collaboration_score, 'completeness_score': audit.completeness_score, 'maturity_level': audit.maturity_level, 'is_partial': is_partial, }, 'history_count': audit_history_count, 'redirect_url': url_for('company_detail_by_slug', slug=company.slug) }), 200 except Exception as e: db.rollback() logger.error(f"Error saving IT audit for company {company_id}: {e}") return jsonify({ 'success': False, 'error': f'Błąd podczas zapisywania audytu: {str(e)}' }), 500 finally: db.close() def _parse_it_audit_form_data(data: dict) -> dict: """ Parse form data into audit_data dictionary. Handles: - Boolean fields (checkboxes) - Array fields (multi-select) - String and numeric fields Args: data: Raw form data dictionary Returns: Parsed audit_data dictionary with proper types """ # Boolean fields (checkboxes - present means True) boolean_fields = [ 'has_it_manager', 'it_outsourced', 'has_azure_ad', 'has_m365', 'has_google_workspace', 'has_mdm', 'has_edr', 'has_vpn', 'has_mfa', 'has_proxmox_pbs', 'has_dr_plan', 'has_local_ad', 'has_ad_azure_sync', 'open_to_shared_licensing', 'open_to_backup_replication', 'open_to_teams_federation', 'open_to_shared_monitoring', 'open_to_collective_purchasing', 'open_to_knowledge_sharing', ] # Array fields (multi-select - may come as comma-separated or multiple values) array_fields = [ 'm365_plans', 'teams_usage', 'server_types', 'server_os', 'desktop_os', 'mfa_scope', 'backup_targets', ] # String fields string_fields = [ 'it_provider_name', 'it_contact_name', 'it_contact_email', 'azure_tenant_name', 'azure_user_count', 'server_count', 'virtualization_platform', 'network_firewall_brand', 'employee_count', 'computer_count', 'mdm_solution', 'antivirus_solution', 'edr_solution', 'vpn_solution', 'backup_solution', 'backup_frequency', 'monitoring_solution', 'ad_domain_name', 'ticketing_system', 'erp_system', 'crm_system', 'document_management', ] audit_data = {} # Parse boolean fields for field in boolean_fields: value = data.get(field) if value is None: audit_data[field] = False elif isinstance(value, bool): audit_data[field] = value elif isinstance(value, str): audit_data[field] = value.lower() in ('true', '1', 'on', 'yes') else: audit_data[field] = bool(value) # Parse array fields for field in array_fields: value = data.get(field) if value is None: audit_data[field] = [] elif isinstance(value, list): audit_data[field] = value elif isinstance(value, str): # Handle comma-separated values audit_data[field] = [v.strip() for v in value.split(',') if v.strip()] else: audit_data[field] = [value] # Parse string fields for field in string_fields: value = data.get(field) if value is not None and isinstance(value, str): audit_data[field] = value.strip() if value.strip() else None else: audit_data[field] = None # Parse zabbix_integration as JSON if present zabbix_integration = data.get('zabbix_integration') if zabbix_integration: if isinstance(zabbix_integration, dict): audit_data['zabbix_integration'] = zabbix_integration elif isinstance(zabbix_integration, str): try: audit_data['zabbix_integration'] = json.loads(zabbix_integration) except json.JSONDecodeError: audit_data['zabbix_integration'] = {'hostname': zabbix_integration} else: audit_data['zabbix_integration'] = None else: # Check for zabbix_hostname field as alternative zabbix_hostname = data.get('zabbix_hostname') if zabbix_hostname and isinstance(zabbix_hostname, str) and zabbix_hostname.strip(): audit_data['zabbix_integration'] = {'hostname': zabbix_hostname.strip()} else: audit_data['zabbix_integration'] = None return audit_data