Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
1. Auto-strip legal form from name (CONSTELLATION SP. Z O.O. → Constellation) 2. Auto-suggest category from main PKD code (62.02.Z → IT) 3. Clean empty 'https://' from WWW field 4. Rename button to 'Wyszukaj w internecie' 5. Auto-advance to step 4 after logo confirmation 6. Larger logo preview in summary step Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1009 lines
34 KiB
Python
1009 lines
34 KiB
Python
"""
|
|
Admin Routes - Company Creation Wizard
|
|
=======================================
|
|
|
|
Multi-step wizard for adding new companies with automatic
|
|
data enrichment from KRS, CEIDG, SEO, GBP, and Social Media.
|
|
"""
|
|
|
|
import re
|
|
import os
|
|
import sys
|
|
import logging
|
|
import threading
|
|
from datetime import datetime, date as date_type
|
|
|
|
from flask import render_template, request, jsonify
|
|
from flask_login import login_required, current_user
|
|
from sqlalchemy.orm.attributes import flag_modified
|
|
|
|
from . import bp
|
|
from database import (
|
|
SessionLocal, Company, Category, CompanyPKD, CompanyPerson, Person,
|
|
CompanyWebsiteAnalysis, CompanySocialMedia, GBPAudit,
|
|
SystemRole
|
|
)
|
|
from utils.decorators import role_required
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _delete_draft_company(db, company_id):
|
|
"""Delete a wizard_draft company using raw SQL to trigger DB-level CASCADE."""
|
|
from sqlalchemy import text
|
|
db.execute(text("DELETE FROM companies WHERE id = :cid AND status = 'wizard_draft'"),
|
|
{'cid': company_id})
|
|
|
|
|
|
def _strip_legal_form(name):
|
|
"""Strip legal form suffix from company name to get business name."""
|
|
suffixes = [
|
|
'SPÓŁKA Z OGRANICZONĄ ODPOWIEDZIALNOŚCIĄ',
|
|
'SPÓŁKA AKCYJNA', 'SPÓŁKA KOMANDYTOWA',
|
|
'SPÓŁKA KOMANDYTOWO-AKCYJNA', 'SPÓŁKA JAWNA',
|
|
'SPÓŁKA PARTNERSKA', 'SP. Z O.O.', 'S.A.', 'SP.K.',
|
|
'SP.J.', 'SP.P.', 'S.K.A.',
|
|
]
|
|
result = name
|
|
for suffix in suffixes:
|
|
result = result.replace(suffix, '').strip()
|
|
result = result.strip(' ,-').title()
|
|
return result or name
|
|
|
|
|
|
# PKD prefix → category_id mapping
|
|
_PKD_CATEGORY_MAP = {
|
|
'62': 1, # IT i Technologie
|
|
'63': 1, # IT
|
|
'26': 1, # IT / electronics
|
|
'61': 6, # IT i Telekomunikacja
|
|
'41': 2, # Budownictwo
|
|
'42': 2, # Budownictwo inżynieryjne
|
|
'43': 2, # Budownictwo specjalistyczne
|
|
'71': 19, # Architektura i Projektowanie
|
|
'69': 3, # Usługi prawne
|
|
'73': 4, # Marketing i Reklama
|
|
'10': 5, # Produkcja spożywcza
|
|
'25': 5, # Produkcja metalowa
|
|
'22': 5, # Produkcja
|
|
'46': 18, # Handel hurtowy
|
|
'47': 27, # Handel detaliczny
|
|
'35': 9, # Energia
|
|
'55': 10, # Hotelarstwo
|
|
'56': 10, # Gastronomia/turystyka
|
|
'66': 31, # Usługi finansowe
|
|
'64': 31, # Finanse
|
|
'68': 15, # Nieruchomości
|
|
'58': 13, # Wydawnictwa / media
|
|
'45': 14, # Motoryzacja
|
|
'01': 16, # Rolnictwo
|
|
'80': 23, # Bezpieczeństwo
|
|
'70': 7, # Usługi biznesowe / doradztwo
|
|
'74': 7, # Usługi specjalistyczne
|
|
'78': 7, # Usługi HR
|
|
'82': 7, # Usługi biurowe
|
|
}
|
|
|
|
|
|
def _pkd_to_category_id(pkd_code):
|
|
"""Map PKD code to category ID. Returns None if no match."""
|
|
if not pkd_code:
|
|
return None
|
|
prefix = pkd_code[:2]
|
|
return _PKD_CATEGORY_MAP.get(prefix)
|
|
|
|
|
|
def _validate_nip(nip: str) -> bool:
|
|
"""Validate Polish NIP number (10 digits, checksum)."""
|
|
if not nip or not re.match(r'^\d{10}$', nip):
|
|
return False
|
|
weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
|
|
checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11
|
|
return checksum == int(nip[9])
|
|
|
|
|
|
def _generate_slug(name):
|
|
"""Generate URL-safe slug from company name."""
|
|
slug = re.sub(r'[^\w\s-]', '', name.lower())
|
|
slug = re.sub(r'[\s_]+', '-', slug)
|
|
slug = re.sub(r'-+', '-', slug).strip('-')
|
|
return slug
|
|
|
|
|
|
def _ensure_unique_slug(db, slug, exclude_id=None):
|
|
"""Ensure slug is unique, appending -2, -3 etc. if needed."""
|
|
base_slug = slug
|
|
counter = 2
|
|
while True:
|
|
q = db.query(Company).filter(Company.slug == slug)
|
|
if exclude_id:
|
|
q = q.filter(Company.id != exclude_id)
|
|
if not q.first():
|
|
return slug
|
|
slug = f"{base_slug}-{counter}"
|
|
counter += 1
|
|
|
|
|
|
# ============================================================
|
|
# WIZARD PAGE
|
|
# ============================================================
|
|
|
|
@bp.route('/companies/wizard')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def company_wizard():
|
|
"""Render the company creation wizard page."""
|
|
db = SessionLocal()
|
|
try:
|
|
categories = db.query(Category).order_by(Category.name).all()
|
|
categories_list = [{'id': c.id, 'name': c.name} for c in categories]
|
|
|
|
# Check for existing wizard drafts by this user
|
|
draft = db.query(Company).filter(
|
|
Company.status == 'wizard_draft',
|
|
Company.wizard_started_by == current_user.id
|
|
).first()
|
|
draft_info = None
|
|
if draft:
|
|
draft_info = {
|
|
'id': draft.id,
|
|
'name': draft.name,
|
|
'nip': draft.nip,
|
|
'step': draft.wizard_step or 1,
|
|
'created_at': draft.created_at.strftime('%Y-%m-%d %H:%M') if draft.created_at else ''
|
|
}
|
|
|
|
return render_template(
|
|
'admin/company_wizard.html',
|
|
categories=categories_list,
|
|
draft=draft_info
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# STEP 1: NIP LOOKUP & REGISTRY FETCH
|
|
# ============================================================
|
|
|
|
@bp.route('/companies/wizard/step1', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_step1():
|
|
"""Validate NIP, fetch registry data, create draft company."""
|
|
data = request.get_json()
|
|
nip = (data.get('nip') or '').strip().replace('-', '').replace(' ', '')
|
|
|
|
if not _validate_nip(nip):
|
|
return jsonify({'success': False, 'error': 'Nieprawidłowy NIP (10 cyfr, suma kontrolna)'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Check for duplicates (excluding wizard_draft)
|
|
existing = db.query(Company).filter(
|
|
Company.nip == nip,
|
|
Company.status != 'wizard_draft'
|
|
).first()
|
|
if existing:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Firma z NIP {nip} już istnieje: {existing.name} (ID: {existing.id})',
|
|
'existing_id': existing.id,
|
|
'existing_name': existing.name
|
|
}), 409
|
|
|
|
# Delete any existing wizard_draft with this NIP by this user
|
|
old_draft = db.query(Company).filter(
|
|
Company.nip == nip,
|
|
Company.status == 'wizard_draft'
|
|
).first()
|
|
if old_draft:
|
|
_delete_draft_company(db, old_draft.id)
|
|
db.commit()
|
|
|
|
# Try KRS first (via Biala Lista → KRS Open API)
|
|
registry_source = None
|
|
registry_data = {}
|
|
company_name = ''
|
|
krs_number = None
|
|
|
|
try:
|
|
from krs_api_service import KRSApiService
|
|
krs_service = KRSApiService()
|
|
biala_lista = krs_service.search_by_nip(nip)
|
|
if biala_lista and biala_lista.get('krs'):
|
|
krs_number = biala_lista['krs']
|
|
company_name = biala_lista.get('name', '')
|
|
registry_source = 'KRS'
|
|
registry_data = biala_lista
|
|
except Exception as e:
|
|
logger.warning(f"KRS lookup failed for NIP {nip}: {e}")
|
|
|
|
# If no KRS, try CEIDG
|
|
ceidg_data = None
|
|
if not registry_source:
|
|
try:
|
|
from ceidg_api_service import fetch_ceidg_by_nip
|
|
ceidg_data = fetch_ceidg_by_nip(nip)
|
|
if ceidg_data:
|
|
registry_source = 'CEIDG'
|
|
company_name = ceidg_data.get('firma', ceidg_data.get('nazwa', ''))
|
|
registry_data = ceidg_data
|
|
except Exception as e:
|
|
logger.warning(f"CEIDG lookup failed for NIP {nip}: {e}")
|
|
|
|
# Create draft company
|
|
slug = _generate_slug(company_name or f'firma-{nip}')
|
|
slug = _ensure_unique_slug(db, slug)
|
|
|
|
company = Company(
|
|
name=company_name or f'Firma NIP {nip}',
|
|
slug=slug,
|
|
nip=nip,
|
|
status='wizard_draft',
|
|
data_quality='basic',
|
|
wizard_step=1,
|
|
wizard_started_by=current_user.id,
|
|
created_at=datetime.now(),
|
|
last_updated=datetime.now()
|
|
)
|
|
db.add(company)
|
|
db.flush() # Get company.id
|
|
|
|
# Enrich from KRS if found
|
|
if registry_source == 'KRS' and krs_number:
|
|
company.krs = krs_number
|
|
try:
|
|
from .routes_membership import _enrich_company_from_krs
|
|
_enrich_company_from_krs(company, db)
|
|
except Exception as e:
|
|
logger.warning(f"KRS enrichment failed: {e}")
|
|
|
|
# Enrich from CEIDG if found
|
|
elif registry_source == 'CEIDG' and ceidg_data:
|
|
_apply_ceidg_data(company, ceidg_data, db)
|
|
|
|
# Set short business name (strip legal form), keep legal_name as full
|
|
if company.legal_name and (company.name.startswith('Firma NIP') or company.name == company.legal_name):
|
|
company.name = _strip_legal_form(company.legal_name)
|
|
company.slug = _generate_slug(company.name)
|
|
company.slug = _ensure_unique_slug(db, company.slug, exclude_id=company.id)
|
|
|
|
# Auto-suggest category from main PKD code
|
|
if not company.category_id and company.pkd_code:
|
|
suggested = _pkd_to_category_id(company.pkd_code)
|
|
if suggested:
|
|
company.category_id = suggested
|
|
|
|
company.wizard_step = 2
|
|
db.commit()
|
|
|
|
# Build response with company data
|
|
response_data = _company_to_dict(company)
|
|
response_data['registry_source'] = registry_source or 'manual'
|
|
|
|
# Add PKD codes
|
|
pkd_codes = db.query(CompanyPKD).filter_by(company_id=company.id).all()
|
|
response_data['pkd_codes'] = [
|
|
{'code': p.pkd_code, 'description': p.pkd_description, 'is_primary': p.is_primary}
|
|
for p in pkd_codes
|
|
]
|
|
|
|
# Add people (board members)
|
|
people = db.query(CompanyPerson).filter_by(company_id=company.id).all()
|
|
response_data['people'] = [
|
|
{
|
|
'name': cp.person.full_name() if cp.person else '',
|
|
'role': cp.role or ''
|
|
}
|
|
for cp in people if cp.person
|
|
]
|
|
|
|
return jsonify({'success': True, 'company': response_data})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Wizard step 1 error: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _apply_ceidg_data(company, ceidg_data, db):
|
|
"""Apply CEIDG API data to company record. Mirrors arm_company.py logic."""
|
|
company.ceidg_id = ceidg_data.get('ceidg_id')
|
|
company.ceidg_status = ceidg_data.get('status')
|
|
company.ceidg_raw_data = ceidg_data.get('raw')
|
|
company.ceidg_fetched_at = datetime.now()
|
|
company.data_source = 'CEIDG API'
|
|
company.last_verified_at = datetime.now()
|
|
|
|
# Owner
|
|
wlasciciel = ceidg_data.get('wlasciciel', {})
|
|
if wlasciciel.get('imie'):
|
|
company.owner_first_name = wlasciciel['imie']
|
|
if wlasciciel.get('nazwisko'):
|
|
company.owner_last_name = wlasciciel['nazwisko']
|
|
if ceidg_data.get('obywatelstwa'):
|
|
company.owner_citizenships = ceidg_data['obywatelstwa']
|
|
|
|
# Legal name
|
|
if ceidg_data.get('firma'):
|
|
company.legal_name = ceidg_data['firma']
|
|
|
|
# REGON
|
|
regon = ceidg_data.get('regon') or wlasciciel.get('regon')
|
|
if regon:
|
|
company.regon = regon
|
|
|
|
# Business start date
|
|
if ceidg_data.get('dataRozpoczecia'):
|
|
try:
|
|
d = ceidg_data['dataRozpoczecia']
|
|
if isinstance(d, str):
|
|
company.business_start_date = date_type.fromisoformat(d)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Legal form
|
|
company.legal_form = 'JEDNOOSOBOWA DZIALALNOSC GOSPODARCZA'
|
|
|
|
# PKD (main)
|
|
pkd_gl = ceidg_data.get('pkdGlowny', {})
|
|
if pkd_gl and pkd_gl.get('kod'):
|
|
company.pkd_code = pkd_gl['kod']
|
|
company.pkd_description = pkd_gl.get('nazwa')
|
|
|
|
# PKD (full list)
|
|
pkd_lista = ceidg_data.get('pkd', [])
|
|
if pkd_lista:
|
|
company.ceidg_pkd_list = pkd_lista
|
|
pkd_main_code = pkd_gl.get('kod', '') if pkd_gl else ''
|
|
for pkd_item in pkd_lista:
|
|
kod = pkd_item.get('kod', '')
|
|
if not kod:
|
|
continue
|
|
existing_pkd = db.query(CompanyPKD).filter(
|
|
CompanyPKD.company_id == company.id,
|
|
CompanyPKD.pkd_code == kod
|
|
).first()
|
|
if not existing_pkd:
|
|
db.add(CompanyPKD(
|
|
company_id=company.id,
|
|
pkd_code=kod,
|
|
pkd_description=pkd_item.get('nazwa', ''),
|
|
is_primary=(kod == pkd_main_code)
|
|
))
|
|
|
|
# Business address
|
|
adres = ceidg_data.get('adresDzialalnosci', {})
|
|
ulica = adres.get('ulica', '')
|
|
budynek = adres.get('budynek', '')
|
|
lokal = adres.get('lokal', '')
|
|
if ulica or budynek:
|
|
street_parts = [ulica, budynek]
|
|
if lokal:
|
|
street_parts[-1] = (budynek + '/' + lokal) if budynek else lokal
|
|
company.address_street = ' '.join(p for p in street_parts if p)
|
|
if adres.get('kod') or adres.get('kodPocztowy'):
|
|
company.address_postal = adres.get('kod') or adres.get('kodPocztowy')
|
|
if adres.get('miasto') or adres.get('miejscowosc'):
|
|
company.address_city = adres.get('miasto') or adres.get('miejscowosc')
|
|
if company.address_street and company.address_postal and company.address_city:
|
|
company.address_full = f'{company.address_street}, {company.address_postal} {company.address_city}'
|
|
|
|
# Contact
|
|
if ceidg_data.get('email'):
|
|
company.email = ceidg_data['email']
|
|
if ceidg_data.get('stronaWWW'):
|
|
company.website = ceidg_data['stronaWWW']
|
|
if ceidg_data.get('telefon'):
|
|
company.phone = ceidg_data['telefon']
|
|
|
|
|
|
def _company_to_dict(company):
|
|
"""Convert Company to a dictionary for JSON response."""
|
|
return {
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'legal_name': company.legal_name,
|
|
'slug': company.slug,
|
|
'nip': company.nip,
|
|
'regon': company.regon,
|
|
'krs': company.krs,
|
|
'website': company.website,
|
|
'email': company.email,
|
|
'phone': company.phone,
|
|
'address_street': company.address_street,
|
|
'address_city': company.address_city,
|
|
'address_postal': company.address_postal,
|
|
'address_full': company.address_full,
|
|
'legal_form': company.legal_form,
|
|
'pkd_code': company.pkd_code,
|
|
'pkd_description': company.pkd_description,
|
|
'capital_amount': float(company.capital_amount) if company.capital_amount else None,
|
|
'capital_currency': company.capital_currency,
|
|
'year_established': company.year_established,
|
|
'owner_first_name': company.owner_first_name,
|
|
'owner_last_name': company.owner_last_name,
|
|
'category_id': company.category_id,
|
|
'description_short': company.description_short,
|
|
'description_full': company.description_full,
|
|
'data_source': company.data_source,
|
|
'wizard_step': company.wizard_step,
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# STEP 2: REVIEW & EDIT REGISTRY DATA
|
|
# ============================================================
|
|
|
|
@bp.route('/companies/wizard/step2', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_step2():
|
|
"""Update company with edited registry data."""
|
|
data = request.get_json()
|
|
company_id = data.get('company_id')
|
|
|
|
if not company_id:
|
|
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(
|
|
id=company_id, status='wizard_draft'
|
|
).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
|
|
|
|
# Update editable fields
|
|
editable_fields = [
|
|
'name', 'legal_name', 'website', 'email', 'phone',
|
|
'address_street', 'address_city', 'address_postal',
|
|
'description_short', 'description_full'
|
|
]
|
|
for field in editable_fields:
|
|
if field in data:
|
|
setattr(company, field, data[field] or None)
|
|
|
|
# Category
|
|
if 'category_id' in data and data['category_id']:
|
|
company.category_id = int(data['category_id'])
|
|
|
|
# Rebuild address_full
|
|
if company.address_street and company.address_postal and company.address_city:
|
|
company.address_full = f'{company.address_street}, {company.address_postal} {company.address_city}'
|
|
|
|
# Update slug if name changed
|
|
if 'name' in data and data['name']:
|
|
new_slug = _generate_slug(data['name'])
|
|
new_slug = _ensure_unique_slug(db, new_slug, exclude_id=company.id)
|
|
company.slug = new_slug
|
|
|
|
company.wizard_step = 3
|
|
company.last_updated = datetime.now()
|
|
db.commit()
|
|
|
|
return jsonify({'success': True, 'company': _company_to_dict(company)})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Wizard step 2 error: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# STEP 3: WEBSITE & LOGO
|
|
# ============================================================
|
|
|
|
@bp.route('/companies/wizard/discover-website', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_discover_website():
|
|
"""Search for company website using Brave Search."""
|
|
data = request.get_json()
|
|
company_id = data.get('company_id')
|
|
|
|
if not company_id:
|
|
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(
|
|
id=company_id, status='wizard_draft'
|
|
).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
|
|
|
|
# If website already known from registry, return it
|
|
if company.website and company.website not in ('https://', 'http://'):
|
|
return jsonify({'success': True, 'website': company.website, 'confidence': 'registry'})
|
|
|
|
# Build a clean short name for search
|
|
search_name = company.name
|
|
for suffix in ['SPÓŁKA Z OGRANICZONĄ ODPOWIEDZIALNOŚCIĄ', 'SP. Z O.O.',
|
|
'SPÓŁKA AKCYJNA', 'S.A.', 'SPÓŁKA KOMANDYTOWA', 'SP.K.',
|
|
'SPÓŁKA JAWNA', 'SP.J.']:
|
|
search_name = search_name.replace(suffix, '').strip()
|
|
search_name = search_name.strip(' ,-')
|
|
|
|
from services.website_discovery_service import WebsiteDiscoveryService
|
|
service = WebsiteDiscoveryService(db=db)
|
|
|
|
# Temporarily override company.name — discover_for_company uses it for query
|
|
original_name = company.name
|
|
company.name = search_name
|
|
|
|
result = service.discover_for_company(company)
|
|
company.name = original_name # Restore
|
|
|
|
if result and result.get('error'):
|
|
error_msg = result['error']
|
|
if error_msg == 'Brak wyników':
|
|
error_msg = 'Nie znaleziono strony internetowej tej firmy w internecie. Firma może nie posiadać strony WWW.'
|
|
return jsonify({'success': False, 'error': error_msg})
|
|
|
|
# Apply discovered website to company
|
|
website = result.get('url')
|
|
if website:
|
|
company.website = website
|
|
db.commit()
|
|
|
|
return jsonify({
|
|
'success': bool(website),
|
|
'website': website,
|
|
'error': 'Nie znaleziono strony internetowej tej firmy.' if not website else None,
|
|
'confidence': result.get('confidence'),
|
|
})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Website discovery error: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/wizard/step3/fetch-logos', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_step3_fetch_logos():
|
|
"""Fetch logo candidates from company website."""
|
|
data = request.get_json()
|
|
company_id = data.get('company_id')
|
|
website_url = data.get('website_url', '').strip()
|
|
|
|
if not company_id:
|
|
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(
|
|
id=company_id, status='wizard_draft'
|
|
).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
|
|
|
|
# Update website URL if provided
|
|
if website_url:
|
|
if not website_url.startswith('http'):
|
|
website_url = 'https://' + website_url
|
|
company.website = website_url
|
|
db.commit()
|
|
|
|
if not company.website:
|
|
return jsonify({'success': False, 'error': 'Brak adresu strony WWW'}), 400
|
|
|
|
from logo_fetch_service import LogoFetchService
|
|
service = LogoFetchService()
|
|
result = service.fetch_candidates(company.website, company.slug)
|
|
|
|
candidates = result.get('candidates', [])
|
|
return jsonify({
|
|
'success': True,
|
|
'candidates': candidates,
|
|
'recommended_index': result.get('recommended_index', 0),
|
|
'message': result.get('message', '')
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Logo fetch error: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/wizard/step3/select-logo', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_step3_select_logo():
|
|
"""Confirm selected logo candidate."""
|
|
data = request.get_json()
|
|
company_id = data.get('company_id')
|
|
candidate_index = data.get('candidate_index', 0)
|
|
|
|
if not company_id:
|
|
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(
|
|
id=company_id, status='wizard_draft'
|
|
).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
|
|
|
|
from logo_fetch_service import LogoFetchService
|
|
service = LogoFetchService()
|
|
success = service.confirm_candidate(company.slug, candidate_index)
|
|
|
|
if success:
|
|
company.wizard_step = 4
|
|
db.commit()
|
|
return jsonify({'success': True, 'logo_path': f'/static/img/companies/{company.slug}.webp'})
|
|
else:
|
|
return jsonify({'success': False, 'error': 'Nie udało się zapisać logo'}), 500
|
|
|
|
except Exception as e:
|
|
logger.error(f"Logo select error: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/wizard/step3/skip', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_step3_skip():
|
|
"""Skip logo step."""
|
|
data = request.get_json()
|
|
company_id = data.get('company_id')
|
|
|
|
if not company_id:
|
|
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(
|
|
id=company_id, status='wizard_draft'
|
|
).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
|
|
|
|
company.wizard_step = 4
|
|
db.commit()
|
|
return jsonify({'success': True})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Wizard step3 skip error: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# STEP 4: AUDITS (BACKGROUND)
|
|
# ============================================================
|
|
|
|
@bp.route('/companies/wizard/step4/start-audits', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_step4_start_audits():
|
|
"""Start background audit thread for SEO, GBP, and Social Media."""
|
|
data = request.get_json()
|
|
company_id = data.get('company_id')
|
|
|
|
if not company_id:
|
|
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(
|
|
id=company_id, status='wizard_draft'
|
|
).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
|
|
|
|
# Initialize audit status
|
|
company.wizard_audit_status = {
|
|
'seo': 'pending' if company.website else 'skipped',
|
|
'gbp': 'pending',
|
|
'social': 'pending'
|
|
}
|
|
flag_modified(company, 'wizard_audit_status')
|
|
db.commit()
|
|
|
|
# Launch background thread
|
|
thread = threading.Thread(
|
|
target=_run_wizard_audits,
|
|
args=(company_id,),
|
|
daemon=True
|
|
)
|
|
thread.start()
|
|
|
|
return jsonify({'success': True, 'audit_status': company.wizard_audit_status})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Audit start error: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _run_wizard_audits(company_id):
|
|
"""Background worker: run SEO, GBP, and Social Media audits."""
|
|
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
scripts_dir = os.path.join(base_dir, 'scripts')
|
|
if base_dir not in sys.path:
|
|
sys.path.insert(0, base_dir)
|
|
if scripts_dir not in sys.path:
|
|
sys.path.insert(0, scripts_dir)
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(id=company_id).first()
|
|
if not company:
|
|
return
|
|
|
|
audit_status = dict(company.wizard_audit_status or {})
|
|
|
|
# --- SEO Audit ---
|
|
if company.website and audit_status.get('seo') == 'pending':
|
|
audit_status['seo'] = 'running'
|
|
company.wizard_audit_status = audit_status
|
|
flag_modified(company, 'wizard_audit_status')
|
|
db.commit()
|
|
|
|
try:
|
|
from seo_audit import SEOAuditor
|
|
seo_service = SEOAuditor()
|
|
company_dict = {
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'slug': company.slug,
|
|
'website': company.website,
|
|
'address_city': company.address_city or '',
|
|
}
|
|
result = seo_service.audit_company(company_dict)
|
|
seo_score = result.get('scores', {}).get('pagespeed_seo', 0)
|
|
audit_status['seo'] = 'completed'
|
|
audit_status['seo_score'] = seo_score
|
|
except Exception as e:
|
|
audit_status['seo'] = f'error: {str(e)[:80]}'
|
|
logger.error(f"Wizard SEO audit error: {e}")
|
|
|
|
company.wizard_audit_status = audit_status
|
|
flag_modified(company, 'wizard_audit_status')
|
|
db.commit()
|
|
|
|
# --- GBP Audit ---
|
|
if audit_status.get('gbp') == 'pending':
|
|
audit_status['gbp'] = 'running'
|
|
company.wizard_audit_status = audit_status
|
|
flag_modified(company, 'wizard_audit_status')
|
|
db.commit()
|
|
|
|
try:
|
|
db.expire_all()
|
|
from gbp_audit_service import GBPAuditService
|
|
gbp_service = GBPAuditService(db)
|
|
gbp_result = gbp_service.audit_company(company.id)
|
|
if gbp_result:
|
|
gbp_service.save_audit(gbp_result, source='wizard')
|
|
audit_status['gbp'] = 'completed'
|
|
audit_status['gbp_score'] = gbp_result.completeness_score
|
|
else:
|
|
audit_status['gbp'] = 'not_found'
|
|
except Exception as e:
|
|
audit_status['gbp'] = f'error: {str(e)[:80]}'
|
|
logger.error(f"Wizard GBP audit error: {e}")
|
|
|
|
company.wizard_audit_status = audit_status
|
|
flag_modified(company, 'wizard_audit_status')
|
|
db.commit()
|
|
|
|
# --- Social Media Audit ---
|
|
if audit_status.get('social') == 'pending':
|
|
audit_status['social'] = 'running'
|
|
company.wizard_audit_status = audit_status
|
|
flag_modified(company, 'wizard_audit_status')
|
|
db.commit()
|
|
|
|
try:
|
|
from social_media_audit import SocialMediaAuditor
|
|
auditor = SocialMediaAuditor()
|
|
company_dict = {
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'slug': company.slug,
|
|
'website': company.website,
|
|
'address_city': company.address_city or '',
|
|
}
|
|
result = auditor.audit_company(company_dict)
|
|
if result:
|
|
auditor.save_audit_result(result)
|
|
db.expire_all()
|
|
saved_count = db.query(CompanySocialMedia).filter_by(company_id=company.id).count()
|
|
audit_status['social'] = 'completed'
|
|
audit_status['social_count'] = saved_count
|
|
except Exception as e:
|
|
audit_status['social'] = f'error: {str(e)[:80]}'
|
|
logger.error(f"Wizard Social audit error: {e}")
|
|
|
|
company.wizard_audit_status = audit_status
|
|
flag_modified(company, 'wizard_audit_status')
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Wizard audit thread error: {e}")
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/wizard/step4/audit-status')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_step4_audit_status():
|
|
"""Poll audit progress."""
|
|
company_id = request.args.get('company_id', type=int)
|
|
if not company_id:
|
|
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(
|
|
id=company_id, status='wizard_draft'
|
|
).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
|
|
|
|
audit_status = company.wizard_audit_status or {}
|
|
all_done = all(
|
|
v not in ('pending', 'running')
|
|
for v in [audit_status.get('seo', 'skipped'),
|
|
audit_status.get('gbp', 'skipped'),
|
|
audit_status.get('social', 'skipped')]
|
|
)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'audit_status': audit_status,
|
|
'all_done': all_done
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/wizard/step4/skip', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_step4_skip():
|
|
"""Skip audits step."""
|
|
data = request.get_json()
|
|
company_id = data.get('company_id')
|
|
|
|
if not company_id:
|
|
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(
|
|
id=company_id, status='wizard_draft'
|
|
).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
|
|
|
|
company.wizard_step = 5
|
|
db.commit()
|
|
return jsonify({'success': True})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Wizard step4 skip error: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# STEP 5: FINALIZE
|
|
# ============================================================
|
|
|
|
@bp.route('/companies/wizard/step5/finalize', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_step5_finalize():
|
|
"""Finalize company: change status, compute data quality."""
|
|
data = request.get_json()
|
|
company_id = data.get('company_id')
|
|
target_status = data.get('status', 'active')
|
|
category_id = data.get('category_id')
|
|
|
|
if target_status not in ('active', 'pending'):
|
|
target_status = 'active'
|
|
|
|
if not company_id:
|
|
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(
|
|
id=company_id, status='wizard_draft'
|
|
).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
|
|
|
|
# Apply final settings
|
|
company.status = target_status
|
|
if category_id:
|
|
company.category_id = int(category_id)
|
|
|
|
# Clean up wizard fields
|
|
company.wizard_step = None
|
|
company.wizard_started_by = None
|
|
company.wizard_audit_status = None
|
|
|
|
# Update data quality
|
|
from utils.data_quality import update_company_data_quality
|
|
dq = update_company_data_quality(company, db)
|
|
db.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'company_id': company.id,
|
|
'slug': company.slug,
|
|
'name': company.name,
|
|
'status': company.status,
|
|
'data_quality': company.data_quality,
|
|
'data_quality_score': dq.get('score', 0)
|
|
})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Wizard finalize error: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# CANCEL / CLEANUP
|
|
# ============================================================
|
|
|
|
@bp.route('/companies/wizard/cancel', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def wizard_cancel():
|
|
"""Delete wizard draft company."""
|
|
data = request.get_json()
|
|
company_id = data.get('company_id')
|
|
|
|
if not company_id:
|
|
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(
|
|
id=company_id, status='wizard_draft'
|
|
).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
|
|
|
|
# Clean up logo candidates
|
|
from logo_fetch_service import LogoFetchService
|
|
LogoFetchService.cleanup_candidates(company.slug)
|
|
|
|
_delete_draft_company(db, company.id)
|
|
db.commit()
|
|
|
|
return jsonify({'success': True})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Wizard cancel error: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|