nordabiz/blueprints/board/routes.py
Maciej Pienczyn d742b6676c
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: Add meeting agenda/protocol form system for Board Council
- Add BoardMeeting model with JSON fields for flexible data storage
- Add migration 049_board_meetings.sql
- Add routes for creating, editing, viewing meetings
- Add publish workflows for agenda and protocol
- Add templates: meetings_list, meeting_form (with tabs), meeting_view
- Support for: agenda items, attendance tracking, proceedings
- Pre-filled defaults for chairperson, secretary, location
- Quorum calculation (9/16 for majority)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 20:10:50 +01:00

816 lines
28 KiB
Python

"""
Board Routes (Rada Izby)
========================
Routes for board document management and meeting forms.
Endpoints - Documents:
- GET /rada/ - Document list + board members
- GET/POST /rada/upload - Upload new document (office_manager+)
- GET /rada/<id>/view - View document in browser
- GET /rada/<id>/download - Download document
- POST /rada/<id>/delete - Soft delete document (office_manager+)
Endpoints - Meetings:
- GET /rada/posiedzenia - List all meetings
- GET/POST /rada/posiedzenia/nowe - Create new meeting (office_manager+)
- GET/POST /rada/posiedzenia/<id>/edytuj - Edit meeting (office_manager+)
- GET /rada/posiedzenia/<id> - View meeting details
- POST /rada/posiedzenia/<id>/publikuj-program - Publish agenda (office_manager+)
- POST /rada/posiedzenia/<id>/publikuj-protokol - Publish protocol (office_manager+)
"""
import os
import subprocess
import tempfile
import shutil
from datetime import datetime
from flask import (
render_template, request, redirect, url_for, flash,
send_file, current_app, Response
)
from flask_login import login_required, current_user
from sqlalchemy import desc, extract
from . import bp
from database import SessionLocal, BoardDocument, BoardMeeting, SystemRole, User
from utils.decorators import rada_member_required, office_manager_required
from services.document_upload_service import DocumentUploadService
from datetime import date, time
def convert_docx_to_pdf(docx_path: str) -> str | None:
"""
Convert DOCX to PDF using LibreOffice headless.
Returns path to generated PDF or None on failure.
PDF is stored alongside the original with .pdf extension.
"""
# Generate PDF path (same location, .pdf extension)
pdf_path = docx_path.rsplit('.', 1)[0] + '.pdf'
# Check if PDF already exists (cached)
if os.path.exists(pdf_path):
return pdf_path
# Convert using LibreOffice
try:
output_dir = os.path.dirname(docx_path)
result = subprocess.run(
[
'soffice',
'--headless',
'--convert-to', 'pdf',
'--outdir', output_dir,
docx_path
],
capture_output=True,
text=True,
timeout=60 # 60 seconds timeout
)
if result.returncode == 0 and os.path.exists(pdf_path):
# Ensure www-data can read it
os.chmod(pdf_path, 0o644)
return pdf_path
else:
return None
except subprocess.TimeoutExpired:
return None
except Exception:
return None
@bp.route('/')
@login_required
@rada_member_required
def index():
"""Display list of board documents and board members"""
# Get filter parameters
year = request.args.get('year', type=int)
month = request.args.get('month', type=int)
doc_type = request.args.get('type', '')
db = SessionLocal()
try:
# Build query for documents
query = db.query(BoardDocument).filter(BoardDocument.is_active == True)
if year:
query = query.filter(extract('year', BoardDocument.meeting_date) == year)
if month:
query = query.filter(extract('month', BoardDocument.meeting_date) == month)
if doc_type and doc_type in BoardDocument.DOCUMENT_TYPES:
query = query.filter(BoardDocument.document_type == doc_type)
# Order by meeting date descending
documents = query.order_by(desc(BoardDocument.meeting_date)).all()
# Get available years for filter
all_docs = db.query(BoardDocument).filter(BoardDocument.is_active == True).all()
available_years = sorted(set(
doc.meeting_date.year for doc in all_docs if doc.meeting_date
), reverse=True)
# Get board members (is_rada_member = True)
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Check if user can upload (office_manager or admin)
can_upload = current_user.has_role(SystemRole.OFFICE_MANAGER)
return render_template(
'board/index.html',
documents=documents,
available_years=available_years,
document_types=BoardDocument.DOCUMENT_TYPES,
type_labels=BoardDocument.DOCUMENT_TYPE_LABELS,
current_year=year,
current_month=month,
current_type=doc_type,
can_upload=can_upload,
board_members=board_members
)
finally:
db.close()
@bp.route('/<int:doc_id>/view')
@login_required
@rada_member_required
def view(doc_id):
"""View document in browser (PDF inline, DOCX converted to HTML)"""
db = SessionLocal()
try:
document = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not document:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
# Get file path
file_path = DocumentUploadService.get_file_path(
document.stored_filename,
document.uploaded_at
)
if not os.path.exists(file_path):
current_app.logger.error(f"Board document file not found: {file_path}")
flash('Plik dokumentu nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
# Log view
current_app.logger.info(
f"Board document viewed: {document.title} (ID: {doc_id}) by user {current_user.id}"
)
# Handle based on file type
if document.file_extension == 'pdf':
# Display PDF inline in browser
return send_file(
file_path,
as_attachment=False,
download_name=document.original_filename,
mimetype='application/pdf'
)
elif document.file_extension in ('docx', 'doc'):
# Convert DOCX to PDF using LibreOffice (preserves formatting)
pdf_path = convert_docx_to_pdf(file_path)
if pdf_path and os.path.exists(pdf_path):
# Serve the converted PDF inline
pdf_filename = document.original_filename.rsplit('.', 1)[0] + '.pdf'
return send_file(
pdf_path,
as_attachment=False,
download_name=pdf_filename,
mimetype='application/pdf'
)
else:
# Fallback to mammoth HTML conversion
current_app.logger.warning(
f"LibreOffice conversion failed for {document.title}, falling back to mammoth"
)
try:
import mammoth
with open(file_path, 'rb') as docx_file:
result = mammoth.convert_to_html(docx_file)
html_content = result.value
return render_template(
'board/view_document.html',
document=document,
html_content=html_content,
conversion_messages=result.messages
)
except Exception as e:
current_app.logger.error(f"Failed to convert DOCX: {e}")
flash('Błąd podczas konwersji dokumentu.', 'error')
return redirect(url_for('board.index'))
else:
# Unknown format - redirect to download
flash('Podgląd tego typu pliku nie jest obsługiwany.', 'warning')
return redirect(url_for('board.download', doc_id=doc_id))
finally:
db.close()
@bp.route('/upload', methods=['GET', 'POST'])
@login_required
@office_manager_required
def upload():
"""Upload new board document"""
if request.method == 'POST':
# Get form data
title = request.form.get('title', '').strip()
description = request.form.get('description', '').strip()
document_type = request.form.get('document_type', 'protocol')
meeting_date_str = request.form.get('meeting_date', '')
meeting_number = request.form.get('meeting_number', type=int)
# Validate required fields
errors = []
if not title:
errors.append('Tytuł dokumentu jest wymagany.')
if not meeting_date_str:
errors.append('Data posiedzenia jest wymagana.')
# Parse meeting date
meeting_date = None
if meeting_date_str:
try:
meeting_date = datetime.strptime(meeting_date_str, '%Y-%m-%d').date()
except ValueError:
errors.append('Nieprawidłowy format daty.')
# Validate document type
if document_type not in BoardDocument.DOCUMENT_TYPES:
document_type = 'other'
# Get uploaded file
file = request.files.get('document')
if not file or file.filename == '':
errors.append('Plik dokumentu jest wymagany.')
# Validate file
if file and file.filename:
is_valid, error_msg = DocumentUploadService.validate_file(file)
if not is_valid:
errors.append(error_msg)
if errors:
for error in errors:
flash(error, 'error')
return render_template(
'board/upload.html',
document_types=BoardDocument.DOCUMENT_TYPES,
type_labels=BoardDocument.DOCUMENT_TYPE_LABELS,
form_data={
'title': title,
'description': description,
'document_type': document_type,
'meeting_date': meeting_date_str,
'meeting_number': meeting_number
}
)
# Save file
try:
stored_filename, file_path, file_size, mime_type = DocumentUploadService.save_file(file)
except Exception as e:
current_app.logger.error(f"Failed to save board document: {e}")
flash('Błąd podczas zapisywania pliku. Spróbuj ponownie.', 'error')
return redirect(url_for('board.upload'))
# Get file extension
file_extension = file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else ''
# Create database record
document = BoardDocument(
title=title,
description=description if description else None,
document_type=document_type,
meeting_date=meeting_date,
meeting_number=meeting_number if meeting_number else None,
original_filename=file.filename,
stored_filename=stored_filename,
file_extension=file_extension,
file_size=file_size,
mime_type=mime_type,
uploaded_by=current_user.id
)
db = SessionLocal()
try:
db.add(document)
db.commit()
flash(f'Dokument "{title}" został dodany.', 'success')
current_app.logger.info(f"Board document uploaded: {title} by user {current_user.id}")
return redirect(url_for('board.index'))
except Exception as e:
db.rollback()
# Clean up uploaded file
DocumentUploadService.delete_file(stored_filename)
current_app.logger.error(f"Failed to create board document record: {e}")
flash('Błąd podczas tworzenia rekordu. Spróbuj ponownie.', 'error')
return redirect(url_for('board.upload'))
finally:
db.close()
# GET request - show upload form
return render_template(
'board/upload.html',
document_types=BoardDocument.DOCUMENT_TYPES,
type_labels=BoardDocument.DOCUMENT_TYPE_LABELS,
form_data={}
)
@bp.route('/<int:doc_id>/download')
@login_required
@rada_member_required
def download(doc_id):
"""Download board document (secure, authenticated)"""
db = SessionLocal()
try:
document = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not document:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
# Get file path
file_path = DocumentUploadService.get_file_path(
document.stored_filename,
document.uploaded_at
)
if not os.path.exists(file_path):
current_app.logger.error(f"Board document file not found: {file_path}")
flash('Plik dokumentu nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
# Log download
current_app.logger.info(
f"Board document downloaded: {document.title} (ID: {doc_id}) by user {current_user.id}"
)
# Send file with original filename
return send_file(
file_path,
as_attachment=True,
download_name=document.original_filename,
mimetype=document.mime_type
)
finally:
db.close()
@bp.route('/<int:doc_id>/delete', methods=['POST'])
@login_required
@office_manager_required
def delete(doc_id):
"""Soft delete board document"""
db = SessionLocal()
try:
document = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not document:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
# Soft delete
document.is_active = False
document.updated_at = datetime.now()
document.updated_by = current_user.id
db.commit()
flash(f'Dokument "{document.title}" został usunięty.', 'success')
current_app.logger.info(
f"Board document deleted: {document.title} (ID: {doc_id}) by user {current_user.id}"
)
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to delete board document: {e}")
flash('Błąd podczas usuwania dokumentu.', 'error')
finally:
db.close()
return redirect(url_for('board.index'))
# =============================================================================
# MEETING ROUTES (Agenda & Protocol Forms)
# =============================================================================
@bp.route('/posiedzenia')
@login_required
@rada_member_required
def meetings_list():
"""List all board meetings"""
db = SessionLocal()
try:
meetings = db.query(BoardMeeting).order_by(
desc(BoardMeeting.year),
desc(BoardMeeting.meeting_number)
).all()
# Check if user can manage meetings
can_manage = current_user.has_role(SystemRole.OFFICE_MANAGER)
return render_template(
'board/meetings_list.html',
meetings=meetings,
can_manage=can_manage
)
finally:
db.close()
@bp.route('/posiedzenia/nowe', methods=['GET', 'POST'])
@login_required
@office_manager_required
def meeting_create():
"""Create new board meeting"""
db = SessionLocal()
try:
if request.method == 'POST':
return _handle_meeting_form(db, meeting=None)
# GET - show form with defaults
# Get next meeting number for current year
current_year = datetime.now().year
last_meeting = db.query(BoardMeeting).filter(
BoardMeeting.year == current_year
).order_by(desc(BoardMeeting.meeting_number)).first()
next_number = (last_meeting.meeting_number + 1) if last_meeting else 1
# Get board members for attendance list
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Default chairperson (Prezes - Leszek Glaza)
default_chairperson = db.query(User).filter(
User.email == 'leszek@rotor.pl'
).first()
# Default secretary (Magdalena Klóska)
default_secretary = db.query(User).filter(
User.email.like('%kloska%')
).first()
return render_template(
'board/meeting_form.html',
meeting=None,
form_data={
'meeting_number': next_number,
'year': current_year,
'location': 'Siedziba Izby',
'start_time': '16:00',
'chairperson_id': default_chairperson.id if default_chairperson else None,
'secretary_id': default_secretary.id if default_secretary else None,
},
board_members=board_members,
is_edit=False
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/edytuj', methods=['GET', 'POST'])
@login_required
@office_manager_required
def meeting_edit(meeting_id):
"""Edit existing board meeting"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.meetings_list'))
if request.method == 'POST':
return _handle_meeting_form(db, meeting=meeting)
# GET - show form with existing data
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
return render_template(
'board/meeting_form.html',
meeting=meeting,
form_data={
'meeting_number': meeting.meeting_number,
'year': meeting.year,
'meeting_date': meeting.meeting_date.isoformat() if meeting.meeting_date else '',
'start_time': meeting.start_time.strftime('%H:%M') if meeting.start_time else '',
'end_time': meeting.end_time.strftime('%H:%M') if meeting.end_time else '',
'location': meeting.location,
'chairperson_id': meeting.chairperson_id,
'secretary_id': meeting.secretary_id,
'guests': meeting.guests,
'agenda_items': meeting.agenda_items or [],
'attendance': meeting.attendance or {},
'proceedings': meeting.proceedings or [],
},
board_members=board_members,
is_edit=True
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>')
@login_required
@rada_member_required
def meeting_view(meeting_id):
"""View board meeting details"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.meetings_list'))
# Get board members for attendance display
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
can_manage = current_user.has_role(SystemRole.OFFICE_MANAGER)
return render_template(
'board/meeting_view.html',
meeting=meeting,
board_members=board_members,
can_manage=can_manage
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/publikuj-program', methods=['POST'])
@login_required
@office_manager_required
def meeting_publish_agenda(meeting_id):
"""Publish meeting agenda"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.meetings_list'))
if meeting.status != BoardMeeting.STATUS_DRAFT:
flash('Program został już opublikowany.', 'warning')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate required fields
if not meeting.meeting_date:
flash('Data posiedzenia jest wymagana przed publikacją.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
if not meeting.agenda_items:
flash('Program posiedzenia jest wymagany przed publikacją.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
# Publish
meeting.status = BoardMeeting.STATUS_AGENDA_PUBLISHED
meeting.agenda_published_at = datetime.now()
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
db.commit()
flash(f'Program posiedzenia {meeting.meeting_identifier} został opublikowany.', 'success')
current_app.logger.info(
f"Meeting agenda published: {meeting.meeting_identifier} by user {current_user.id}"
)
# TODO: Send email notifications to board members
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to publish meeting agenda: {e}")
flash('Błąd podczas publikowania programu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/publikuj-protokol', methods=['POST'])
@login_required
@office_manager_required
def meeting_publish_protocol(meeting_id):
"""Publish meeting protocol"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.meetings_list'))
if meeting.status == BoardMeeting.STATUS_PROTOCOL_PUBLISHED:
flash('Protokół został już opublikowany.', 'warning')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate required fields for protocol
if not meeting.attendance:
flash('Lista obecności jest wymagana przed publikacją protokołu.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
if not meeting.proceedings:
flash('Przebieg posiedzenia jest wymagany przed publikacją protokołu.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
# Publish
meeting.status = BoardMeeting.STATUS_PROTOCOL_PUBLISHED
meeting.protocol_published_at = datetime.now()
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
db.commit()
flash(f'Protokół z posiedzenia {meeting.meeting_identifier} został opublikowany.', 'success')
current_app.logger.info(
f"Meeting protocol published: {meeting.meeting_identifier} by user {current_user.id}"
)
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to publish meeting protocol: {e}")
flash('Błąd podczas publikowania protokołu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
def _handle_meeting_form(db, meeting=None):
"""Handle meeting form submission (create or update)"""
import json
# Get form data
meeting_number = request.form.get('meeting_number', type=int)
year = request.form.get('year', type=int)
meeting_date_str = request.form.get('meeting_date', '')
start_time_str = request.form.get('start_time', '')
end_time_str = request.form.get('end_time', '')
location = request.form.get('location', '').strip()
chairperson_id = request.form.get('chairperson_id', type=int)
secretary_id = request.form.get('secretary_id', type=int)
guests = request.form.get('guests', '').strip()
# Parse agenda items from JSON
agenda_items_json = request.form.get('agenda_items', '[]')
try:
agenda_items = json.loads(agenda_items_json)
except json.JSONDecodeError:
agenda_items = []
# Parse attendance from form checkboxes
attendance = {}
for key, value in request.form.items():
if key.startswith('attendance_'):
user_id = key.replace('attendance_', '')
initials = request.form.get(f'initials_{user_id}', '')
attendance[user_id] = {
'present': value == 'present',
'initials': initials
}
# Parse proceedings from JSON
proceedings_json = request.form.get('proceedings', '[]')
try:
proceedings = json.loads(proceedings_json)
except json.JSONDecodeError:
proceedings = []
# Validate
errors = []
if not meeting_number:
errors.append('Numer posiedzenia jest wymagany.')
if not year:
errors.append('Rok jest wymagany.')
# Parse date/time
meeting_date = None
if meeting_date_str:
try:
meeting_date = datetime.strptime(meeting_date_str, '%Y-%m-%d').date()
except ValueError:
errors.append('Nieprawidłowy format daty.')
start_time = None
if start_time_str:
try:
start_time = datetime.strptime(start_time_str, '%H:%M').time()
except ValueError:
errors.append('Nieprawidłowy format godziny rozpoczęcia.')
end_time = None
if end_time_str:
try:
end_time = datetime.strptime(end_time_str, '%H:%M').time()
except ValueError:
errors.append('Nieprawidłowy format godziny zakończenia.')
if errors:
for error in errors:
flash(error, 'error')
return redirect(request.url)
try:
if meeting:
# Update existing
meeting.meeting_number = meeting_number
meeting.year = year
meeting.meeting_date = meeting_date
meeting.start_time = start_time
meeting.end_time = end_time
meeting.location = location or 'Siedziba Izby'
meeting.chairperson_id = chairperson_id
meeting.secretary_id = secretary_id
meeting.guests = guests or None
meeting.agenda_items = agenda_items if agenda_items else None
meeting.attendance = attendance if attendance else None
meeting.proceedings = proceedings if proceedings else None
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
# Update quorum count
if attendance:
meeting.quorum_count = sum(1 for a in attendance.values() if a.get('present'))
meeting.quorum_confirmed = meeting.quorum_count >= 9 # Majority of 16
db.commit()
flash(f'Posiedzenie {meeting.meeting_identifier} zostało zaktualizowane.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=meeting.id))
else:
# Create new
new_meeting = BoardMeeting(
meeting_number=meeting_number,
year=year,
meeting_date=meeting_date,
start_time=start_time,
end_time=end_time,
location=location or 'Siedziba Izby',
chairperson_id=chairperson_id,
secretary_id=secretary_id,
guests=guests or None,
agenda_items=agenda_items if agenda_items else None,
attendance=attendance if attendance else None,
proceedings=proceedings if proceedings else None,
status=BoardMeeting.STATUS_DRAFT,
created_by=current_user.id
)
if attendance:
new_meeting.quorum_count = sum(1 for a in attendance.values() if a.get('present'))
new_meeting.quorum_confirmed = new_meeting.quorum_count >= 9
db.add(new_meeting)
db.commit()
flash(f'Posiedzenie {new_meeting.meeting_identifier} zostało utworzone.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=new_meeting.id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to save meeting: {e}")
flash('Błąd podczas zapisywania posiedzenia.', 'error')
return redirect(request.url)