""" Company Edit Routes =================== Routes for editing company profiles by authorized users. """ from flask import render_template, request, redirect, url_for, flash, jsonify from flask_login import login_required, current_user from blueprints.public import bp from sqlalchemy import or_ from database import SessionLocal, Company, CompanyContact, CompanySocialMedia, CompanyWebsite, Category from utils.helpers import sanitize_input, sanitize_html, validate_email, ensure_url from utils.data_quality import update_company_data_quality from datetime import datetime import logging logger = logging.getLogger(__name__) EMPLOYEE_COUNT_WHITELIST = ['1-5', '6-10', '11-25', '26-50', '51-100', '101-250', '250+', ''] VALID_SOCIAL_PLATFORMS = ['facebook', 'linkedin', 'instagram', 'youtube', 'twitter', 'tiktok'] VALID_WEBSITE_TYPES = ['website', 'store', 'booking', 'blog', 'portfolio', 'other'] EDITABLE_SOURCES = [None, 'manual_edit', 'manual'] VALID_SECTION_KEYS = [ 'about', 'services', 'board', 'owner_info', 'registry', 'legal', 'contact', 'social_media', 'recommendations', 'website', 'seo_audit', 'gbp_audit', 'social_audit', 'it_audit', 'news', # Registry sub-sections (CEIDG) 'registry.ceidg_status', 'registry.ceidg_owner', 'registry.ceidg_start_date', 'registry.ceidg_pkd', 'registry.ceidg_contacts', 'registry.ceidg_succession', 'registry.ceidg_locations', # Registry sub-sections (KRS) 'registry.krs_legal_form', 'registry.krs_capital', 'registry.krs_identity', 'registry.krs_contacts', 'registry.krs_agreement', 'registry.krs_representation', 'registry.krs_board', 'registry.krs_partners', 'registry.krs_activities', 'registry.krs_financials', 'registry.krs_address', 'registry.krs_register', ] @bp.route('/firma/edytuj') @bp.route('/firma/edytuj/') @login_required def company_edit(company_id=None): """Display the company profile edit form.""" target_company_id = company_id or current_user.company_id if not target_company_id or not current_user.can_manage_company(target_company_id): flash('Edycja profilu firmy jest dostępna tylko dla kadry zarządzającej. ' 'Skontaktuj się z osobą zarządzającą Twoją firmą.', 'warning') return redirect(url_for('public.dashboard')) db = SessionLocal() try: company = db.query(Company).get(target_company_id) if not company: flash('Nie znaleziono firmy.', 'error') return redirect(url_for('public.dashboard')) categories = db.query(Category).order_by(Category.name).all() contacts = db.query(CompanyContact).filter_by( company_id=company.id ).order_by( CompanyContact.contact_type, CompanyContact.is_primary.desc() ).all() social_media = db.query(CompanySocialMedia).filter_by( company_id=company.id ).all() company_websites = db.query(CompanyWebsite).filter_by( company_id=company.id ).order_by(CompanyWebsite.is_primary.desc()).all() permissions = { 'description': current_user.can_edit_company_field('description', company_id=company.id), 'services': current_user.can_edit_company_field('services', company_id=company.id), 'contacts': current_user.can_edit_company_field('contacts', company_id=company.id), 'social': current_user.can_edit_company_field('social', company_id=company.id), } editable_contacts = [c for c in contacts if c.source in EDITABLE_SOURCES] registry_ceidg_subs = [ ('registry.ceidg_status', 'Status działalności'), ('registry.ceidg_owner', 'Właściciel JDG'), ('registry.ceidg_start_date', 'Data rozpoczęcia'), ('registry.ceidg_pkd', 'Kody PKD'), ('registry.ceidg_contacts', 'Dane kontaktowe z CEIDG'), ('registry.ceidg_succession', 'Zarządca sukcesyjny'), ('registry.ceidg_locations', 'Dodatkowe miejsca działalności'), ] registry_krs_subs = [ ('registry.krs_legal_form', 'Forma prawna'), ('registry.krs_capital', 'Kapitał zakładowy'), ('registry.krs_identity', 'Identyfikatory (KRS, NIP, REGON, data)'), ('registry.krs_contacts', 'Dane kontaktowe'), ('registry.krs_agreement', 'Umowa spółki'), ('registry.krs_representation', 'Sposób reprezentacji'), ('registry.krs_board', 'Zarząd'), ('registry.krs_partners', 'Wspólnicy'), ('registry.krs_activities', 'Przedmiot działalności (PKD)'), ('registry.krs_financials', 'Sprawozdania finansowe'), ('registry.krs_address', 'Adres siedziby z KRS'), ('registry.krs_register', 'Dane rejestrowe i odpis KRS'), ] # Choose sub-sections based on company type if company.ceidg_id: registry_subs = registry_ceidg_subs elif company.krs and company.data_source == 'KRS API': registry_subs = registry_krs_subs else: registry_subs = [] section_definitions = [ ('about', 'O firmie', 'Opis firmy, historia, wartości', []), ('services', 'Usługi i kompetencje', 'Oferowane usługi, technologie, obszar działania', []), ('board', 'Zarząd i Wspólnicy', 'Osoby w zarządzie, wspólnicy, prokurenci', []), ('owner_info', 'Właściciel (JDG)', 'Dane właściciela jednoosobowej działalności', []), ('registry', 'Dane z rejestrów urzędowych', 'Dane z CEIDG/KRS, PKD, adresy rejestrowe', registry_subs), ('legal', 'Informacje prawne i biznesowe', 'NIP, REGON, KRS, forma prawna', []), ('contact', 'Dane kontaktowe', 'Telefony, e-mail, adres, dodatkowe kontakty', []), ('social_media', 'Social Media', 'Profile w mediach społecznościowych', []), ('recommendations', 'Rekomendacje', 'Rekomendacje od innych członków Izby', []), ('website', 'Strona WWW', 'Link do strony i podgląd', []), ('seo_audit', 'Analiza SEO', 'Wyniki audytu SEO strony internetowej', []), ('gbp_audit', 'Audyt Google Business Profile', 'Wyniki audytu wizytówki Google', []), ('social_audit', 'Audyt Social Media', 'Wyniki audytu mediów społecznościowych', []), ('it_audit', 'Audyt IT', 'Wyniki audytu infrastruktury IT', []), ('news', 'Aktualności i wydarzenia', 'Najnowsze wydarzenia firmy', []), ] return render_template( 'company_edit.html', company=company, categories=categories, contacts=editable_contacts, all_contacts=contacts, social_media=social_media, company_websites=company_websites, permissions=permissions, section_definitions=section_definitions, ) finally: db.close() @bp.route('/firma/edytuj', methods=['POST']) @bp.route('/firma/edytuj/', methods=['POST']) @login_required def company_edit_save(company_id=None): """Save company profile edits.""" target_company_id = company_id or current_user.company_id if not target_company_id or not current_user.can_manage_company(target_company_id): flash('Edycja profilu firmy jest dostępna tylko dla kadry zarządzającej. ' 'Skontaktuj się z osobą zarządzającą Twoją firmą.', 'warning') return redirect(url_for('public.dashboard')) db = SessionLocal() try: company = db.query(Company).get(target_company_id) if not company: flash('Nie znaleziono firmy.', 'error') return redirect(url_for('public.dashboard')) active_tab = request.form.get('active_tab', 'description') if active_tab == 'description' and current_user.can_edit_company_field('description', company_id=company.id): _save_description(db, company) elif active_tab == 'services' and current_user.can_edit_company_field('services', company_id=company.id): _save_services(company) elif active_tab == 'contacts' and current_user.can_edit_company_field('contacts', company_id=company.id): _save_contacts(db, company) elif active_tab == 'social' and current_user.can_edit_company_field('social', company_id=company.id): _save_social_media(db, company) db.commit() update_company_data_quality(company, db) db.commit() flash('Dane firmy zostały zaktualizowane.', 'success') return redirect(url_for('public.company_detail', company_id=company.id)) except Exception as e: db.rollback() logger.error(f"Error saving company edit for company_id={target_company_id}: {e}") flash('Wystąpił błąd podczas zapisywania zmian. Spróbuj ponownie.', 'error') return redirect(url_for('public.company_edit', company_id=company_id)) finally: db.close() def _save_description(db, company): """Save description tab fields.""" company.description_short = sanitize_input( request.form.get('description_short', ''), max_length=500 ) or None company.description_full = sanitize_html( request.form.get('description_full', '') ) or None company.founding_history = sanitize_html( request.form.get('founding_history', '') ) or None company.core_values = sanitize_html( request.form.get('core_values', '') ) or None # Year established — only save if not from registry if not company.krs_registration_date and not company.business_start_date: year_raw = request.form.get('year_established', '').strip() if year_raw: try: year = int(year_raw) if 1800 <= year <= 2030: company.year_established = year except ValueError: pass else: company.year_established = None category_id_raw = request.form.get('category_id', '') if category_id_raw: try: category_id = int(category_id_raw) category = db.query(Category).get(category_id) if category: company.category_id = category_id except (ValueError, TypeError): pass else: company.category_id = None # Logo upload — always convert to .webp (templates expect {slug}.webp) logo_file = request.files.get('logo_file') if logo_file and logo_file.filename: import os from PIL import Image allowed = {'png', 'jpg', 'jpeg', 'svg', 'webp'} ext = logo_file.filename.rsplit('.', 1)[-1].lower() if '.' in logo_file.filename else '' if ext in allowed: logo_dir = os.path.join('static', 'img', 'companies') os.makedirs(logo_dir, exist_ok=True) # Remove old logo files for this company (different extensions) for old_ext in allowed: old_path = os.path.join(logo_dir, f"{company.slug}.{old_ext}") if os.path.exists(old_path): os.remove(old_path) try: if ext == 'svg': filepath = os.path.join(logo_dir, f"{company.slug}.svg") logo_file.save(filepath) flash('Logo zostało zapisane (format SVG).', 'success') else: filepath = os.path.join(logo_dir, f"{company.slug}.webp") img = Image.open(logo_file) img.save(filepath, 'WEBP', quality=85) flash(f'Logo zostało przekonwertowane z {ext.upper()} do WebP i zapisane.', 'success') logger.info(f"Logo uploaded for company {company.id}: {filepath}") except Exception as e: logger.error(f"Logo conversion failed for company {company.id}: {e}") flash('Nie udało się przetworzyć pliku logo. Spróbuj inny plik (PNG, JPG lub WebP).', 'error') else: flash(f'Nieobsługiwany format pliku (.{ext}). Dozwolone: PNG, JPG, SVG, WebP.', 'warning') def _save_services(company): """Save services tab fields.""" company.services_offered = sanitize_html( request.form.get('services_offered', '') ) or None company.technologies_used = sanitize_html( request.form.get('technologies_used', '') ) or None company.operational_area = sanitize_input( request.form.get('operational_area', ''), max_length=500 ) or None company.languages_offered = sanitize_input( request.form.get('languages_offered', ''), max_length=200 ) or None employee_count = request.form.get('employee_count_range', '') if employee_count in EMPLOYEE_COUNT_WHITELIST: company.employee_count_range = employee_count or None def _save_contacts(db, company): """Save contacts tab fields.""" _save_websites(db, company) email_raw = sanitize_input(request.form.get('email', ''), max_length=255) if email_raw: if validate_email(email_raw): company.email = email_raw else: company.email = None phone_raw = sanitize_input(request.form.get('phone', ''), max_length=50) company.phone = phone_raw or None company.address_street = sanitize_input( request.form.get('address_street', ''), max_length=255 ) or None company.address_city = sanitize_input( request.form.get('address_city', ''), max_length=100 ) or None company.address_postal = sanitize_input( request.form.get('address_postal', ''), max_length=10 ) or None # Delete existing editable contacts (source is NULL, 'manual_edit', or 'manual') db.query(CompanyContact).filter( CompanyContact.company_id == company.id, or_( CompanyContact.source.in_(['manual_edit', 'manual']), CompanyContact.source.is_(None) ) ).delete(synchronize_session='fetch') # Add new contacts from form contact_types = request.form.getlist('contact_types[]') contact_values = request.form.getlist('contact_values[]') contact_purposes = request.form.getlist('contact_purposes[]') for i, value in enumerate(contact_values): value = sanitize_input(value, max_length=255) if not value: continue contact_type = sanitize_input(contact_types[i], max_length=20) if i < len(contact_types) else 'phone' purpose = sanitize_input(contact_purposes[i], max_length=100) if i < len(contact_purposes) else '' db.add(CompanyContact( company_id=company.id, contact_type=contact_type, value=value, purpose=purpose or None, source='manual_edit', )) def _save_social_media(db, company): """Save social media tab fields.""" # Delete existing editable social media (source is NULL, 'manual_edit', or 'manual') db.query(CompanySocialMedia).filter( CompanySocialMedia.company_id == company.id, or_( CompanySocialMedia.source.in_(['manual_edit', 'manual']), CompanySocialMedia.source.is_(None) ) ).delete(synchronize_session='fetch') # Add new social media from form social_platforms = request.form.getlist('social_platforms[]') social_urls = request.form.getlist('social_urls[]') for i, url in enumerate(social_urls): url = sanitize_input(url, max_length=500) if not url: continue platform = social_platforms[i] if i < len(social_platforms) else '' if platform not in VALID_SOCIAL_PLATFORMS: continue db.add(CompanySocialMedia( company_id=company.id, platform=platform, url=ensure_url(url), source='manual_edit', verified_at=datetime.now(), )) def _save_websites(db, company): """Save multiple website URLs from the contacts tab.""" # Delete all existing websites — user edits the full list db.query(CompanyWebsite).filter( CompanyWebsite.company_id == company.id ).delete(synchronize_session='fetch') website_urls = request.form.getlist('website_urls[]') website_labels = request.form.getlist('website_labels[]') website_types = request.form.getlist('website_types[]') primary_idx_raw = request.form.get('website_primary', '0') try: primary_idx = int(primary_idx_raw) except (ValueError, TypeError): primary_idx = 0 added = 0 primary_url = None for i, url_raw in enumerate(website_urls): if added >= 5: break url_raw = sanitize_input(url_raw, max_length=500) if not url_raw: continue url = ensure_url(url_raw) label = sanitize_input(website_labels[i], max_length=100) if i < len(website_labels) else '' wtype = sanitize_input(website_types[i], max_length=20) if i < len(website_types) else 'website' if wtype not in VALID_WEBSITE_TYPES: wtype = 'website' is_primary = (i == primary_idx) if is_primary: primary_url = url db.add(CompanyWebsite( company_id=company.id, url=url, label=label or None, website_type=wtype, is_primary=is_primary, source='manual_edit', )) added += 1 # Sync company.website with primary for backward compatibility if primary_url: company.website = primary_url elif added > 0: # No explicit primary — first one becomes primary company.website = ensure_url(sanitize_input(website_urls[0], max_length=500)) else: company.website = None @bp.route('/firma/edytuj//visibility', methods=['POST']) @login_required def company_edit_visibility(company_id): """Save section visibility preferences via AJAX.""" if not current_user.can_manage_company(company_id): return jsonify({'error': 'Brak uprawnień'}), 403 db = SessionLocal() try: company = db.query(Company).get(company_id) if not company: return jsonify({'error': 'Nie znaleziono firmy'}), 404 data = request.get_json() if not data or 'hidden_sections' not in data: return jsonify({'error': 'Brak danych'}), 400 hidden = data['hidden_sections'] if not isinstance(hidden, list): return jsonify({'error': 'Nieprawidłowy format'}), 400 # Validate keys validated = [k for k in hidden if k in VALID_SECTION_KEYS] company.hidden_sections = validated db.commit() return jsonify({'success': True, 'hidden_sections': validated}) except Exception as e: db.rollback() logger.error(f"Error saving visibility for company_id={company_id}: {e}") return jsonify({'error': 'Wystąpił błąd'}), 500 finally: db.close()