nordabiz/blueprints/it_audit/routes.py
Maciej Pienczyn 7f77d7ebcd
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat(security): Restrict audit access to single designated user
Audits (SEO, IT, GBP, Social Media) are now visible only to the
designated audit owner (maciej.pienczyn@inpi.pl). All other users,
including admins, see 404 for audit routes and no audit links in
navigation. KRS Audit and Digital Maturity remain unchanged.

Adds /admin/access-overview panel showing the access matrix.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 12:31:10 +01:00

377 lines
13 KiB
Python

"""
IT Audit Routes - IT Audit blueprint
Migrated from app.py as part of the blueprint refactoring.
Contains IT infrastructure audit routes for companies.
"""
import csv
import json
import logging
from io import StringIO
from flask import abort, flash, jsonify, redirect, render_template, request, Response, url_for
from flask_login import current_user, login_required
from utils.decorators import is_audit_owner
from database import SessionLocal, Company
from . import bp
logger = logging.getLogger(__name__)
# Import limiter from app - will be initialized when app starts
from flask import current_app
def get_limiter():
"""Get rate limiter from current app."""
return current_app.extensions.get('limiter')
# ============================================================
# IT AUDIT FORM ROUTES
# ============================================================
@bp.route('/it-audit/form')
@login_required
def it_audit_form():
"""
IT Audit form for data collection.
Displays a 9-section form for collecting IT infrastructure data:
- IT Contact
- Cloud & Identity
- Server Infrastructure
- Endpoints
- Security
- Backup & DR
- Monitoring
- Business Apps
- Collaboration
Query parameters:
company_id (int, optional): Company ID to audit. If not provided,
defaults to current user's company.
Access control:
- Admin users can access form for any company
- Regular users can only access form for their own company
Returns:
Rendered it_audit_form.html template with company and audit data
"""
if not is_audit_owner():
abort(404)
db = SessionLocal()
try:
from database import ITAudit
# Get company_id from query params or use current user's company
company_id = request.args.get('company_id', type=int)
if not company_id:
# If no company_id provided, use current user's company
if current_user.company_id:
company_id = current_user.company_id
elif current_user.can_access_admin_panel():
# Admin without specific company_id should redirect to admin dashboard
flash('Wybierz firmę do przeprowadzenia audytu IT.', 'info')
return redirect(url_for('admin_it_audit'))
else:
flash('Nie jesteś przypisany do żadnej firmy.', 'error')
return redirect(url_for('dashboard'))
# Find company
company = db.query(Company).filter(
Company.id == company_id,
Company.status == 'active'
).first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control: users with company edit rights can access
if not current_user.can_edit_company(company.id):
flash('Nie masz uprawnień do edycji audytu IT tej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get latest audit for this company (for pre-filling the form)
audit = db.query(ITAudit).filter(
ITAudit.company_id == company.id
).order_by(
ITAudit.audit_date.desc()
).first()
logger.info(f"IT audit form viewed by {current_user.email} for company: {company.name}")
return render_template('it_audit_form.html',
company=company,
audit=audit
)
finally:
db.close()
@bp.route('/it-audit/save', methods=['POST'])
@login_required
def it_audit_save():
"""
Save IT audit form data with automatic scoring.
This endpoint saves IT infrastructure audit data from the form,
calculates security, collaboration, and completeness scores,
and stores the audit in the database.
Request JSON body:
- company_id: Company ID (integer, required)
- All audit fields from the 9-section form
Returns:
- Success: Audit results with scores and redirect URL
- Error: Error message with status code
Access:
- Members can save audits for their own company
- Admins can save audits for any company
Rate limited to 30 requests per hour per user.
"""
if not is_audit_owner():
abort(404)
# Apply rate limiting manually since decorator doesn't work with blueprint
limiter = get_limiter()
if limiter:
try:
limiter.check()
except Exception:
pass # Allow request if limiter fails
from database import ITAudit
from it_audit_service import ITAuditService
# Parse request data (supports both JSON and form data)
if request.is_json:
data = request.get_json()
else:
data = request.form.to_dict(flat=True)
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu.'
}), 400
# Get company_id
company_id = data.get('company_id')
if company_id:
try:
company_id = int(company_id)
except (ValueError, TypeError):
return jsonify({
'success': False,
'error': 'Nieprawidłowy identyfikator firmy.'
}), 400
else:
# Use current user's company if not specified
if current_user.company_id:
company_id = current_user.company_id
else:
return jsonify({
'success': False,
'error': 'Podaj company_id firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company
company = db.query(Company).filter(
Company.id == company_id,
Company.status == 'active'
).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Access control: users with company edit rights can save
if not current_user.can_edit_company(company.id):
return jsonify({
'success': False,
'error': 'Nie masz uprawnień do edycji audytu IT tej firmy.'
}), 403
# Parse form data into audit_data dictionary
audit_data = _parse_it_audit_form_data(data)
audit_data['audited_by'] = current_user.id
audit_data['audit_source'] = 'form'
# Save audit using service
service = ITAuditService(db)
audit = service.save_audit(company_id, audit_data)
# Check if this is a partial submission (completeness < 100)
is_partial = audit.completeness_score < 100 if audit.completeness_score else True
# Count previous audits for this company (to indicate if history exists)
audit_history_count = db.query(ITAudit).filter(
ITAudit.company_id == company_id
).count()
logger.info(
f"IT audit saved by {current_user.email} for company {company.name}: "
f"overall={audit.overall_score}, security={audit.security_score}, "
f"collaboration={audit.collaboration_score}, completeness={audit.completeness_score}"
f"{' (partial)' if is_partial else ''}"
)
# Build appropriate success message
if is_partial:
if audit.completeness_score < 30:
message = f'Audyt IT został zapisany. Formularz wypełniony w {audit.completeness_score}%. Uzupełnij więcej sekcji, aby uzyskać pełniejszy obraz infrastruktury IT.'
elif audit.completeness_score < 70:
message = f'Audyt IT został zapisany. Wypełniono {audit.completeness_score}% formularza. Rozważ uzupełnienie pozostałych sekcji.'
else:
message = f'Audyt IT został zapisany. Formularz prawie kompletny ({audit.completeness_score}%).'
else:
message = 'Audyt IT został zapisany pomyślnie. Formularz jest kompletny.'
# Return success response with detailed information
return jsonify({
'success': True,
'message': message,
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit': {
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'overall_score': audit.overall_score,
'security_score': audit.security_score,
'collaboration_score': audit.collaboration_score,
'completeness_score': audit.completeness_score,
'maturity_level': audit.maturity_level,
'is_partial': is_partial,
},
'history_count': audit_history_count,
'redirect_url': url_for('company_detail_by_slug', slug=company.slug)
}), 200
except Exception as e:
db.rollback()
logger.error(f"Error saving IT audit for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas zapisywania audytu: {str(e)}'
}), 500
finally:
db.close()
def _parse_it_audit_form_data(data: dict) -> dict:
"""
Parse form data into audit_data dictionary.
Handles:
- Boolean fields (checkboxes)
- Array fields (multi-select)
- String and numeric fields
Args:
data: Raw form data dictionary
Returns:
Parsed audit_data dictionary with proper types
"""
# Boolean fields (checkboxes - present means True)
boolean_fields = [
'has_it_manager', 'it_outsourced',
'has_azure_ad', 'has_m365', 'has_google_workspace',
'has_mdm', 'has_edr', 'has_vpn', 'has_mfa',
'has_proxmox_pbs', 'has_dr_plan',
'has_local_ad', 'has_ad_azure_sync',
'open_to_shared_licensing', 'open_to_backup_replication',
'open_to_teams_federation', 'open_to_shared_monitoring',
'open_to_collective_purchasing', 'open_to_knowledge_sharing',
]
# Array fields (multi-select - may come as comma-separated or multiple values)
array_fields = [
'm365_plans', 'teams_usage', 'server_types', 'server_os',
'desktop_os', 'mfa_scope', 'backup_targets',
]
# String fields
string_fields = [
'it_provider_name', 'it_contact_name', 'it_contact_email',
'azure_tenant_name', 'azure_user_count',
'server_count', 'virtualization_platform', 'network_firewall_brand',
'employee_count', 'computer_count', 'mdm_solution',
'antivirus_solution', 'edr_solution', 'vpn_solution',
'backup_solution', 'backup_frequency',
'monitoring_solution', 'ad_domain_name',
'ticketing_system', 'erp_system', 'crm_system', 'document_management',
]
audit_data = {}
# Parse boolean fields
for field in boolean_fields:
value = data.get(field)
if value is None:
audit_data[field] = False
elif isinstance(value, bool):
audit_data[field] = value
elif isinstance(value, str):
audit_data[field] = value.lower() in ('true', '1', 'on', 'yes')
else:
audit_data[field] = bool(value)
# Parse array fields
for field in array_fields:
value = data.get(field)
if value is None:
audit_data[field] = []
elif isinstance(value, list):
audit_data[field] = value
elif isinstance(value, str):
# Handle comma-separated values
audit_data[field] = [v.strip() for v in value.split(',') if v.strip()]
else:
audit_data[field] = [value]
# Parse string fields
for field in string_fields:
value = data.get(field)
if value is not None and isinstance(value, str):
audit_data[field] = value.strip() if value.strip() else None
else:
audit_data[field] = None
# Parse zabbix_integration as JSON if present
zabbix_integration = data.get('zabbix_integration')
if zabbix_integration:
if isinstance(zabbix_integration, dict):
audit_data['zabbix_integration'] = zabbix_integration
elif isinstance(zabbix_integration, str):
try:
audit_data['zabbix_integration'] = json.loads(zabbix_integration)
except json.JSONDecodeError:
audit_data['zabbix_integration'] = {'hostname': zabbix_integration}
else:
audit_data['zabbix_integration'] = None
else:
# Check for zabbix_hostname field as alternative
zabbix_hostname = data.get('zabbix_hostname')
if zabbix_hostname and isinstance(zabbix_hostname, str) and zabbix_hostname.strip():
audit_data['zabbix_integration'] = {'hostname': zabbix_hostname.strip()}
else:
audit_data['zabbix_integration'] = None
return audit_data