nordabiz/blueprints/board/routes.py
Maciej Pienczyn 9f22f27738
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: fee analysis with parent/child brands on skladki page
- Shows expected fee per company (200 zł for 1 brand, 300 zł for 2+)
- Child companies shown with striped "nie dotyczy" tiles
- Rate change month displayed (e.g., "I-III: 200 zł, od IV: 300 zł")
- Expandable brand list under parent company name
- Children grouped after their parent in the table

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 15:15:10 +02:00

956 lines
35 KiB
Python

"""
Board Routes (Rada Izby)
========================
Routes for board meeting management, document handling, and PDF generation.
Endpoints - Meetings:
- GET /rada/ - List all meetings + board members
- GET /rada/posiedzenia - Redirect to /rada/
- GET/POST /rada/posiedzenia/nowe - Create new meeting (office_manager+)
- GET/POST /rada/posiedzenia/<id>/edytuj - Edit meeting (office_manager+)
- GET /rada/posiedzenia/<id> - View meeting details
- POST /rada/posiedzenia/<id>/publikuj-program - Publish agenda (office_manager+)
- POST /rada/posiedzenia/<id>/publikuj-protokol - Publish protocol (office_manager+)
- GET /rada/posiedzenia/<id>/pdf-program - Download agenda PDF
- GET /rada/posiedzenia/<id>/pdf-protokol - Download protocol PDF
Endpoints - Documents:
- POST /rada/posiedzenia/<id>/dokumenty/dodaj - Upload document (office_manager+)
- GET /rada/dokumenty/<id>/pobierz - Download document (rada_member+)
- POST /rada/dokumenty/<id>/usun - Soft delete document (office_manager+)
"""
import os
from decimal import Decimal
from datetime import datetime
from flask import (
render_template, request, redirect, url_for, flash,
current_app, Response, send_file
)
from flask_login import login_required, current_user
from sqlalchemy import desc
from . import bp
from database import SessionLocal, BoardMeeting, BoardDocument, SystemRole, User, Company, MembershipFee, MembershipFeeConfig
from utils.decorators import rada_member_required, office_manager_required
from utils.helpers import sanitize_html
from services.document_upload_service import DocumentUploadService
from datetime import date, time
try:
import weasyprint
HAS_WEASYPRINT = True
except (ImportError, OSError):
HAS_WEASYPRINT = False
# =============================================================================
# MEETING ROUTES
# =============================================================================
@bp.route('/')
@login_required
@rada_member_required
def index():
"""Display list of board meetings and board members (main page)"""
db = SessionLocal()
try:
meetings = db.query(BoardMeeting).order_by(
desc(BoardMeeting.year),
desc(BoardMeeting.meeting_number)
).all()
# Get board members
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
can_manage = current_user.has_role(SystemRole.OFFICE_MANAGER)
return render_template(
'board/meetings_list.html',
meetings=meetings,
board_members=board_members,
can_manage=can_manage
)
finally:
db.close()
@bp.route('/posiedzenia')
@login_required
@rada_member_required
def meetings_list():
"""Redirect to main board page for backwards compatibility"""
return redirect(url_for('board.index'))
@bp.route('/posiedzenia/nowe', methods=['GET', 'POST'])
@login_required
@office_manager_required
def meeting_create():
"""Create new board meeting"""
db = SessionLocal()
try:
if request.method == 'POST':
return _handle_meeting_form(db, meeting=None)
# GET - show form with defaults
# Get next meeting number for current year
current_year = datetime.now().year
last_meeting = db.query(BoardMeeting).filter(
BoardMeeting.year == current_year
).order_by(desc(BoardMeeting.meeting_number)).first()
next_number = (last_meeting.meeting_number + 1) if last_meeting else 1
# Get board members for attendance list
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Default chairperson (Prezes - Leszek Glaza)
default_chairperson = db.query(User).filter(
User.email == 'leszek@rotor.pl'
).first()
# Default secretary (Magdalena Klóska)
default_secretary = db.query(User).filter(
User.email.like('%kloska%')
).first()
# Get staff users for secretary dropdown (OFFICE_MANAGER and above)
staff_users = db.query(User).filter(
User.role.in_(['OFFICE_MANAGER', 'ADMIN']),
User.is_active == True
).order_by(User.name).all()
return render_template(
'board/meeting_form.html',
meeting=None,
form_data={
'meeting_number': next_number,
'year': current_year,
'location': 'Siedziba Izby',
'start_time': '16:00',
'chairperson_id': default_chairperson.id if default_chairperson else None,
'secretary_id': default_secretary.id if default_secretary else None,
},
board_members=board_members,
staff_users=staff_users,
is_edit=False
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/edytuj', methods=['GET', 'POST'])
@login_required
@office_manager_required
def meeting_edit(meeting_id):
"""Edit existing board meeting"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
if request.method == 'POST':
return _handle_meeting_form(db, meeting=meeting)
# GET - show form with existing data
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Get staff users for secretary dropdown (OFFICE_MANAGER and above)
staff_users = db.query(User).filter(
User.role.in_(['OFFICE_MANAGER', 'ADMIN']),
User.is_active == True
).order_by(User.name).all()
return render_template(
'board/meeting_form.html',
meeting=meeting,
form_data={
'meeting_number': meeting.meeting_number,
'year': meeting.year,
'meeting_date': meeting.meeting_date.isoformat() if meeting.meeting_date else '',
'start_time': meeting.start_time.strftime('%H:%M') if meeting.start_time else '',
'end_time': meeting.end_time.strftime('%H:%M') if meeting.end_time else '',
'location': meeting.location,
'chairperson_id': meeting.chairperson_id,
'secretary_id': meeting.secretary_id,
'guests': meeting.guests,
'agenda_items': meeting.agenda_items or [],
'attendance': meeting.attendance or {},
'proceedings': meeting.proceedings or [],
},
board_members=board_members,
staff_users=staff_users,
is_edit=True
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>')
@login_required
@rada_member_required
def meeting_view(meeting_id):
"""View board meeting details"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
# Get board members for attendance display
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Get documents for this meeting
documents = db.query(BoardDocument).filter(
BoardDocument.meeting_id == meeting_id,
BoardDocument.is_active == True
).order_by(BoardDocument.document_type, BoardDocument.title).all()
can_manage = current_user.has_role(SystemRole.OFFICE_MANAGER)
return render_template(
'board/meeting_view.html',
meeting=meeting,
board_members=board_members,
documents=documents,
can_manage=can_manage
)
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/publikuj-program', methods=['POST'])
@login_required
@office_manager_required
def meeting_publish_agenda(meeting_id):
"""Publish meeting agenda"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
if meeting.status != BoardMeeting.STATUS_DRAFT:
flash('Program został już opublikowany.', 'warning')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate required fields
if not meeting.meeting_date:
flash('Data posiedzenia jest wymagana przed publikacją.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
if not meeting.agenda_items:
flash('Program posiedzenia jest wymagany przed publikacją.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
# Publish
meeting.status = BoardMeeting.STATUS_AGENDA_PUBLISHED
meeting.agenda_published_at = datetime.now()
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
db.commit()
flash(f'Program posiedzenia {meeting.meeting_identifier} został opublikowany.', 'success')
current_app.logger.info(
f"Meeting agenda published: {meeting.meeting_identifier} by user {current_user.id}"
)
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to publish meeting agenda: {e}")
flash('Błąd podczas publikowania programu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/publikuj-protokol', methods=['POST'])
@login_required
@office_manager_required
def meeting_publish_protocol(meeting_id):
"""Publish meeting protocol"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
if meeting.status == BoardMeeting.STATUS_PROTOCOL_PUBLISHED:
flash('Protokół został już opublikowany.', 'warning')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate required fields for protocol
if not meeting.attendance:
flash('Lista obecności jest wymagana przed publikacją protokołu.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
if not meeting.proceedings:
flash('Przebieg posiedzenia jest wymagany przed publikacją protokołu.', 'error')
return redirect(url_for('board.meeting_edit', meeting_id=meeting_id))
# Publish
meeting.status = BoardMeeting.STATUS_PROTOCOL_PUBLISHED
meeting.protocol_published_at = datetime.now()
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
db.commit()
# Trigger admission workflow in background
user_id = current_user.id
import threading
def _run_admission_workflow():
try:
from services.admission_workflow import run_admission_workflow
run_admission_workflow(meeting_id, user_id)
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Admission workflow failed for meeting {meeting_id}: {e}")
threading.Thread(target=_run_admission_workflow, daemon=True).start()
flash(f'Protokół z posiedzenia {meeting.meeting_identifier} został opublikowany.', 'success')
current_app.logger.info(
f"Meeting protocol published: {meeting.meeting_identifier} by user {current_user.id}"
)
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to publish meeting protocol: {e}")
flash('Błąd podczas publikowania protokołu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
# =============================================================================
# ADMISSION DASHBOARD
# =============================================================================
@bp.route('/posiedzenia/<int:meeting_id>/przyjecia')
@login_required
@office_manager_required
def meeting_admissions(meeting_id):
"""Dashboard: companies admitted at a board meeting."""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter_by(id=meeting_id).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
from database import Company, AdmissionWorkflowLog
admitted = db.query(Company).filter(
Company.admitted_at_meeting_id == meeting_id
).order_by(Company.name).all()
workflow_log = db.query(AdmissionWorkflowLog).filter_by(
meeting_id=meeting_id
).order_by(AdmissionWorkflowLog.executed_at.desc()).first()
return render_template('board/meeting_admissions.html',
meeting=meeting,
admitted=admitted,
workflow_log=workflow_log,
)
finally:
db.close()
# =============================================================================
# MEETING PDF GENERATION
# =============================================================================
def _generate_meeting_pdf(meeting_id, pdf_type):
"""Generate PDF for meeting agenda or protocol.
pdf_type: 'agenda' or 'protocol'
"""
if not HAS_WEASYPRINT:
flash('Generowanie PDF nie jest dostępne (brak biblioteki weasyprint).', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
# Get board members for protocol attendance
board_members = db.query(User).filter(
User.is_rada_member == True,
User.is_active == True
).order_by(User.name).all()
# Render HTML template
html_content = render_template(
'board/meeting_pdf.html',
meeting=meeting,
board_members=board_members,
pdf_type=pdf_type
)
# Generate PDF
pdf_bytes = weasyprint.HTML(string=html_content).write_pdf()
# Build filename
if pdf_type == 'agenda':
filename = f"Program_Posiedzenia_{meeting.meeting_identifier.replace('/', '_')}.pdf"
else:
filename = f"Protokol_Posiedzenia_{meeting.meeting_identifier.replace('/', '_')}.pdf"
current_app.logger.info(
f"Meeting PDF generated: {pdf_type} for {meeting.meeting_identifier} by user {current_user.id}"
)
return Response(
pdf_bytes,
mimetype='application/pdf',
headers={
'Content-Disposition': f'attachment; filename="{filename}"'
}
)
except Exception as e:
current_app.logger.error(f"Failed to generate meeting PDF: {e}")
flash('Błąd podczas generowania PDF.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
@bp.route('/posiedzenia/<int:meeting_id>/pdf-program')
@login_required
@rada_member_required
def meeting_pdf_agenda(meeting_id):
"""Download meeting agenda as PDF"""
return _generate_meeting_pdf(meeting_id, 'agenda')
@bp.route('/posiedzenia/<int:meeting_id>/pdf-protokol')
@login_required
@rada_member_required
def meeting_pdf_protocol(meeting_id):
"""Download meeting protocol as PDF"""
return _generate_meeting_pdf(meeting_id, 'protocol')
# =============================================================================
# DOCUMENT ROUTES
# =============================================================================
@bp.route('/posiedzenia/<int:meeting_id>/dokumenty/dodaj', methods=['POST'])
@login_required
@office_manager_required
def document_upload(meeting_id):
"""Upload document to a board meeting"""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter(
BoardMeeting.id == meeting_id
).first()
if not meeting:
flash('Posiedzenie nie zostało znalezione.', 'error')
return redirect(url_for('board.index'))
file = request.files.get('document')
if not file:
flash('Nie wybrano pliku.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Validate file
is_valid, error_msg = DocumentUploadService.validate_file(file)
if not is_valid:
flash(error_msg, 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
# Save file
stored_filename, file_path, file_size, mime_type = DocumentUploadService.save_file(file)
# Get form data
title = request.form.get('title', '').strip()
if not title:
title = file.filename
document_type = request.form.get('document_type', 'other')
description = request.form.get('description', '').strip() or None
# Get file extension
ext = file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else ''
# Create database record
doc = BoardDocument(
title=title,
description=description,
document_type=document_type,
meeting_id=meeting_id,
meeting_date=meeting.meeting_date,
meeting_number=meeting.meeting_number,
original_filename=file.filename,
stored_filename=stored_filename,
file_extension=ext,
file_size=file_size,
mime_type=mime_type,
uploaded_by=current_user.id
)
db.add(doc)
db.commit()
current_app.logger.info(
f"Board document uploaded: '{title}' for meeting {meeting.meeting_identifier} "
f"by user {current_user.id}"
)
flash(f'Dokument „{title}" został dodany.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to upload board document: {e}")
flash('Błąd podczas dodawania dokumentu.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
finally:
db.close()
@bp.route('/dokumenty/<int:doc_id>/pobierz')
@login_required
@rada_member_required
def document_download(doc_id):
"""Download a board document"""
db = SessionLocal()
try:
doc = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not doc:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
file_path = DocumentUploadService.get_file_path(
doc.stored_filename, doc.uploaded_at
)
if not os.path.exists(file_path):
current_app.logger.error(f"Board document file not found: {file_path}")
flash('Plik dokumentu nie został znaleziony na serwerze.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=doc.meeting_id))
return send_file(
file_path,
mimetype=doc.mime_type,
as_attachment=True,
download_name=doc.original_filename
)
finally:
db.close()
@bp.route('/dokumenty/<int:doc_id>/otworz')
@login_required
@rada_member_required
def document_view(doc_id):
"""Open a PDF document inline in the browser"""
db = SessionLocal()
try:
doc = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not doc:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
file_path = DocumentUploadService.get_file_path(
doc.stored_filename, doc.uploaded_at
)
if not os.path.exists(file_path):
current_app.logger.error(f"Board document file not found: {file_path}")
flash('Plik dokumentu nie został znaleziony na serwerze.', 'error')
return redirect(url_for('board.meeting_view', meeting_id=doc.meeting_id))
return send_file(
file_path,
mimetype=doc.mime_type,
as_attachment=False,
download_name=doc.original_filename
)
finally:
db.close()
@bp.route('/dokumenty/<int:doc_id>/usun', methods=['POST'])
@login_required
@office_manager_required
def document_delete(doc_id):
"""Soft delete a board document"""
db = SessionLocal()
try:
doc = db.query(BoardDocument).filter(
BoardDocument.id == doc_id,
BoardDocument.is_active == True
).first()
if not doc:
flash('Dokument nie został znaleziony.', 'error')
return redirect(url_for('board.index'))
meeting_id = doc.meeting_id
doc_title = doc.title
# Soft delete — file stays on disk
doc.is_active = False
doc.updated_by = current_user.id
doc.updated_at = datetime.now()
db.commit()
current_app.logger.info(
f"Board document soft-deleted: '{doc_title}' (id={doc_id}) by user {current_user.id}"
)
flash(f'Dokument „{doc_title}" został usunięty.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=meeting_id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to delete board document: {e}")
flash('Błąd podczas usuwania dokumentu.', 'error')
return redirect(url_for('board.index'))
finally:
db.close()
# =============================================================================
# MEETING FORM HANDLER
# =============================================================================
def _handle_meeting_form(db, meeting=None):
"""Handle meeting form submission (create or update)"""
import json
# Get form data
meeting_number = request.form.get('meeting_number', type=int)
year = request.form.get('year', type=int)
meeting_date_str = request.form.get('meeting_date', '')
start_time_str = request.form.get('start_time', '')
end_time_str = request.form.get('end_time', '')
location = request.form.get('location', '').strip()
chairperson_id = request.form.get('chairperson_id', type=int)
secretary_id = request.form.get('secretary_id', type=int)
guests = request.form.get('guests', '').strip()
# Parse agenda items from JSON
agenda_items_json = request.form.get('agenda_items', '[]')
try:
agenda_items = json.loads(agenda_items_json)
except json.JSONDecodeError:
agenda_items = []
# Parse attendance from form (status: present/absent/unknown)
attendance = {}
for key, value in request.form.items():
if key.startswith('attendance_status_'):
user_id = key.replace('attendance_status_', '')
initials = request.form.get(f'initials_{user_id}', '')
status = value # present, absent, or unknown
attendance[user_id] = {
'status': status,
'present': status == 'present', # Keep for backward compatibility
'initials': initials
}
# Parse proceedings from JSON
proceedings_json = request.form.get('proceedings', '[]')
try:
proceedings = json.loads(proceedings_json)
except json.JSONDecodeError:
proceedings = []
# Sanitize text fields in proceedings to prevent stored XSS
for proc in proceedings:
if isinstance(proc, dict):
for field in ('discussion', 'discussed', 'title'):
if field in proc and isinstance(proc[field], str):
proc[field] = sanitize_html(proc[field])
# Validate
errors = []
if not meeting_number:
errors.append('Numer posiedzenia jest wymagany.')
if not year:
errors.append('Rok jest wymagany.')
# Parse date/time
meeting_date = None
if meeting_date_str:
try:
meeting_date = datetime.strptime(meeting_date_str, '%Y-%m-%d').date()
except ValueError:
errors.append('Nieprawidłowy format daty.')
start_time = None
if start_time_str:
try:
start_time = datetime.strptime(start_time_str, '%H:%M').time()
except ValueError:
errors.append('Nieprawidłowy format godziny rozpoczęcia.')
end_time = None
if end_time_str:
try:
end_time = datetime.strptime(end_time_str, '%H:%M').time()
except ValueError:
errors.append('Nieprawidłowy format godziny zakończenia.')
if errors:
for error in errors:
flash(error, 'error')
return redirect(request.url)
try:
if meeting:
# Update existing
meeting.meeting_number = meeting_number
meeting.year = year
meeting.meeting_date = meeting_date
meeting.start_time = start_time
meeting.end_time = end_time
meeting.location = location or 'Siedziba Izby'
meeting.chairperson_id = chairperson_id
meeting.secretary_id = secretary_id
meeting.guests = guests or None
meeting.agenda_items = agenda_items if agenda_items else None
meeting.attendance = attendance if attendance else None
meeting.proceedings = proceedings if proceedings else None
meeting.updated_by = current_user.id
meeting.updated_at = datetime.now()
# Update quorum count
if attendance:
meeting.quorum_count = sum(1 for a in attendance.values() if a.get('present'))
meeting.quorum_confirmed = meeting.quorum_count >= 9 # Majority of 16
db.commit()
flash(f'Posiedzenie {meeting.meeting_identifier} zostało zaktualizowane.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=meeting.id))
else:
# Create new
new_meeting = BoardMeeting(
meeting_number=meeting_number,
year=year,
meeting_date=meeting_date,
start_time=start_time,
end_time=end_time,
location=location or 'Siedziba Izby',
chairperson_id=chairperson_id,
secretary_id=secretary_id,
guests=guests or None,
agenda_items=agenda_items if agenda_items else None,
attendance=attendance if attendance else None,
proceedings=proceedings if proceedings else None,
status=BoardMeeting.STATUS_DRAFT,
created_by=current_user.id
)
if attendance:
new_meeting.quorum_count = sum(1 for a in attendance.values() if a.get('present'))
new_meeting.quorum_confirmed = new_meeting.quorum_count >= 9
db.add(new_meeting)
db.commit()
flash(f'Posiedzenie {new_meeting.meeting_identifier} zostało utworzone.', 'success')
return redirect(url_for('board.meeting_view', meeting_id=new_meeting.id))
except Exception as e:
db.rollback()
current_app.logger.error(f"Failed to save meeting: {e}")
flash('Błąd podczas zapisywania posiedzenia.', 'error')
return redirect(request.url)
# =============================================================================
# MEMBERSHIP FEES - READ ONLY VIEW FOR BOARD MEMBERS
# =============================================================================
MONTHS_PL_BOARD = [
(1, 'Styczeń'), (2, 'Luty'), (3, 'Marzec'), (4, 'Kwiecień'),
(5, 'Maj'), (6, 'Czerwiec'), (7, 'Lipiec'), (8, 'Sierpień'),
(9, 'Wrzesień'), (10, 'Październik'), (11, 'Listopad'), (12, 'Grudzień')
]
@bp.route('/skladki')
@login_required
@rada_member_required
def board_fees():
"""Read-only view of membership fees for board members."""
db = SessionLocal()
try:
from sqlalchemy import func
year = request.args.get('year', datetime.now().year, type=int)
status_filter = request.args.get('status', '')
companies = db.query(Company).filter(
Company.status == 'active'
).order_by(Company.name).all()
fee_query = db.query(MembershipFee).filter(MembershipFee.fee_year == year)
fees = {(f.company_id, f.fee_month): f for f in fee_query.all()}
# Build parent/child relationship data for fee calculation
# Fee model: 1 brand = 200 zł (reduced), 2+ brands = 300 zł (standard)
all_companies = db.query(Company).filter(Company.status == 'active').all()
children_by_parent = {} # parent_id -> [(child, created_at)]
for c in all_companies:
if c.parent_company_id:
children_by_parent.setdefault(c.parent_company_id, []).append(c)
# Build fee data per company
parent_fees_data = [] # non-child companies
child_companies_set = set()
for c in all_companies:
if c.fee_included_in_parent or c.parent_company_id:
child_companies_set.add(c.id)
for company in companies:
is_child = company.id in child_companies_set
company_data = {
'company': company, 'months': {}, 'monthly_rate': 0,
'has_data': False, 'is_child': is_child,
'child_brands': [], 'child_count': 0,
'expected_fees': {}, 'rate_change_month': None,
'parent_months': {},
}
for m in range(1, 13):
fee = fees.get((company.id, m))
company_data['months'][m] = fee
if fee and fee.amount:
company_data['has_data'] = True
if not company_data['monthly_rate']:
company_data['monthly_rate'] = int(fee.amount)
if not is_child:
# Parent/standalone: calculate expected fees
child_brands = children_by_parent.get(company.id, [])
company_data['child_brands'] = child_brands
company_data['child_count'] = len(child_brands)
expected_fees = {}
rate_change_month = None
for m in range(1, 13):
month_date = datetime(year, m, 1)
active_children = sum(
1 for ch in child_brands
if ch.created_at and ch.created_at.replace(day=1) <= month_date
)
total_brands = 1 + active_children
expected_fees[m] = 300 if total_brands >= 2 else 200
if total_brands >= 2 and rate_change_month is None:
rate_change_month = m
company_data['expected_fees'] = expected_fees
company_data['rate_change_month'] = rate_change_month
else:
# Child: copy parent's months for visual reference
if company.parent_company_id:
for m in range(1, 13):
company_data['parent_months'][m] = fees.get((company.parent_company_id, m))
parent_fees_data.append(company_data)
# Sort: non-children with data first, then without data, children will be inserted after parents
non_children = [cf for cf in parent_fees_data if not cf['is_child']]
non_children.sort(key=lambda cf: (0 if cf.get('has_data') else 1, cf['company'].name))
# Insert child companies right after their parent
companies_fees = []
children_by_pid = {}
for cf in parent_fees_data:
if cf['is_child'] and cf['company'].parent_company_id:
children_by_pid.setdefault(cf['company'].parent_company_id, []).append(cf)
for cf in non_children:
companies_fees.append(cf)
# Add child rows after parent
for child_cf in sorted(children_by_pid.get(cf['company'].id, []), key=lambda x: x['company'].name):
companies_fees.append(child_cf)
# Filters
if status_filter:
if status_filter == 'paid':
companies_fees = [cf for cf in companies_fees if all(
cf['months'].get(m) and cf['months'][m].status == 'paid' for m in range(1, 13)
if cf['months'].get(m)
) and any(cf['months'].get(m) for m in range(1, 13))]
elif status_filter == 'partial':
companies_fees = [cf for cf in companies_fees if (
(any(cf['months'].get(m) and cf['months'][m].status == 'paid' for m in range(1, 13)) and
any(cf['months'].get(m) and cf['months'][m].status in ('pending', 'overdue') for m in range(1, 13))) or
any(cf['months'].get(m) and cf['months'][m].status == 'partial' for m in range(1, 13))
)]
elif status_filter == 'none':
companies_fees = [cf for cf in companies_fees if
any(cf['months'].get(m) for m in range(1, 13)) and
not any(cf['months'].get(m) and cf['months'][m].status in ('paid', 'partial') for m in range(1, 13))
]
total_companies = len(companies)
all_fees = list(fees.values())
paid_count = sum(1 for f in all_fees if f.status == 'paid')
pending_count = len(all_fees) - paid_count
total_due = sum(float(f.amount) for f in all_fees) if all_fees else Decimal(0)
total_paid = sum(float(f.amount_paid or 0) for f in all_fees) if all_fees else Decimal(0)
return render_template(
'board/fees_readonly.html',
companies_fees=companies_fees,
year=year,
status_filter=status_filter,
total_companies=total_companies,
paid_count=paid_count,
pending_count=pending_count,
total_due=total_due,
total_paid=total_paid,
years=list(range(2022, datetime.now().year + 2)),
months=MONTHS_PL_BOARD,
)
finally:
db.close()