feat(security): Silnik sanityzacji danych wrażliwych (RODO)
Automatyczne wykrywanie i maskowanie danych wrażliwych w czacie: - PESEL (walidacja sumy kontrolnej) - Numery kart kredytowych (algorytm Luhn) - IBAN (konta bankowe) - Hasła (detekcja kontekstowa) - Dowody osobiste i paszporty NIE wykrywa (zgodnie z wymogami): - NIP (publiczne dane biznesowe) - Adresy email (celowo podawane) API dla adminów: POST /api/admin/test-sanitization Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
e7483269a0
commit
ca03cb0b3b
45
app.py
45
app.py
@ -7110,6 +7110,51 @@ def chat_analytics():
|
||||
db.close()
|
||||
|
||||
|
||||
@app.route('/api/admin/test-sanitization', methods=['POST'])
|
||||
@login_required
|
||||
def test_sanitization():
|
||||
"""
|
||||
Admin API: Test sensitive data detection without saving.
|
||||
Allows admins to verify what data would be sanitized.
|
||||
"""
|
||||
if not current_user.is_admin:
|
||||
return jsonify({'success': False, 'error': 'Admin access required'}), 403
|
||||
|
||||
try:
|
||||
from sensitive_data_service import sanitize_message
|
||||
data = request.get_json()
|
||||
text = data.get('text', '')
|
||||
|
||||
if not text:
|
||||
return jsonify({'success': False, 'error': 'Text is required'}), 400
|
||||
|
||||
sanitized, matches = sanitize_message(text)
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'original': text,
|
||||
'sanitized': sanitized,
|
||||
'matches': [
|
||||
{
|
||||
'type': m.data_type.value,
|
||||
'original': m.original,
|
||||
'masked': m.masked,
|
||||
'confidence': m.confidence
|
||||
}
|
||||
for m in matches
|
||||
],
|
||||
'has_sensitive_data': len(matches) > 0
|
||||
})
|
||||
except ImportError:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': 'Sensitive data service not available'
|
||||
}), 500
|
||||
except Exception as e:
|
||||
logger.error(f"Error testing sanitization: {e}")
|
||||
return jsonify({'success': False, 'error': str(e)}), 500
|
||||
|
||||
|
||||
@app.route('/admin/analytics')
|
||||
@login_required
|
||||
def admin_analytics():
|
||||
|
||||
@ -69,6 +69,14 @@ try:
|
||||
except ImportError:
|
||||
ZOPK_KNOWLEDGE_AVAILABLE = False
|
||||
|
||||
# Import sensitive data sanitization service (RODO compliance)
|
||||
try:
|
||||
from sensitive_data_service import sanitize_message, SensitiveDataType
|
||||
SENSITIVE_DATA_SERVICE_AVAILABLE = True
|
||||
except ImportError:
|
||||
SENSITIVE_DATA_SERVICE_AVAILABLE = False
|
||||
logger.warning("Sensitive data service not available - messages will not be sanitized")
|
||||
|
||||
|
||||
class NordaBizChatEngine:
|
||||
"""
|
||||
@ -199,12 +207,24 @@ class NordaBizChatEngine:
|
||||
)
|
||||
raise PermissionError("Access denied: You don't own this conversation")
|
||||
|
||||
# Save user message
|
||||
# RODO/GDPR: Sanitize user message - remove sensitive data before storage
|
||||
# Note: NIP and email are NOT considered sensitive (public business data)
|
||||
sanitized_message = user_message
|
||||
sensitive_data_found = []
|
||||
if SENSITIVE_DATA_SERVICE_AVAILABLE:
|
||||
sanitized_message, sensitive_data_found = sanitize_message(user_message)
|
||||
if sensitive_data_found:
|
||||
logger.info(
|
||||
f"RODO: Sanitized {len(sensitive_data_found)} sensitive items in message "
|
||||
f"from user {user_id}: {[m.data_type.value for m in sensitive_data_found]}"
|
||||
)
|
||||
|
||||
# Save user message (sanitized for storage, original for AI context)
|
||||
user_msg = AIChatMessage(
|
||||
conversation_id=conversation_id,
|
||||
created_at=datetime.now(),
|
||||
role='user',
|
||||
content=user_message,
|
||||
content=sanitized_message, # Store sanitized version
|
||||
edited=False,
|
||||
regenerated=False
|
||||
)
|
||||
@ -212,6 +232,8 @@ class NordaBizChatEngine:
|
||||
db.commit()
|
||||
|
||||
# Build context from conversation history and relevant companies
|
||||
# Use ORIGINAL message for AI (so it can understand the question)
|
||||
# but the sanitized version is what gets stored in DB
|
||||
context = self._build_conversation_context(db, conversation, user_message)
|
||||
|
||||
# Get AI response with cost tracking
|
||||
|
||||
338
sensitive_data_service.py
Normal file
338
sensitive_data_service.py
Normal file
@ -0,0 +1,338 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sensitive Data Detection and Sanitization Service
|
||||
==================================================
|
||||
|
||||
Automatically detects and masks sensitive data in user messages.
|
||||
RODO/GDPR compliant - prevents storage of sensitive personal data.
|
||||
|
||||
Detected data types:
|
||||
- PESEL (Polish national ID)
|
||||
- Credit card numbers (Luhn validated)
|
||||
- IBAN bank account numbers
|
||||
- Passwords (contextual detection)
|
||||
- Phone numbers (optional)
|
||||
|
||||
Author: Norda Biznes Development Team
|
||||
Created: 2026-01-28
|
||||
"""
|
||||
|
||||
import re
|
||||
import logging
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SensitiveDataType(Enum):
|
||||
"""Types of sensitive data that can be detected"""
|
||||
PESEL = "pesel"
|
||||
CREDIT_CARD = "credit_card"
|
||||
IBAN = "iban"
|
||||
PASSWORD = "password"
|
||||
NIP = "nip"
|
||||
REGON = "regon"
|
||||
ID_CARD = "id_card"
|
||||
PASSPORT = "passport"
|
||||
|
||||
|
||||
@dataclass
|
||||
class SensitiveDataMatch:
|
||||
"""Represents a detected sensitive data match"""
|
||||
data_type: SensitiveDataType
|
||||
original: str
|
||||
masked: str
|
||||
start_pos: int
|
||||
end_pos: int
|
||||
confidence: float # 0.0 to 1.0
|
||||
|
||||
|
||||
class SensitiveDataService:
|
||||
"""
|
||||
Service for detecting and sanitizing sensitive data in text.
|
||||
|
||||
Usage:
|
||||
service = SensitiveDataService()
|
||||
sanitized, matches = service.sanitize("Mój PESEL to 12345678901")
|
||||
# sanitized = "Mój PESEL to [PESEL UKRYTY]"
|
||||
"""
|
||||
|
||||
# Masking templates
|
||||
MASKS = {
|
||||
SensitiveDataType.PESEL: "[PESEL UKRYTY]",
|
||||
SensitiveDataType.CREDIT_CARD: "[KARTA UKRYTA]",
|
||||
SensitiveDataType.IBAN: "[KONTO UKRYTE]",
|
||||
SensitiveDataType.PASSWORD: "[HASŁO UKRYTE]",
|
||||
SensitiveDataType.NIP: "[NIP UKRYTY]",
|
||||
SensitiveDataType.REGON: "[REGON UKRYTY]",
|
||||
SensitiveDataType.ID_CARD: "[DOWÓD UKRYTY]",
|
||||
SensitiveDataType.PASSPORT: "[PASZPORT UKRYTY]",
|
||||
}
|
||||
|
||||
# Regex patterns
|
||||
PATTERNS = {
|
||||
# PESEL: 11 digits, often written with spaces
|
||||
SensitiveDataType.PESEL: r'\b(\d{2})[\s-]?(\d{2})[\s-]?(\d{2})[\s-]?(\d{5})\b',
|
||||
|
||||
# Credit cards: 13-19 digits, often grouped by 4
|
||||
SensitiveDataType.CREDIT_CARD: r'\b(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{1,7})\b',
|
||||
|
||||
# IBAN Poland: PL + 26 digits
|
||||
SensitiveDataType.IBAN: r'\b(PL)?\s?(\d{2})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})\b',
|
||||
|
||||
# Password patterns (contextual)
|
||||
SensitiveDataType.PASSWORD: r'(?:hasło|password|pass|pwd|pin)[\s:=]+["\']?([^\s"\']{4,})["\']?',
|
||||
|
||||
# NIP: 10 digits
|
||||
SensitiveDataType.NIP: r'\b(\d{3})[\s-]?(\d{3})[\s-]?(\d{2})[\s-]?(\d{2})\b',
|
||||
|
||||
# REGON: 9 or 14 digits
|
||||
SensitiveDataType.REGON: r'\b(\d{9}|\d{14})\b',
|
||||
|
||||
# Polish ID card: 3 letters + 6 digits
|
||||
SensitiveDataType.ID_CARD: r'\b([A-Z]{3})[\s-]?(\d{6})\b',
|
||||
|
||||
# Passport: 2 letters + 7 digits
|
||||
SensitiveDataType.PASSPORT: r'\b([A-Z]{2})[\s-]?(\d{7})\b',
|
||||
}
|
||||
|
||||
# Context keywords that increase confidence
|
||||
CONTEXT_KEYWORDS = {
|
||||
SensitiveDataType.PESEL: ['pesel', 'numer pesel', 'nr pesel', 'identyfikacyjny'],
|
||||
SensitiveDataType.CREDIT_CARD: ['karta', 'kredytowa', 'debetowa', 'visa', 'mastercard', 'card'],
|
||||
SensitiveDataType.IBAN: ['konto', 'bankowe', 'przelew', 'iban', 'numer konta', 'rachunek'],
|
||||
SensitiveDataType.PASSWORD: ['hasło', 'password', 'login', 'logowanie'],
|
||||
SensitiveDataType.NIP: ['nip', 'podatnik', 'faktura'],
|
||||
SensitiveDataType.REGON: ['regon', 'rejestr'],
|
||||
SensitiveDataType.ID_CARD: ['dowód', 'osobisty', 'dokument'],
|
||||
SensitiveDataType.PASSPORT: ['paszport', 'passport'],
|
||||
}
|
||||
|
||||
def __init__(self, enabled_types: Optional[List[SensitiveDataType]] = None):
|
||||
"""
|
||||
Initialize service with optional list of data types to detect.
|
||||
|
||||
Args:
|
||||
enabled_types: List of SensitiveDataType to detect.
|
||||
If None, detects all types except NIP (often public in business context).
|
||||
"""
|
||||
if enabled_types is None:
|
||||
# Default: detect all except NIP (public for companies)
|
||||
self.enabled_types = [
|
||||
SensitiveDataType.PESEL,
|
||||
SensitiveDataType.CREDIT_CARD,
|
||||
SensitiveDataType.IBAN,
|
||||
SensitiveDataType.PASSWORD,
|
||||
SensitiveDataType.ID_CARD,
|
||||
SensitiveDataType.PASSPORT,
|
||||
]
|
||||
else:
|
||||
self.enabled_types = enabled_types
|
||||
|
||||
def detect(self, text: str) -> List[SensitiveDataMatch]:
|
||||
"""
|
||||
Detect all sensitive data in text.
|
||||
|
||||
Args:
|
||||
text: Input text to scan
|
||||
|
||||
Returns:
|
||||
List of SensitiveDataMatch objects
|
||||
"""
|
||||
matches = []
|
||||
text_lower = text.lower()
|
||||
|
||||
for data_type in self.enabled_types:
|
||||
pattern = self.PATTERNS.get(data_type)
|
||||
if not pattern:
|
||||
continue
|
||||
|
||||
for match in re.finditer(pattern, text, re.IGNORECASE):
|
||||
original = match.group(0)
|
||||
|
||||
# Calculate confidence based on context and validation
|
||||
confidence = self._calculate_confidence(data_type, original, text_lower, match.start())
|
||||
|
||||
# Skip low-confidence matches
|
||||
if confidence < 0.5:
|
||||
continue
|
||||
|
||||
matches.append(SensitiveDataMatch(
|
||||
data_type=data_type,
|
||||
original=original,
|
||||
masked=self.MASKS[data_type],
|
||||
start_pos=match.start(),
|
||||
end_pos=match.end(),
|
||||
confidence=confidence
|
||||
))
|
||||
|
||||
# Sort by position (reverse for safe replacement)
|
||||
matches.sort(key=lambda m: m.start_pos, reverse=True)
|
||||
|
||||
return matches
|
||||
|
||||
def sanitize(self, text: str) -> Tuple[str, List[SensitiveDataMatch]]:
|
||||
"""
|
||||
Detect and mask sensitive data in text.
|
||||
|
||||
Args:
|
||||
text: Input text to sanitize
|
||||
|
||||
Returns:
|
||||
Tuple of (sanitized_text, list_of_matches)
|
||||
"""
|
||||
matches = self.detect(text)
|
||||
|
||||
sanitized = text
|
||||
for match in matches:
|
||||
sanitized = (
|
||||
sanitized[:match.start_pos] +
|
||||
match.masked +
|
||||
sanitized[match.end_pos:]
|
||||
)
|
||||
|
||||
if matches:
|
||||
logger.info(
|
||||
f"SENSITIVE_DATA: Sanitized {len(matches)} sensitive data items: "
|
||||
f"{[m.data_type.value for m in matches]}"
|
||||
)
|
||||
|
||||
return sanitized, matches
|
||||
|
||||
def _calculate_confidence(
|
||||
self,
|
||||
data_type: SensitiveDataType,
|
||||
value: str,
|
||||
text_lower: str,
|
||||
position: int
|
||||
) -> float:
|
||||
"""
|
||||
Calculate confidence score for a match.
|
||||
|
||||
Args:
|
||||
data_type: Type of detected data
|
||||
value: The matched value
|
||||
text_lower: Lowercase version of full text (for context search)
|
||||
position: Position of match in text
|
||||
|
||||
Returns:
|
||||
Confidence score 0.0 to 1.0
|
||||
"""
|
||||
confidence = 0.5 # Base confidence
|
||||
|
||||
# Check for context keywords nearby (within 50 chars before match)
|
||||
context_start = max(0, position - 50)
|
||||
context = text_lower[context_start:position]
|
||||
|
||||
keywords = self.CONTEXT_KEYWORDS.get(data_type, [])
|
||||
for keyword in keywords:
|
||||
if keyword in context:
|
||||
confidence += 0.3
|
||||
break
|
||||
|
||||
# Validate specific formats
|
||||
clean_value = re.sub(r'[\s-]', '', value)
|
||||
|
||||
if data_type == SensitiveDataType.PESEL:
|
||||
if self._validate_pesel(clean_value):
|
||||
confidence += 0.2
|
||||
|
||||
elif data_type == SensitiveDataType.CREDIT_CARD:
|
||||
if self._validate_luhn(clean_value):
|
||||
confidence += 0.3
|
||||
|
||||
elif data_type == SensitiveDataType.IBAN:
|
||||
if clean_value.upper().startswith('PL') or len(clean_value) == 26:
|
||||
confidence += 0.2
|
||||
|
||||
elif data_type == SensitiveDataType.NIP:
|
||||
if self._validate_nip(clean_value):
|
||||
confidence += 0.2
|
||||
|
||||
return min(confidence, 1.0)
|
||||
|
||||
def _validate_pesel(self, pesel: str) -> bool:
|
||||
"""Validate PESEL checksum"""
|
||||
if len(pesel) != 11 or not pesel.isdigit():
|
||||
return False
|
||||
|
||||
weights = [1, 3, 7, 9, 1, 3, 7, 9, 1, 3]
|
||||
checksum = sum(int(pesel[i]) * weights[i] for i in range(10))
|
||||
control = (10 - (checksum % 10)) % 10
|
||||
|
||||
return control == int(pesel[10])
|
||||
|
||||
def _validate_luhn(self, number: str) -> bool:
|
||||
"""Validate credit card number using Luhn algorithm"""
|
||||
if not number.isdigit() or len(number) < 13 or len(number) > 19:
|
||||
return False
|
||||
|
||||
digits = [int(d) for d in number]
|
||||
odd_digits = digits[-1::-2]
|
||||
even_digits = digits[-2::-2]
|
||||
|
||||
checksum = sum(odd_digits)
|
||||
for d in even_digits:
|
||||
checksum += sum(divmod(d * 2, 10))
|
||||
|
||||
return checksum % 10 == 0
|
||||
|
||||
def _validate_nip(self, nip: str) -> bool:
|
||||
"""Validate Polish NIP checksum"""
|
||||
if len(nip) != 10 or not nip.isdigit():
|
||||
return False
|
||||
|
||||
weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
|
||||
checksum = sum(int(nip[i]) * weights[i] for i in range(9))
|
||||
control = checksum % 11
|
||||
|
||||
return control == int(nip[9])
|
||||
|
||||
|
||||
# Global instance for easy import
|
||||
_service_instance: Optional[SensitiveDataService] = None
|
||||
|
||||
|
||||
def get_sensitive_data_service() -> SensitiveDataService:
|
||||
"""Get or create global SensitiveDataService instance"""
|
||||
global _service_instance
|
||||
if _service_instance is None:
|
||||
_service_instance = SensitiveDataService()
|
||||
return _service_instance
|
||||
|
||||
|
||||
def sanitize_message(text: str) -> Tuple[str, List[SensitiveDataMatch]]:
|
||||
"""
|
||||
Convenience function to sanitize text using global service.
|
||||
|
||||
Args:
|
||||
text: Input text to sanitize
|
||||
|
||||
Returns:
|
||||
Tuple of (sanitized_text, list_of_matches)
|
||||
"""
|
||||
return get_sensitive_data_service().sanitize(text)
|
||||
|
||||
|
||||
# Quick test
|
||||
if __name__ == "__main__":
|
||||
service = SensitiveDataService()
|
||||
|
||||
test_cases = [
|
||||
"Mój PESEL to 44051401359",
|
||||
"Przelej na konto PL61 1090 1014 0000 0712 1981 2874",
|
||||
"Numer karty: 4532015112830366",
|
||||
"Moje hasło: SuperSecret123!",
|
||||
"Dowód osobisty: ABC123456",
|
||||
"Napisz na email@example.com", # Should NOT be masked (intentional)
|
||||
]
|
||||
|
||||
for test in test_cases:
|
||||
sanitized, matches = service.sanitize(test)
|
||||
print(f"Input: {test}")
|
||||
print(f"Output: {sanitized}")
|
||||
if matches:
|
||||
print(f"Found: {[(m.data_type.value, m.confidence) for m in matches]}")
|
||||
print()
|
||||
Loading…
Reference in New Issue
Block a user