#!/usr/bin/env python3 """ CEIDG API Client - pobiera dane właścicieli JDG Używa oficjalnego API CEIDG (dane.biznes.gov.pl) do pobierania danych o jednoosobowych działalnościach gospodarczych. Usage: python scripts/fetch_ceidg_api.py --nip 5881571773 python scripts/fetch_ceidg_api.py --name "Kancelaria Notarialna" python scripts/fetch_ceidg_api.py --all # wszystkie JDG z bazy python scripts/fetch_ceidg_api.py --missing-nip # firmy bez NIP python scripts/fetch_ceidg_api.py --all --import # pobierz i importuj do bazy """ import os import sys import argparse import json import time from pathlib import Path from datetime import datetime from dataclasses import dataclass, asdict from typing import Optional, List import requests # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) # Load environment from dotenv import load_dotenv load_dotenv(Path(__file__).parent.parent / '.env') # API Configuration # API v3 - main endpoint for company queries (supports NIP, REGON, nazwa, etc.) CEIDG_API_V3_URL = "https://dane.biznes.gov.pl/api/ceidg/v3/firmy" CEIDG_API_KEY = os.getenv("CEIDG_API_KEY") # Output directory for JSON cache JSON_OUTPUT_DIR = Path(__file__).parent.parent / "data" / "ceidg_json" @dataclass class CEIDGOwner: """Dane właściciela JDG z CEIDG""" imie: str nazwisko: str nip: str regon: str = "" def to_dict(self): return asdict(self) @dataclass class CEIDGData: """Dane firmy z CEIDG API v3""" id: str nazwa: str nip: str regon: str = "" wlasciciel: Optional[CEIDGOwner] = None adres_miasto: str = "" adres_ulica: str = "" adres_kod: str = "" pkd_glowny: str = "" pkd_opis: str = "" data_rozpoczecia: str = "" status: str = "" zrodlo: str = "dane.biznes.gov.pl" pobrano: str = "" def to_dict(self): d = asdict(self) if self.wlasciciel: d['wlasciciel'] = self.wlasciciel.to_dict() return d def fetch_ceidg_data(nip: str) -> Optional[CEIDGData]: """ Pobiera dane z CEIDG API v3 dla podanego NIP. Returns: CEIDGData lub None jeśli nie znaleziono """ if not CEIDG_API_KEY: print(" [ERROR] Brak CEIDG_API_KEY w .env") return None print(f" [INFO] Pobieranie danych CEIDG dla NIP {nip}...") headers = { "Authorization": f"Bearer {CEIDG_API_KEY}", "Accept": "application/json" } try: response = requests.get( CEIDG_API_V3_URL, params={"nip": nip}, headers=headers, timeout=30 ) if response.status_code == 204: print(f" [WARN] Brak danych w CEIDG dla NIP {nip}") return None if response.status_code == 401: print(f" [ERROR] Błąd autoryzacji - sprawdź CEIDG_API_KEY") return None if response.status_code != 200: print(f" [ERROR] HTTP {response.status_code}: {response.text[:100]}") return None data = response.json() if "firma" not in data or not data["firma"]: print(f" [WARN] Brak danych firmy w odpowiedzi") return None firma = data["firma"][0] # Parse owner data owner = None if "wlasciciel" in firma: w = firma["wlasciciel"] owner = CEIDGOwner( imie=w.get("imie", ""), nazwisko=w.get("nazwisko", ""), nip=w.get("nip", nip), regon=w.get("regon", "") ) # Parse address adres = firma.get("adresDzialalnosci", {}) adres_ulica = "" if adres.get("ulica"): adres_ulica = adres.get("ulica", "") if adres.get("budynek"): adres_ulica += f" {adres.get('budynek')}" if adres.get("lokal"): adres_ulica += f"/{adres.get('lokal')}" # Parse PKD pkd_glowny = firma.get("pkdGlowny", {}) ceidg_data = CEIDGData( id=firma.get("id", ""), nazwa=firma.get("nazwa", ""), nip=nip, regon=owner.regon if owner else "", wlasciciel=owner, adres_miasto=adres.get("miasto", ""), adres_ulica=adres_ulica, adres_kod=adres.get("kod", ""), pkd_glowny=pkd_glowny.get("kod", ""), pkd_opis=pkd_glowny.get("nazwa", ""), data_rozpoczecia=firma.get("dataRozpoczecia", ""), status=firma.get("status", ""), pobrano=datetime.now().isoformat() ) if owner: print(f" [OK] {owner.imie} {owner.nazwisko} ({ceidg_data.status})") else: print(f" [OK] {ceidg_data.nazwa} ({ceidg_data.status})") return ceidg_data except requests.RequestException as e: print(f" [ERROR] Błąd połączenia: {e}") return None except json.JSONDecodeError as e: print(f" [ERROR] Błąd parsowania JSON: {e}") return None def search_ceidg_by_name(nazwa: str, miasto: str = None) -> List[dict]: """ Wyszukuje firmy w CEIDG po nazwie używając API v3. Args: nazwa: Nazwa firmy do wyszukania miasto: Opcjonalnie miasto do zawężenia wyników Returns: Lista słowników z danymi firm """ if not CEIDG_API_KEY: print(" [ERROR] Brak CEIDG_API_KEY w .env") return [] print(f" [INFO] Wyszukiwanie w CEIDG: '{nazwa}'...") headers = { "Authorization": f"Bearer {CEIDG_API_KEY}", "Accept": "application/json" } params = {"nazwa": nazwa} if miasto: params["miasto"] = miasto try: response = requests.get( CEIDG_API_V3_URL, params=params, headers=headers, timeout=30 ) if response.status_code == 204: print(f" [WARN] Brak wyników dla '{nazwa}'") return [] if response.status_code == 401: print(f" [ERROR] Błąd autoryzacji - sprawdź CEIDG_API_KEY") return [] if response.status_code != 200: print(f" [ERROR] HTTP {response.status_code}: {response.text[:200]}") return [] data = response.json() if "firmy" not in data or not data["firmy"]: print(f" [WARN] Brak wyników dla '{nazwa}'") return [] results = data["firmy"] print(f" [OK] Znaleziono {len(results)} wyników") return results except requests.RequestException as e: print(f" [ERROR] Błąd połączenia: {e}") return [] except json.JSONDecodeError as e: print(f" [ERROR] Błąd parsowania JSON: {e}") return [] def search_missing_nip_companies() -> List[dict]: """ Wyszukuje NIP dla firm bez NIP w bazie, używając nazwy firmy. Returns: Lista znalezionych dopasowań """ from database import SessionLocal, Company from difflib import SequenceMatcher db = SessionLocal() results = [] try: # Pobierz firmy bez NIP companies = db.query(Company).filter( (Company.nip.is_(None)) | (Company.nip == '') ).order_by(Company.name).all() print(f"\n=== Wyszukiwanie NIP dla {len(companies)} firm ===\n") for i, company in enumerate(companies): print(f"[{i+1}/{len(companies)}] {company.name}") # Szukaj po nazwie search_results = search_ceidg_by_name( company.name, miasto=company.address_city ) if not search_results: time.sleep(0.5) continue # Szukaj najlepszego dopasowania best_match = None best_score = 0.0 for result in search_results: result_name = result.get("nazwa", "") # Oblicz podobieństwo nazwy score = SequenceMatcher( None, company.name.lower(), result_name.lower() ).ratio() # Bonus za zgodne miasto if company.address_city and result.get("adresDzialalnosci", {}).get("miasto"): if company.address_city.lower() in result["adresDzialalnosci"]["miasto"].lower(): score += 0.2 if score > best_score: best_score = score best_match = result if best_match and best_score >= 0.6: # NIP is in wlasciciel object wlasciciel = best_match.get("wlasciciel", {}) nip = wlasciciel.get("nip") regon = wlasciciel.get("regon") found_name = best_match.get("nazwa", "") status = best_match.get("status", "") if not nip: print(f" ✗ Znaleziono firmę ale brak NIP w odpowiedzi") continue confidence = "high" if best_score >= 0.8 else "medium" results.append({ "company_id": company.id, "company_name": company.name, "found_nip": nip, "found_regon": regon, "found_name": found_name, "found_status": status, "score": round(best_score, 2), "confidence": confidence }) print(f" ✓ Znaleziono: NIP {nip} ({confidence}, score: {best_score:.2f})") print(f" → {found_name}") else: print(f" ✗ Brak dopasowania (najlepszy score: {best_score:.2f})") time.sleep(0.5) # Rate limiting finally: db.close() return results def fetch_full_ceidg_details(ceidg_id: str) -> Optional[dict]: """ Pobiera pełne szczegóły firmy z CEIDG API po ID. Args: ceidg_id: GUID firmy w CEIDG Returns: Pełny słownik z danymi firmy lub None """ if not CEIDG_API_KEY: print(" [ERROR] Brak CEIDG_API_KEY w .env") return None url = f"{CEIDG_API_V3_URL.replace('/firmy', '/firma')}/{ceidg_id}" headers = { "Authorization": f"Bearer {CEIDG_API_KEY}", "Accept": "application/json" } try: response = requests.get(url, headers=headers, timeout=30) if response.status_code == 204: return None if response.status_code != 200: print(f" [ERROR] HTTP {response.status_code}") return None data = response.json() if "firma" in data and data["firma"]: return data["firma"][0] return None except requests.RequestException as e: print(f" [ERROR] Błąd połączenia: {e}") return None def update_company_from_ceidg(company_id: int, ceidg_data: dict, db) -> bool: """ Aktualizuje firmę w bazie wszystkimi danymi z CEIDG. Args: company_id: ID firmy w naszej bazie ceidg_data: Słownik z danymi z CEIDG API db: Sesja SQLAlchemy Returns: True jeśli sukces """ from database import Company from datetime import datetime company = db.query(Company).filter(Company.id == company_id).first() if not company: print(f" [ERROR] Firma {company_id} nie istnieje") return False try: # CEIDG ID i status company.ceidg_id = ceidg_data.get("id") company.ceidg_status = ceidg_data.get("status") # NIP i REGON z właściciela wlasciciel = ceidg_data.get("wlasciciel", {}) if wlasciciel.get("nip") and not company.nip: company.nip = wlasciciel.get("nip") if wlasciciel.get("regon") and not company.regon: company.regon = wlasciciel.get("regon") # Właściciel company.owner_first_name = wlasciciel.get("imie") company.owner_last_name = wlasciciel.get("nazwisko") # Obywatelstwa if ceidg_data.get("obywatelstwa"): company.owner_citizenships = ceidg_data.get("obywatelstwa") # Adres działalności adres = ceidg_data.get("adresDzialalnosci", {}) if adres: ulica = adres.get("ulica", "") budynek = adres.get("budynek", "") lokal = adres.get("lokal", "") street = ulica if budynek: street += f" {budynek}" if lokal: street += f"/{lokal}" if not company.address_street: company.address_street = street if not company.address_city: company.address_city = adres.get("miasto") if not company.address_postal: company.address_postal = adres.get("kod") # Adres korespondencyjny koresp = ceidg_data.get("adresKorespondencyjny", {}) if koresp: k_ulica = koresp.get("ulica", "") k_budynek = koresp.get("budynek", "") k_lokal = koresp.get("lokal", "") k_street = k_ulica if k_budynek: k_street += f" {k_budynek}" if k_lokal: k_street += f"/{k_lokal}" company.correspondence_street = k_street company.correspondence_city = koresp.get("miasto") company.correspondence_postal = koresp.get("kod") # PKD główny pkd_glowny = ceidg_data.get("pkdGlowny", {}) if pkd_glowny: company.pkd_code = pkd_glowny.get("kod") company.pkd_description = pkd_glowny.get("nazwa") # Wszystkie PKD z CEIDG if ceidg_data.get("pkd"): company.ceidg_pkd_list = ceidg_data.get("pkd") # Data rozpoczęcia działalności if ceidg_data.get("dataRozpoczecia"): from datetime import datetime as dt try: company.business_start_date = dt.strptime( ceidg_data.get("dataRozpoczecia"), "%Y-%m-%d" ).date() except: pass # Surowe dane company.ceidg_raw_data = ceidg_data company.ceidg_fetched_at = datetime.now() # Data source company.data_source = "CEIDG API" company.last_verified_at = datetime.now() return True except Exception as e: print(f" [ERROR] Błąd aktualizacji: {e}") return False def enrich_companies_from_ceidg(apply: bool = False) -> dict: """ Wzbogaca dane firm w bazie o informacje z CEIDG. Używa NIP do wyszukania, potem pobiera pełne szczegóły. Args: apply: Czy zapisać zmiany do bazy Returns: Statystyki operacji """ from database import SessionLocal, Company db = SessionLocal() stats = {"searched": 0, "found": 0, "updated": 0, "errors": 0} try: # Pobierz firmy z NIP ale bez ceidg_id companies = db.query(Company).filter( Company.nip.isnot(None), Company.nip != '', (Company.ceidg_id.is_(None)) | (Company.ceidg_id == '') ).all() print(f"\n=== Wzbogacanie danych dla {len(companies)} firm z NIP ===\n") for i, company in enumerate(companies): print(f"[{i+1}/{len(companies)}] {company.name} (NIP: {company.nip})") stats["searched"] += 1 # Szukaj po NIP search_results = search_ceidg_by_name(company.nip) # NIP też działa jako nazwa # Lepiej: użyj dedykowanego endpointu po NIP headers = { "Authorization": f"Bearer {CEIDG_API_KEY}", "Accept": "application/json" } response = requests.get( CEIDG_API_V3_URL, params={"nip": company.nip}, headers=headers, timeout=30 ) if response.status_code != 200: print(f" ✗ Nie znaleziono w CEIDG") time.sleep(0.3) continue data = response.json() if "firmy" not in data or not data["firmy"]: print(f" ✗ Brak danych w CEIDG") time.sleep(0.3) continue firma = data["firmy"][0] ceidg_id = firma.get("id") if not ceidg_id: print(f" ✗ Brak CEIDG ID") time.sleep(0.3) continue stats["found"] += 1 # Pobierz pełne szczegóły print(f" → Pobieram szczegóły (ID: {ceidg_id})...") full_data = fetch_full_ceidg_details(ceidg_id) if not full_data: full_data = firma # Użyj danych z wyszukiwania # Aktualizuj firmę if update_company_from_ceidg(company.id, full_data, db): stats["updated"] += 1 print(f" ✓ Zaktualizowano dane CEIDG") if apply: db.commit() else: stats["errors"] += 1 time.sleep(0.5) # Rate limiting if not apply: db.rollback() print("\n[INFO] Zmiany NIE zostały zapisane (użyj --apply)") else: db.commit() finally: db.close() return stats def import_to_database(results: List[CEIDGData]) -> dict: """ Importuje dane właścicieli JDG do bazy danych. Returns: dict z podsumowaniem importu """ from database import SessionLocal, Company, Person, CompanyPerson db = SessionLocal() stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0} try: for data in results: if not data.wlasciciel: stats["skipped"] += 1 continue owner = data.wlasciciel # Find company by NIP company = db.query(Company).filter(Company.nip == data.nip).first() if not company: print(f" [SKIP] Firma z NIP {data.nip} nie istnieje w bazie") stats["skipped"] += 1 continue # Find or create person (by name since JDG owners don't have PESEL in API) person = db.query(Person).filter( Person.nazwisko == owner.nazwisko, Person.imiona == owner.imie ).first() if not person: person = Person( imiona=owner.imie, nazwisko=owner.nazwisko, pesel=None # CEIDG API doesn't return PESEL ) db.add(person) db.flush() print(f" [NEW] Utworzono osobę: {owner.imie} {owner.nazwisko}") # Check if relationship already exists existing = db.query(CompanyPerson).filter( CompanyPerson.company_id == company.id, CompanyPerson.person_id == person.id, CompanyPerson.role_category == "wlasciciel_jdg" ).first() if existing: # Update source if needed if existing.source != "dane.biznes.gov.pl": existing.source = "dane.biznes.gov.pl" existing.fetched_at = datetime.now() stats["updated"] += 1 else: stats["skipped"] += 1 else: # Create new relationship company_person = CompanyPerson( company_id=company.id, person_id=person.id, role="WŁAŚCICIEL", role_category="wlasciciel_jdg", source="dane.biznes.gov.pl", fetched_at=datetime.now() ) db.add(company_person) stats["imported"] += 1 print(f" [ADD] {owner.imie} {owner.nazwisko} → {company.name}") db.commit() except Exception as e: db.rollback() print(f" [ERROR] Błąd importu: {e}") stats["errors"] += 1 finally: db.close() return stats def main(): parser = argparse.ArgumentParser(description="Fetch JDG owner data from CEIDG API") parser.add_argument("--nip", type=str, help="Single NIP to fetch") parser.add_argument("--name", type=str, help="Search by company name") parser.add_argument("--city", type=str, help="City for name search (optional)") parser.add_argument("--all", action="store_true", help="Fetch all JDG from database") parser.add_argument("--missing-nip", action="store_true", help="Search NIP for companies without NIP") parser.add_argument("--enrich", action="store_true", help="Enrich companies with NIP with full CEIDG data") parser.add_argument("--import", dest="do_import", action="store_true", help="Import fetched data to database") parser.add_argument("--apply-nip", action="store_true", help="Apply found NIPs to database (with --missing-nip)") parser.add_argument("--apply", action="store_true", help="Apply changes to database (with --enrich)") parser.add_argument("--output", type=str, help="Output JSON file") args = parser.parse_args() results = [] if args.nip: data = fetch_ceidg_data(args.nip) if data: results.append(data) print(f"\n=== {data.nazwa} ===") if data.wlasciciel: print(f" Właściciel: {data.wlasciciel.imie} {data.wlasciciel.nazwisko}") print(f" Status: {data.status}") print(f" PKD: {data.pkd_glowny} - {data.pkd_opis}") print(f" Adres: {data.adres_ulica}, {data.adres_kod} {data.adres_miasto}") elif args.name: # Search by name using API v3 search_results = search_ceidg_by_name(args.name, miasto=args.city) if search_results: print(f"\n=== Wyniki dla '{args.name}' ===\n") for i, r in enumerate(search_results[:10]): # NIP is in wlasciciel object wlasciciel = r.get("wlasciciel", {}) nip = wlasciciel.get("nip", "?") regon = wlasciciel.get("regon", "?") nazwa = r.get("nazwa", "?") status = r.get("status", "?") adres = r.get("adresDzialalnosci", {}) miasto = adres.get("miasto", "") print(f"{i+1}. NIP: {nip} | REGON: {regon}") print(f" Nazwa: {nazwa}") print(f" Status: {status}") print(f" Miasto: {miasto}") if wlasciciel.get("imie"): print(f" Właściciel: {wlasciciel.get('imie')} {wlasciciel.get('nazwisko')}") print() elif args.missing_nip: # Search NIP for companies without NIP found_results = search_missing_nip_companies() if found_results: # Summary high_conf = [r for r in found_results if r["confidence"] == "high"] medium_conf = [r for r in found_results if r["confidence"] == "medium"] print(f"\n=== PODSUMOWANIE ===") print(f"Znaleziono: {len(found_results)} NIP") print(f" - Wysoka pewność: {len(high_conf)}") print(f" - Średnia pewność: {len(medium_conf)}") # Save results JSON_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) output_file = args.output or str(JSON_OUTPUT_DIR / f"missing_nip_search_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") with open(output_file, 'w', encoding='utf-8') as f: json.dump(found_results, f, ensure_ascii=False, indent=2) print(f"\nWyniki zapisane: {output_file}") # Apply NIPs if requested if args.apply_nip and high_conf: from database import SessionLocal, Company db = SessionLocal() try: print(f"\n=== Zapisywanie {len(high_conf)} NIP (wysoka pewność) ===") for r in high_conf: company = db.query(Company).filter(Company.id == r["company_id"]).first() if company: company.nip = r["found_nip"] print(f" ✓ {company.name} → NIP {r['found_nip']}") db.commit() print(f"\nZapisano {len(high_conf)} NIP do bazy") except Exception as e: db.rollback() print(f" [ERROR] Błąd zapisu: {e}") finally: db.close() return elif args.enrich: # Enrich companies with full CEIDG data stats = enrich_companies_from_ceidg(apply=args.apply) print(f"\n=== PODSUMOWANIE WZBOGACANIA ===") print(f"Przeszukano: {stats['searched']}") print(f"Znaleziono w CEIDG: {stats['found']}") print(f"Zaktualizowano: {stats['updated']}") print(f"Błędy: {stats['errors']}") if not args.apply: print("\n[UWAGA] Użyj --apply aby zapisać zmiany do bazy") return elif args.all: from database import SessionLocal, Company db = SessionLocal() try: # Get JDG companies (no KRS) jdg_companies = db.query(Company).filter( (Company.krs.is_(None)) | (Company.krs == ''), Company.nip.isnot(None), Company.nip != '' ).all() print(f"Znaleziono {len(jdg_companies)} firm JDG\n") success = 0 failed = 0 for i, company in enumerate(jdg_companies): print(f"[{i+1}/{len(jdg_companies)}] {company.name}") data = fetch_ceidg_data(company.nip) if data: results.append(data) success += 1 else: failed += 1 time.sleep(0.5) # Rate limiting print(f"\n=== PODSUMOWANIE ===") print(f"Pobrano: {success}") print(f"Błędy/brak danych: {failed}") finally: db.close() else: parser.print_help() return # Save to JSON cache if results: JSON_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) output_file = args.output or str(JSON_OUTPUT_DIR / f"ceidg_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") with open(output_file, 'w', encoding='utf-8') as f: json.dump([r.to_dict() for r in results], f, ensure_ascii=False, indent=2) print(f"\nDane zapisane do: {output_file}") # Import to database if requested if args.do_import and results: print("\n=== IMPORT DO BAZY ===") stats = import_to_database(results) print(f"\nZaimportowano: {stats['imported']}") print(f"Zaktualizowano: {stats['updated']}") print(f"Pominięto: {stats['skipped']}") print(f"Błędy: {stats['errors']}") if __name__ == "__main__": main()