nordabiz/blueprints/public/routes_company_edit.py
Maciej Pienczyn bd179dec97
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix(company): convert uploaded logos to .webp so templates display them
Templates expect logo at {slug}.webp with SVG fallback. When users uploaded
JPG/PNG files, the logo was saved with original extension and never displayed.
Now all raster uploads are converted to .webp via Pillow; SVG stays as-is.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 14:28:11 +02:00

457 lines
18 KiB
Python

"""
Company Edit Routes
===================
Routes for editing company profiles by authorized users.
"""
from flask import render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from blueprints.public import bp
from sqlalchemy import or_
from database import SessionLocal, Company, CompanyContact, CompanySocialMedia, CompanyWebsite, Category
from utils.helpers import sanitize_input, sanitize_html, validate_email, ensure_url
from utils.data_quality import update_company_data_quality
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
EMPLOYEE_COUNT_WHITELIST = ['1-5', '6-10', '11-25', '26-50', '51-100', '101-250', '250+', '']
VALID_SOCIAL_PLATFORMS = ['facebook', 'linkedin', 'instagram', 'youtube', 'twitter', 'tiktok']
VALID_WEBSITE_TYPES = ['website', 'store', 'booking', 'blog', 'portfolio', 'other']
EDITABLE_SOURCES = [None, 'manual_edit', 'manual']
VALID_SECTION_KEYS = [
'about', 'services', 'board', 'owner_info', 'registry', 'legal',
'contact', 'social_media', 'recommendations', 'website', 'seo_audit',
'gbp_audit', 'social_audit', 'it_audit', 'news',
# Registry sub-sections (CEIDG)
'registry.ceidg_status', 'registry.ceidg_owner', 'registry.ceidg_start_date',
'registry.ceidg_pkd', 'registry.ceidg_contacts', 'registry.ceidg_succession',
'registry.ceidg_locations',
# Registry sub-sections (KRS)
'registry.krs_legal_form', 'registry.krs_capital', 'registry.krs_identity',
'registry.krs_contacts', 'registry.krs_agreement', 'registry.krs_representation',
'registry.krs_board', 'registry.krs_partners', 'registry.krs_activities',
'registry.krs_financials', 'registry.krs_address', 'registry.krs_register',
]
@bp.route('/firma/edytuj')
@bp.route('/firma/edytuj/<int:company_id>')
@login_required
def company_edit(company_id=None):
"""Display the company profile edit form."""
target_company_id = company_id or current_user.company_id
if not target_company_id or not current_user.can_manage_company(target_company_id):
flash('Edycja profilu firmy jest dostępna tylko dla kadry zarządzającej. '
'Skontaktuj się z osobą zarządzającą Twoją firmą.', 'warning')
return redirect(url_for('public.dashboard'))
db = SessionLocal()
try:
company = db.query(Company).get(target_company_id)
if not company:
flash('Nie znaleziono firmy.', 'error')
return redirect(url_for('public.dashboard'))
categories = db.query(Category).order_by(Category.name).all()
contacts = db.query(CompanyContact).filter_by(
company_id=company.id
).order_by(
CompanyContact.contact_type,
CompanyContact.is_primary.desc()
).all()
social_media = db.query(CompanySocialMedia).filter_by(
company_id=company.id
).all()
company_websites = db.query(CompanyWebsite).filter_by(
company_id=company.id
).order_by(CompanyWebsite.is_primary.desc()).all()
permissions = {
'description': current_user.can_edit_company_field('description', company_id=company.id),
'services': current_user.can_edit_company_field('services', company_id=company.id),
'contacts': current_user.can_edit_company_field('contacts', company_id=company.id),
'social': current_user.can_edit_company_field('social', company_id=company.id),
}
editable_contacts = [c for c in contacts if c.source in EDITABLE_SOURCES]
registry_ceidg_subs = [
('registry.ceidg_status', 'Status działalności'),
('registry.ceidg_owner', 'Właściciel JDG'),
('registry.ceidg_start_date', 'Data rozpoczęcia'),
('registry.ceidg_pkd', 'Kody PKD'),
('registry.ceidg_contacts', 'Dane kontaktowe z CEIDG'),
('registry.ceidg_succession', 'Zarządca sukcesyjny'),
('registry.ceidg_locations', 'Dodatkowe miejsca działalności'),
]
registry_krs_subs = [
('registry.krs_legal_form', 'Forma prawna'),
('registry.krs_capital', 'Kapitał zakładowy'),
('registry.krs_identity', 'Identyfikatory (KRS, NIP, REGON, data)'),
('registry.krs_contacts', 'Dane kontaktowe'),
('registry.krs_agreement', 'Umowa spółki'),
('registry.krs_representation', 'Sposób reprezentacji'),
('registry.krs_board', 'Zarząd'),
('registry.krs_partners', 'Wspólnicy'),
('registry.krs_activities', 'Przedmiot działalności (PKD)'),
('registry.krs_financials', 'Sprawozdania finansowe'),
('registry.krs_address', 'Adres siedziby z KRS'),
('registry.krs_register', 'Dane rejestrowe i odpis KRS'),
]
# Choose sub-sections based on company type
if company.ceidg_id:
registry_subs = registry_ceidg_subs
elif company.krs and company.data_source == 'KRS API':
registry_subs = registry_krs_subs
else:
registry_subs = []
section_definitions = [
('about', 'O firmie', 'Opis firmy, historia, wartości', []),
('services', 'Usługi i kompetencje', 'Oferowane usługi, technologie, obszar działania', []),
('board', 'Zarząd i Wspólnicy', 'Osoby w zarządzie, wspólnicy, prokurenci', []),
('owner_info', 'Właściciel (JDG)', 'Dane właściciela jednoosobowej działalności', []),
('registry', 'Dane z rejestrów urzędowych', 'Dane z CEIDG/KRS, PKD, adresy rejestrowe', registry_subs),
('legal', 'Informacje prawne i biznesowe', 'NIP, REGON, KRS, forma prawna', []),
('contact', 'Dane kontaktowe', 'Telefony, e-mail, adres, dodatkowe kontakty', []),
('social_media', 'Social Media', 'Profile w mediach społecznościowych', []),
('recommendations', 'Rekomendacje', 'Rekomendacje od innych członków Izby', []),
('website', 'Strona WWW', 'Link do strony i podgląd', []),
('seo_audit', 'Analiza SEO', 'Wyniki audytu SEO strony internetowej', []),
('gbp_audit', 'Audyt Google Business Profile', 'Wyniki audytu wizytówki Google', []),
('social_audit', 'Audyt Social Media', 'Wyniki audytu mediów społecznościowych', []),
('it_audit', 'Audyt IT', 'Wyniki audytu infrastruktury IT', []),
('news', 'Aktualności i wydarzenia', 'Najnowsze wydarzenia firmy', []),
]
return render_template(
'company_edit.html',
company=company,
categories=categories,
contacts=editable_contacts,
all_contacts=contacts,
social_media=social_media,
company_websites=company_websites,
permissions=permissions,
section_definitions=section_definitions,
)
finally:
db.close()
@bp.route('/firma/edytuj', methods=['POST'])
@bp.route('/firma/edytuj/<int:company_id>', methods=['POST'])
@login_required
def company_edit_save(company_id=None):
"""Save company profile edits."""
target_company_id = company_id or current_user.company_id
if not target_company_id or not current_user.can_manage_company(target_company_id):
flash('Edycja profilu firmy jest dostępna tylko dla kadry zarządzającej. '
'Skontaktuj się z osobą zarządzającą Twoją firmą.', 'warning')
return redirect(url_for('public.dashboard'))
db = SessionLocal()
try:
company = db.query(Company).get(target_company_id)
if not company:
flash('Nie znaleziono firmy.', 'error')
return redirect(url_for('public.dashboard'))
active_tab = request.form.get('active_tab', 'description')
if active_tab == 'description' and current_user.can_edit_company_field('description', company_id=company.id):
_save_description(db, company)
elif active_tab == 'services' and current_user.can_edit_company_field('services', company_id=company.id):
_save_services(company)
elif active_tab == 'contacts' and current_user.can_edit_company_field('contacts', company_id=company.id):
_save_contacts(db, company)
elif active_tab == 'social' and current_user.can_edit_company_field('social', company_id=company.id):
_save_social_media(db, company)
db.commit()
update_company_data_quality(company, db)
db.commit()
flash('Dane firmy zostały zaktualizowane.', 'success')
return redirect(url_for('public.company_detail', company_id=company.id))
except Exception as e:
db.rollback()
logger.error(f"Error saving company edit for company_id={target_company_id}: {e}")
flash('Wystąpił błąd podczas zapisywania zmian. Spróbuj ponownie.', 'error')
return redirect(url_for('public.company_edit', company_id=company_id))
finally:
db.close()
def _save_description(db, company):
"""Save description tab fields."""
company.description_short = sanitize_input(
request.form.get('description_short', ''), max_length=500
) or None
company.description_full = sanitize_html(
request.form.get('description_full', '')
) or None
company.founding_history = sanitize_html(
request.form.get('founding_history', '')
) or None
company.core_values = sanitize_html(
request.form.get('core_values', '')
) or None
# Year established — only save if not from registry
if not company.krs_registration_date and not company.business_start_date:
year_raw = request.form.get('year_established', '').strip()
if year_raw:
try:
year = int(year_raw)
if 1800 <= year <= 2030:
company.year_established = year
except ValueError:
pass
else:
company.year_established = None
category_id_raw = request.form.get('category_id', '')
if category_id_raw:
try:
category_id = int(category_id_raw)
category = db.query(Category).get(category_id)
if category:
company.category_id = category_id
except (ValueError, TypeError):
pass
else:
company.category_id = None
# Logo upload — always convert to .webp (templates expect {slug}.webp)
logo_file = request.files.get('logo_file')
if logo_file and logo_file.filename:
import os
from PIL import Image
import io
allowed = {'png', 'jpg', 'jpeg', 'svg', 'webp'}
ext = logo_file.filename.rsplit('.', 1)[-1].lower() if '.' in logo_file.filename else ''
if ext in allowed:
logo_dir = os.path.join('static', 'img', 'companies')
os.makedirs(logo_dir, exist_ok=True)
# Remove old logo files for this company (different extensions)
for old_ext in allowed:
old_path = os.path.join(logo_dir, f"{company.slug}.{old_ext}")
if os.path.exists(old_path):
os.remove(old_path)
filepath = os.path.join(logo_dir, f"{company.slug}.webp")
if ext == 'svg':
# SVG stays as-is (can't convert to webp)
filepath = os.path.join(logo_dir, f"{company.slug}.svg")
logo_file.save(filepath)
else:
img = Image.open(logo_file)
img.save(filepath, 'WEBP', quality=85)
logger.info(f"Logo uploaded for company {company.id}: {filepath}")
def _save_services(company):
"""Save services tab fields."""
company.services_offered = sanitize_html(
request.form.get('services_offered', '')
) or None
company.technologies_used = sanitize_html(
request.form.get('technologies_used', '')
) or None
company.operational_area = sanitize_input(
request.form.get('operational_area', ''), max_length=500
) or None
company.languages_offered = sanitize_input(
request.form.get('languages_offered', ''), max_length=200
) or None
employee_count = request.form.get('employee_count_range', '')
if employee_count in EMPLOYEE_COUNT_WHITELIST:
company.employee_count_range = employee_count or None
def _save_contacts(db, company):
"""Save contacts tab fields."""
_save_websites(db, company)
email_raw = sanitize_input(request.form.get('email', ''), max_length=255)
if email_raw:
if validate_email(email_raw):
company.email = email_raw
else:
company.email = None
phone_raw = sanitize_input(request.form.get('phone', ''), max_length=50)
company.phone = phone_raw or None
company.address_street = sanitize_input(
request.form.get('address_street', ''), max_length=255
) or None
company.address_city = sanitize_input(
request.form.get('address_city', ''), max_length=100
) or None
company.address_postal = sanitize_input(
request.form.get('address_postal', ''), max_length=10
) or None
# Delete existing editable contacts (source is NULL, 'manual_edit', or 'manual')
db.query(CompanyContact).filter(
CompanyContact.company_id == company.id,
or_(
CompanyContact.source.in_(['manual_edit', 'manual']),
CompanyContact.source.is_(None)
)
).delete(synchronize_session='fetch')
# Add new contacts from form
contact_types = request.form.getlist('contact_types[]')
contact_values = request.form.getlist('contact_values[]')
contact_purposes = request.form.getlist('contact_purposes[]')
for i, value in enumerate(contact_values):
value = sanitize_input(value, max_length=255)
if not value:
continue
contact_type = sanitize_input(contact_types[i], max_length=20) if i < len(contact_types) else 'phone'
purpose = sanitize_input(contact_purposes[i], max_length=100) if i < len(contact_purposes) else ''
db.add(CompanyContact(
company_id=company.id,
contact_type=contact_type,
value=value,
purpose=purpose or None,
source='manual_edit',
))
def _save_social_media(db, company):
"""Save social media tab fields."""
# Delete existing editable social media (source is NULL, 'manual_edit', or 'manual')
db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company.id,
or_(
CompanySocialMedia.source.in_(['manual_edit', 'manual']),
CompanySocialMedia.source.is_(None)
)
).delete(synchronize_session='fetch')
# Add new social media from form
social_platforms = request.form.getlist('social_platforms[]')
social_urls = request.form.getlist('social_urls[]')
for i, url in enumerate(social_urls):
url = sanitize_input(url, max_length=500)
if not url:
continue
platform = social_platforms[i] if i < len(social_platforms) else ''
if platform not in VALID_SOCIAL_PLATFORMS:
continue
db.add(CompanySocialMedia(
company_id=company.id,
platform=platform,
url=ensure_url(url),
source='manual_edit',
verified_at=datetime.now(),
))
def _save_websites(db, company):
"""Save multiple website URLs from the contacts tab."""
# Delete all existing websites — user edits the full list
db.query(CompanyWebsite).filter(
CompanyWebsite.company_id == company.id
).delete(synchronize_session='fetch')
website_urls = request.form.getlist('website_urls[]')
website_labels = request.form.getlist('website_labels[]')
website_types = request.form.getlist('website_types[]')
primary_idx_raw = request.form.get('website_primary', '0')
try:
primary_idx = int(primary_idx_raw)
except (ValueError, TypeError):
primary_idx = 0
added = 0
primary_url = None
for i, url_raw in enumerate(website_urls):
if added >= 5:
break
url_raw = sanitize_input(url_raw, max_length=500)
if not url_raw:
continue
url = ensure_url(url_raw)
label = sanitize_input(website_labels[i], max_length=100) if i < len(website_labels) else ''
wtype = sanitize_input(website_types[i], max_length=20) if i < len(website_types) else 'website'
if wtype not in VALID_WEBSITE_TYPES:
wtype = 'website'
is_primary = (i == primary_idx)
if is_primary:
primary_url = url
db.add(CompanyWebsite(
company_id=company.id,
url=url,
label=label or None,
website_type=wtype,
is_primary=is_primary,
source='manual_edit',
))
added += 1
# Sync company.website with primary for backward compatibility
if primary_url:
company.website = primary_url
elif added > 0:
# No explicit primary — first one becomes primary
company.website = ensure_url(sanitize_input(website_urls[0], max_length=500))
else:
company.website = None
@bp.route('/firma/edytuj/<int:company_id>/visibility', methods=['POST'])
@login_required
def company_edit_visibility(company_id):
"""Save section visibility preferences via AJAX."""
if not current_user.can_manage_company(company_id):
return jsonify({'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
company = db.query(Company).get(company_id)
if not company:
return jsonify({'error': 'Nie znaleziono firmy'}), 404
data = request.get_json()
if not data or 'hidden_sections' not in data:
return jsonify({'error': 'Brak danych'}), 400
hidden = data['hidden_sections']
if not isinstance(hidden, list):
return jsonify({'error': 'Nieprawidłowy format'}), 400
# Validate keys
validated = [k for k in hidden if k in VALID_SECTION_KEYS]
company.hidden_sections = validated
db.commit()
return jsonify({'success': True, 'hidden_sections': validated})
except Exception as e:
db.rollback()
logger.error(f"Error saving visibility for company_id={company_id}: {e}")
return jsonify({'error': 'Wystąpił błąd'}), 500
finally:
db.close()