nordabiz/blueprints/board/routes.py
Maciej Pienczyn 0e96dd80fa
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
simplify: show Open button only for PDF, remove DOCX conversion
PDF files: "Otwórz" (inline in browser) + "Pobierz" (download)
DOCX files: "Pobierz" only (browsers can't display DOCX inline)

Removes LibreOffice on-the-fly conversion - simpler and more reliable.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 12:53:03 +01:00

758 lines
27 KiB
Python

"""
Board Routes (Rada Izby)
========================
Routes for board meeting management, document handling, and PDF generation.
Endpoints - Meetings:
- GET /rada/ - List all meetings + board members
- GET /rada/posiedzenia - Redirect to /rada/
- GET/POST /rada/posiedzenia/nowe - Create new meeting (office_manager+)
- GET/POST /rada/posiedzenia/<id>/edytuj - Edit meeting (office_manager+)
- GET /rada/posiedzenia/<id> - View meeting details
- POST /rada/posiedzenia/<id>/publikuj-program - Publish agenda (office_manager+)
- POST /rada/posiedzenia/<id>/publikuj-protokol - Publish protocol (office_manager+)
- GET /rada/posiedzenia/<id>/pdf-program - Download agenda PDF
- GET /rada/posiedzenia/<id>/pdf-protokol - Download protocol PDF
Endpoints - Documents:
- POST /rada/posiedzenia/<id>/dokumenty/dodaj - Upload document (office_manager+)
- GET /rada/dokumenty/<id>/pobierz - Download document (rada_member+)
- POST /rada/dokumenty/<id>/usun - Soft delete document (office_manager+)
"""
import os
from datetime import datetime
from flask import (
render_template, request, redirect, url_for, flash,
current_app, Response, send_file
)
from flask_login import login_required, current_user
from sqlalchemy import desc
from . import bp
from database import SessionLocal, BoardMeeting, BoardDocument, SystemRole, User
from utils.decorators import rada_member_required, office_manager_required
from utils.helpers import sanitize_html
from services.document_upload_service import DocumentUploadService
from datetime import date, time
try:
import weasyprint
HAS_WEASYPRINT = True
except (ImportError, OSError):
HAS_WEASYPRINT = False
# =============================================================================
# MEETING ROUTES
# =============================================================================
@bp.route('/')
@login_required
@rada_member_required
def index():
"""Display list of board meetings and board members (main page)"""
db = SessionLocal()
try:
meetings = db.query(BoardMeeting).order_by(
desc(BoardMeeting.year),
desc(BoardMeeting.meeting_number)
).all()
# Get board members
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
can_manage = current_user.has_role(SystemRole.OFFICE_MANAGER)
return render_template(
'board/meetings_list.html',
meetings=meetings,
board_members=board_members,
can_manage=can_manage
)
finally:
db.close()
@bp.route('/posiedzenia')
@login_required
@rada_member_required
def meetings_list():
"""Redirect to main board page for backwards compatibility"""
return redirect(url_for('board.index'))
@bp.route('/posiedzenia/nowe', methods=['GET', 'POST'])
@login_required
@office_manager_required
def meeting_create():
"""Create new board meeting"""
db = SessionLocal()
try:
if request.method == 'POST':
return _handle_meeting_form(db, meeting=None)
# GET - show form with defaults
# Get next meeting number for current year
current_year = datetime.now().year
last_meeting = db.query(BoardMeeting).filter(
BoardMeeting.year == current_year
).order_by(desc(BoardMeeting.meeting_number)).first()
next_number = (last_meeting.meeting_number + 1) if last_meeting else 1
# Get board members for attendance list
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Default chairperson (Prezes - Leszek Glaza)
default_chairperson = db.query(User).filter(
User.email == 'leszek@rotor.pl'
).first()
# Default secretary (Magdalena Klóska)
default_secretary = db.query(User).filter(
User.email.like('%kloska%')
).first()
# Get staff users for secretary dropdown (OFFICE_MANAGER and above)
staff_users = db.query(User).filter(
User.role.in_(['OFFICE_MANAGER', 'ADMIN']),
User.is_active == True
).order_by(User.name).all()
return render_template(
'board/meeting_form.html',
meeting=None,
form_data={
'meeting_number': next_number,
'year': current_year,
'location': 'Siedziba Izby',
'start_time': '16:00',
'chairperson_id': default_chairperson.id if default_chairperson else None,
'secretary_id': default_secretary.id if default_secretary else None,
},
board_members=board_members,
staff_users=staff_users,
is_edit=False
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/edytuj', methods=['GET', 'POST'])
@login_required
@office_manager_required
def meeting_edit(meeting_id):
"""Edit existing board meeting"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
if request.method == 'POST':
return _handle_meeting_form(db, meeting=meeting)
# GET - show form with existing data
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Get staff users for secretary dropdown (OFFICE_MANAGER and above)
staff_users = db.query(User).filter(
User.role.in_(['OFFICE_MANAGER', 'ADMIN']),
User.is_active == True
).order_by(User.name).all()
return render_template(
'board/meeting_form.html',
meeting=meeting,
form_data={
'meeting_number': meeting.meeting_number,
'year': meeting.year,
'meeting_date': meeting.meeting_date.isoformat() if meeting.meeting_date else '',
'start_time': meeting.start_time.strftime('%H:%M') if meeting.start_time else '',
'end_time': meeting.end_time.strftime('%H:%M') if meeting.end_time else '',
'location': meeting.location,
'chairperson_id': meeting.chairperson_id,
'secretary_id': meeting.secretary_id,
'guests': meeting.guests,
'agenda_items': meeting.agenda_items or [],
'attendance': meeting.attendance or {},
'proceedings': meeting.proceedings or [],
},
board_members=board_members,
staff_users=staff_users,
is_edit=True
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>')
@login_required
@rada_member_required
def meeting_view(meeting_id):
"""View board meeting details"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
# Get board members for attendance display
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Get documents for this meeting
documents = db.query(BoardDocument).filter(
BoardDocument.meeting_id == meeting_id,
BoardDocument.is_active == True
).order_by(BoardDocument.document_type, BoardDocument.title).all()
can_manage = current_user.has_role(SystemRole.OFFICE_MANAGER)
return render_template(
'board/meeting_view.html',
meeting=meeting,
board_members=board_members,
documents=documents,
can_manage=can_manage
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/publikuj-program', methods=['POST'])
@login_required
@office_manager_required
def meeting_publish_agenda(meeting_id):
"""Publish meeting agenda"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
if meeting.status != BoardMeeting.STATUS_DRAFT:
flash('Program został już opublikowany.', 'warning')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate required fields
if not meeting.meeting_date:
flash('Data posiedzenia jest wymagana przed publikacją.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
if not meeting.agenda_items:
flash('Program posiedzenia jest wymagany przed publikacją.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
# Publish
meeting.status = BoardMeeting.STATUS_AGENDA_PUBLISHED
meeting.agenda_published_at = datetime.now()
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
db.commit()
flash(f'Program posiedzenia {meeting.meeting_identifier} został opublikowany.', 'success')
current_app.logger.info(
f"Meeting agenda published: {meeting.meeting_identifier} by user {current_user.id}"
)
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to publish meeting agenda: {e}")
flash('Błąd podczas publikowania programu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/publikuj-protokol', methods=['POST'])
@login_required
@office_manager_required
def meeting_publish_protocol(meeting_id):
"""Publish meeting protocol"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
if meeting.status == BoardMeeting.STATUS_PROTOCOL_PUBLISHED:
flash('Protokół został już opublikowany.', 'warning')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate required fields for protocol
if not meeting.attendance:
flash('Lista obecności jest wymagana przed publikacją protokołu.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
if not meeting.proceedings:
flash('Przebieg posiedzenia jest wymagany przed publikacją protokołu.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
# Publish
meeting.status = BoardMeeting.STATUS_PROTOCOL_PUBLISHED
meeting.protocol_published_at = datetime.now()
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
db.commit()
flash(f'Protokół z posiedzenia {meeting.meeting_identifier} został opublikowany.', 'success')
current_app.logger.info(
f"Meeting protocol published: {meeting.meeting_identifier} by user {current_user.id}"
)
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to publish meeting protocol: {e}")
flash('Błąd podczas publikowania protokołu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
# =============================================================================
# MEETING PDF GENERATION
# =============================================================================
def _generate_meeting_pdf(meeting_id, pdf_type):
"""Generate PDF for meeting agenda or protocol.
pdf_type: 'agenda' or 'protocol'
"""
if not HAS_WEASYPRINT:
flash('Generowanie PDF nie jest dostępne (brak biblioteki weasyprint).', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
# Get board members for protocol attendance
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Render HTML template
html_content = render_template(
'board/meeting_pdf.html',
meeting=meeting,
board_members=board_members,
pdf_type=pdf_type
)
# Generate PDF
pdf_bytes = weasyprint.HTML(string=html_content).write_pdf()
# Build filename
if pdf_type == 'agenda':
filename = f"Program_Posiedzenia_{meeting.meeting_identifier.replace('/', '_')}.pdf"
else:
filename = f"Protokol_Posiedzenia_{meeting.meeting_identifier.replace('/', '_')}.pdf"
current_app.logger.info(
f"Meeting PDF generated: {pdf_type} for {meeting.meeting_identifier} by user {current_user.id}"
)
return Response(
pdf_bytes,
mimetype='application/pdf',
headers={
'Content-Disposition': f'attachment; filename="{filename}"'
}
)
except Exception as e:
current_app.logger.error(f"Failed to generate meeting PDF: {e}")
flash('Błąd podczas generowania PDF.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/pdf-program')
@login_required
@rada_member_required
def meeting_pdf_agenda(meeting_id):
"""Download meeting agenda as PDF"""
return _generate_meeting_pdf(meeting_id, 'agenda')
@bp.route('/posiedzenia/<int:meeting_id>/pdf-protokol')
@login_required
@rada_member_required
def meeting_pdf_protocol(meeting_id):
"""Download meeting protocol as PDF"""
return _generate_meeting_pdf(meeting_id, 'protocol')
# =============================================================================
# DOCUMENT ROUTES
# =============================================================================
@bp.route('/posiedzenia/<int:meeting_id>/dokumenty/dodaj', methods=['POST'])
@login_required
@office_manager_required
def document_upload(meeting_id):
"""Upload document to a board meeting"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
file = request.files.get('document')
if not file:
flash('Nie wybrano pliku.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate file
is_valid, error_msg = DocumentUploadService.validate_file(file)
if not is_valid:
flash(error_msg, 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Save file
stored_filename, file_path, file_size, mime_type = DocumentUploadService.save_file(file)
# Get form data
title = request.form.get('title', '').strip()
if not title:
title = file.filename
document_type = request.form.get('document_type', 'other')
description = request.form.get('description', '').strip() or None
# Get file extension
ext = file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else ''
# Create database record
doc = BoardDocument(
title=title,
description=description,
document_type=document_type,
meeting_id=meeting_id,
meeting_date=meeting.meeting_date,
meeting_number=meeting.meeting_number,
original_filename=file.filename,
stored_filename=stored_filename,
file_extension=ext,
file_size=file_size,
mime_type=mime_type,
uploaded_by=current_user.id
)
db.add(doc)
db.commit()
current_app.logger.info(
f"Board document uploaded: '{title}' for meeting {meeting.meeting_identifier} "
f"by user {current_user.id}"
)
flash(f'Dokument „{title}" został dodany.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to upload board document: {e}")
flash('Błąd podczas dodawania dokumentu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
@bp.route('/dokumenty/<int:doc_id>/pobierz')
@login_required
@rada_member_required
def document_download(doc_id):
"""Download a board document"""
db = SessionLocal()
try:
doc = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not doc:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
file_path = DocumentUploadService.get_file_path(
doc.stored_filename, doc.uploaded_at
)
if not os.path.exists(file_path):
current_app.logger.error(f"Board document file not found: {file_path}")
flash('Plik dokumentu nie został znaleziony na serwerze.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=doc.meeting_id))
return send_file(
file_path,
mimetype=doc.mime_type,
as_attachment=True,
download_name=doc.original_filename
)
finally:
db.close()
@bp.route('/dokumenty/<int:doc_id>/otworz')
@login_required
@rada_member_required
def document_view(doc_id):
"""Open a PDF document inline in the browser"""
db = SessionLocal()
try:
doc = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not doc:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
file_path = DocumentUploadService.get_file_path(
doc.stored_filename, doc.uploaded_at
)
if not os.path.exists(file_path):
current_app.logger.error(f"Board document file not found: {file_path}")
flash('Plik dokumentu nie został znaleziony na serwerze.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=doc.meeting_id))
return send_file(
file_path,
mimetype=doc.mime_type,
as_attachment=False,
download_name=doc.original_filename
)
finally:
db.close()
@bp.route('/dokumenty/<int:doc_id>/usun', methods=['POST'])
@login_required
@office_manager_required
def document_delete(doc_id):
"""Soft delete a board document"""
db = SessionLocal()
try:
doc = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not doc:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
meeting_id = doc.meeting_id
doc_title = doc.title
# Soft delete — file stays on disk
doc.is_active = False
doc.updated_by = current_user.id
doc.updated_at = datetime.now()
db.commit()
current_app.logger.info(
f"Board document soft-deleted: '{doc_title}' (id={doc_id}) by user {current_user.id}"
)
flash(f'Dokument „{doc_title}" został usunięty.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to delete board document: {e}")
flash('Błąd podczas usuwania dokumentu.', 'error')
return redirect(url_for('board.index'))
finally:
db.close()
# =============================================================================
# MEETING FORM HANDLER
# =============================================================================
def _handle_meeting_form(db, meeting=None):
"""Handle meeting form submission (create or update)"""
import json
# Get form data
meeting_number = request.form.get('meeting_number', type=int)
year = request.form.get('year', type=int)
meeting_date_str = request.form.get('meeting_date', '')
start_time_str = request.form.get('start_time', '')
end_time_str = request.form.get('end_time', '')
location = request.form.get('location', '').strip()
chairperson_id = request.form.get('chairperson_id', type=int)
secretary_id = request.form.get('secretary_id', type=int)
guests = request.form.get('guests', '').strip()
# Parse agenda items from JSON
agenda_items_json = request.form.get('agenda_items', '[]')
try:
agenda_items = json.loads(agenda_items_json)
except json.JSONDecodeError:
agenda_items = []
# Parse attendance from form (status: present/absent/unknown)
attendance = {}
for key, value in request.form.items():
if key.startswith('attendance_status_'):
user_id = key.replace('attendance_status_', '')
initials = request.form.get(f'initials_{user_id}', '')
status = value # present, absent, or unknown
attendance[user_id] = {
'status': status,
'present': status == 'present', # Keep for backward compatibility
'initials': initials
}
# Parse proceedings from JSON
proceedings_json = request.form.get('proceedings', '[]')
try:
proceedings = json.loads(proceedings_json)
except json.JSONDecodeError:
proceedings = []
# Sanitize text fields in proceedings to prevent stored XSS
for proc in proceedings:
if isinstance(proc, dict):
for field in ('discussion', 'discussed', 'title'):
if field in proc and isinstance(proc[field], str):
proc[field] = sanitize_html(proc[field])
# Validate
errors = []
if not meeting_number:
errors.append('Numer posiedzenia jest wymagany.')
if not year:
errors.append('Rok jest wymagany.')
# Parse date/time
meeting_date = None
if meeting_date_str:
try:
meeting_date = datetime.strptime(meeting_date_str, '%Y-%m-%d').date()
except ValueError:
errors.append('Nieprawidłowy format daty.')
start_time = None
if start_time_str:
try:
start_time = datetime.strptime(start_time_str, '%H:%M').time()
except ValueError:
errors.append('Nieprawidłowy format godziny rozpoczęcia.')
end_time = None
if end_time_str:
try:
end_time = datetime.strptime(end_time_str, '%H:%M').time()
except ValueError:
errors.append('Nieprawidłowy format godziny zakończenia.')
if errors:
for error in errors:
flash(error, 'error')
return redirect(request.url)
try:
if meeting:
# Update existing
meeting.meeting_number = meeting_number
meeting.year = year
meeting.meeting_date = meeting_date
meeting.start_time = start_time
meeting.end_time = end_time
meeting.location = location or 'Siedziba Izby'
meeting.chairperson_id = chairperson_id
meeting.secretary_id = secretary_id
meeting.guests = guests or None
meeting.agenda_items = agenda_items if agenda_items else None
meeting.attendance = attendance if attendance else None
meeting.proceedings = proceedings if proceedings else None
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
# Update quorum count
if attendance:
meeting.quorum_count = sum(1 for a in attendance.values() if a.get('present'))
meeting.quorum_confirmed = meeting.quorum_count >= 9 # Majority of 16
db.commit()
flash(f'Posiedzenie {meeting.meeting_identifier} zostało zaktualizowane.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=meeting.id))
else:
# Create new
new_meeting = BoardMeeting(
meeting_number=meeting_number,
year=year,
meeting_date=meeting_date,
start_time=start_time,
end_time=end_time,
location=location or 'Siedziba Izby',
chairperson_id=chairperson_id,
secretary_id=secretary_id,
guests=guests or None,
agenda_items=agenda_items if agenda_items else None,
attendance=attendance if attendance else None,
proceedings=proceedings if proceedings else None,
status=BoardMeeting.STATUS_DRAFT,
created_by=current_user.id
)
if attendance:
new_meeting.quorum_count = sum(1 for a in attendance.values() if a.get('present'))
new_meeting.quorum_confirmed = new_meeting.quorum_count >= 9
db.add(new_meeting)
db.commit()
flash(f'Posiedzenie {new_meeting.meeting_identifier} zostało utworzone.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=new_meeting.id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to save meeting: {e}")
flash('Błąd podczas zapisywania posiedzenia.', 'error')
return redirect(request.url)