- Add company logo display in search results cards - Make logo clickable (links to company profile) - Temporarily hide "Aktualności i wydarzenia" section on company profiles - Add scripts for KRS PDF download/parsing and CEIDG API Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
247 lines
8.9 KiB
Python
247 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CEIDG Data Downloader - pobiera dane JDG z portalu CEIDG
|
|
|
|
Używa Playwright do pobierania danych o jednoosobowych działalnościach
|
|
gospodarczych z oficjalnego portalu CEIDG (aplikacja.ceidg.gov.pl).
|
|
|
|
Dla JDG właściciel = firma, więc wyciągamy:
|
|
- Imię i nazwisko właściciela
|
|
- Status działalności
|
|
- Adres prowadzenia działalności
|
|
|
|
Usage:
|
|
python scripts/download_ceidg_data.py --nip 5881943861
|
|
python scripts/download_ceidg_data.py --all # wszystkie JDG z bazy
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, asdict
|
|
from typing import Optional
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
try:
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
|
|
except ImportError:
|
|
print("Playwright nie jest zainstalowany. Uruchom: pip install playwright && playwright install chromium")
|
|
sys.exit(1)
|
|
|
|
|
|
@dataclass
|
|
class CEIDGData:
|
|
"""Dane z CEIDG"""
|
|
nip: str
|
|
imiona: str = ""
|
|
nazwisko: str = ""
|
|
nazwa_firmy: str = ""
|
|
status: str = "" # AKTYWNY, ZAWIESZONY, WYKREŚLONY
|
|
adres: str = ""
|
|
data_rozpoczecia: str = ""
|
|
zrodlo: str = "ceidg.gov.pl"
|
|
pobrano: str = ""
|
|
|
|
def to_dict(self):
|
|
return asdict(self)
|
|
|
|
|
|
def fetch_ceidg_data(nip: str) -> Optional[CEIDGData]:
|
|
"""
|
|
Pobiera dane z CEIDG dla podanego NIP.
|
|
|
|
Returns:
|
|
CEIDGData lub None jeśli nie znaleziono
|
|
"""
|
|
print(f" [INFO] Pobieranie danych CEIDG dla NIP {nip}...")
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=True)
|
|
context = browser.new_context(
|
|
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
|
|
)
|
|
page = context.new_page()
|
|
|
|
try:
|
|
# Go to CEIDG search page
|
|
page.goto("https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx", timeout=30000)
|
|
time.sleep(3)
|
|
|
|
# Wait for page to load
|
|
page.wait_for_load_state("networkidle", timeout=15000)
|
|
|
|
# Find NIP input field
|
|
nip_input = page.locator("input[id*='NIP'], input[name*='nip']").first
|
|
if not nip_input.is_visible(timeout=5000):
|
|
# Try alternative - look for text inputs
|
|
nip_input = page.locator("input[type='text']").first
|
|
|
|
nip_input.fill(nip)
|
|
time.sleep(1)
|
|
|
|
# Click search button
|
|
search_btn = page.locator("input[type='submit'][value*='Szukaj'], button:has-text('Szukaj')").first
|
|
search_btn.click()
|
|
|
|
# Wait for results
|
|
time.sleep(5)
|
|
page.wait_for_load_state("networkidle", timeout=20000)
|
|
|
|
# Check if we have results
|
|
# Look for "Szczegóły" link or result row
|
|
details_link = page.locator("a:has-text('Szczegóły'), a[href*='SearchDetails']").first
|
|
|
|
if details_link.is_visible(timeout=5000):
|
|
details_link.click()
|
|
time.sleep(3)
|
|
page.wait_for_load_state("networkidle", timeout=15000)
|
|
|
|
# Extract data from details page
|
|
data = CEIDGData(nip=nip, pobrano=datetime.now().isoformat())
|
|
|
|
# Get page content
|
|
content = page.content()
|
|
|
|
# Try to extract data from the page
|
|
# Look for specific labels and their values
|
|
|
|
# Imię i Nazwisko
|
|
name_label = page.locator("span:has-text('Imię i nazwisko')").first
|
|
if name_label.is_visible(timeout=2000):
|
|
# Get the next sibling or parent's text
|
|
name_row = name_label.locator("xpath=ancestor::tr").first
|
|
if name_row.is_visible():
|
|
name_text = name_row.inner_text()
|
|
# Parse name from text
|
|
if "Imię i nazwisko" in name_text:
|
|
parts = name_text.split("Imię i nazwisko")
|
|
if len(parts) > 1:
|
|
full_name = parts[1].strip()
|
|
# Split into first/last name
|
|
name_parts = full_name.split()
|
|
if len(name_parts) >= 2:
|
|
data.nazwisko = name_parts[-1]
|
|
data.imiona = " ".join(name_parts[:-1])
|
|
|
|
# Nazwa firmy
|
|
firma_element = page.locator("td:has-text('Firma przedsiębiorcy')").first
|
|
if firma_element.is_visible(timeout=2000):
|
|
firma_row = firma_element.locator("xpath=following-sibling::td").first
|
|
if firma_row.is_visible():
|
|
data.nazwa_firmy = firma_row.inner_text().strip()
|
|
|
|
# Status
|
|
status_element = page.locator("td:has-text('Status')").first
|
|
if status_element.is_visible(timeout=2000):
|
|
status_value = status_element.locator("xpath=following-sibling::td").first
|
|
if status_value.is_visible():
|
|
data.status = status_value.inner_text().strip()
|
|
|
|
# If we didn't get structured data, try to get raw text
|
|
if not data.imiona and not data.nazwisko:
|
|
# Get all text from the page and parse
|
|
page_text = page.inner_text("body")
|
|
|
|
# Look for common patterns
|
|
import re
|
|
|
|
# Pattern: "Imię i nazwisko: JAN KOWALSKI"
|
|
name_match = re.search(r'Imię i nazwisko[:\s]+([A-ZĄĆĘŁŃÓŚŹŻ]+\s+[A-ZĄĆĘŁŃÓŚŹŻ]+)', page_text, re.IGNORECASE)
|
|
if name_match:
|
|
full_name = name_match.group(1).strip()
|
|
parts = full_name.split()
|
|
if len(parts) >= 2:
|
|
data.imiona = " ".join(parts[:-1])
|
|
data.nazwisko = parts[-1]
|
|
|
|
if data.imiona or data.nazwisko or data.nazwa_firmy:
|
|
print(f" [OK] Znaleziono: {data.imiona} {data.nazwisko}")
|
|
return data
|
|
else:
|
|
print(f" [WARN] Nie udało się wyciągnąć danych ze strony")
|
|
# Save screenshot for debugging
|
|
page.screenshot(path=f"/tmp/ceidg_debug_{nip}.png")
|
|
return None
|
|
else:
|
|
print(f" [ERROR] Nie znaleziono wpisu dla NIP {nip}")
|
|
return None
|
|
|
|
except PlaywrightTimeout as e:
|
|
print(f" [ERROR] Timeout dla NIP {nip}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
print(f" [ERROR] Błąd dla NIP {nip}: {e}")
|
|
return None
|
|
finally:
|
|
browser.close()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Download CEIDG data for JDG companies")
|
|
parser.add_argument("--nip", type=str, help="Single NIP to fetch")
|
|
parser.add_argument("--all", action="store_true", help="Fetch all JDG from database")
|
|
parser.add_argument("--output", type=str, help="Output JSON file")
|
|
args = parser.parse_args()
|
|
|
|
results = []
|
|
|
|
if args.nip:
|
|
data = fetch_ceidg_data(args.nip)
|
|
if data:
|
|
results.append(data.to_dict())
|
|
print(f"\n=== {data.imiona} {data.nazwisko} ===")
|
|
print(f" Firma: {data.nazwa_firmy}")
|
|
print(f" Status: {data.status}")
|
|
print(f" NIP: {data.nip}")
|
|
|
|
elif args.all:
|
|
# Load environment and import database
|
|
from dotenv import load_dotenv
|
|
load_dotenv(Path(__file__).parent.parent / '.env')
|
|
|
|
from database import SessionLocal, Company
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Get JDG companies (no KRS)
|
|
jdg_companies = db.query(Company).filter(
|
|
(Company.krs.is_(None)) | (Company.krs == ''),
|
|
Company.nip.isnot(None),
|
|
Company.nip != ''
|
|
).all()
|
|
|
|
print(f"Znaleziono {len(jdg_companies)} firm JDG\n")
|
|
|
|
for i, company in enumerate(jdg_companies):
|
|
print(f"[{i+1}/{len(jdg_companies)}] {company.name}")
|
|
data = fetch_ceidg_data(company.nip)
|
|
if data:
|
|
results.append(data.to_dict())
|
|
time.sleep(3) # Rate limiting
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
else:
|
|
parser.print_help()
|
|
return
|
|
|
|
# Save results
|
|
if args.output and results:
|
|
with open(args.output, 'w', encoding='utf-8') as f:
|
|
json.dump(results, f, ensure_ascii=False, indent=2)
|
|
print(f"\nWyniki zapisane do: {args.output}")
|
|
elif results:
|
|
print("\n=== JSON OUTPUT ===")
|
|
print(json.dumps(results, ensure_ascii=False, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|