From ca03cb0b3b0e7400f82684a271de13802d931602 Mon Sep 17 00:00:00 2001 From: Maciej Pienczyn Date: Wed, 28 Jan 2026 22:00:18 +0100 Subject: [PATCH] =?UTF-8?q?feat(security):=20Silnik=20sanityzacji=20danych?= =?UTF-8?q?=20wra=C5=BCliwych=20(RODO)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Automatyczne wykrywanie i maskowanie danych wrażliwych w czacie: - PESEL (walidacja sumy kontrolnej) - Numery kart kredytowych (algorytm Luhn) - IBAN (konta bankowe) - Hasła (detekcja kontekstowa) - Dowody osobiste i paszporty NIE wykrywa (zgodnie z wymogami): - NIP (publiczne dane biznesowe) - Adresy email (celowo podawane) API dla adminów: POST /api/admin/test-sanitization Co-Authored-By: Claude Opus 4.5 --- app.py | 45 +++++ nordabiz_chat.py | 26 ++- sensitive_data_service.py | 338 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 407 insertions(+), 2 deletions(-) create mode 100644 sensitive_data_service.py diff --git a/app.py b/app.py index 9d18ae5..10f4411 100644 --- a/app.py +++ b/app.py @@ -7110,6 +7110,51 @@ def chat_analytics(): db.close() +@app.route('/api/admin/test-sanitization', methods=['POST']) +@login_required +def test_sanitization(): + """ + Admin API: Test sensitive data detection without saving. + Allows admins to verify what data would be sanitized. + """ + if not current_user.is_admin: + return jsonify({'success': False, 'error': 'Admin access required'}), 403 + + try: + from sensitive_data_service import sanitize_message + data = request.get_json() + text = data.get('text', '') + + if not text: + return jsonify({'success': False, 'error': 'Text is required'}), 400 + + sanitized, matches = sanitize_message(text) + + return jsonify({ + 'success': True, + 'original': text, + 'sanitized': sanitized, + 'matches': [ + { + 'type': m.data_type.value, + 'original': m.original, + 'masked': m.masked, + 'confidence': m.confidence + } + for m in matches + ], + 'has_sensitive_data': len(matches) > 0 + }) + except ImportError: + return jsonify({ + 'success': False, + 'error': 'Sensitive data service not available' + }), 500 + except Exception as e: + logger.error(f"Error testing sanitization: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + + @app.route('/admin/analytics') @login_required def admin_analytics(): diff --git a/nordabiz_chat.py b/nordabiz_chat.py index e23e8b6..76809b2 100644 --- a/nordabiz_chat.py +++ b/nordabiz_chat.py @@ -69,6 +69,14 @@ try: except ImportError: ZOPK_KNOWLEDGE_AVAILABLE = False +# Import sensitive data sanitization service (RODO compliance) +try: + from sensitive_data_service import sanitize_message, SensitiveDataType + SENSITIVE_DATA_SERVICE_AVAILABLE = True +except ImportError: + SENSITIVE_DATA_SERVICE_AVAILABLE = False + logger.warning("Sensitive data service not available - messages will not be sanitized") + class NordaBizChatEngine: """ @@ -199,12 +207,24 @@ class NordaBizChatEngine: ) raise PermissionError("Access denied: You don't own this conversation") - # Save user message + # RODO/GDPR: Sanitize user message - remove sensitive data before storage + # Note: NIP and email are NOT considered sensitive (public business data) + sanitized_message = user_message + sensitive_data_found = [] + if SENSITIVE_DATA_SERVICE_AVAILABLE: + sanitized_message, sensitive_data_found = sanitize_message(user_message) + if sensitive_data_found: + logger.info( + f"RODO: Sanitized {len(sensitive_data_found)} sensitive items in message " + f"from user {user_id}: {[m.data_type.value for m in sensitive_data_found]}" + ) + + # Save user message (sanitized for storage, original for AI context) user_msg = AIChatMessage( conversation_id=conversation_id, created_at=datetime.now(), role='user', - content=user_message, + content=sanitized_message, # Store sanitized version edited=False, regenerated=False ) @@ -212,6 +232,8 @@ class NordaBizChatEngine: db.commit() # Build context from conversation history and relevant companies + # Use ORIGINAL message for AI (so it can understand the question) + # but the sanitized version is what gets stored in DB context = self._build_conversation_context(db, conversation, user_message) # Get AI response with cost tracking diff --git a/sensitive_data_service.py b/sensitive_data_service.py new file mode 100644 index 0000000..f89393c --- /dev/null +++ b/sensitive_data_service.py @@ -0,0 +1,338 @@ +#!/usr/bin/env python3 +""" +Sensitive Data Detection and Sanitization Service +================================================== + +Automatically detects and masks sensitive data in user messages. +RODO/GDPR compliant - prevents storage of sensitive personal data. + +Detected data types: +- PESEL (Polish national ID) +- Credit card numbers (Luhn validated) +- IBAN bank account numbers +- Passwords (contextual detection) +- Phone numbers (optional) + +Author: Norda Biznes Development Team +Created: 2026-01-28 +""" + +import re +import logging +from typing import Dict, List, Tuple, Optional +from dataclasses import dataclass +from enum import Enum + +logger = logging.getLogger(__name__) + + +class SensitiveDataType(Enum): + """Types of sensitive data that can be detected""" + PESEL = "pesel" + CREDIT_CARD = "credit_card" + IBAN = "iban" + PASSWORD = "password" + NIP = "nip" + REGON = "regon" + ID_CARD = "id_card" + PASSPORT = "passport" + + +@dataclass +class SensitiveDataMatch: + """Represents a detected sensitive data match""" + data_type: SensitiveDataType + original: str + masked: str + start_pos: int + end_pos: int + confidence: float # 0.0 to 1.0 + + +class SensitiveDataService: + """ + Service for detecting and sanitizing sensitive data in text. + + Usage: + service = SensitiveDataService() + sanitized, matches = service.sanitize("Mój PESEL to 12345678901") + # sanitized = "Mój PESEL to [PESEL UKRYTY]" + """ + + # Masking templates + MASKS = { + SensitiveDataType.PESEL: "[PESEL UKRYTY]", + SensitiveDataType.CREDIT_CARD: "[KARTA UKRYTA]", + SensitiveDataType.IBAN: "[KONTO UKRYTE]", + SensitiveDataType.PASSWORD: "[HASŁO UKRYTE]", + SensitiveDataType.NIP: "[NIP UKRYTY]", + SensitiveDataType.REGON: "[REGON UKRYTY]", + SensitiveDataType.ID_CARD: "[DOWÓD UKRYTY]", + SensitiveDataType.PASSPORT: "[PASZPORT UKRYTY]", + } + + # Regex patterns + PATTERNS = { + # PESEL: 11 digits, often written with spaces + SensitiveDataType.PESEL: r'\b(\d{2})[\s-]?(\d{2})[\s-]?(\d{2})[\s-]?(\d{5})\b', + + # Credit cards: 13-19 digits, often grouped by 4 + SensitiveDataType.CREDIT_CARD: r'\b(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{1,7})\b', + + # IBAN Poland: PL + 26 digits + SensitiveDataType.IBAN: r'\b(PL)?\s?(\d{2})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})\b', + + # Password patterns (contextual) + SensitiveDataType.PASSWORD: r'(?:hasło|password|pass|pwd|pin)[\s:=]+["\']?([^\s"\']{4,})["\']?', + + # NIP: 10 digits + SensitiveDataType.NIP: r'\b(\d{3})[\s-]?(\d{3})[\s-]?(\d{2})[\s-]?(\d{2})\b', + + # REGON: 9 or 14 digits + SensitiveDataType.REGON: r'\b(\d{9}|\d{14})\b', + + # Polish ID card: 3 letters + 6 digits + SensitiveDataType.ID_CARD: r'\b([A-Z]{3})[\s-]?(\d{6})\b', + + # Passport: 2 letters + 7 digits + SensitiveDataType.PASSPORT: r'\b([A-Z]{2})[\s-]?(\d{7})\b', + } + + # Context keywords that increase confidence + CONTEXT_KEYWORDS = { + SensitiveDataType.PESEL: ['pesel', 'numer pesel', 'nr pesel', 'identyfikacyjny'], + SensitiveDataType.CREDIT_CARD: ['karta', 'kredytowa', 'debetowa', 'visa', 'mastercard', 'card'], + SensitiveDataType.IBAN: ['konto', 'bankowe', 'przelew', 'iban', 'numer konta', 'rachunek'], + SensitiveDataType.PASSWORD: ['hasło', 'password', 'login', 'logowanie'], + SensitiveDataType.NIP: ['nip', 'podatnik', 'faktura'], + SensitiveDataType.REGON: ['regon', 'rejestr'], + SensitiveDataType.ID_CARD: ['dowód', 'osobisty', 'dokument'], + SensitiveDataType.PASSPORT: ['paszport', 'passport'], + } + + def __init__(self, enabled_types: Optional[List[SensitiveDataType]] = None): + """ + Initialize service with optional list of data types to detect. + + Args: + enabled_types: List of SensitiveDataType to detect. + If None, detects all types except NIP (often public in business context). + """ + if enabled_types is None: + # Default: detect all except NIP (public for companies) + self.enabled_types = [ + SensitiveDataType.PESEL, + SensitiveDataType.CREDIT_CARD, + SensitiveDataType.IBAN, + SensitiveDataType.PASSWORD, + SensitiveDataType.ID_CARD, + SensitiveDataType.PASSPORT, + ] + else: + self.enabled_types = enabled_types + + def detect(self, text: str) -> List[SensitiveDataMatch]: + """ + Detect all sensitive data in text. + + Args: + text: Input text to scan + + Returns: + List of SensitiveDataMatch objects + """ + matches = [] + text_lower = text.lower() + + for data_type in self.enabled_types: + pattern = self.PATTERNS.get(data_type) + if not pattern: + continue + + for match in re.finditer(pattern, text, re.IGNORECASE): + original = match.group(0) + + # Calculate confidence based on context and validation + confidence = self._calculate_confidence(data_type, original, text_lower, match.start()) + + # Skip low-confidence matches + if confidence < 0.5: + continue + + matches.append(SensitiveDataMatch( + data_type=data_type, + original=original, + masked=self.MASKS[data_type], + start_pos=match.start(), + end_pos=match.end(), + confidence=confidence + )) + + # Sort by position (reverse for safe replacement) + matches.sort(key=lambda m: m.start_pos, reverse=True) + + return matches + + def sanitize(self, text: str) -> Tuple[str, List[SensitiveDataMatch]]: + """ + Detect and mask sensitive data in text. + + Args: + text: Input text to sanitize + + Returns: + Tuple of (sanitized_text, list_of_matches) + """ + matches = self.detect(text) + + sanitized = text + for match in matches: + sanitized = ( + sanitized[:match.start_pos] + + match.masked + + sanitized[match.end_pos:] + ) + + if matches: + logger.info( + f"SENSITIVE_DATA: Sanitized {len(matches)} sensitive data items: " + f"{[m.data_type.value for m in matches]}" + ) + + return sanitized, matches + + def _calculate_confidence( + self, + data_type: SensitiveDataType, + value: str, + text_lower: str, + position: int + ) -> float: + """ + Calculate confidence score for a match. + + Args: + data_type: Type of detected data + value: The matched value + text_lower: Lowercase version of full text (for context search) + position: Position of match in text + + Returns: + Confidence score 0.0 to 1.0 + """ + confidence = 0.5 # Base confidence + + # Check for context keywords nearby (within 50 chars before match) + context_start = max(0, position - 50) + context = text_lower[context_start:position] + + keywords = self.CONTEXT_KEYWORDS.get(data_type, []) + for keyword in keywords: + if keyword in context: + confidence += 0.3 + break + + # Validate specific formats + clean_value = re.sub(r'[\s-]', '', value) + + if data_type == SensitiveDataType.PESEL: + if self._validate_pesel(clean_value): + confidence += 0.2 + + elif data_type == SensitiveDataType.CREDIT_CARD: + if self._validate_luhn(clean_value): + confidence += 0.3 + + elif data_type == SensitiveDataType.IBAN: + if clean_value.upper().startswith('PL') or len(clean_value) == 26: + confidence += 0.2 + + elif data_type == SensitiveDataType.NIP: + if self._validate_nip(clean_value): + confidence += 0.2 + + return min(confidence, 1.0) + + def _validate_pesel(self, pesel: str) -> bool: + """Validate PESEL checksum""" + if len(pesel) != 11 or not pesel.isdigit(): + return False + + weights = [1, 3, 7, 9, 1, 3, 7, 9, 1, 3] + checksum = sum(int(pesel[i]) * weights[i] for i in range(10)) + control = (10 - (checksum % 10)) % 10 + + return control == int(pesel[10]) + + def _validate_luhn(self, number: str) -> bool: + """Validate credit card number using Luhn algorithm""" + if not number.isdigit() or len(number) < 13 or len(number) > 19: + return False + + digits = [int(d) for d in number] + odd_digits = digits[-1::-2] + even_digits = digits[-2::-2] + + checksum = sum(odd_digits) + for d in even_digits: + checksum += sum(divmod(d * 2, 10)) + + return checksum % 10 == 0 + + def _validate_nip(self, nip: str) -> bool: + """Validate Polish NIP checksum""" + if len(nip) != 10 or not nip.isdigit(): + return False + + weights = [6, 5, 7, 2, 3, 4, 5, 6, 7] + checksum = sum(int(nip[i]) * weights[i] for i in range(9)) + control = checksum % 11 + + return control == int(nip[9]) + + +# Global instance for easy import +_service_instance: Optional[SensitiveDataService] = None + + +def get_sensitive_data_service() -> SensitiveDataService: + """Get or create global SensitiveDataService instance""" + global _service_instance + if _service_instance is None: + _service_instance = SensitiveDataService() + return _service_instance + + +def sanitize_message(text: str) -> Tuple[str, List[SensitiveDataMatch]]: + """ + Convenience function to sanitize text using global service. + + Args: + text: Input text to sanitize + + Returns: + Tuple of (sanitized_text, list_of_matches) + """ + return get_sensitive_data_service().sanitize(text) + + +# Quick test +if __name__ == "__main__": + service = SensitiveDataService() + + test_cases = [ + "Mój PESEL to 44051401359", + "Przelej na konto PL61 1090 1014 0000 0712 1981 2874", + "Numer karty: 4532015112830366", + "Moje hasło: SuperSecret123!", + "Dowód osobisty: ABC123456", + "Napisz na email@example.com", # Should NOT be masked (intentional) + ] + + for test in test_cases: + sanitized, matches = service.sanitize(test) + print(f"Input: {test}") + print(f"Output: {sanitized}") + if matches: + print(f"Found: {[(m.data_type.value, m.confidence) for m in matches]}") + print()