nordabiz/blueprints/community/calendar/routes.py
Maciej Pienczyn d06507a7c3
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: company colleague picker + admin add person to paid events
- EventGuest.guest_type: 'member' (member rate) or 'external' (guest rate)
- Dropdown of company colleagues when adding member-type guest
- Manual entry option for members not on portal
- Admin payment panel: "Dodaj osobę" with "Dodaj + opłacone" shortcut
- Migration 064: guest_type column

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 11:48:46 +02:00

677 lines
25 KiB
Python

"""
Calendar Routes
===============
Public calendar and event registration endpoints.
Includes iCal subscription feed at /kalendarz/ical
"""
from datetime import date, datetime, timedelta
import calendar as cal_module
from flask import render_template, request, redirect, url_for, flash, jsonify, Response
from flask_login import login_required, current_user
from . import bp
from database import SessionLocal, NordaEvent, EventAttendee, EventGuest
# Polish month names
POLISH_MONTHS = {
1: 'Styczeń', 2: 'Luty', 3: 'Marzec', 4: 'Kwiecień',
5: 'Maj', 6: 'Czerwiec', 7: 'Lipiec', 8: 'Sierpień',
9: 'Wrzesień', 10: 'Październik', 11: 'Listopad', 12: 'Grudzień'
}
@bp.route('/', endpoint='calendar_index')
@login_required
def index():
"""Kalendarz wydarzeń Norda Biznes - widok listy lub siatki miesięcznej"""
db = SessionLocal()
try:
today = date.today()
# Parametry widoku
view_mode = request.args.get('view', 'list') # list lub grid
year = request.args.get('year', today.year, type=int)
month = request.args.get('month', today.month, type=int)
# Walidacja roku
if year < 2020 or year > 2100:
return redirect(url_for('.calendar_index'))
# Walidacja miesiąca
if month < 1:
month = 12
year -= 1
elif month > 12:
month = 1
year += 1
# Oblicz poprzedni/następny miesiąc
if month == 1:
prev_month, prev_year = 12, year - 1
else:
prev_month, prev_year = month - 1, year
if month == 12:
next_month, next_year = 1, year + 1
else:
next_month, next_year = month + 1, year
# Dane dla widoku siatki
month_days = []
events_by_day = {}
if view_mode == 'grid':
# Pobierz wydarzenia z danego miesiąca
first_day = date(year, month, 1)
last_day = date(year, month, cal_module.monthrange(year, month)[1])
all_events = db.query(NordaEvent).filter(
NordaEvent.event_date >= first_day,
NordaEvent.event_date <= last_day
).order_by(NordaEvent.event_date.asc()).all()
# Filtruj wydarzenia według uprawnień użytkownika
events = [e for e in all_events if e.can_user_view(current_user)]
# Przygotuj strukturę kalendarza (poniedziałek = 0)
cal = cal_module.Calendar(firstweekday=0)
month_days = cal.monthdayscalendar(year, month)
# Mapuj wydarzenia na dni
for event in events:
day = event.event_date.day
if day not in events_by_day:
events_by_day[day] = []
events_by_day[day].append(event)
# Dane dla widoku listy (zawsze potrzebne dla fallback)
all_upcoming = db.query(NordaEvent).filter(
NordaEvent.event_date >= today
).order_by(NordaEvent.event_date.asc()).all()
# Filtruj według uprawnień
upcoming = [e for e in all_upcoming if e.can_user_view(current_user)]
all_past = db.query(NordaEvent).filter(
NordaEvent.event_date < today
).order_by(NordaEvent.event_date.desc()).limit(10).all()
past = [e for e in all_past if e.can_user_view(current_user)][:5]
return render_template('calendar/index.html',
# Dane dla widoku listy
upcoming_events=upcoming,
past_events=past,
today=today,
# Dane dla widoku siatki
view_mode=view_mode,
year=year,
month=month,
month_name=POLISH_MONTHS.get(month, ''),
month_days=month_days,
events_by_day=events_by_day,
prev_month=prev_month,
prev_year=prev_year,
next_month=next_month,
next_year=next_year,
)
finally:
db.close()
def _enrich_event_description(db, html):
"""Enrich event description: link member names and auto-link URLs.
Processes HTML text nodes only (skips content inside <a> tags and HTML attributes).
"""
import re
from markupsafe import Markup, escape
from flask import url_for as flask_url_for
from database import User
# If plain text (no HTML block tags), convert newlines to HTML
if '<p>' not in html and '<br' not in html and '<div' not in html and '<ul' not in html:
# Escape HTML entities first for plain text
html = str(escape(html))
# Double newline = paragraph break, single = line break
paragraphs = html.split('\n\n')
html = ''.join(f'<p>{p.replace(chr(10), "<br>")}</p>' for p in paragraphs if p.strip())
# Build replacement maps — persons (all users with a name)
members = db.query(User.id, User.name).filter(
User.name.isnot(None),
).all()
members = sorted(members, key=lambda m: len(m.name), reverse=True)
# person_map: exact name → url
# person_fuzzy: list of (pattern, display_name, url) for declined Polish names
person_map = {}
person_fuzzy = []
for m in members:
if m.name in person_map:
continue
url = flask_url_for('public.user_profile', user_id=m.id)
person_map[m.name] = url
# Build fuzzy pattern for Polish name declensions: "Iwona Spaleniak" → "Iwon\w+ Spaleniak"
parts = m.name.split()
if len(parts) >= 2:
first = parts[0]
rest = ' '.join(parts[1:])
# Stem: keep at least 3 chars, cut last 1-2 chars depending on length
stem_len = max(3, len(first) - 2)
stem = re.escape(first[:stem_len])
fuzzy_pattern = r'\b' + stem + r'\w*\s+' + re.escape(rest) + r'\b'
person_fuzzy.append((fuzzy_pattern, m.name, url))
# Build replacement maps — companies
from database import Company
companies = db.query(Company.name, Company.slug).filter(
Company.slug.isnot(None),
Company.name.isnot(None),
Company.status == 'active',
).all()
companies = sorted(companies, key=lambda c: len(c.name), reverse=True)
# Skip company names that are common Polish/English words — too many false positives
_common_words = {'Portal', 'Joker', 'Wakat'}
company_map = {}
for c in companies:
if c.name in _common_words:
continue
company_map[c.name] = flask_url_for('public.company_detail_by_slug', slug=c.slug)
def enrich_text_node(text):
"""Apply person/company linking and URL linkification to a plain text fragment."""
# 1. Auto-link URLs
url_pattern = r'(https?://[^\s<>"\']+|(?<!\w)www\.[^\s<>"\']+|(?<!\w)nordabiznes\.pl[^\s<>"\']*)'
def url_replacer(m):
url = m.group(0)
href = url if url.startswith('http') else 'https://' + url
return f'<a href="{href}" target="_blank" style="color:var(--primary);font-weight:500;">{url}</a>'
text = re.sub(url_pattern, url_replacer, text)
# 2. Link person names (pill badge — green)
# First: exact matches
for name, url in person_map.items():
pattern = r'\b' + re.escape(name) + r'\b'
link = f'<a href="{url}" class="person-link" title="Zobacz profil">{name}</a>'
text = re.sub(pattern, link, text)
# Then: fuzzy matches for Polish declensions (Iwonę → Iwona, etc.)
for fuzzy_pattern, display_name, url in person_fuzzy:
def fuzzy_replacer(m, _url=url, _display=display_name):
return f'<a href="{_url}" class="person-link" title="Zobacz profil">{m.group(0)}</a>'
text = re.sub(fuzzy_pattern, fuzzy_replacer, text)
# 3. Link company names (pill badge — orange)
for name, url in company_map.items():
pattern = r'\b' + re.escape(name) + r'\b'
link = f'<a href="{url}" class="company-link" title="Zobacz firmę">{name}</a>'
text = re.sub(pattern, link, text)
return text
# Split HTML into tags and text nodes, only process text outside <a> tags
# Pattern: match HTML tags (including their content for <a>) or text between tags
result = []
pos = 0
in_a_tag = False
# Regex to find HTML tags
tag_pattern = re.compile(r'<(/?)(\w+)([^>]*)>')
for match in tag_pattern.finditer(html):
start, end = match.start(), match.end()
is_closing = match.group(1) == '/'
tag_name = match.group(2).lower()
# Process text before this tag
if start > pos:
text_chunk = html[pos:start]
if in_a_tag:
result.append(text_chunk) # Don't modify text inside <a>
else:
result.append(enrich_text_node(text_chunk))
result.append(match.group(0)) # The tag itself
pos = end
if tag_name == 'a':
in_a_tag = not is_closing
# Process remaining text after last tag
if pos < len(html):
text_chunk = html[pos:]
if in_a_tag:
result.append(text_chunk)
else:
result.append(enrich_text_node(text_chunk))
return Markup(''.join(result))
@bp.route('/<int:event_id>', endpoint='calendar_event')
@login_required
def event(event_id):
"""Szczegóły wydarzenia"""
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
flash('Wydarzenie nie istnieje.', 'error')
return redirect(url_for('.calendar_index'))
# Sprawdź uprawnienia dostępu
if not event.can_user_view(current_user):
flash('Nie masz uprawnień do wyświetlenia tego wydarzenia.', 'error')
return redirect(url_for('.calendar_index'))
# Sprawdź czy użytkownik jest zapisany
user_attending = db.query(EventAttendee).filter(
EventAttendee.event_id == event_id,
EventAttendee.user_id == current_user.id
).first()
# Pobierz gości bieżącego użytkownika na to wydarzenie
user_guests = db.query(EventGuest).filter(
EventGuest.event_id == event_id,
EventGuest.host_user_id == current_user.id
).order_by(EventGuest.created_at.asc()).all()
# Find speaker as user or company
speaker_user_id = None
speaker_company_slug = None
from database import User, Company
if event.speaker_name:
speaker_user = db.query(User).filter(
User.name == event.speaker_name,
).first()
if speaker_user:
speaker_user_id = speaker_user.id
else:
# Try matching as company name
speaker_company = db.query(Company).filter(
Company.name == event.speaker_name,
Company.status == 'active',
).first()
if speaker_company:
speaker_company_slug = speaker_company.slug
# Enrich description: linkify member names, companies and URLs
enriched_description = event.description or ''
if enriched_description:
enriched_description = _enrich_event_description(db, enriched_description)
return render_template('calendar/event.html',
event=event,
user_attending=user_attending,
user_guests=user_guests,
speaker_user_id=speaker_user_id,
speaker_company_slug=speaker_company_slug,
enriched_description=enriched_description,
)
finally:
db.close()
@bp.route('/<int:event_id>/rsvp', methods=['POST'], endpoint='calendar_rsvp')
@login_required
def rsvp(event_id):
"""Zapisz się / wypisz z wydarzenia"""
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
return jsonify({'success': False, 'error': 'Wydarzenie nie istnieje'}), 404
# Sprawdź uprawnienia dostępu
if not event.can_user_attend(current_user):
return jsonify({'success': False, 'error': 'Nie masz uprawnień do zapisania się na to wydarzenie'}), 403
# Sprawdź czy już zapisany
existing = db.query(EventAttendee).filter(
EventAttendee.event_id == event_id,
EventAttendee.user_id == current_user.id
).first()
is_ext = getattr(event, 'is_external', False) or False
msg_added = 'Oznaczono jako zainteresowany' if is_ext else 'Zapisano na wydarzenie'
msg_removed = 'Usunięto zainteresowanie' if is_ext else 'Wypisano z wydarzenia'
if existing:
db.delete(existing)
db.commit()
return jsonify({
'success': True,
'action': 'removed',
'message': msg_removed,
'attendee_count': event.attendee_count
})
else:
# Skip max_attendees check for external events
if not is_ext and event.max_attendees and event.total_attendee_count >= event.max_attendees:
return jsonify({'success': False, 'error': 'Brak wolnych miejsc'}), 400
attendee = EventAttendee(
event_id=event_id,
user_id=current_user.id,
status='confirmed'
)
# Auto-assign payment amount for paid events
if event.is_paid:
is_member = current_user.role and current_user.role.value >= 20 # MEMBER+
attendee.payment_amount = event.price_member if is_member else event.price_guest
db.add(attendee)
db.commit()
return jsonify({
'success': True,
'action': 'added',
'message': msg_added,
'attendee_count': event.attendee_count
})
finally:
db.close()
MAX_GUESTS_PER_USER = 5
@bp.route('/<int:event_id>/guests', methods=['POST'], endpoint='calendar_add_guest')
@login_required
def add_guest(event_id):
"""Dodaj osobę towarzyszącą na wydarzenie"""
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
return jsonify({'success': False, 'error': 'Wydarzenie nie istnieje'}), 404
if event.is_past:
return jsonify({'success': False, 'error': 'Wydarzenie już się odbyło'}), 400
if not event.can_user_attend(current_user):
return jsonify({'success': False, 'error': 'Nie masz uprawnień'}), 403
if getattr(event, 'is_external', False):
return jsonify({'success': False, 'error': 'Rejestracja gości niedostępna dla wydarzeń zewnętrznych'}), 400
# Sprawdź limit gości per użytkownik
guest_count = db.query(EventGuest).filter(
EventGuest.event_id == event_id,
EventGuest.host_user_id == current_user.id
).count()
if guest_count >= MAX_GUESTS_PER_USER:
return jsonify({'success': False, 'error': f'Maksymalnie {MAX_GUESTS_PER_USER} gości na wydarzenie'}), 400
# Sprawdź limit miejsc
if event.max_attendees and event.total_attendee_count >= event.max_attendees:
return jsonify({'success': False, 'error': 'Brak wolnych miejsc'}), 400
data = request.get_json() or {}
first_name = (data.get('first_name') or '').strip()
last_name = (data.get('last_name') or '').strip()
organization = (data.get('organization') or '').strip()
# Minimum jedno pole
if not first_name and not last_name and not organization:
return jsonify({'success': False, 'error': 'Podaj przynajmniej imię, nazwisko lub firmę'}), 400
guest_type = (data.get('guest_type') or 'external').strip()
if guest_type not in ('external', 'member'):
guest_type = 'external'
guest = EventGuest(
event_id=event_id,
host_user_id=current_user.id,
first_name=first_name or None,
last_name=last_name or None,
organization=organization or None,
guest_type=guest_type,
)
# Auto-assign payment amount based on guest type
if event.is_paid:
guest.payment_amount = event.price_member if guest_type == 'member' else event.price_guest
db.add(guest)
db.commit()
return jsonify({
'success': True,
'action': 'added',
'guest': {
'id': guest.id,
'first_name': guest.first_name,
'last_name': guest.last_name,
'organization': guest.organization,
'display_name': guest.display_name,
}
}), 201
finally:
db.close()
@bp.route('/<int:event_id>/guests/<int:guest_id>', methods=['PATCH'], endpoint='calendar_edit_guest')
@login_required
def edit_guest(event_id, guest_id):
"""Edytuj dane osoby towarzyszącej"""
db = SessionLocal()
try:
guest = db.query(EventGuest).filter(
EventGuest.id == guest_id,
EventGuest.event_id == event_id
).first()
if not guest:
return jsonify({'success': False, 'error': 'Gość nie znaleziony'}), 404
# Tylko host lub admin
from database import SystemRole
if guest.host_user_id != current_user.id and not current_user.has_role(SystemRole.OFFICE_MANAGER):
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if event and event.is_past:
return jsonify({'success': False, 'error': 'Wydarzenie już się odbyło'}), 400
data = request.get_json() or {}
first_name = (data.get('first_name') or '').strip()
last_name = (data.get('last_name') or '').strip()
organization = (data.get('organization') or '').strip()
if not first_name and not last_name and not organization:
return jsonify({'success': False, 'error': 'Podaj przynajmniej imię, nazwisko lub firmę'}), 400
guest.first_name = first_name or None
guest.last_name = last_name or None
guest.organization = organization or None
db.commit()
return jsonify({
'success': True,
'action': 'updated',
'guest': {
'id': guest.id,
'first_name': guest.first_name,
'last_name': guest.last_name,
'organization': guest.organization,
'display_name': guest.display_name,
}
})
finally:
db.close()
@bp.route('/<int:event_id>/guests/<int:guest_id>', methods=['DELETE'], endpoint='calendar_delete_guest')
@login_required
def delete_guest(event_id, guest_id):
"""Usuń osobę towarzyszącą z wydarzenia"""
db = SessionLocal()
try:
guest = db.query(EventGuest).filter(
EventGuest.id == guest_id,
EventGuest.event_id == event_id
).first()
if not guest:
return jsonify({'success': False, 'error': 'Gość nie znaleziony'}), 404
# Tylko host lub admin
from database import SystemRole
if guest.host_user_id != current_user.id and not current_user.has_role(SystemRole.OFFICE_MANAGER):
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db.delete(guest)
db.commit()
return jsonify({'success': True, 'action': 'removed'})
finally:
db.close()
@bp.route('/<int:event_id>/company-colleagues', methods=['GET'], endpoint='calendar_company_colleagues')
@login_required
def company_colleagues(event_id):
"""Pobierz listę kolegów z firmy do dropdownu przy dodawaniu gościa"""
if not current_user.company_id:
return jsonify([])
db = SessionLocal()
try:
from database import User
colleagues = db.query(User).filter(
User.company_id == current_user.company_id,
User.id != current_user.id,
User.is_active == True
).order_by(User.name).all()
# Check who is already registered
registered_ids = set(
a.user_id for a in db.query(EventAttendee).filter(EventAttendee.event_id == event_id).all()
)
# Check who is already added as guest
guest_names = set()
for g in db.query(EventGuest).filter(EventGuest.event_id == event_id).all():
guest_names.add(f"{g.first_name or ''} {g.last_name or ''}".strip().lower())
result = []
for c in colleagues:
name_parts = (c.name or '').split(' ', 1)
first = name_parts[0] if name_parts else ''
last = name_parts[1] if len(name_parts) > 1 else ''
already = c.id in registered_ids or c.name.lower() in guest_names if c.name else False
result.append({
'id': c.id,
'name': c.name or c.email.split('@')[0],
'first_name': first,
'last_name': last,
'already_registered': already,
})
return jsonify(result)
finally:
db.close()
@bp.route('/ical', endpoint='calendar_ical')
def ical_feed():
"""
iCal subscription feed — public endpoint (no login required).
Returns .ics file with all upcoming public/member events.
Subscribe once in Google Calendar / iOS Calendar and events sync automatically.
"""
db = SessionLocal()
try:
events = db.query(NordaEvent).filter(
NordaEvent.event_date >= date.today() - timedelta(days=30),
NordaEvent.access_level.in_(['public', 'members_only'])
).order_by(NordaEvent.event_date.asc()).all()
lines = [
'BEGIN:VCALENDAR',
'VERSION:2.0',
'PRODID:-//NordaBiznes//Kalendarz//PL',
'CALSCALE:GREGORIAN',
'METHOD:PUBLISH',
'X-WR-CALNAME:Norda Biznes - Wydarzenia',
'X-WR-TIMEZONE:Europe/Warsaw',
'REFRESH-INTERVAL;VALUE=DURATION:PT6H',
'X-PUBLISHED-TTL:PT6H',
# VTIMEZONE for Google Calendar to recognize Europe/Warsaw
'BEGIN:VTIMEZONE',
'TZID:Europe/Warsaw',
'BEGIN:STANDARD',
'DTSTART:19701025T030000',
'RRULE:FREQ=YEARLY;BYDAY=-1SU;BYMONTH=10',
'TZOFFSETFROM:+0200',
'TZOFFSETTO:+0100',
'TZNAME:CET',
'END:STANDARD',
'BEGIN:DAYLIGHT',
'DTSTART:19700329T020000',
'RRULE:FREQ=YEARLY;BYDAY=-1SU;BYMONTH=3',
'TZOFFSETFROM:+0100',
'TZOFFSETTO:+0200',
'TZNAME:CEST',
'END:DAYLIGHT',
'END:VTIMEZONE',
]
for event in events:
uid = f'event-{event.id}@nordabiznes.pl'
# Build DTSTART/DTEND
dtend = None
if event.time_start:
dt_start = datetime.combine(event.event_date, event.time_start)
dtstart = f'DTSTART;TZID=Europe/Warsaw:{dt_start.strftime("%Y%m%dT%H%M%S")}'
if event.time_end:
dt_end = datetime.combine(event.event_date, event.time_end)
dtend = f'DTEND;TZID=Europe/Warsaw:{dt_end.strftime("%Y%m%dT%H%M%S")}'
# No time_end → omit DTEND (RFC 5545: duration defaults to zero)
else:
# All-day event
dtstart = f'DTSTART;VALUE=DATE:{event.event_date.strftime("%Y%m%d")}'
next_day = event.event_date + timedelta(days=1)
dtend = f'DTEND;VALUE=DATE:{next_day.strftime("%Y%m%d")}'
# Clean description (remove HTML, limit length)
desc = (event.description or '').replace('\r\n', '\\n').replace('\n', '\\n').replace(',', '\\,').replace(';', '\\;')
if len(desc) > 500:
desc = desc[:497] + '...'
summary = (event.title or '').replace(',', '\\,').replace(';', '\\;')
location = (event.location or '').replace(',', '\\,').replace(';', '\\;')
created = event.created_at.strftime('%Y%m%dT%H%M%SZ') if event.created_at else datetime.now().strftime('%Y%m%dT%H%M%SZ')
lines.append('BEGIN:VEVENT')
lines.append(f'UID:{uid}')
lines.append(dtstart)
if dtend:
lines.append(dtend)
lines.append(f'SUMMARY:{summary}')
if location:
lines.append(f'LOCATION:{location}')
if desc:
lines.append(f'DESCRIPTION:{desc}')
if event.external_url:
lines.append(f'URL:{event.external_url}')
else:
lines.append(f'URL:https://nordabiznes.pl/kalendarz/{event.id}')
lines.append(f'ORGANIZER;CN={event.organizer_name or "Norda Biznes"}:mailto:{event.organizer_email or "biuro@norda-biznes.info"}')
lines.append(f'DTSTAMP:{created}')
lines.append('END:VEVENT')
lines.append('END:VCALENDAR')
ical_content = '\r\n'.join(lines)
return Response(
ical_content,
mimetype='text/calendar; charset=utf-8',
headers={
'Cache-Control': 'public, max-age=3600',
}
)
finally:
db.close()