#!/usr/bin/env python3 """ CEIDG Data Downloader - pobiera dane JDG z portalu CEIDG Używa Playwright do pobierania danych o jednoosobowych działalnościach gospodarczych z oficjalnego portalu CEIDG (aplikacja.ceidg.gov.pl). Dla JDG właściciel = firma, więc wyciągamy: - Imię i nazwisko właściciela - Status działalności - Adres prowadzenia działalności Usage: python scripts/download_ceidg_data.py --nip 5881943861 python scripts/download_ceidg_data.py --all # wszystkie JDG z bazy """ import os import sys import argparse import time import json from pathlib import Path from datetime import datetime from dataclasses import dataclass, asdict from typing import Optional # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout except ImportError: print("Playwright nie jest zainstalowany. Uruchom: pip install playwright && playwright install chromium") sys.exit(1) @dataclass class CEIDGData: """Dane z CEIDG""" nip: str imiona: str = "" nazwisko: str = "" nazwa_firmy: str = "" status: str = "" # AKTYWNY, ZAWIESZONY, WYKREŚLONY adres: str = "" data_rozpoczecia: str = "" zrodlo: str = "ceidg.gov.pl" pobrano: str = "" def to_dict(self): return asdict(self) def fetch_ceidg_data(nip: str) -> Optional[CEIDGData]: """ Pobiera dane z CEIDG dla podanego NIP. Returns: CEIDGData lub None jeśli nie znaleziono """ print(f" [INFO] Pobieranie danych CEIDG dla NIP {nip}...") with sync_playwright() as p: browser = p.chromium.launch(headless=True) context = browser.new_context( user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" ) page = context.new_page() try: # Go to CEIDG search page page.goto("https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx", timeout=30000) time.sleep(3) # Wait for page to load page.wait_for_load_state("networkidle", timeout=15000) # Find NIP input field nip_input = page.locator("input[id*='NIP'], input[name*='nip']").first if not nip_input.is_visible(timeout=5000): # Try alternative - look for text inputs nip_input = page.locator("input[type='text']").first nip_input.fill(nip) time.sleep(1) # Click search button search_btn = page.locator("input[type='submit'][value*='Szukaj'], button:has-text('Szukaj')").first search_btn.click() # Wait for results time.sleep(5) page.wait_for_load_state("networkidle", timeout=20000) # Check if we have results # Look for "Szczegóły" link or result row details_link = page.locator("a:has-text('Szczegóły'), a[href*='SearchDetails']").first if details_link.is_visible(timeout=5000): details_link.click() time.sleep(3) page.wait_for_load_state("networkidle", timeout=15000) # Extract data from details page data = CEIDGData(nip=nip, pobrano=datetime.now().isoformat()) # Get page content content = page.content() # Try to extract data from the page # Look for specific labels and their values # Imię i Nazwisko name_label = page.locator("span:has-text('Imię i nazwisko')").first if name_label.is_visible(timeout=2000): # Get the next sibling or parent's text name_row = name_label.locator("xpath=ancestor::tr").first if name_row.is_visible(): name_text = name_row.inner_text() # Parse name from text if "Imię i nazwisko" in name_text: parts = name_text.split("Imię i nazwisko") if len(parts) > 1: full_name = parts[1].strip() # Split into first/last name name_parts = full_name.split() if len(name_parts) >= 2: data.nazwisko = name_parts[-1] data.imiona = " ".join(name_parts[:-1]) # Nazwa firmy firma_element = page.locator("td:has-text('Firma przedsiębiorcy')").first if firma_element.is_visible(timeout=2000): firma_row = firma_element.locator("xpath=following-sibling::td").first if firma_row.is_visible(): data.nazwa_firmy = firma_row.inner_text().strip() # Status status_element = page.locator("td:has-text('Status')").first if status_element.is_visible(timeout=2000): status_value = status_element.locator("xpath=following-sibling::td").first if status_value.is_visible(): data.status = status_value.inner_text().strip() # If we didn't get structured data, try to get raw text if not data.imiona and not data.nazwisko: # Get all text from the page and parse page_text = page.inner_text("body") # Look for common patterns import re # Pattern: "Imię i nazwisko: JAN KOWALSKI" name_match = re.search(r'Imię i nazwisko[:\s]+([A-ZĄĆĘŁŃÓŚŹŻ]+\s+[A-ZĄĆĘŁŃÓŚŹŻ]+)', page_text, re.IGNORECASE) if name_match: full_name = name_match.group(1).strip() parts = full_name.split() if len(parts) >= 2: data.imiona = " ".join(parts[:-1]) data.nazwisko = parts[-1] if data.imiona or data.nazwisko or data.nazwa_firmy: print(f" [OK] Znaleziono: {data.imiona} {data.nazwisko}") return data else: print(f" [WARN] Nie udało się wyciągnąć danych ze strony") # Save screenshot for debugging page.screenshot(path=f"/tmp/ceidg_debug_{nip}.png") return None else: print(f" [ERROR] Nie znaleziono wpisu dla NIP {nip}") return None except PlaywrightTimeout as e: print(f" [ERROR] Timeout dla NIP {nip}: {e}") return None except Exception as e: print(f" [ERROR] Błąd dla NIP {nip}: {e}") return None finally: browser.close() def main(): parser = argparse.ArgumentParser(description="Download CEIDG data for JDG companies") parser.add_argument("--nip", type=str, help="Single NIP to fetch") parser.add_argument("--all", action="store_true", help="Fetch all JDG from database") parser.add_argument("--output", type=str, help="Output JSON file") args = parser.parse_args() results = [] if args.nip: data = fetch_ceidg_data(args.nip) if data: results.append(data.to_dict()) print(f"\n=== {data.imiona} {data.nazwisko} ===") print(f" Firma: {data.nazwa_firmy}") print(f" Status: {data.status}") print(f" NIP: {data.nip}") elif args.all: # Load environment and import database from dotenv import load_dotenv load_dotenv(Path(__file__).parent.parent / '.env') from database import SessionLocal, Company db = SessionLocal() try: # Get JDG companies (no KRS) jdg_companies = db.query(Company).filter( (Company.krs.is_(None)) | (Company.krs == ''), Company.nip.isnot(None), Company.nip != '' ).all() print(f"Znaleziono {len(jdg_companies)} firm JDG\n") for i, company in enumerate(jdg_companies): print(f"[{i+1}/{len(jdg_companies)}] {company.name}") data = fetch_ceidg_data(company.nip) if data: results.append(data.to_dict()) time.sleep(3) # Rate limiting finally: db.close() else: parser.print_help() return # Save results if args.output and results: with open(args.output, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) print(f"\nWyniki zapisane do: {args.output}") elif results: print("\n=== JSON OUTPUT ===") print(json.dumps(results, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()