refactor: Migrate security routes to blueprints

- Create new blueprints/admin/routes_security.py
- Move 5 security routes: admin_security, acknowledge_security_alert,
  resolve_security_alert, unlock_account, api_geoip_stats
- Update templates to use full blueprint names
- Add endpoint aliases for backward compatibility

Phase 6.2d - Security routes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-01-31 09:39:12 +01:00
parent 470e2a8bb7
commit 82162874b8
7 changed files with 347 additions and 22 deletions

30
app.py
View File

@ -10374,9 +10374,9 @@ def internal_error(error):
# ADMIN - SECURITY DASHBOARD
# ============================================================
@app.route('/admin/security')
@login_required
def admin_security():
# @app.route('/admin/security') # MOVED TO admin.admin_security
# @login_required
def _old_admin_security():
"""Security dashboard - audit logs, alerts, GeoIP stats"""
if not current_user.is_admin:
flash('Brak uprawnień.', 'error')
@ -10499,9 +10499,9 @@ def admin_security():
db.close()
@app.route('/admin/security/alert/<int:alert_id>/acknowledge', methods=['POST'])
@login_required
def acknowledge_security_alert(alert_id):
# @app.route('/admin/security/alert/<int:alert_id>/acknowledge', methods=['POST']) # MOVED TO admin.acknowledge_security_alert
# @login_required
def _old_acknowledge_security_alert(alert_id):
"""Acknowledge a security alert"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
@ -10527,9 +10527,9 @@ def acknowledge_security_alert(alert_id):
db.close()
@app.route('/admin/security/alert/<int:alert_id>/resolve', methods=['POST'])
@login_required
def resolve_security_alert(alert_id):
# @app.route('/admin/security/alert/<int:alert_id>/resolve', methods=['POST']) # MOVED TO admin.resolve_security_alert
# @login_required
def _old_resolve_security_alert(alert_id):
"""Resolve a security alert"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
@ -10560,9 +10560,9 @@ def resolve_security_alert(alert_id):
db.close()
@app.route('/admin/security/unlock-account/<int:user_id>', methods=['POST'])
@login_required
def unlock_account(user_id):
# @app.route('/admin/security/unlock-account/<int:user_id>', methods=['POST']) # MOVED TO admin.unlock_account
# @login_required
def _old_unlock_account(user_id):
"""Unlock a locked user account"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
@ -10587,9 +10587,9 @@ def unlock_account(user_id):
db.close()
@app.route('/api/admin/security/geoip-stats')
@login_required
def api_geoip_stats():
# @app.route('/api/admin/security/geoip-stats') # MOVED TO admin.api_geoip_stats
# @login_required
def _old_api_geoip_stats():
"""API endpoint for GeoIP stats auto-refresh"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403

View File

@ -240,6 +240,12 @@ def register_blueprints(app):
'digital_maturity_dashboard': 'admin.digital_maturity_dashboard',
'admin_krs_audit': 'admin.admin_krs_audit',
'admin_it_audit': 'admin.admin_it_audit',
# Security (Phase 6.2d)
'admin_security': 'admin.admin_security',
'acknowledge_security_alert': 'admin.acknowledge_security_alert',
'resolve_security_alert': 'admin.resolve_security_alert',
'unlock_account': 'admin.unlock_account',
'api_geoip_stats': 'admin.api_geoip_stats',
})
logger.info("Created admin endpoint aliases")
except ImportError as e:

View File

@ -13,3 +13,4 @@ from . import routes # noqa: E402, F401
from . import routes_audits # noqa: E402, F401
from . import routes_status # noqa: E402, F401
from . import routes_social # noqa: E402, F401
from . import routes_security # noqa: E402, F401

View File

@ -0,0 +1,318 @@
"""
Admin Security Routes
======================
Security dashboard, alerts, and account management for admin panel.
"""
import logging
from datetime import datetime
from flask import render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from . import bp
from database import SessionLocal, User, AuditLog, SecurityAlert
logger = logging.getLogger(__name__)
# Check if security service is available
try:
from security_service import log_audit, _get_geoip_enabled
SECURITY_SERVICE_AVAILABLE = True
except ImportError:
SECURITY_SERVICE_AVAILABLE = False
def log_audit(*args, **kwargs):
pass
def _get_geoip_enabled():
return False
# ============================================================
# SECURITY DASHBOARD
# ============================================================
@bp.route('/security')
@login_required
def admin_security():
"""Security dashboard - audit logs, alerts, GeoIP stats"""
if not current_user.is_admin:
flash('Brak uprawnień.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
from sqlalchemy import func, desc
# Get recent audit logs
audit_logs = db.query(AuditLog).order_by(
desc(AuditLog.created_at)
).limit(50).all()
# Get security alerts
alerts = db.query(SecurityAlert).order_by(
desc(SecurityAlert.created_at)
).limit(50).all()
# Alert stats
new_alerts_count = db.query(SecurityAlert).filter(
SecurityAlert.status == 'new'
).count()
# Recent locked accounts
locked_accounts = db.query(User).filter(
User.locked_until > datetime.now()
).all()
# Users with 2FA enabled
users_with_2fa = db.query(User).filter(
User.totp_enabled == True
).count()
total_admins = db.query(User).filter(
User.is_admin == True
).count()
# Alert type breakdown
alert_breakdown = db.query(
SecurityAlert.alert_type,
func.count(SecurityAlert.id).label('count')
).group_by(SecurityAlert.alert_type).all()
stats = {
'new_alerts': new_alerts_count,
'locked_accounts': len(locked_accounts),
'users_with_2fa': users_with_2fa,
'total_admins': total_admins,
'alert_breakdown': {a.alert_type: a.count for a in alert_breakdown}
}
# GeoIP stats
geoip_enabled = _get_geoip_enabled()
geoip_stats = {'today': 0, 'this_month': 0, 'this_year': 0, 'total': 0, 'by_country': []}
if geoip_enabled:
today = datetime.now().date()
first_of_month = today.replace(day=1)
first_of_year = today.replace(month=1, day=1)
# Count geo_blocked alerts
geoip_stats['today'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) == today
).count()
geoip_stats['this_month'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_month
).count()
geoip_stats['this_year'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_year
).count()
geoip_stats['total'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).count()
# Country breakdown (from details JSON)
country_flags = {
'RU': ('🇷🇺', 'Rosja'), 'CN': ('🇨🇳', 'Chiny'), 'KP': ('🇰🇵', 'Korea Płn.'),
'IR': ('🇮🇷', 'Iran'), 'BY': ('🇧🇾', 'Białoruś'), 'SY': ('🇸🇾', 'Syria'),
'VE': ('🇻🇪', 'Wenezuela'), 'CU': ('🇨🇺', 'Kuba')
}
geo_alerts = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).all()
country_counts = {}
for alert in geo_alerts:
if alert.details and 'country' in alert.details:
country = alert.details['country']
if country:
country_counts[country] = country_counts.get(country, 0) + 1
# Sort by count descending
sorted_countries = sorted(country_counts.items(), key=lambda x: x[1], reverse=True)
for code, count in sorted_countries:
flag, name = country_flags.get(code, ('🏴', code))
geoip_stats['by_country'].append({
'code': code, 'flag': flag, 'name': name, 'count': count
})
return render_template(
'admin/security_dashboard.html',
audit_logs=audit_logs,
alerts=alerts,
locked_accounts=locked_accounts,
stats=stats,
geoip_enabled=geoip_enabled,
geoip_stats=geoip_stats,
generated_at=datetime.now()
)
finally:
db.close()
@bp.route('/security/alert/<int:alert_id>/acknowledge', methods=['POST'])
@login_required
def acknowledge_security_alert(alert_id):
"""Acknowledge a security alert"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
db = SessionLocal()
try:
alert = db.query(SecurityAlert).get(alert_id)
if not alert:
return jsonify({'success': False, 'error': 'Alert not found'}), 404
alert.status = 'acknowledged'
alert.acknowledged_by = current_user.id
alert.acknowledged_at = datetime.now()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, 'alert.acknowledge', 'security_alert', alert_id,
details={'alert_type': alert.alert_type})
db.commit()
return jsonify({'success': True})
finally:
db.close()
@bp.route('/security/alert/<int:alert_id>/resolve', methods=['POST'])
@login_required
def resolve_security_alert(alert_id):
"""Resolve a security alert"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
note = request.form.get('note', '')
db = SessionLocal()
try:
alert = db.query(SecurityAlert).get(alert_id)
if not alert:
return jsonify({'success': False, 'error': 'Alert not found'}), 404
alert.status = 'resolved'
alert.resolution_note = note
if not alert.acknowledged_by:
alert.acknowledged_by = current_user.id
alert.acknowledged_at = datetime.now()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, 'alert.resolve', 'security_alert', alert_id,
details={'alert_type': alert.alert_type, 'note': note})
db.commit()
flash('Alert został rozwiązany.', 'success')
return redirect(url_for('admin.admin_security'))
finally:
db.close()
@bp.route('/security/unlock-account/<int:user_id>', methods=['POST'])
@login_required
def unlock_account(user_id):
"""Unlock a locked user account"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
db = SessionLocal()
try:
user = db.query(User).get(user_id)
if not user:
return jsonify({'success': False, 'error': 'User not found'}), 404
user.locked_until = None
user.failed_login_attempts = 0
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, 'user.unlock', 'user', user_id, user.email)
db.commit()
flash(f'Konto {user.email} zostało odblokowane.', 'success')
return redirect(url_for('admin.admin_security'))
finally:
db.close()
@bp.route('/security/geoip-stats')
@login_required
def api_geoip_stats():
"""API endpoint for GeoIP stats auto-refresh"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
from sqlalchemy import func
db = SessionLocal()
try:
now = datetime.now()
geoip_enabled = _get_geoip_enabled()
if not geoip_enabled:
return jsonify({
'enabled': False,
'timestamp': now.isoformat()
})
today = now.date()
first_of_month = today.replace(day=1)
first_of_year = today.replace(month=1, day=1)
stats = {
'enabled': True,
'timestamp': now.isoformat(),
'today': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) == today
).count(),
'this_month': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_month
).count(),
'this_year': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_year
).count(),
'total': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).count()
}
# Country breakdown
country_flags = {
'RU': ('🇷🇺', 'Rosja'), 'CN': ('🇨🇳', 'Chiny'), 'KP': ('🇰🇵', 'Korea Płn.'),
'IR': ('🇮🇷', 'Iran'), 'BY': ('🇧🇾', 'Białoruś'), 'SY': ('🇸🇾', 'Syria'),
'VE': ('🇻🇪', 'Wenezuela'), 'CU': ('🇨🇺', 'Kuba')
}
geo_alerts = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).all()
country_counts = {}
for alert in geo_alerts:
if alert.details and 'country' in alert.details:
country = alert.details['country']
if country:
country_counts[country] = country_counts.get(country, 0) + 1
by_country = []
for code, count in sorted(country_counts.items(), key=lambda x: x[1], reverse=True):
flag, name = country_flags.get(code, ('🏴', code))
by_country.append({'code': code, 'flag': flag, 'name': name, 'count': count})
stats['by_country'] = by_country
return jsonify(stats)
finally:
db.close()

View File

@ -583,17 +583,17 @@
<td>
{% if alert.status == 'new' %}
<div class="action-buttons">
<form method="POST" action="{{ url_for('acknowledge_security_alert', alert_id=alert.id) }}" style="display:inline;">
<form method="POST" action="{{ url_for('admin.acknowledge_security_alert', alert_id=alert.id) }}" style="display:inline;">
{{ csrf_token() }}
<button type="submit" class="btn-sm btn-secondary">Potwierdź</button>
</form>
<form method="POST" action="{{ url_for('resolve_security_alert', alert_id=alert.id) }}" style="display:inline;">
<form method="POST" action="{{ url_for('admin.resolve_security_alert', alert_id=alert.id) }}" style="display:inline;">
{{ csrf_token() }}
<button type="submit" class="btn-sm btn-primary">Rozwiąż</button>
</form>
</div>
{% elif alert.status == 'acknowledged' %}
<form method="POST" action="{{ url_for('resolve_security_alert', alert_id=alert.id) }}" style="display:inline;">
<form method="POST" action="{{ url_for('admin.resolve_security_alert', alert_id=alert.id) }}" style="display:inline;">
{{ csrf_token() }}
<button type="submit" class="btn-sm btn-primary">Rozwiąż</button>
</form>
@ -679,7 +679,7 @@
<td>{{ user.failed_login_attempts }}</td>
<td><span class="timestamp">{{ user.locked_until.strftime('%Y-%m-%d %H:%M') }}</span></td>
<td>
<form method="POST" action="{{ url_for('unlock_account', user_id=user.id) }}" style="display:inline;">
<form method="POST" action="{{ url_for('admin.unlock_account', user_id=user.id) }}" style="display:inline;">
{{ csrf_token() }}
<button type="submit" class="btn-sm btn-danger" onclick="return confirm('Odblokować konto {{ user.email }}?')">
Odblokuj

View File

@ -361,7 +361,7 @@
{% endblock %}
{% block content %}
<a href="{{ url_for('admin_security') }}" class="back-link">← Powrót do panelu bezpieczeństwa</a>
<a href="{{ url_for('admin.admin_security') }}" class="back-link">← Powrót do panelu bezpieczeństwa</a>
<div class="status-header">
<div>
@ -603,7 +603,7 @@
<div class="quick-actions">
<a href="{{ url_for('health_full') }}" class="quick-action-btn" target="_blank">🔍 Pełny health check</a>
<a href="{{ url_for('admin_security') }}" class="quick-action-btn">🛡️ Bezpieczeństwo</a>
<a href="{{ url_for('admin.admin_security') }}" class="quick-action-btn">🛡️ Bezpieczeństwo</a>
</div>
<div class="quick-actions">

View File

@ -1256,7 +1256,7 @@
</svg>
ZOP Kaszubia
</a>
<a href="{{ url_for('admin_security') }}">
<a href="{{ url_for('admin.admin_security') }}">
<svg fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 12l2 2 4-4m5.618-4.016A11.955 11.955 0 0112 2.944a11.955 11.955 0 01-8.618 3.04A12.02 12.02 0 003 9c0 5.591 3.824 10.29 9 11.622 5.176-1.332 9-6.03 9-11.622 0-1.042-.133-2.052-.382-3.016z"/>
</svg>