""" File Upload Service =================== Secure file upload handling for forum and classified attachments. Supports JPG, PNG, GIF images up to 5MB. Features: - File type validation (magic bytes + extension) - Size limits - EXIF data stripping for privacy - UUID-based filenames for security - Date-organized storage structure Author: Maciej Pienczyn, InPi sp. z o.o. Created: 2026-01-10 """ import os import uuid import logging from datetime import datetime from typing import Tuple, Optional from werkzeug.datastructures import FileStorage logger = logging.getLogger(__name__) # Configuration ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif'} ALLOWED_MIME_TYPES = {'image/jpeg', 'image/png', 'image/gif'} MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB MAX_IMAGE_DIMENSIONS = (4096, 4096) # Max 4K resolution # Get absolute path based on this file's location _BASE_DIR = os.path.dirname(os.path.abspath(__file__)) UPLOAD_BASE_PATH = os.path.join(_BASE_DIR, 'static', 'uploads', 'forum') UPLOAD_CLASSIFIEDS_PATH = os.path.join(_BASE_DIR, 'static', 'uploads', 'classifieds') # Magic bytes for image validation IMAGE_SIGNATURES = { b'\xff\xd8\xff': 'jpg', # JPEG b'\x89PNG\r\n\x1a\n': 'png', # PNG b'GIF87a': 'gif', # GIF87a b'GIF89a': 'gif', # GIF89a } class FileUploadService: """Secure file upload service for forum attachments""" @staticmethod def validate_file(file: FileStorage) -> Tuple[bool, str]: """ Validate uploaded file. Args: file: Werkzeug FileStorage object Returns: Tuple of (is_valid, error_message) """ # Check if file exists if not file or file.filename == '': return False, 'Nie wybrano pliku' # Check extension ext = file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else '' if ext not in ALLOWED_EXTENSIONS: return False, f'Niedozwolony format pliku. Dozwolone: {", ".join(sorted(ALLOWED_EXTENSIONS))}' # Check file size file.seek(0, 2) # Seek to end size = file.tell() file.seek(0) # Reset to beginning if size > MAX_FILE_SIZE: return False, f'Plik jest za duży (max {MAX_FILE_SIZE // 1024 // 1024}MB)' if size == 0: return False, 'Plik jest pusty' # Verify magic bytes (actual file type) header = file.read(16) file.seek(0) detected_type = None for signature, file_type in IMAGE_SIGNATURES.items(): if header.startswith(signature): detected_type = file_type break if not detected_type: return False, 'Plik nie jest prawidłowym obrazem' # Check if extension matches detected type if ext == 'jpg': ext = 'jpeg' # Normalize if detected_type == 'jpg': detected_type = 'jpeg' if detected_type not in (ext, 'jpeg' if ext == 'jpg' else ext): # Allow jpg/jpeg mismatch if not (detected_type == 'jpeg' and ext in ('jpg', 'jpeg')): return False, f'Rozszerzenie pliku ({ext}) nie odpowiada zawartości ({detected_type})' # Validate image dimensions using PIL (if available) try: from PIL import Image img = Image.open(file) width, height = img.size file.seek(0) if width > MAX_IMAGE_DIMENSIONS[0] or height > MAX_IMAGE_DIMENSIONS[1]: return False, f'Obraz jest za duży (max {MAX_IMAGE_DIMENSIONS[0]}x{MAX_IMAGE_DIMENSIONS[1]}px)' except ImportError: # PIL not available, skip dimension check logger.warning("PIL not available, skipping image dimension validation") except Exception as e: file.seek(0) return False, f'Nie można odczytać obrazu: {str(e)}' return True, '' @staticmethod def generate_stored_filename(original_filename: str) -> str: """ Generate secure UUID-based filename preserving extension. Args: original_filename: Original filename from upload Returns: UUID-based filename with original extension """ ext = original_filename.rsplit('.', 1)[-1].lower() if '.' in original_filename else 'bin' if ext == 'jpeg': ext = 'jpg' # Normalize to jpg return f"{uuid.uuid4()}.{ext}" @staticmethod def get_upload_path(attachment_type: str) -> str: """ Get upload directory path with date-based organization. Args: attachment_type: 'topic' or 'reply' Returns: Full path to upload directory """ now = datetime.now() if attachment_type == 'classified': path = os.path.join(UPLOAD_CLASSIFIEDS_PATH, str(now.year), f"{now.month:02d}") else: subdir = 'topics' if attachment_type == 'topic' else 'replies' path = os.path.join(UPLOAD_BASE_PATH, subdir, str(now.year), f"{now.month:02d}") os.makedirs(path, exist_ok=True) return path @staticmethod def save_file(file: FileStorage, attachment_type: str) -> Tuple[str, str, int, str]: """ Save file securely with EXIF stripping. Args: file: Werkzeug FileStorage object attachment_type: 'topic' or 'reply' Returns: Tuple of (stored_filename, relative_path, file_size, mime_type) """ stored_filename = FileUploadService.generate_stored_filename(file.filename) upload_dir = FileUploadService.get_upload_path(attachment_type) file_path = os.path.join(upload_dir, stored_filename) # Determine mime type ext = stored_filename.rsplit('.', 1)[-1].lower() mime_types = { 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'gif': 'image/gif' } mime_type = mime_types.get(ext, 'application/octet-stream') try: from PIL import Image # Open and process image img = Image.open(file) # For GIF, preserve animation if ext == 'gif' and getattr(img, 'is_animated', False): # Save animated GIF without modification file.seek(0) file.save(file_path) else: # Strip EXIF data by creating new image if img.mode in ('RGBA', 'LA', 'P'): # Keep transparency for PNG clean_img = Image.new(img.mode, img.size) clean_img.putdata(list(img.getdata())) else: # Convert to RGB for JPEG if img.mode != 'RGB': img = img.convert('RGB') clean_img = Image.new('RGB', img.size) clean_img.putdata(list(img.getdata())) # Save with optimization save_kwargs = {'optimize': True} if ext in ('jpg', 'jpeg'): save_kwargs['quality'] = 85 elif ext == 'png': save_kwargs['compress_level'] = 6 clean_img.save(file_path, **save_kwargs) file_size = os.path.getsize(file_path) relative_path = os.path.relpath(file_path, os.path.join(_BASE_DIR, 'static')) logger.info(f"Saved forum attachment: {stored_filename} ({file_size} bytes)") return stored_filename, relative_path, file_size, mime_type except ImportError: # PIL not available, save without processing logger.warning("PIL not available, saving file without EXIF stripping") file.seek(0) file.save(file_path) file_size = os.path.getsize(file_path) relative_path = os.path.relpath(file_path, os.path.join(_BASE_DIR, 'static')) return stored_filename, relative_path, file_size, mime_type @staticmethod def delete_file(stored_filename: str, attachment_type: str, created_at: Optional[datetime] = None) -> bool: """ Delete file from storage. Args: stored_filename: UUID-based filename attachment_type: 'topic' or 'reply' created_at: Creation timestamp to determine path Returns: True if deleted, False otherwise """ if attachment_type == 'classified': base_path = UPLOAD_CLASSIFIEDS_PATH subdir = None else: base_path = UPLOAD_BASE_PATH subdir = 'topics' if attachment_type == 'topic' else 'replies' if created_at: # Try exact path first if subdir: path = os.path.join(base_path, subdir, str(created_at.year), f"{created_at.month:02d}", stored_filename) else: path = os.path.join(base_path, str(created_at.year), f"{created_at.month:02d}", stored_filename) if os.path.exists(path): try: os.remove(path) logger.info(f"Deleted forum attachment: {stored_filename}") return True except OSError as e: logger.error(f"Failed to delete {stored_filename}: {e}") return False # Search in all date directories search_path = os.path.join(base_path, subdir) if subdir else base_path for root, dirs, files in os.walk(search_path): if stored_filename in files: try: os.remove(os.path.join(root, stored_filename)) logger.info(f"Deleted forum attachment: {stored_filename}") return True except OSError as e: logger.error(f"Failed to delete {stored_filename}: {e}") return False logger.warning(f"Attachment not found for deletion: {stored_filename}") return False @staticmethod def get_file_url(stored_filename: str, attachment_type: str, created_at: datetime) -> str: """ Get URL for serving the file. Args: stored_filename: UUID-based filename attachment_type: 'topic' or 'reply' created_at: Creation timestamp Returns: URL path to the file """ if attachment_type == 'classified': return f"/static/uploads/classifieds/{created_at.year}/{created_at.month:02d}/{stored_filename}" subdir = 'topics' if attachment_type == 'topic' else 'replies' return f"/static/uploads/forum/{subdir}/{created_at.year}/{created_at.month:02d}/{stored_filename}"