""" Admin Announcements Routes =========================== Announcements management (ogłoszenia) for admin panel. """ import logging import re from datetime import datetime from flask import render_template, request, redirect, url_for, flash, jsonify from flask_login import login_required, current_user from . import bp from database import SessionLocal, Announcement, SystemRole, User from utils.decorators import role_required from utils.helpers import sanitize_html logger = logging.getLogger(__name__) def _broadcast_announcement(db, announcement): """Broadcast push (+ optional e-mail) do aktywnych użytkowników. Każdy user jest filtrowany po własnych flagach: - notify_push_announcements (default TRUE — push) - notify_email_announcements (default FALSE — e-mail, opt-in) Autor (publishing admin) nie dostaje powiadomienia o sobie. """ push_sent = 0 try: from blueprints.push.push_service import send_push from email_service import send_email, _email_v3_wrap except ImportError: return 0 url_path = f'/aktualnosci/{announcement.id}' if hasattr(announcement, 'slug') else f'/aktualnosci/{announcement.id}' title = announcement.title or 'Nowa aktualność' excerpt = (announcement.excerpt or '')[:160] if hasattr(announcement, 'excerpt') else '' try: users = db.query(User).filter(User.is_active == True).all() except Exception as e: logger.warning(f"broadcast: fetch users failed: {e}") return 0 publisher_id = getattr(current_user, 'id', None) for u in users: if publisher_id and u.id == publisher_id: continue # Push try: if getattr(u, 'notify_push_announcements', True) is not False: send_push( user_id=u.id, title=f'Nowa aktualność Izby', body=title[:120] if title else 'Opublikowano nową aktualność', url=url_path, tag=f'announcement-{announcement.id}', ) push_sent += 1 except Exception as e: logger.debug(f"broadcast push err user={u.id}: {e}") # Email (opt-in) try: if u.email and getattr(u, 'notify_email_announcements', False) is True: subject = f"Aktualność Izby: {title[:60]}" body_text = f"Nowa aktualność na portalu:\n\n{title}\n\n{excerpt}\n\nZobacz: https://nordabiznes.pl{url_path}" content = ( f'

Cześć {u.name or u.email}!

' f'

Biuro Izby opublikowało nową aktualność:

' f'

{title}

' f'

{excerpt}

' f'

Zobacz aktualność

' ) body_html = _email_v3_wrap('Nowa aktualność Izby', 'Norda Biznes Partner', content) send_email( to=[u.email], subject=subject, body_text=body_text, body_html=body_html, email_type='announcement_broadcast', user_id=u.id, recipient_name=u.name, notification_type='announcements', ) except Exception as e: logger.debug(f"broadcast email err user={u.id}: {e}") return push_sent def generate_slug(title): """ Generate URL-friendly slug from title. Uses unidecode for proper Polish character handling. """ try: from unidecode import unidecode text = unidecode(title.lower()) except ImportError: # Fallback without unidecode text = title.lower() replacements = { 'ą': 'a', 'ć': 'c', 'ę': 'e', 'ł': 'l', 'ń': 'n', 'ó': 'o', 'ś': 's', 'ź': 'z', 'ż': 'z' } for pl, en in replacements.items(): text = text.replace(pl, en) # Remove special characters, replace spaces with hyphens text = re.sub(r'[^\w\s-]', '', text) text = re.sub(r'[-\s]+', '-', text).strip('-') return text[:200] # Limit slug length # ============================================================ # ANNOUNCEMENTS MANAGEMENT # ============================================================ @bp.route('/announcements') @login_required @role_required(SystemRole.OFFICE_MANAGER) def admin_announcements(): """Admin panel - lista ogłoszeń""" db = SessionLocal() try: # Filters status_filter = request.args.get('status', 'all') category_filter = request.args.get('category', 'all') query = db.query(Announcement) if status_filter != 'all': query = query.filter(Announcement.status == status_filter) if category_filter != 'all': from sqlalchemy.dialects.postgresql import array as pg_array query = query.filter(Announcement.categories.op('@>')(pg_array([category_filter]))) # Sort: pinned first, then by created_at desc query = query.order_by( Announcement.is_pinned.desc(), Announcement.created_at.desc() ) announcements = query.all() return render_template('admin/announcements.html', announcements=announcements, now=datetime.now(), status_filter=status_filter, category_filter=category_filter, categories=Announcement.CATEGORIES, category_labels=Announcement.CATEGORY_LABELS, statuses=Announcement.STATUSES, status_labels=Announcement.STATUS_LABELS) finally: db.close() @bp.route('/announcements/new', methods=['GET', 'POST']) @login_required @role_required(SystemRole.OFFICE_MANAGER) def admin_announcements_new(): """Admin panel - nowe ogłoszenie""" if request.method == 'POST': db = SessionLocal() try: title = request.form.get('title', '').strip() excerpt = request.form.get('excerpt', '').strip() content = sanitize_html(request.form.get('content', '').strip()) categories = request.form.getlist('categories') if not categories: categories = ['internal'] # Default category category = categories[0] # Backwards compatibility image_url = request.form.get('image_url', '').strip() or None external_link = request.form.get('external_link', '').strip() or None is_featured = 'is_featured' in request.form is_pinned = 'is_pinned' in request.form # Handle expires_at expires_at_str = request.form.get('expires_at', '').strip() expires_at = None if expires_at_str: try: expires_at = datetime.strptime(expires_at_str, '%Y-%m-%dT%H:%M') except ValueError: pass # Generate unique slug base_slug = generate_slug(title) slug = base_slug counter = 1 while db.query(Announcement).filter(Announcement.slug == slug).first(): slug = f"{base_slug}-{counter}" counter += 1 # Determine status based on button clicked action = request.form.get('action', 'draft') status = 'published' if action == 'publish' else 'draft' published_at = datetime.now() if status == 'published' else None announcement = Announcement( title=title, slug=slug, excerpt=excerpt or None, content=content, category=category, categories=categories, image_url=image_url, external_link=external_link, status=status, published_at=published_at, expires_at=expires_at, is_featured=is_featured, is_pinned=is_pinned, created_by=current_user.id ) db.add(announcement) db.commit() flash(f'Ogłoszenie zostało {"opublikowane" if status == "published" else "zapisane jako szkic"}.', 'success') return redirect(url_for('admin.admin_announcements')) except Exception as e: db.rollback() logger.error(f"Error creating announcement: {e}") flash(f'Błąd podczas tworzenia ogłoszenia: {e}', 'error') finally: db.close() # GET request - show form return render_template('admin/announcements_form.html', announcement=None, categories=Announcement.CATEGORIES, category_labels=Announcement.CATEGORY_LABELS) @bp.route('/announcements//edit', methods=['GET', 'POST']) @login_required @role_required(SystemRole.OFFICE_MANAGER) def admin_announcements_edit(id): """Admin panel - edycja ogłoszenia""" db = SessionLocal() try: announcement = db.query(Announcement).filter(Announcement.id == id).first() if not announcement: flash('Nie znaleziono ogłoszenia.', 'error') return redirect(url_for('admin.admin_announcements')) if request.method == 'POST': announcement.title = request.form.get('title', '').strip() announcement.excerpt = request.form.get('excerpt', '').strip() or None announcement.content = sanitize_html(request.form.get('content', '').strip()) categories = request.form.getlist('categories') if not categories: categories = ['internal'] # Default category announcement.categories = categories announcement.category = categories[0] # Backwards compatibility announcement.image_url = request.form.get('image_url', '').strip() or None announcement.external_link = request.form.get('external_link', '').strip() or None announcement.is_featured = 'is_featured' in request.form announcement.is_pinned = 'is_pinned' in request.form # Handle expires_at expires_at_str = request.form.get('expires_at', '').strip() if expires_at_str: try: announcement.expires_at = datetime.strptime(expires_at_str, '%Y-%m-%dT%H:%M') except ValueError: pass else: announcement.expires_at = None # Regenerate slug if title changed significantly new_slug = generate_slug(announcement.title) if new_slug != announcement.slug.split('-')[0]: # Check if base changed base_slug = new_slug slug = base_slug counter = 1 while db.query(Announcement).filter( Announcement.slug == slug, Announcement.id != id ).first(): slug = f"{base_slug}-{counter}" counter += 1 announcement.slug = slug # Handle status change action = request.form.get('action', 'save') if action == 'publish' and announcement.status != 'published': announcement.status = 'published' announcement.published_at = datetime.now() elif action == 'archive': announcement.status = 'archived' elif action == 'draft': announcement.status = 'draft' announcement.updated_at = datetime.now() db.commit() flash('Zmiany zostały zapisane.', 'success') return redirect(url_for('admin.admin_announcements')) # GET request - show form return render_template('admin/announcements_form.html', announcement=announcement, categories=Announcement.CATEGORIES, category_labels=Announcement.CATEGORY_LABELS) except Exception as e: db.rollback() logger.error(f"Error editing announcement {id}: {e}") flash(f'Błąd: {e}', 'error') return redirect(url_for('admin.admin_announcements')) finally: db.close() @bp.route('/announcements//publish', methods=['POST']) @login_required @role_required(SystemRole.OFFICE_MANAGER) def admin_announcements_publish(id): """Publikacja ogłoszenia""" db = SessionLocal() try: announcement = db.query(Announcement).filter(Announcement.id == id).first() if not announcement: return jsonify({'success': False, 'error': 'Nie znaleziono ogłoszenia'}), 404 announcement.status = 'published' if not announcement.published_at: announcement.published_at = datetime.now() announcement.updated_at = datetime.now() db.commit() # Notify all users about new announcement try: from utils.notifications import notify_all_users_announcement notify_count = notify_all_users_announcement( announcement_id=announcement.id, title=announcement.title, category=announcement.category ) logger.info(f"Sent {notify_count} in-app notifications for announcement: {announcement.title}") # Broadcast push/e-mail do aktywnych użytkowników (opt-in per flaga) push_count = _broadcast_announcement(db, announcement) logger.info(f"Sent {push_count} push notifications for announcement: {announcement.title}") return jsonify({'success': True, 'message': f'Ogłoszenie zostało opublikowane. Wysłano {notify_count} powiadomień w portalu, {push_count} na urządzenia.'}) except ImportError: return jsonify({'success': True, 'message': 'Ogłoszenie zostało opublikowane.'}) except Exception as e: db.rollback() logger.error(f"Error publishing announcement {id}: {e}") return jsonify({'success': False, 'error': str(e)}), 500 finally: db.close() @bp.route('/announcements//archive', methods=['POST']) @login_required @role_required(SystemRole.OFFICE_MANAGER) def admin_announcements_archive(id): """Archiwizacja ogłoszenia""" db = SessionLocal() try: announcement = db.query(Announcement).filter(Announcement.id == id).first() if not announcement: return jsonify({'success': False, 'error': 'Nie znaleziono ogłoszenia'}), 404 announcement.status = 'archived' announcement.updated_at = datetime.now() db.commit() return jsonify({'success': True, 'message': 'Ogłoszenie zostało zarchiwizowane'}) except Exception as e: db.rollback() logger.error(f"Error archiving announcement {id}: {e}") return jsonify({'success': False, 'error': str(e)}), 500 finally: db.close() @bp.route('/announcements//delete', methods=['POST']) @login_required @role_required(SystemRole.OFFICE_MANAGER) def admin_announcements_delete(id): """Usunięcie ogłoszenia""" db = SessionLocal() try: announcement = db.query(Announcement).filter(Announcement.id == id).first() if not announcement: return jsonify({'success': False, 'error': 'Nie znaleziono ogłoszenia'}), 404 db.delete(announcement) db.commit() return jsonify({'success': True, 'message': 'Ogłoszenie zostało usunięte'}) except Exception as e: db.rollback() logger.error(f"Error deleting announcement {id}: {e}") return jsonify({'success': False, 'error': str(e)}), 500 finally: db.close()