nordabiz/blueprints/admin/routes_membership.py
Maciej Pienczyn 86090a7d33
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat(membership): PDF generation for declarations via WeasyPrint
- New route: /admin/membership/<id>/print → generates PDF
- Professional A4 layout with chamber header, data sections, signatures
- Button changed from window.print() to PDF link (opens in new tab)
- No browser headers/footers — clean PDF output
- Signature lines for applicant and reviewer

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 12:13:21 +02:00

1025 lines
37 KiB
Python

"""
Admin Routes - Membership Applications
=======================================
Admin panel for managing membership applications and company data requests.
"""
import re
import logging
from datetime import datetime, date
from flask import render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from sqlalchemy.orm.attributes import flag_modified
from . import bp
from database import (
SessionLocal, MembershipApplication, CompanyDataRequest,
Company, Category, User, UserNotification, Person, CompanyPerson, CompanyPKD,
CompanyFinancialReport, SystemRole, UserCompany
)
from krs_api_service import get_company_from_krs
from utils.decorators import role_required
logger = logging.getLogger(__name__)
def _parse_krs_date(date_str):
"""Parse KRS date format 'DD.MM.YYYY' to Python date. Returns None on failure."""
if not date_str:
return None
# Extract first date-like pattern (handles "25.10.2011 R. - ASESOR..." format)
import re as _re
match = _re.search(r'(\d{2})\.(\d{2})\.(\d{4})', date_str)
if match:
try:
return date(int(match.group(3)), int(match.group(2)), int(match.group(1)))
except ValueError:
return None
return None
def _generate_slug(name):
"""Generate URL-safe slug from company name."""
slug = re.sub(r'[^\w\s-]', '', name.lower())
slug = re.sub(r'[\s_]+', '-', slug)
slug = re.sub(r'-+', '-', slug).strip('-')
return slug
def _generate_member_number(db):
"""Generate next member number."""
last = db.query(MembershipApplication).filter(
MembershipApplication.member_number.isnot(None)
).order_by(MembershipApplication.id.desc()).first()
if last and last.member_number:
try:
num = int(last.member_number.replace('NB-', ''))
return f"NB-{num + 1:04d}"
except ValueError:
pass
# Count existing companies as starting point
count = db.query(Company).filter(Company.status == 'active').count()
return f"NB-{count + 1:04d}"
def _enrich_company_from_krs(company, db):
"""
Fetch and import KRS data for a company.
Updates company with krs_raw_data, imports people and PKD codes.
"""
if not company.krs:
return False
try:
krs_data = get_company_from_krs(company.krs)
if not krs_data:
logger.warning(f"KRS data not found for company {company.id} (KRS: {company.krs})")
return False
data_dict = krs_data.to_dict()
# Update company with KRS data
company.krs_raw_data = data_dict
company.krs_fetched_at = datetime.now()
company.data_source = 'KRS API'
company.last_verified_at = datetime.now()
# Additional fields from KRS
if data_dict.get('forma_prawna'):
company.legal_form = data_dict['forma_prawna']
if data_dict.get('nazwa'):
company.legal_name = data_dict['nazwa']
if data_dict.get('sposob_reprezentacji'):
company.krs_representation = data_dict['sposob_reprezentacji']
# REGON
regon = data_dict.get('regon', '')
if regon:
# KRS returns 14-digit REGON, strip trailing zeros for 9-digit form
company.regon = regon.rstrip('0') if len(regon) > 9 else regon
# NIP (if missing)
if not company.nip and data_dict.get('nip'):
company.nip = data_dict['nip']
# Address
adres = data_dict.get('adres', {})
if adres:
ulica = adres.get('ulica', '')
nr_domu = adres.get('nr_domu', '')
nr_lokalu = adres.get('nr_lokalu', '')
addr_street = ulica
if nr_domu:
addr_street += f' {nr_domu}'
if nr_lokalu:
addr_street += f'/{nr_lokalu}'
if addr_street and not company.address_street:
company.address_street = addr_street
miasto = adres.get('miejscowosc') or adres.get('poczta')
if miasto and not company.address_city:
company.address_city = miasto.title()
if adres.get('kod_pocztowy') and not company.address_postal:
company.address_postal = adres['kod_pocztowy']
# Contact from KRS (www, email)
kontakt = data_dict.get('kontakt_krs', {})
if kontakt.get('www') and not company.website:
www = kontakt['www'].strip()
if not www.lower().startswith('http'):
www = 'https://' + www.lower()
company.website = www
if kontakt.get('email') and not company.email:
company.email = kontakt['email'].lower().strip()
# Capital
kapital = data_dict.get('kapital', {})
if kapital.get('zakladowy'):
company.capital_amount = kapital['zakladowy']
if kapital.get('waluta'):
company.capital_currency = kapital['waluta']
# Representation rules
if data_dict.get('sposob_reprezentacji'):
company.krs_representation_rules = data_dict['sposob_reprezentacji']
# Registration date
daty = data_dict.get('daty', {})
if daty.get('rejestracji'):
parsed = _parse_krs_date(daty['rejestracji'])
if parsed:
company.krs_registration_date = parsed
# Company agreement (umowa spółki)
umowa = data_dict.get('umowa_spolki', {})
if umowa.get('data_umowy'):
parsed = _parse_krs_date(umowa['data_umowy'])
if parsed:
company.krs_company_agreement_date = parsed
if umowa.get('czas_trwania'):
company.krs_duration = umowa['czas_trwania']
# OPP status
if 'czy_opp' in data_dict:
company.is_opp = bool(data_dict['czy_opp'])
# Last audit timestamp
company.krs_last_audit_at = datetime.now()
# Primary PKD on Company model
pkd_list = data_dict.get('przedmiot_dzialalnosci', [])
primary_pkd = next((p for p in pkd_list if p.get('glowna')), pkd_list[0] if pkd_list else None)
if primary_pkd and not company.pkd_code:
company.pkd_code = primary_pkd.get('kod', '')
company.pkd_description = primary_pkd.get('opis', '')
# Import board members (zarząd)
with db.no_autoflush:
for osoba in data_dict.get('zarzad', []):
nazwisko = osoba.get('nazwisko', '')
imiona = osoba.get('imiona', osoba.get('imie', ''))
funkcja = osoba.get('funkcja', 'CZŁONEK ZARZĄDU')
if not nazwisko:
continue
person = db.query(Person).filter(
Person.nazwisko == nazwisko,
Person.imiona == imiona
).first()
if not person:
person = Person(nazwisko=nazwisko, imiona=imiona)
db.add(person)
db.flush()
existing = db.query(CompanyPerson).filter(
CompanyPerson.company_id == company.id,
CompanyPerson.person_id == person.id
).first()
if not existing:
cp = CompanyPerson(
company_id=company.id,
person_id=person.id,
role=funkcja,
role_category='zarzad',
source='ekrs.ms.gov.pl',
fetched_at=datetime.now()
)
db.add(cp)
# Import PKD codes
for pkd in data_dict.get('przedmiot_dzialalnosci', []):
kod = pkd.get('kod', '')
nazwa = pkd.get('opis', '') or pkd.get('nazwa', '')
is_primary = pkd.get('glowna', False) or pkd.get('glowny', False)
if not kod:
continue
existing = db.query(CompanyPKD).filter(
CompanyPKD.company_id == company.id,
CompanyPKD.pkd_code == kod
).first()
if not existing:
cpkd = CompanyPKD(
company_id=company.id,
pkd_code=kod,
pkd_description=nazwa,
is_primary=is_primary
)
db.add(cpkd)
# Import shareholders (wspólnicy)
for wspolnik in data_dict.get('wspolnicy', []):
nazwisko = wspolnik.get('nazwisko', '')
imiona = wspolnik.get('imiona', wspolnik.get('imie', ''))
if not nazwisko:
continue
person = db.query(Person).filter(
Person.nazwisko == nazwisko,
Person.imiona == imiona
).first()
if not person:
person = Person(nazwisko=nazwisko, imiona=imiona)
db.add(person)
db.flush()
existing = db.query(CompanyPerson).filter(
CompanyPerson.company_id == company.id,
CompanyPerson.person_id == person.id,
CompanyPerson.role_category == 'wspolnik'
).first()
if not existing:
cp = CompanyPerson(
company_id=company.id,
person_id=person.id,
role=wspolnik.get('funkcja', 'WSPÓLNIK'),
role_category='wspolnik',
shares_count=wspolnik.get('liczba_udzialow'),
shares_value=wspolnik.get('wartosc_udzialow'),
source='ekrs.ms.gov.pl',
fetched_at=datetime.now()
)
db.add(cp)
# Import financial reports (sprawozdania finansowe)
for sf in data_dict.get('sprawozdania_finansowe', []):
za_okres = sf.get('za_okres', '')
data_zlozenia = sf.get('data_zlozenia', '')
# Extract two dates from period string
# Handles: "01.01.2011 - 31.12.2011" and "OD 01.01.2013 DO 31.12.2013"
period_start = None
period_end = None
import re as _re
date_matches = _re.findall(r'(\d{2}\.\d{2}\.\d{4})', za_okres)
if len(date_matches) >= 2:
period_start = _parse_krs_date(date_matches[0])
period_end = _parse_krs_date(date_matches[1])
filed = _parse_krs_date(data_zlozenia)
if not period_start and not period_end:
continue
existing = db.query(CompanyFinancialReport).filter(
CompanyFinancialReport.company_id == company.id,
CompanyFinancialReport.period_start == period_start,
CompanyFinancialReport.period_end == period_end
).first()
if not existing:
cfr = CompanyFinancialReport(
company_id=company.id,
period_start=period_start,
period_end=period_end,
filed_at=filed,
report_type='annual',
source='ekrs'
)
db.add(cfr)
logger.info(f"Enriched company {company.id} with KRS data")
return True
except Exception as e:
logger.error(f"Error enriching company {company.id} from KRS: {e}")
return False
# ============================================================
# MEMBERSHIP APPLICATIONS
# ============================================================
@bp.route('/membership')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_membership():
"""Admin panel for membership applications."""
db = SessionLocal()
try:
# Get filter parameters
status_filter = request.args.get('status', 'all')
search_query = request.args.get('q', '').strip()
# Base query
query = db.query(MembershipApplication)
# Apply filters
if status_filter and status_filter != 'all':
query = query.filter(MembershipApplication.status == status_filter)
if search_query:
search_pattern = f'%{search_query}%'
query = query.filter(
(MembershipApplication.company_name.ilike(search_pattern)) |
(MembershipApplication.nip.ilike(search_pattern)) |
(MembershipApplication.email.ilike(search_pattern))
)
# Order by submitted date (newest first)
applications = query.order_by(
MembershipApplication.submitted_at.desc().nullslast(),
MembershipApplication.created_at.desc()
).all()
# Statistics
total = db.query(MembershipApplication).count()
submitted = db.query(MembershipApplication).filter(
MembershipApplication.status == 'submitted'
).count()
under_review = db.query(MembershipApplication).filter(
MembershipApplication.status == 'under_review'
).count()
approved = db.query(MembershipApplication).filter(
MembershipApplication.status == 'approved'
).count()
rejected = db.query(MembershipApplication).filter(
MembershipApplication.status == 'rejected'
).count()
logger.info(f"Admin {current_user.email} accessed membership applications panel")
return render_template(
'admin/membership.html',
applications=applications,
total=total,
submitted=submitted,
under_review=under_review,
approved=approved,
rejected=rejected,
current_status=status_filter,
search_query=search_query,
status_choices=MembershipApplication.STATUS_CHOICES
)
finally:
db.close()
@bp.route('/membership/<int:app_id>/print')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_membership_print(app_id):
"""Generate PDF of membership declaration."""
from flask import Response
import weasyprint
db = SessionLocal()
try:
application = db.query(MembershipApplication).get(app_id)
if not application:
flash('Nie znaleziono deklaracji.', 'error')
return redirect(url_for('admin.admin_membership'))
html_content = render_template(
'admin/membership_print.html',
app=application,
section_choices=MembershipApplication.SECTION_CHOICES,
)
pdf = weasyprint.HTML(string=html_content).write_pdf()
return Response(
pdf,
mimetype='application/pdf',
headers={
'Content-Disposition': f'inline; filename=deklaracja-{app_id}.pdf'
}
)
finally:
db.close()
@bp.route('/membership/<int:app_id>')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_membership_detail(app_id):
"""View membership application details."""
db = SessionLocal()
try:
application = db.query(MembershipApplication).get(app_id)
if not application:
flash('Nie znaleziono deklaracji.', 'error')
return redirect(url_for('admin.admin_membership'))
# Get categories for company creation
categories = db.query(Category).order_by(Category.name).all()
return render_template(
'admin/membership_detail.html',
application=application,
categories=categories,
section_choices=MembershipApplication.SECTION_CHOICES,
business_type_choices=MembershipApplication.BUSINESS_TYPE_CHOICES
)
finally:
db.close()
@bp.route('/membership/<int:app_id>/approve', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_membership_approve(app_id):
"""Approve membership application and create company."""
db = SessionLocal()
try:
application = db.query(MembershipApplication).get(app_id)
if not application:
return jsonify({'success': False, 'error': 'Nie znaleziono deklaracji'}), 404
if application.status not in ['submitted', 'under_review']:
return jsonify({
'success': False,
'error': f'Nie można zatwierdzić deklaracji w statusie: {application.status}'
}), 400
data = request.get_json() or {}
# Check if company with this NIP already exists
existing = db.query(Company).filter(Company.nip == application.nip).first()
if existing:
return jsonify({
'success': False,
'error': f'Firma z NIP {application.nip} już istnieje w katalogu'
}), 400
# Generate unique slug
slug = _generate_slug(application.company_name)
base_slug = slug
counter = 1
while db.query(Company).filter(Company.slug == slug).first():
slug = f"{base_slug}-{counter}"
counter += 1
# Format address components
street = f"{application.address_street} {application.address_number}".strip() if application.address_street else None
city = application.address_city.title() if application.address_city else None
postal = application.address_postal_code
# Build full address
address_full = None
if street or city:
parts = []
if street:
parts.append(street.title())
if postal and city:
parts.append(f"{postal} {city}")
elif city:
parts.append(city)
address_full = ", ".join(parts)
# Determine legal form from company name
legal_form = None
name_upper = application.company_name.upper()
if 'SP. Z O.O.' in name_upper or 'SPÓŁKA Z OGRANICZONĄ' in name_upper:
legal_form = 'SPÓŁKA Z OGRANICZONĄ ODPOWIEDZIALNOŚCIĄ'
elif 'S.A.' in name_upper or 'SPÓŁKA AKCYJNA' in name_upper:
legal_form = 'SPÓŁKA AKCYJNA'
elif 'SP.K.' in name_upper or 'SPÓŁKA KOMANDYTOWA' in name_upper:
legal_form = 'SPÓŁKA KOMANDYTOWA'
elif 'SP.J.' in name_upper or 'SPÓŁKA JAWNA' in name_upper:
legal_form = 'SPÓŁKA JAWNA'
# Create company
company = Company(
name=application.company_name,
slug=slug,
nip=application.nip.replace('-', '') if application.nip else None,
regon=application.regon,
krs=application.krs_number,
legal_form=legal_form,
website=application.website,
email=application.email,
phone=application.phone,
address_postal=postal,
address_city=city,
address_street=street.title() if street else None,
address_full=address_full,
description_short=application.description[:500] if application.description else None,
description_full=application.description,
business_start_date=application.founded_date,
year_established=application.founded_date.year if application.founded_date else None,
employees_count=application.employee_count,
category_id=data.get('category_id'),
status='active',
member_since=date.today(),
data_quality='enhanced'
)
db.add(company)
db.flush() # Get company ID
# Update application
application.status = 'approved'
application.reviewed_at = datetime.now()
application.reviewed_by_id = current_user.id
application.review_comment = data.get('comment', '')
application.company_id = company.id
application.member_number = _generate_member_number(db)
# Assign user to company
user = db.query(User).get(application.user_id)
if user:
user.company_id = company.id
user.is_norda_member = True
# Create user-company association (multi-company support)
is_first_company = not db.query(UserCompany).filter_by(user_id=user.id).first()
user_company = UserCompany(
user_id=user.id,
company_id=company.id,
role=user.company_role or 'MANAGER',
is_primary=is_first_company,
)
db.add(user_company)
db.commit()
logger.info(
f"Membership application {app_id} approved by {current_user.email}. "
f"Company {company.id} created. Member number: {application.member_number}"
)
# Enrich company with KRS data (if KRS number available)
krs_enriched = False
krs_message = ""
if company.krs:
logger.info(f"Fetching KRS data for company {company.id} (KRS: {company.krs})...")
krs_enriched = _enrich_company_from_krs(company, db)
if krs_enriched:
db.commit()
krs_message = " Pobrano dane z rejestru KRS."
logger.info(f"KRS data imported for company {company.id}")
else:
krs_message = " (Nie udało się pobrać danych z KRS)"
flash(f'Deklaracja zatwierdzona! Nr członkowski: {application.member_number}{krs_message}', 'success')
return jsonify({
'success': True,
'company_id': company.id,
'member_number': application.member_number,
'krs_enriched': krs_enriched
})
except Exception as e:
db.rollback()
logger.error(f"Error approving application {app_id}: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/membership/<int:app_id>/reject', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_membership_reject(app_id):
"""Reject membership application."""
db = SessionLocal()
try:
application = db.query(MembershipApplication).get(app_id)
if not application:
return jsonify({'success': False, 'error': 'Nie znaleziono deklaracji'}), 404
if application.status not in ['submitted', 'under_review']:
return jsonify({
'success': False,
'error': f'Nie można odrzucić deklaracji w statusie: {application.status}'
}), 400
data = request.get_json() or {}
comment = data.get('comment', '').strip()
if not comment:
return jsonify({
'success': False,
'error': 'Podaj powód odrzucenia'
}), 400
application.status = 'rejected'
application.reviewed_at = datetime.now()
application.reviewed_by_id = current_user.id
application.review_comment = comment
db.commit()
logger.info(f"Membership application {app_id} rejected by {current_user.email}: {comment}")
return jsonify({'success': True})
except Exception as e:
db.rollback()
logger.error(f"Error rejecting application {app_id}: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/membership/<int:app_id>/request-changes', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_membership_request_changes(app_id):
"""Request changes to membership application."""
db = SessionLocal()
try:
application = db.query(MembershipApplication).get(app_id)
if not application:
return jsonify({'success': False, 'error': 'Nie znaleziono deklaracji'}), 404
if application.status not in ['submitted', 'under_review']:
return jsonify({
'success': False,
'error': f'Nie można poprosić o poprawki dla deklaracji w statusie: {application.status}'
}), 400
data = request.get_json() or {}
comment = data.get('comment', '').strip()
if not comment:
return jsonify({
'success': False,
'error': 'Podaj co wymaga poprawienia'
}), 400
application.status = 'changes_requested'
application.reviewed_at = datetime.now()
application.reviewed_by_id = current_user.id
application.review_comment = comment
db.commit()
logger.info(f"Changes requested for application {app_id} by {current_user.email}: {comment}")
return jsonify({'success': True})
except Exception as e:
db.rollback()
logger.error(f"Error requesting changes for application {app_id}: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/membership/<int:app_id>/start-review', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_membership_start_review(app_id):
"""Mark application as under review."""
db = SessionLocal()
try:
application = db.query(MembershipApplication).get(app_id)
if not application:
return jsonify({'success': False, 'error': 'Nie znaleziono deklaracji'}), 404
if application.status != 'submitted':
return jsonify({
'success': False,
'error': 'Można rozpocząć rozpatrywanie tylko dla wysłanych deklaracji'
}), 400
application.status = 'under_review'
application.reviewed_by_id = current_user.id
db.commit()
return jsonify({'success': True})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/membership/<int:app_id>/propose-changes', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_membership_propose_changes(app_id):
"""
Propose changes from registry data for user approval.
Instead of directly updating, save proposed changes and notify user.
"""
db = SessionLocal()
try:
application = db.query(MembershipApplication).get(app_id)
if not application:
return jsonify({'success': False, 'error': 'Nie znaleziono deklaracji'}), 404
if application.status not in ['submitted', 'under_review']:
return jsonify({
'success': False,
'error': 'Można proponować zmiany tylko dla deklaracji oczekujących na rozpatrzenie'
}), 400
data = request.get_json() or {}
registry_data = data.get('registry_data', {})
comment = data.get('comment', '').strip()
# Build proposed changes with old and new values
proposed_changes = {}
field_mappings = {
'name': ('company_name', application.company_name),
'address_postal_code': ('address_postal_code', application.address_postal_code),
'address_city': ('address_city', application.address_city),
'address_street': ('address_street', application.address_street),
'address_number': ('address_number', application.address_number),
'regon': ('regon', application.regon),
'krs': ('krs_number', application.krs_number),
'founded_date': ('founded_date', str(application.founded_date) if application.founded_date else None),
}
for registry_key, (app_field, old_value) in field_mappings.items():
new_value = registry_data.get(registry_key)
if new_value and str(new_value) != str(old_value or ''):
proposed_changes[app_field] = {
'old': old_value,
'new': new_value,
'label': _get_field_label(app_field)
}
if not proposed_changes:
return jsonify({
'success': False,
'error': 'Brak zmian do zaproponowania - dane są identyczne'
}), 400
# Save proposed changes
application.proposed_changes = proposed_changes
application.proposed_changes_at = datetime.now()
application.proposed_changes_by_id = current_user.id
application.proposed_changes_comment = comment
application.status = 'pending_user_approval'
# Store registry data for reference
application.registry_data = registry_data
application.registry_source = registry_data.get('source', 'KRS')
# Add to workflow history
history = application.workflow_history or []
changes_summary = [f"{v['label']}: {v['old'] or '-'}{v['new']}" for v in proposed_changes.values()]
history.append({
'event': 'admin_proposed_changes',
'timestamp': datetime.now().isoformat(),
'user_id': current_user.id,
'user_name': current_user.name or current_user.email,
'details': {
'source': registry_data.get('source', 'KRS'),
'changes': changes_summary,
'comment': comment
}
})
application.workflow_history = list(history) # Create new list for SQLAlchemy detection
flag_modified(application, 'workflow_history')
# Create notification for user
notification = UserNotification(
user_id=application.user_id,
title='Propozycja zmian w deklaracji',
message=f'Administrator zaproponował aktualizację danych firmy "{application.company_name}" na podstawie rejestru {registry_data.get("source", "KRS")}. Przejrzyj i zaakceptuj lub odrzuć zmiany.',
notification_type='alert',
related_type='membership_application',
related_id=app_id,
action_url=f'/membership/review-changes/{app_id}'
)
db.add(notification)
db.commit()
logger.info(
f"Membership application {app_id}: changes proposed by {current_user.email}. "
f"Proposed fields: {list(proposed_changes.keys())}"
)
return jsonify({
'success': True,
'proposed_changes': proposed_changes,
'message': 'Zmiany zostały zaproponowane. Użytkownik otrzyma powiadomienie.'
})
except Exception as e:
db.rollback()
logger.error(f"Error proposing changes for application {app_id}: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
def _get_field_label(field_name):
"""Get human-readable label for field name."""
labels = {
'company_name': 'Nazwa firmy',
'address_postal_code': 'Kod pocztowy',
'address_city': 'Miejscowość',
'address_street': 'Ulica',
'address_number': 'Numer budynku/lokalu',
'regon': 'REGON',
'krs_number': 'Numer KRS',
'founded_date': 'Data założenia',
}
return labels.get(field_name, field_name)
@bp.route('/membership/<int:app_id>/update-from-registry', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_membership_update_from_registry(app_id):
"""
[DEPRECATED - use propose-changes instead]
Direct update is now replaced by propose-changes workflow.
This endpoint is kept for backward compatibility but redirects to propose-changes.
"""
# Redirect to new workflow
data = request.get_json() or {}
return admin_membership_propose_changes.__wrapped__(app_id)
# ============================================================
# COMPANY DATA REQUESTS
# ============================================================
@bp.route('/company-requests')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_requests():
"""Admin panel for company data requests."""
db = SessionLocal()
try:
status_filter = request.args.get('status', 'pending')
query = db.query(CompanyDataRequest)
if status_filter and status_filter != 'all':
query = query.filter(CompanyDataRequest.status == status_filter)
requests_list = query.order_by(CompanyDataRequest.created_at.desc()).all()
# Statistics
pending = db.query(CompanyDataRequest).filter(
CompanyDataRequest.status == 'pending'
).count()
approved = db.query(CompanyDataRequest).filter(
CompanyDataRequest.status == 'approved'
).count()
rejected = db.query(CompanyDataRequest).filter(
CompanyDataRequest.status == 'rejected'
).count()
return render_template(
'admin/company_requests.html',
requests=requests_list,
pending=pending,
approved=approved,
rejected=rejected,
current_status=status_filter
)
finally:
db.close()
@bp.route('/company-requests/<int:req_id>/approve', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_request_approve(req_id):
"""Approve company data request and update company."""
db = SessionLocal()
try:
data_request = db.query(CompanyDataRequest).get(req_id)
if not data_request:
return jsonify({'success': False, 'error': 'Nie znaleziono zgłoszenia'}), 404
if data_request.status != 'pending':
return jsonify({
'success': False,
'error': f'Zgłoszenie już rozpatrzone: {data_request.status}'
}), 400
company = db.query(Company).get(data_request.company_id)
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
fetched = data_request.fetched_data or {}
applied_fields = []
# Update company with fetched data
if fetched.get('name') and not company.name:
company.name = fetched['name']
applied_fields.append('name')
if data_request.nip and not company.nip:
company.nip = data_request.nip
applied_fields.append('nip')
if fetched.get('regon') and not company.regon:
company.regon = fetched['regon']
applied_fields.append('regon')
if fetched.get('krs') and not company.krs:
company.krs = fetched['krs']
applied_fields.append('krs')
if fetched.get('address_postal_code') and not company.address_postal:
company.address_postal = fetched['address_postal_code']
applied_fields.append('address_postal')
if fetched.get('address_city') and not company.address_city:
company.address_city = fetched['address_city']
applied_fields.append('address_city')
if fetched.get('address_street') and not company.address_street:
street = fetched.get('address_street', '')
number = fetched.get('address_number', '')
company.address_street = f"{street} {number}".strip() if street else None
applied_fields.append('address_street')
if fetched.get('website') and not company.website:
company.website = fetched['website']
applied_fields.append('website')
if fetched.get('email') and not company.email:
company.email = fetched['email']
applied_fields.append('email')
# Update request
data_request.status = 'approved'
data_request.reviewed_at = datetime.now()
data_request.reviewed_by_id = current_user.id
data_request.applied_fields = applied_fields
db.commit()
logger.info(
f"Company data request {req_id} approved by {current_user.email}. "
f"Updated fields: {applied_fields}"
)
return jsonify({
'success': True,
'applied_fields': applied_fields
})
except Exception as e:
db.rollback()
logger.error(f"Error approving data request {req_id}: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/company-requests/<int:req_id>/reject', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_request_reject(req_id):
"""Reject company data request."""
db = SessionLocal()
try:
data_request = db.query(CompanyDataRequest).get(req_id)
if not data_request:
return jsonify({'success': False, 'error': 'Nie znaleziono zgłoszenia'}), 404
if data_request.status != 'pending':
return jsonify({
'success': False,
'error': f'Zgłoszenie już rozpatrzone: {data_request.status}'
}), 400
data = request.get_json() or {}
comment = data.get('comment', '').strip()
data_request.status = 'rejected'
data_request.reviewed_at = datetime.now()
data_request.reviewed_by_id = current_user.id
data_request.review_comment = comment
db.commit()
logger.info(f"Company data request {req_id} rejected by {current_user.email}")
return jsonify({'success': True})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()