nordabiz/blueprints/board/routes.py
Maciej Pienczyn 52eb79f87d
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: convert DOCX to PDF on-the-fly for inline viewing
DOCX/DOC documents are now converted to PDF using LibreOffice headless
when the user clicks "Otwórz". The converted PDF is cached next to the
original file so subsequent views are instant.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 12:46:41 +01:00

788 lines
28 KiB
Python

"""
Board Routes (Rada Izby)
========================
Routes for board meeting management, document handling, and PDF generation.
Endpoints - Meetings:
- GET /rada/ - List all meetings + board members
- GET /rada/posiedzenia - Redirect to /rada/
- GET/POST /rada/posiedzenia/nowe - Create new meeting (office_manager+)
- GET/POST /rada/posiedzenia/<id>/edytuj - Edit meeting (office_manager+)
- GET /rada/posiedzenia/<id> - View meeting details
- POST /rada/posiedzenia/<id>/publikuj-program - Publish agenda (office_manager+)
- POST /rada/posiedzenia/<id>/publikuj-protokol - Publish protocol (office_manager+)
- GET /rada/posiedzenia/<id>/pdf-program - Download agenda PDF
- GET /rada/posiedzenia/<id>/pdf-protokol - Download protocol PDF
Endpoints - Documents:
- POST /rada/posiedzenia/<id>/dokumenty/dodaj - Upload document (office_manager+)
- GET /rada/dokumenty/<id>/pobierz - Download document (rada_member+)
- POST /rada/dokumenty/<id>/usun - Soft delete document (office_manager+)
"""
import os
import subprocess
import tempfile
from datetime import datetime
from flask import (
render_template, request, redirect, url_for, flash,
current_app, Response, send_file
)
from flask_login import login_required, current_user
from sqlalchemy import desc
from . import bp
from database import SessionLocal, BoardMeeting, BoardDocument, SystemRole, User
from utils.decorators import rada_member_required, office_manager_required
from utils.helpers import sanitize_html
from services.document_upload_service import DocumentUploadService
from datetime import date, time
try:
import weasyprint
HAS_WEASYPRINT = True
except (ImportError, OSError):
HAS_WEASYPRINT = False
# =============================================================================
# MEETING ROUTES
# =============================================================================
@bp.route('/')
@login_required
@rada_member_required
def index():
"""Display list of board meetings and board members (main page)"""
db = SessionLocal()
try:
meetings = db.query(BoardMeeting).order_by(
desc(BoardMeeting.year),
desc(BoardMeeting.meeting_number)
).all()
# Get board members
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
can_manage = current_user.has_role(SystemRole.OFFICE_MANAGER)
return render_template(
'board/meetings_list.html',
meetings=meetings,
board_members=board_members,
can_manage=can_manage
)
finally:
db.close()
@bp.route('/posiedzenia')
@login_required
@rada_member_required
def meetings_list():
"""Redirect to main board page for backwards compatibility"""
return redirect(url_for('board.index'))
@bp.route('/posiedzenia/nowe', methods=['GET', 'POST'])
@login_required
@office_manager_required
def meeting_create():
"""Create new board meeting"""
db = SessionLocal()
try:
if request.method == 'POST':
return _handle_meeting_form(db, meeting=None)
# GET - show form with defaults
# Get next meeting number for current year
current_year = datetime.now().year
last_meeting = db.query(BoardMeeting).filter(
BoardMeeting.year == current_year
).order_by(desc(BoardMeeting.meeting_number)).first()
next_number = (last_meeting.meeting_number + 1) if last_meeting else 1
# Get board members for attendance list
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Default chairperson (Prezes - Leszek Glaza)
default_chairperson = db.query(User).filter(
User.email == 'leszek@rotor.pl'
).first()
# Default secretary (Magdalena Klóska)
default_secretary = db.query(User).filter(
User.email.like('%kloska%')
).first()
# Get staff users for secretary dropdown (OFFICE_MANAGER and above)
staff_users = db.query(User).filter(
User.role.in_(['OFFICE_MANAGER', 'ADMIN']),
User.is_active == True
).order_by(User.name).all()
return render_template(
'board/meeting_form.html',
meeting=None,
form_data={
'meeting_number': next_number,
'year': current_year,
'location': 'Siedziba Izby',
'start_time': '16:00',
'chairperson_id': default_chairperson.id if default_chairperson else None,
'secretary_id': default_secretary.id if default_secretary else None,
},
board_members=board_members,
staff_users=staff_users,
is_edit=False
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/edytuj', methods=['GET', 'POST'])
@login_required
@office_manager_required
def meeting_edit(meeting_id):
"""Edit existing board meeting"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
if request.method == 'POST':
return _handle_meeting_form(db, meeting=meeting)
# GET - show form with existing data
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Get staff users for secretary dropdown (OFFICE_MANAGER and above)
staff_users = db.query(User).filter(
User.role.in_(['OFFICE_MANAGER', 'ADMIN']),
User.is_active == True
).order_by(User.name).all()
return render_template(
'board/meeting_form.html',
meeting=meeting,
form_data={
'meeting_number': meeting.meeting_number,
'year': meeting.year,
'meeting_date': meeting.meeting_date.isoformat() if meeting.meeting_date else '',
'start_time': meeting.start_time.strftime('%H:%M') if meeting.start_time else '',
'end_time': meeting.end_time.strftime('%H:%M') if meeting.end_time else '',
'location': meeting.location,
'chairperson_id': meeting.chairperson_id,
'secretary_id': meeting.secretary_id,
'guests': meeting.guests,
'agenda_items': meeting.agenda_items or [],
'attendance': meeting.attendance or {},
'proceedings': meeting.proceedings or [],
},
board_members=board_members,
staff_users=staff_users,
is_edit=True
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>')
@login_required
@rada_member_required
def meeting_view(meeting_id):
"""View board meeting details"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
# Get board members for attendance display
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Get documents for this meeting
documents = db.query(BoardDocument).filter(
BoardDocument.meeting_id == meeting_id,
BoardDocument.is_active == True
).order_by(BoardDocument.document_type, BoardDocument.title).all()
can_manage = current_user.has_role(SystemRole.OFFICE_MANAGER)
return render_template(
'board/meeting_view.html',
meeting=meeting,
board_members=board_members,
documents=documents,
can_manage=can_manage
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/publikuj-program', methods=['POST'])
@login_required
@office_manager_required
def meeting_publish_agenda(meeting_id):
"""Publish meeting agenda"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
if meeting.status != BoardMeeting.STATUS_DRAFT:
flash('Program został już opublikowany.', 'warning')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate required fields
if not meeting.meeting_date:
flash('Data posiedzenia jest wymagana przed publikacją.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
if not meeting.agenda_items:
flash('Program posiedzenia jest wymagany przed publikacją.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
# Publish
meeting.status = BoardMeeting.STATUS_AGENDA_PUBLISHED
meeting.agenda_published_at = datetime.now()
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
db.commit()
flash(f'Program posiedzenia {meeting.meeting_identifier} został opublikowany.', 'success')
current_app.logger.info(
f"Meeting agenda published: {meeting.meeting_identifier} by user {current_user.id}"
)
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to publish meeting agenda: {e}")
flash('Błąd podczas publikowania programu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/publikuj-protokol', methods=['POST'])
@login_required
@office_manager_required
def meeting_publish_protocol(meeting_id):
"""Publish meeting protocol"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
if meeting.status == BoardMeeting.STATUS_PROTOCOL_PUBLISHED:
flash('Protokół został już opublikowany.', 'warning')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate required fields for protocol
if not meeting.attendance:
flash('Lista obecności jest wymagana przed publikacją protokołu.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
if not meeting.proceedings:
flash('Przebieg posiedzenia jest wymagany przed publikacją protokołu.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
# Publish
meeting.status = BoardMeeting.STATUS_PROTOCOL_PUBLISHED
meeting.protocol_published_at = datetime.now()
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
db.commit()
flash(f'Protokół z posiedzenia {meeting.meeting_identifier} został opublikowany.', 'success')
current_app.logger.info(
f"Meeting protocol published: {meeting.meeting_identifier} by user {current_user.id}"
)
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to publish meeting protocol: {e}")
flash('Błąd podczas publikowania protokołu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
# =============================================================================
# MEETING PDF GENERATION
# =============================================================================
def _generate_meeting_pdf(meeting_id, pdf_type):
"""Generate PDF for meeting agenda or protocol.
pdf_type: 'agenda' or 'protocol'
"""
if not HAS_WEASYPRINT:
flash('Generowanie PDF nie jest dostępne (brak biblioteki weasyprint).', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
# Get board members for protocol attendance
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Render HTML template
html_content = render_template(
'board/meeting_pdf.html',
meeting=meeting,
board_members=board_members,
pdf_type=pdf_type
)
# Generate PDF
pdf_bytes = weasyprint.HTML(string=html_content).write_pdf()
# Build filename
if pdf_type == 'agenda':
filename = f"Program_Posiedzenia_{meeting.meeting_identifier.replace('/', '_')}.pdf"
else:
filename = f"Protokol_Posiedzenia_{meeting.meeting_identifier.replace('/', '_')}.pdf"
current_app.logger.info(
f"Meeting PDF generated: {pdf_type} for {meeting.meeting_identifier} by user {current_user.id}"
)
return Response(
pdf_bytes,
mimetype='application/pdf',
headers={
'Content-Disposition': f'attachment; filename="{filename}"'
}
)
except Exception as e:
current_app.logger.error(f"Failed to generate meeting PDF: {e}")
flash('Błąd podczas generowania PDF.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/pdf-program')
@login_required
@rada_member_required
def meeting_pdf_agenda(meeting_id):
"""Download meeting agenda as PDF"""
return _generate_meeting_pdf(meeting_id, 'agenda')
@bp.route('/posiedzenia/<int:meeting_id>/pdf-protokol')
@login_required
@rada_member_required
def meeting_pdf_protocol(meeting_id):
"""Download meeting protocol as PDF"""
return _generate_meeting_pdf(meeting_id, 'protocol')
# =============================================================================
# DOCUMENT ROUTES
# =============================================================================
@bp.route('/posiedzenia/<int:meeting_id>/dokumenty/dodaj', methods=['POST'])
@login_required
@office_manager_required
def document_upload(meeting_id):
"""Upload document to a board meeting"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
file = request.files.get('document')
if not file:
flash('Nie wybrano pliku.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate file
is_valid, error_msg = DocumentUploadService.validate_file(file)
if not is_valid:
flash(error_msg, 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Save file
stored_filename, file_path, file_size, mime_type = DocumentUploadService.save_file(file)
# Get form data
title = request.form.get('title', '').strip()
if not title:
title = file.filename
document_type = request.form.get('document_type', 'other')
description = request.form.get('description', '').strip() or None
# Get file extension
ext = file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else ''
# Create database record
doc = BoardDocument(
title=title,
description=description,
document_type=document_type,
meeting_id=meeting_id,
meeting_date=meeting.meeting_date,
meeting_number=meeting.meeting_number,
original_filename=file.filename,
stored_filename=stored_filename,
file_extension=ext,
file_size=file_size,
mime_type=mime_type,
uploaded_by=current_user.id
)
db.add(doc)
db.commit()
current_app.logger.info(
f"Board document uploaded: '{title}' for meeting {meeting.meeting_identifier} "
f"by user {current_user.id}"
)
flash(f'Dokument „{title}" został dodany.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to upload board document: {e}")
flash('Błąd podczas dodawania dokumentu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
@bp.route('/dokumenty/<int:doc_id>/pobierz')
@login_required
@rada_member_required
def document_download(doc_id):
"""Download a board document"""
db = SessionLocal()
try:
doc = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not doc:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
file_path = DocumentUploadService.get_file_path(
doc.stored_filename, doc.uploaded_at
)
if not os.path.exists(file_path):
current_app.logger.error(f"Board document file not found: {file_path}")
flash('Plik dokumentu nie został znaleziony na serwerze.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=doc.meeting_id))
return send_file(
file_path,
mimetype=doc.mime_type,
as_attachment=True,
download_name=doc.original_filename
)
finally:
db.close()
@bp.route('/dokumenty/<int:doc_id>/otworz')
@login_required
@rada_member_required
def document_view(doc_id):
"""Open a document inline in the browser (converts DOCX to PDF on the fly)"""
db = SessionLocal()
try:
doc = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not doc:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
file_path = DocumentUploadService.get_file_path(
doc.stored_filename, doc.uploaded_at
)
if not os.path.exists(file_path):
current_app.logger.error(f"Board document file not found: {file_path}")
flash('Plik dokumentu nie został znaleziony na serwerze.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=doc.meeting_id))
# DOCX/DOC: convert to PDF for inline viewing
if doc.file_extension in ('docx', 'doc'):
pdf_path = file_path.rsplit('.', 1)[0] + '.pdf'
# Convert only if cached PDF doesn't exist
if not os.path.exists(pdf_path):
try:
result = subprocess.run(
['libreoffice', '--headless', '--convert-to', 'pdf',
'--outdir', os.path.dirname(file_path), file_path],
capture_output=True, timeout=30
)
if result.returncode != 0:
current_app.logger.error(f"DOCX to PDF conversion failed: {result.stderr.decode()}")
flash('Nie udało się przekonwertować dokumentu do PDF.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=doc.meeting_id))
except subprocess.TimeoutExpired:
flash('Konwersja dokumentu trwała za długo.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=doc.meeting_id))
pdf_name = doc.original_filename.rsplit('.', 1)[0] + '.pdf'
return send_file(
pdf_path,
mimetype='application/pdf',
as_attachment=False,
download_name=pdf_name
)
return send_file(
file_path,
mimetype=doc.mime_type,
as_attachment=False,
download_name=doc.original_filename
)
finally:
db.close()
@bp.route('/dokumenty/<int:doc_id>/usun', methods=['POST'])
@login_required
@office_manager_required
def document_delete(doc_id):
"""Soft delete a board document"""
db = SessionLocal()
try:
doc = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not doc:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
meeting_id = doc.meeting_id
doc_title = doc.title
# Soft delete — file stays on disk
doc.is_active = False
doc.updated_by = current_user.id
doc.updated_at = datetime.now()
db.commit()
current_app.logger.info(
f"Board document soft-deleted: '{doc_title}' (id={doc_id}) by user {current_user.id}"
)
flash(f'Dokument „{doc_title}" został usunięty.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to delete board document: {e}")
flash('Błąd podczas usuwania dokumentu.', 'error')
return redirect(url_for('board.index'))
finally:
db.close()
# =============================================================================
# MEETING FORM HANDLER
# =============================================================================
def _handle_meeting_form(db, meeting=None):
"""Handle meeting form submission (create or update)"""
import json
# Get form data
meeting_number = request.form.get('meeting_number', type=int)
year = request.form.get('year', type=int)
meeting_date_str = request.form.get('meeting_date', '')
start_time_str = request.form.get('start_time', '')
end_time_str = request.form.get('end_time', '')
location = request.form.get('location', '').strip()
chairperson_id = request.form.get('chairperson_id', type=int)
secretary_id = request.form.get('secretary_id', type=int)
guests = request.form.get('guests', '').strip()
# Parse agenda items from JSON
agenda_items_json = request.form.get('agenda_items', '[]')
try:
agenda_items = json.loads(agenda_items_json)
except json.JSONDecodeError:
agenda_items = []
# Parse attendance from form (status: present/absent/unknown)
attendance = {}
for key, value in request.form.items():
if key.startswith('attendance_status_'):
user_id = key.replace('attendance_status_', '')
initials = request.form.get(f'initials_{user_id}', '')
status = value # present, absent, or unknown
attendance[user_id] = {
'status': status,
'present': status == 'present', # Keep for backward compatibility
'initials': initials
}
# Parse proceedings from JSON
proceedings_json = request.form.get('proceedings', '[]')
try:
proceedings = json.loads(proceedings_json)
except json.JSONDecodeError:
proceedings = []
# Sanitize text fields in proceedings to prevent stored XSS
for proc in proceedings:
if isinstance(proc, dict):
for field in ('discussion', 'discussed', 'title'):
if field in proc and isinstance(proc[field], str):
proc[field] = sanitize_html(proc[field])
# Validate
errors = []
if not meeting_number:
errors.append('Numer posiedzenia jest wymagany.')
if not year:
errors.append('Rok jest wymagany.')
# Parse date/time
meeting_date = None
if meeting_date_str:
try:
meeting_date = datetime.strptime(meeting_date_str, '%Y-%m-%d').date()
except ValueError:
errors.append('Nieprawidłowy format daty.')
start_time = None
if start_time_str:
try:
start_time = datetime.strptime(start_time_str, '%H:%M').time()
except ValueError:
errors.append('Nieprawidłowy format godziny rozpoczęcia.')
end_time = None
if end_time_str:
try:
end_time = datetime.strptime(end_time_str, '%H:%M').time()
except ValueError:
errors.append('Nieprawidłowy format godziny zakończenia.')
if errors:
for error in errors:
flash(error, 'error')
return redirect(request.url)
try:
if meeting:
# Update existing
meeting.meeting_number = meeting_number
meeting.year = year
meeting.meeting_date = meeting_date
meeting.start_time = start_time
meeting.end_time = end_time
meeting.location = location or 'Siedziba Izby'
meeting.chairperson_id = chairperson_id
meeting.secretary_id = secretary_id
meeting.guests = guests or None
meeting.agenda_items = agenda_items if agenda_items else None
meeting.attendance = attendance if attendance else None
meeting.proceedings = proceedings if proceedings else None
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
# Update quorum count
if attendance:
meeting.quorum_count = sum(1 for a in attendance.values() if a.get('present'))
meeting.quorum_confirmed = meeting.quorum_count >= 9 # Majority of 16
db.commit()
flash(f'Posiedzenie {meeting.meeting_identifier} zostało zaktualizowane.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=meeting.id))
else:
# Create new
new_meeting = BoardMeeting(
meeting_number=meeting_number,
year=year,
meeting_date=meeting_date,
start_time=start_time,
end_time=end_time,
location=location or 'Siedziba Izby',
chairperson_id=chairperson_id,
secretary_id=secretary_id,
guests=guests or None,
agenda_items=agenda_items if agenda_items else None,
attendance=attendance if attendance else None,
proceedings=proceedings if proceedings else None,
status=BoardMeeting.STATUS_DRAFT,
created_by=current_user.id
)
if attendance:
new_meeting.quorum_count = sum(1 for a in attendance.values() if a.get('present'))
new_meeting.quorum_confirmed = new_meeting.quorum_count >= 9
db.add(new_meeting)
db.commit()
flash(f'Posiedzenie {new_meeting.meeting_identifier} zostało utworzone.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=new_meeting.id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to save meeting: {e}")
flash('Błąd podczas zapisywania posiedzenia.', 'error')
return redirect(request.url)