nordabiz/scripts/arm_company.py
Maciej Pienczyn 4daf43f9f7
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: GBP audit result is dataclass not dict, fix social count
- GBP: access .completeness_score attribute + call save_audit()
- Social: count saved DB records instead of parsing audit result dict

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 16:38:18 +01:00

297 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Skrypt do automatycznego uzbrajania firm (enrichment) z linii poleceń.
Odpowiednik przycisku "Uzbrój firmę" w panelu admina.
Użycie:
python3 scripts/arm_company.py <company_id> [--force]
python3 scripts/arm_company.py 120 121 122 --force # wiele firm naraz
Opcje:
--force Wymusza ponowne wykonanie wszystkich kroków (jak "Zaktualizuj dane")
"""
import sys
import os
import logging
# Setup path
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, BASE_DIR)
scripts_dir = os.path.join(BASE_DIR, 'scripts')
if scripts_dir not in sys.path:
sys.path.insert(0, scripts_dir)
from database import SessionLocal, Company, CompanyWebsiteAnalysis, CompanySocialMedia, CompanyPKD, CompanyPerson
from database import GBPAudit
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('arm_company')
def arm_company(company_id, force=False):
db = SessionLocal()
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
print("Firma ID %d nie znaleziona!" % company_id)
return False
print("=" * 60)
print("Uzbrajam: %s (ID: %d)" % (company.name, company.id))
print("NIP: %s | WWW: %s" % (company.nip or '-', company.website or '-'))
print("Tryb: %s" % ("FORCE (wszystkie kroki)" if force else "SMART (tylko brakujące)"))
print("=" * 60)
results = {}
# --- Krok 1: Dane urzędowe ---
registry_done = bool(company.krs_fetched_at or company.ceidg_fetched_at)
if force or not registry_done:
if company.nip:
print("\n[1/5] Pobieranie danych urzędowych...")
try:
from blueprints.admin.routes_membership import _enrich_company_from_krs
from krs_api_service import KRSApiService
krs_service = KRSApiService()
# Sprawdź KRS przez Białą Listę (search_by_nip zwraca dict lub None)
if not company.krs:
krs_data = krs_service.search_by_nip(company.nip)
if krs_data and krs_data.get('krs'):
company.krs = krs_data['krs']
db.flush()
logger.info("Znaleziono KRS %s dla NIP %s" % (company.krs, company.nip))
if company.krs:
success = _enrich_company_from_krs(company, db)
if success:
db.commit()
results['registry'] = 'OK (KRS)'
print(" -> OK: Dane z KRS pobrane")
else:
results['registry'] = 'FAIL (KRS)'
print(" -> FAIL: Nie udało się pobrać z KRS")
else:
# Próbuj CEIDG
import requests as req
ceidg_key = os.getenv("CEIDG_API_KEY", "")
nip_clean = company.nip.replace('-', '').replace(' ', '')
ceidg_url = "https://dane.biznes.gov.pl/api/ceidg/v3/firma?nip=%s" % nip_clean
headers = {"Authorization": "Bearer %s" % ceidg_key}
resp = req.get(ceidg_url, headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
firmy = data.get('firmy', [])
if firmy:
firma = firmy[0]
# Zapisz surowe dane
company.ceidg_raw_data = firma
from datetime import datetime
company.ceidg_fetched_at = datetime.utcnow()
# Mapuj podstawowe pola
if firma.get('wlasciciel'):
wl = firma['wlasciciel']
company.legal_form = 'JDG'
if not company.owner_name:
company.owner_name = '%s %s' % (wl.get('imie', ''), wl.get('nazwisko', ''))
if firma.get('adresDzialalnosci'):
addr = firma['adresDzialalnosci']
if not company.address_street:
ulica = addr.get('ulica', '')
nr = addr.get('budynek', '')
lokal = addr.get('lokal', '')
company.address_street = ('%s %s%s' % (ulica, nr, '/%s' % lokal if lokal else '')).strip()
if not company.address_city:
company.address_city = addr.get('miasto', '')
if not company.address_zip:
company.address_zip = addr.get('kodPocztowy', '')
if firma.get('email') and not company.email:
company.email = firma['email']
if firma.get('telefon') and not company.phone:
company.phone = firma['telefon']
if firma.get('adresStronyInternetowej') and not company.website:
company.website = firma['adresStronyInternetowej']
db.commit()
results['registry'] = 'OK (CEIDG)'
print(" -> OK: Dane z CEIDG pobrane")
else:
results['registry'] = 'NOT FOUND'
print(" -> Nie znaleziono w CEIDG")
else:
results['registry'] = 'NOT FOUND'
print(" -> Nie znaleziono w żadnym rejestrze")
except Exception as e:
results['registry'] = 'ERROR: %s' % str(e)[:80]
print(" -> ERROR: %s" % str(e)[:80])
else:
results['registry'] = 'SKIP (brak NIP)'
print("\n[1/5] Pominięto - brak NIP")
else:
results['registry'] = 'SKIP (done)'
print("\n[1/5] Dane urzędowe - już wykonane")
# Refresh company data po registry
db.refresh(company)
# --- Krok 2: Audyt SEO ---
seo_done = db.query(CompanyWebsiteAnalysis).filter_by(company_id=company.id).first() is not None
if force or not seo_done:
if company.website:
print("\n[2/5] Audyt SEO...")
try:
from seo_audit import SEOAuditor
seo_service = SEOAuditor()
company_dict = {
'id': company.id,
'name': company.name,
'slug': company.slug,
'website': company.website,
'address_city': company.address_city or '',
}
audit_result = seo_service.audit_company(company_dict)
seo_score = audit_result.get('scores', {}).get('pagespeed_seo', '?')
perf_score = audit_result.get('scores', {}).get('pagespeed_performance', '?')
results['seo'] = 'OK (SEO: %s, Perf: %s)' % (seo_score, perf_score)
print(" -> OK: SEO=%s, Perf=%s" % (seo_score, perf_score))
except Exception as e:
results['seo'] = 'ERROR: %s' % str(e)[:80]
print(" -> ERROR: %s" % str(e)[:80])
else:
results['seo'] = 'SKIP (brak WWW)'
print("\n[2/5] Audyt SEO - pominięto (brak strony WWW)")
else:
results['seo'] = 'SKIP (done)'
print("\n[2/5] Audyt SEO - już wykonane")
# --- Krok 3: Social Media ---
social_done = db.query(CompanySocialMedia).filter_by(company_id=company.id).count() > 0
if force or not social_done:
print("\n[3/5] Audyt Social Media...")
try:
from social_media_audit import SocialMediaAuditor
auditor = SocialMediaAuditor() # uses DATABASE_URL from env
company_dict = {
'id': company.id,
'name': company.name,
'slug': company.slug,
'website': company.website,
'address_city': company.address_city or '',
}
audit_result = auditor.audit_company(company_dict)
# Count profiles from social_media dict
sm = audit_result.get('social_media', {}) if audit_result else {}
profiles = len(sm.get('profiles', sm.get('links', [])))
# Also check DB for actual saved count
db.expire_all()
saved_count = db.query(CompanySocialMedia).filter_by(company_id=company.id).count()
results['social'] = 'OK (%d profili)' % saved_count
print(" -> OK: %d profili zapisanych w bazie" % saved_count)
except Exception as e:
results['social'] = 'ERROR: %s' % str(e)[:80]
print(" -> ERROR: %s" % str(e)[:80])
else:
results['social'] = 'SKIP (done)'
print("\n[3/5] Social Media - już wykonane")
# --- Krok 4: GBP ---
gbp_done = db.query(GBPAudit).filter_by(company_id=company.id).first() is not None
if force or not gbp_done:
print("\n[4/5] Audyt GBP...")
try:
from gbp_audit_service import GBPAuditService
gbp_service = GBPAuditService(db)
gbp_result = gbp_service.audit_company(company.id)
if gbp_result:
score = gbp_result.completeness_score
# Save to database
gbp_service.save_audit(gbp_result, source='script')
results['gbp'] = 'OK (score: %s)' % score
print(" -> OK: Score=%s" % score)
else:
results['gbp'] = 'FAIL'
print(" -> FAIL: brak wyniku")
except Exception as e:
results['gbp'] = 'ERROR: %s' % str(e)[:80]
print(" -> ERROR: %s" % str(e)[:80])
else:
results['gbp'] = 'SKIP (done)'
print("\n[4/5] Audyt GBP - już wykonane")
# --- Krok 5: Logo ---
logo_done = False
for ext in ('webp', 'svg'):
logo_path = os.path.join('static', 'img', 'companies', '%s.%s' % (company.slug, ext))
if os.path.isfile(logo_path):
logo_done = True
break
if force or not logo_done:
if company.website:
print("\n[5/5] Pobieranie logo...")
try:
from logo_fetch_service import LogoFetchService
service = LogoFetchService()
fetch_result = service.fetch_candidates(company.website, company.slug)
candidates = fetch_result.get('candidates', [])
if candidates:
pick = fetch_result.get('recommended_index', 0) or 0
ok = service.confirm_candidate(company.slug, pick)
results['logo'] = 'OK (kandydat #%d z %d)' % (pick, len(candidates))
print(" -> OK: Wybrano kandydata #%d z %d" % (pick, len(candidates)))
else:
results['logo'] = 'FAIL (0 kandydatów)'
print(" -> FAIL: Nie znaleziono kandydatów na logo")
except Exception as e:
results['logo'] = 'ERROR: %s' % str(e)[:80]
print(" -> ERROR: %s" % str(e)[:80])
else:
results['logo'] = 'SKIP (brak WWW)'
print("\n[5/5] Logo - pominięto (brak strony WWW)")
else:
results['logo'] = 'SKIP (done)'
print("\n[5/5] Logo - już istnieje")
# Podsumowanie
print("\n" + "=" * 60)
print("PODSUMOWANIE: %s (ID: %d)" % (company.name, company.id))
print("-" * 60)
for step, status in results.items():
print(" %-12s: %s" % (step, status))
ok_count = sum(1 for v in results.values() if v.startswith('OK') or v.startswith('SKIP (done)'))
print("-" * 60)
print(" Wynik: %d/5 kroków zaliczonych" % ok_count)
print("=" * 60)
return True
except Exception as e:
logger.error("Błąd uzbrajania firmy %d: %s" % (company_id, str(e)))
print("\nBŁĄD KRYTYCZNY: %s" % str(e))
import traceback
traceback.print_exc()
return False
finally:
db.close()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Użycie: python3 scripts/arm_company.py <company_id> [<id2> ...] [--force]")
print(" --force Wymusza ponowne wykonanie wszystkich kroków")
sys.exit(1)
force = '--force' in sys.argv
ids = [int(a) for a in sys.argv[1:] if a != '--force' and a.isdigit()]
for cid in ids:
arm_company(cid, force=force)
if len(ids) > 1:
print("\n")