auto-claude: subtask-6-3 - Add /it-audit/save POST route for saving form data

Implemented POST endpoint at /it-audit/save that:
- Accepts JSON or form data with IT audit fields
- Validates company access (admin for any, users for their own)
- Parses boolean, array, and string fields with proper type handling
- Uses ITAuditService to save audit with scoring calculation
- Returns JSON with audit scores and redirect URL
- Includes rate limiting (30 per hour)
- Handles errors with rollback and logging

Added helper function _parse_it_audit_form_data() to properly parse:
- Boolean fields (checkboxes)
- Array fields (multi-select)
- String fields
- JSON fields (zabbix_integration)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-01-09 09:11:24 +01:00
parent 10ae221c26
commit b6a3411e43

230
app.py
View File

@ -5327,6 +5327,236 @@ def it_audit_form():
db.close()
@app.route('/it-audit/save', methods=['POST'])
@login_required
@limiter.limit("30 per hour")
def it_audit_save():
"""
Save IT audit form data with automatic scoring.
This endpoint saves IT infrastructure audit data from the form,
calculates security, collaboration, and completeness scores,
and stores the audit in the database.
Request JSON body:
- company_id: Company ID (integer, required)
- All audit fields from the 9-section form
Returns:
- Success: Audit results with scores and redirect URL
- Error: Error message with status code
Access:
- Members can save audits for their own company
- Admins can save audits for any company
Rate limited to 30 requests per hour per user.
"""
from database import ITAudit, Company
from it_audit_service import ITAuditService
# Parse request data (supports both JSON and form data)
if request.is_json:
data = request.get_json()
else:
data = request.form.to_dict(flat=True)
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu.'
}), 400
# Get company_id
company_id = data.get('company_id')
if company_id:
try:
company_id = int(company_id)
except (ValueError, TypeError):
return jsonify({
'success': False,
'error': 'Nieprawidłowy identyfikator firmy.'
}), 400
else:
# Use current user's company if not specified
if current_user.company_id:
company_id = current_user.company_id
else:
return jsonify({
'success': False,
'error': 'Podaj company_id firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company
company = db.query(Company).filter(
Company.id == company_id,
Company.status == 'active'
).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Access control: admin can save for any company, users only their own
if not current_user.is_admin and current_user.company_id != company.id:
return jsonify({
'success': False,
'error': 'Nie masz uprawnień do edycji audytu IT tej firmy.'
}), 403
# Parse form data into audit_data dictionary
audit_data = _parse_it_audit_form_data(data)
audit_data['audited_by'] = current_user.id
audit_data['audit_source'] = 'form'
# Save audit using service
service = ITAuditService(db)
audit = service.save_audit(company_id, audit_data)
logger.info(
f"IT audit saved by {current_user.email} for company {company.name}: "
f"overall={audit.overall_score}, security={audit.security_score}, "
f"collaboration={audit.collaboration_score}, completeness={audit.completeness_score}"
)
# Return success response
return jsonify({
'success': True,
'message': 'Audyt IT został zapisany pomyślnie.',
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit': {
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'overall_score': audit.overall_score,
'security_score': audit.security_score,
'collaboration_score': audit.collaboration_score,
'completeness_score': audit.completeness_score,
'maturity_level': audit.maturity_level,
},
'redirect_url': url_for('company_detail', slug=company.slug)
}), 200
except Exception as e:
db.rollback()
logger.error(f"Error saving IT audit for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas zapisywania audytu: {str(e)}'
}), 500
finally:
db.close()
def _parse_it_audit_form_data(data: dict) -> dict:
"""
Parse form data into audit_data dictionary.
Handles:
- Boolean fields (checkboxes)
- Array fields (multi-select)
- String and numeric fields
Args:
data: Raw form data dictionary
Returns:
Parsed audit_data dictionary with proper types
"""
# Boolean fields (checkboxes - present means True)
boolean_fields = [
'has_it_manager', 'it_outsourced',
'has_azure_ad', 'has_m365', 'has_google_workspace',
'has_mdm', 'has_edr', 'has_vpn', 'has_mfa',
'has_proxmox_pbs', 'has_dr_plan',
'has_local_ad', 'has_ad_azure_sync',
'open_to_shared_licensing', 'open_to_backup_replication',
'open_to_teams_federation', 'open_to_shared_monitoring',
'open_to_collective_purchasing', 'open_to_knowledge_sharing',
]
# Array fields (multi-select - may come as comma-separated or multiple values)
array_fields = [
'm365_plans', 'teams_usage', 'server_types', 'server_os',
'desktop_os', 'mfa_scope', 'backup_targets',
]
# String fields
string_fields = [
'it_provider_name', 'it_contact_name', 'it_contact_email',
'azure_tenant_name', 'azure_user_count',
'server_count', 'virtualization_platform', 'network_firewall_brand',
'employee_count', 'computer_count', 'mdm_solution',
'antivirus_solution', 'edr_solution', 'vpn_solution',
'backup_solution', 'backup_frequency',
'monitoring_solution', 'ad_domain_name',
'ticketing_system', 'erp_system', 'crm_system', 'document_management',
]
audit_data = {}
# Parse boolean fields
for field in boolean_fields:
value = data.get(field)
if value is None:
audit_data[field] = False
elif isinstance(value, bool):
audit_data[field] = value
elif isinstance(value, str):
audit_data[field] = value.lower() in ('true', '1', 'on', 'yes')
else:
audit_data[field] = bool(value)
# Parse array fields
for field in array_fields:
value = data.get(field)
if value is None:
audit_data[field] = []
elif isinstance(value, list):
audit_data[field] = value
elif isinstance(value, str):
# Handle comma-separated values
audit_data[field] = [v.strip() for v in value.split(',') if v.strip()]
else:
audit_data[field] = [value]
# Parse string fields
for field in string_fields:
value = data.get(field)
if value is not None and isinstance(value, str):
audit_data[field] = value.strip() if value.strip() else None
else:
audit_data[field] = None
# Parse zabbix_integration as JSON if present
zabbix_integration = data.get('zabbix_integration')
if zabbix_integration:
if isinstance(zabbix_integration, dict):
audit_data['zabbix_integration'] = zabbix_integration
elif isinstance(zabbix_integration, str):
try:
audit_data['zabbix_integration'] = json.loads(zabbix_integration)
except json.JSONDecodeError:
audit_data['zabbix_integration'] = {'hostname': zabbix_integration}
else:
audit_data['zabbix_integration'] = None
else:
# Check for zabbix_hostname field as alternative
zabbix_hostname = data.get('zabbix_hostname')
if zabbix_hostname and isinstance(zabbix_hostname, str) and zabbix_hostname.strip():
audit_data['zabbix_integration'] = {'hostname': zabbix_hostname.strip()}
else:
audit_data['zabbix_integration'] = None
return audit_data
# ============================================================
# ERROR HANDLERS
# ============================================================