"""File upload service for message attachments.""" import os import uuid from datetime import datetime from werkzeug.utils import secure_filename # Allowed file types and their limits IMAGE_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif'} DOCUMENT_EXTENSIONS = {'pdf', 'docx', 'xlsx'} ALLOWED_EXTENSIONS = IMAGE_EXTENSIONS | DOCUMENT_EXTENSIONS MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image MAX_DOCUMENT_SIZE = 10 * 1024 * 1024 # 10MB per document MAX_TOTAL_SIZE = 15 * 1024 * 1024 # 15MB total per message MAX_FILES_PER_MESSAGE = 3 # Magic bytes signatures for validation FILE_SIGNATURES = { 'jpg': [b'\xff\xd8\xff'], 'jpeg': [b'\xff\xd8\xff'], 'png': [b'\x89PNG\r\n\x1a\n'], 'gif': [b'GIF87a', b'GIF89a'], 'pdf': [b'%PDF'], 'docx': [b'PK\x03\x04'], # ZIP-based format 'xlsx': [b'PK\x03\x04'], # ZIP-based format } MIME_TYPES = { 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'gif': 'image/gif', 'pdf': 'application/pdf', 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', } UPLOAD_BASE = 'static/uploads/messages' class MessageUploadService: """Handles file uploads for message attachments.""" def __init__(self, app_root: str): self.app_root = app_root def validate_files(self, files: list) -> tuple: """Validate list of uploaded files. Returns (valid_files, errors).""" errors = [] valid = [] if len(files) > MAX_FILES_PER_MESSAGE: errors.append(f'Maksymalnie {MAX_FILES_PER_MESSAGE} pliki na wiadomość.') return [], errors total_size = 0 for f in files: if not f or not f.filename: continue filename = secure_filename(f.filename) ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else '' if ext not in ALLOWED_EXTENSIONS: errors.append(f'Niedozwolony typ pliku: {filename}') continue # Read file content for size + magic bytes check content = f.read() f.seek(0) size = len(content) max_size = MAX_IMAGE_SIZE if ext in IMAGE_EXTENSIONS else MAX_DOCUMENT_SIZE if size > max_size: limit_mb = max_size // (1024 * 1024) errors.append(f'Plik {filename} przekracza limit {limit_mb}MB.') continue total_size += size # Magic bytes validation if ext in FILE_SIGNATURES: valid_sig = False for sig in FILE_SIGNATURES[ext]: if content[:len(sig)] == sig: valid_sig = True break if not valid_sig: errors.append(f'Plik {filename} ma nieprawidłowy format.') continue valid.append((f, filename, ext, size, content)) if total_size > MAX_TOTAL_SIZE: errors.append(f'Łączny rozmiar plików przekracza {MAX_TOTAL_SIZE // (1024*1024)}MB.') return [], errors return valid, errors def save_file(self, content: bytes, ext: str) -> tuple: """Save file to disk. Returns (stored_filename, relative_path).""" now = datetime.now() subdir = os.path.join(UPLOAD_BASE, str(now.year), f'{now.month:02d}') full_dir = os.path.join(self.app_root, subdir) os.makedirs(full_dir, exist_ok=True) stored_filename = f'{uuid.uuid4().hex}.{ext}' full_path = os.path.join(full_dir, stored_filename) with open(full_path, 'wb') as out: out.write(content) relative_path = os.path.join(subdir, stored_filename) return stored_filename, relative_path def get_mime_type(self, ext: str) -> str: """Get MIME type for extension.""" return MIME_TYPES.get(ext, 'application/octet-stream') def is_image(self, ext: str) -> bool: """Check if extension is an image type.""" return ext in IMAGE_EXTENSIONS def delete_file(self, relative_path: str): """Delete a file from disk.""" full_path = os.path.join(self.app_root, relative_path) if os.path.exists(full_path): os.remove(full_path)