nordabiz/security_service.py
Maciej Pienczyn 0dba52e9c4 feat: Add security features - 2FA, audit log, alerting
Security enhancements:
- Two-Factor Authentication (TOTP) for all users
  - Enable/disable 2FA in settings
  - Backup codes for recovery
  - Login flow with 2FA verification
- Audit log for admin actions
  - Track all sensitive operations
  - IP address and user agent logging
- Security alerts system
  - Alert types: brute_force, honeypot_hit, account_locked, geo_blocked
  - Email notifications for high/critical alerts
  - Dashboard for alert management
- Admin security dashboard (/admin/security)
  - View/acknowledge/resolve alerts
  - Unlock locked accounts
  - 2FA status overview

New files:
- security_service.py: Security utilities
- templates/auth/verify_2fa.html
- templates/auth/2fa_settings.html
- templates/auth/2fa_setup.html
- templates/auth/2fa_backup_codes.html
- templates/admin/security_dashboard.html

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 21:23:27 +01:00

456 lines
13 KiB
Python

"""
Norda Biznes - Security Service
================================
Security utilities for NordaBiz platform:
- Audit logging (admin action tracking)
- Security alerting (email notifications)
- GeoIP blocking (Poland only)
- 2FA (TOTP) helpers
Author: Norda Biznes Development Team
Created: 2026-01-14
"""
import os
import logging
import secrets
from datetime import datetime
from functools import wraps
from flask import request, abort, flash, redirect, url_for
from flask_login import current_user
logger = logging.getLogger(__name__)
# ============================================================
# AUDIT LOG
# ============================================================
def log_audit(db, action: str, entity_type: str, entity_id: int = None,
entity_name: str = None, details: dict = None, user=None):
"""
Log an administrative action to the audit log.
Args:
db: Database session
action: Action identifier (e.g., 'news.approve', 'company.edit')
entity_type: Type of entity ('news', 'company', 'user', 'event')
entity_id: ID of the affected entity
entity_name: Human-readable name of the entity
details: Additional details (old_value, new_value, reason, etc.)
user: User performing the action (defaults to current_user)
Example:
log_audit(db, 'news.approve', 'news', news_id=123,
entity_name='Artykuł o firmie XYZ',
details={'previous_status': 'pending'})
"""
from database import AuditLog
if user is None:
user = current_user if current_user.is_authenticated else None
audit_entry = AuditLog(
user_id=user.id if user else None,
user_email=user.email if user else 'system',
action=action,
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
details=details,
ip_address=get_client_ip(),
user_agent=request.user_agent.string[:500] if request else None,
request_path=request.path if request else None
)
db.add(audit_entry)
# Don't commit here - let the caller manage the transaction
logger.info(f"AUDIT: {audit_entry.user_email} performed {action} on {entity_type}:{entity_id}")
def get_client_ip():
"""Get real client IP, handling X-Forwarded-For from reverse proxy."""
if request:
forwarded_for = request.headers.get('X-Forwarded-For', '')
if forwarded_for:
return forwarded_for.split(',')[0].strip()
return request.remote_addr
return None
# ============================================================
# SECURITY ALERTING
# ============================================================
# Alert thresholds
ALERT_THRESHOLDS = {
'brute_force': 5, # Failed logins from same IP
'honeypot_hit': 3, # Honeypot accesses from same IP
'account_locked': 1, # Always alert on account lockout
}
# Admin email for alerts
SECURITY_ALERT_EMAIL = os.getenv('SECURITY_ALERT_EMAIL', 'admin@nordabiznes.pl')
def create_security_alert(db, alert_type: str, severity: str = 'medium',
ip_address: str = None, user_email: str = None,
details: dict = None, send_email: bool = True):
"""
Create a security alert and optionally send email notification.
Args:
db: Database session
alert_type: Type of alert ('brute_force', 'honeypot_hit', 'account_locked', 'geo_blocked')
severity: Alert severity ('low', 'medium', 'high', 'critical')
ip_address: Source IP address
user_email: Associated user email (if any)
details: Additional context
send_email: Whether to send email notification
Returns:
SecurityAlert object
"""
from database import SecurityAlert
alert = SecurityAlert(
alert_type=alert_type,
severity=severity,
ip_address=ip_address or get_client_ip(),
user_email=user_email,
details=details or {}
)
db.add(alert)
db.flush() # Get the ID
logger.warning(f"SECURITY_ALERT: {alert_type} ({severity}) from {alert.ip_address}")
# Send email notification for high/critical alerts
if send_email and severity in ('high', 'critical'):
_send_alert_email(alert)
return alert
def _send_alert_email(alert):
"""Send email notification for a security alert."""
try:
from email_service import send_email, is_configured
if not is_configured():
logger.warning("Email service not configured, cannot send security alert email")
return False
subject = f"[ALERT] {alert.severity.upper()}: {alert.alert_type} - NordaBiznes"
body = f"""
<h2>Security Alert - NordaBiznes</h2>
<p><strong>Type:</strong> {alert.alert_type}</p>
<p><strong>Severity:</strong> {alert.severity.upper()}</p>
<p><strong>Time:</strong> {alert.created_at.strftime('%Y-%m-%d %H:%M:%S')}</p>
<p><strong>IP Address:</strong> {alert.ip_address or 'Unknown'}</p>
<p><strong>User Email:</strong> {alert.user_email or 'N/A'}</p>
<p><strong>Details:</strong></p>
<pre>{alert.details}</pre>
<hr>
<p>Review alerts at: <a href="https://nordabiznes.pl/admin/security">Security Dashboard</a></p>
"""
success = send_email(
to_email=SECURITY_ALERT_EMAIL,
subject=subject,
html_content=body
)
if success:
alert.email_sent = True
alert.email_sent_at = datetime.utcnow()
logger.info(f"Security alert email sent for alert {alert.id}")
return success
except Exception as e:
logger.error(f"Failed to send security alert email: {e}")
return False
# ============================================================
# GEOIP BLOCKING
# ============================================================
# GeoIP configuration
GEOIP_ENABLED = os.getenv('GEOIP_ENABLED', 'false').lower() == 'true'
GEOIP_DB_PATH = os.getenv('GEOIP_DB_PATH', '/var/lib/GeoIP/GeoLite2-Country.mmdb')
ALLOWED_COUNTRIES = {'PL'} # Only Poland allowed
GEOIP_WHITELIST = set(os.getenv('GEOIP_WHITELIST', '').split(',')) - {''} # Whitelisted IPs
# GeoIP reader (lazy loaded)
_geoip_reader = None
def get_geoip_reader():
"""Get or initialize GeoIP reader."""
global _geoip_reader
if _geoip_reader is not None:
return _geoip_reader
if not GEOIP_ENABLED:
return None
try:
import geoip2.database
if os.path.exists(GEOIP_DB_PATH):
_geoip_reader = geoip2.database.Reader(GEOIP_DB_PATH)
logger.info(f"GeoIP database loaded from {GEOIP_DB_PATH}")
else:
logger.warning(f"GeoIP database not found at {GEOIP_DB_PATH}")
except ImportError:
logger.warning("geoip2 package not installed, GeoIP blocking disabled")
except Exception as e:
logger.error(f"Failed to load GeoIP database: {e}")
return _geoip_reader
def get_country_code(ip_address: str) -> str:
"""
Get country code for an IP address.
Returns:
ISO 3166-1 alpha-2 country code (e.g., 'PL', 'DE') or None
"""
reader = get_geoip_reader()
if not reader:
return None
try:
response = reader.country(ip_address)
return response.country.iso_code
except Exception:
return None
def is_ip_allowed(ip_address: str = None) -> bool:
"""
Check if an IP address is allowed (from Poland or whitelisted).
Args:
ip_address: IP to check (defaults to current request IP)
Returns:
True if allowed, False if blocked
"""
if not GEOIP_ENABLED:
return True
if ip_address is None:
ip_address = get_client_ip()
if not ip_address:
return True
# Check whitelist first
if ip_address in GEOIP_WHITELIST:
return True
# Local/private IPs are always allowed
if ip_address.startswith(('10.', '192.168.', '172.', '127.')):
return True
country = get_country_code(ip_address)
# If we can't determine country, allow (fail open)
if country is None:
return True
return country in ALLOWED_COUNTRIES
def geoip_check():
"""
Decorator to block non-Polish IPs.
Usage:
@app.route('/sensitive-endpoint')
@geoip_check()
def sensitive_endpoint():
...
"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
if not is_ip_allowed():
ip = get_client_ip()
country = get_country_code(ip)
logger.warning(f"GEOIP_BLOCKED ip={ip} country={country} path={request.path}")
# Create alert for blocked access
try:
from database import SessionLocal
db = SessionLocal()
create_security_alert(
db, 'geo_blocked', 'low',
ip_address=ip,
details={'country': country, 'path': request.path}
)
db.commit()
db.close()
except Exception as e:
logger.error(f"Failed to create geo block alert: {e}")
abort(403)
return f(*args, **kwargs)
return wrapped
return decorator
# ============================================================
# TWO-FACTOR AUTHENTICATION (TOTP)
# ============================================================
TOTP_ISSUER = 'NordaBiznes'
def generate_totp_secret() -> str:
"""Generate a new TOTP secret key."""
try:
import pyotp
return pyotp.random_base32()
except ImportError:
logger.error("pyotp package not installed")
return None
def get_totp_uri(user) -> str:
"""
Get TOTP provisioning URI for QR code generation.
Args:
user: User object with totp_secret set
Returns:
otpauth:// URI for authenticator apps
"""
if not user.totp_secret:
return None
try:
import pyotp
totp = pyotp.TOTP(user.totp_secret)
return totp.provisioning_uri(
name=user.email,
issuer_name=TOTP_ISSUER
)
except ImportError:
logger.error("pyotp package not installed")
return None
def verify_totp(user, code: str) -> bool:
"""
Verify a TOTP code for a user.
Args:
user: User object with totp_secret
code: 6-digit TOTP code to verify
Returns:
True if valid, False otherwise
"""
if not user.totp_secret:
return False
try:
import pyotp
totp = pyotp.TOTP(user.totp_secret)
return totp.verify(code, valid_window=1) # Allow 1 step drift
except ImportError:
logger.error("pyotp package not installed")
return False
def generate_backup_codes(count: int = 8) -> list:
"""
Generate backup codes for 2FA recovery.
Returns:
List of 8-character backup codes
"""
codes = []
for _ in range(count):
# Generate 8-character alphanumeric code
code = secrets.token_hex(4).upper()
codes.append(code)
return codes
def verify_backup_code(user, code: str, db) -> bool:
"""
Verify and consume a backup code.
Args:
user: User object with totp_backup_codes
code: Backup code to verify
db: Database session to update user
Returns:
True if valid (code is consumed), False otherwise
"""
if not user.totp_backup_codes:
return False
code = code.upper().strip()
if code in user.totp_backup_codes:
# Remove the used code
remaining_codes = [c for c in user.totp_backup_codes if c != code]
user.totp_backup_codes = remaining_codes
db.commit()
logger.info(f"Backup code used for user {user.email}, {len(remaining_codes)} codes remaining")
return True
return False
def requires_2fa(f):
"""
Decorator for routes that require 2FA verification.
If user has 2FA enabled but hasn't verified this session,
redirect to 2FA verification page.
Usage:
@app.route('/admin/sensitive')
@login_required
@requires_2fa
def admin_sensitive():
...
"""
@wraps(f)
def wrapped(*args, **kwargs):
from flask import session
if not current_user.is_authenticated:
return redirect(url_for('login'))
# If user has 2FA enabled and hasn't verified this session
if current_user.totp_enabled and not session.get('2fa_verified'):
flash('Wymagana weryfikacja 2FA.', 'warning')
session['2fa_next'] = request.url
return redirect(url_for('verify_2fa'))
return f(*args, **kwargs)
return wrapped
# ============================================================
# INITIALIZATION
# ============================================================
def init_security_service():
"""Initialize security service (load GeoIP database, etc.)."""
if GEOIP_ENABLED:
get_geoip_reader()
logger.info("Security service initialized")