From 3f9273cff68c1812108742ba4e5d51b12dda9bc5 Mon Sep 17 00:00:00 2001 From: Maciej Pienczyn Date: Sun, 11 Jan 2026 15:32:53 +0100 Subject: [PATCH] feat: Add company logos to search results, hide events section MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add company logo display in search results cards - Make logo clickable (links to company profile) - Temporarily hide "Aktualności i wydarzenia" section on company profiles - Add scripts for KRS PDF download/parsing and CEIDG API Co-Authored-By: Claude Opus 4.5 --- scripts/download_ceidg_data.py | 246 ++++++++++++++ scripts/download_krs_pdf.py | 201 +++++++++++ scripts/fetch_ceidg_api.py | 342 +++++++++++++++++++ scripts/import_krs_people.py | 292 ++++++++++++++++ scripts/parse_krs_pdf.py | 279 ++++++++++++++++ templates/company_detail.html | 3 +- templates/connections_map.html | 589 +++++++++++++++++++++++++++++++++ templates/search_results.html | 30 ++ 8 files changed, 1981 insertions(+), 1 deletion(-) create mode 100644 scripts/download_ceidg_data.py create mode 100644 scripts/download_krs_pdf.py create mode 100644 scripts/fetch_ceidg_api.py create mode 100644 scripts/import_krs_people.py create mode 100644 scripts/parse_krs_pdf.py create mode 100644 templates/connections_map.html diff --git a/scripts/download_ceidg_data.py b/scripts/download_ceidg_data.py new file mode 100644 index 0000000..c59e3f0 --- /dev/null +++ b/scripts/download_ceidg_data.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +""" +CEIDG Data Downloader - pobiera dane JDG z portalu CEIDG + +Używa Playwright do pobierania danych o jednoosobowych działalnościach +gospodarczych z oficjalnego portalu CEIDG (aplikacja.ceidg.gov.pl). + +Dla JDG właściciel = firma, więc wyciągamy: +- Imię i nazwisko właściciela +- Status działalności +- Adres prowadzenia działalności + +Usage: + python scripts/download_ceidg_data.py --nip 5881943861 + python scripts/download_ceidg_data.py --all # wszystkie JDG z bazy +""" + +import os +import sys +import argparse +import time +import json +from pathlib import Path +from datetime import datetime +from dataclasses import dataclass, asdict +from typing import Optional + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +try: + from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout +except ImportError: + print("Playwright nie jest zainstalowany. Uruchom: pip install playwright && playwright install chromium") + sys.exit(1) + + +@dataclass +class CEIDGData: + """Dane z CEIDG""" + nip: str + imiona: str = "" + nazwisko: str = "" + nazwa_firmy: str = "" + status: str = "" # AKTYWNY, ZAWIESZONY, WYKREŚLONY + adres: str = "" + data_rozpoczecia: str = "" + zrodlo: str = "ceidg.gov.pl" + pobrano: str = "" + + def to_dict(self): + return asdict(self) + + +def fetch_ceidg_data(nip: str) -> Optional[CEIDGData]: + """ + Pobiera dane z CEIDG dla podanego NIP. + + Returns: + CEIDGData lub None jeśli nie znaleziono + """ + print(f" [INFO] Pobieranie danych CEIDG dla NIP {nip}...") + + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + context = browser.new_context( + user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" + ) + page = context.new_page() + + try: + # Go to CEIDG search page + page.goto("https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx", timeout=30000) + time.sleep(3) + + # Wait for page to load + page.wait_for_load_state("networkidle", timeout=15000) + + # Find NIP input field + nip_input = page.locator("input[id*='NIP'], input[name*='nip']").first + if not nip_input.is_visible(timeout=5000): + # Try alternative - look for text inputs + nip_input = page.locator("input[type='text']").first + + nip_input.fill(nip) + time.sleep(1) + + # Click search button + search_btn = page.locator("input[type='submit'][value*='Szukaj'], button:has-text('Szukaj')").first + search_btn.click() + + # Wait for results + time.sleep(5) + page.wait_for_load_state("networkidle", timeout=20000) + + # Check if we have results + # Look for "Szczegóły" link or result row + details_link = page.locator("a:has-text('Szczegóły'), a[href*='SearchDetails']").first + + if details_link.is_visible(timeout=5000): + details_link.click() + time.sleep(3) + page.wait_for_load_state("networkidle", timeout=15000) + + # Extract data from details page + data = CEIDGData(nip=nip, pobrano=datetime.now().isoformat()) + + # Get page content + content = page.content() + + # Try to extract data from the page + # Look for specific labels and their values + + # Imię i Nazwisko + name_label = page.locator("span:has-text('Imię i nazwisko')").first + if name_label.is_visible(timeout=2000): + # Get the next sibling or parent's text + name_row = name_label.locator("xpath=ancestor::tr").first + if name_row.is_visible(): + name_text = name_row.inner_text() + # Parse name from text + if "Imię i nazwisko" in name_text: + parts = name_text.split("Imię i nazwisko") + if len(parts) > 1: + full_name = parts[1].strip() + # Split into first/last name + name_parts = full_name.split() + if len(name_parts) >= 2: + data.nazwisko = name_parts[-1] + data.imiona = " ".join(name_parts[:-1]) + + # Nazwa firmy + firma_element = page.locator("td:has-text('Firma przedsiębiorcy')").first + if firma_element.is_visible(timeout=2000): + firma_row = firma_element.locator("xpath=following-sibling::td").first + if firma_row.is_visible(): + data.nazwa_firmy = firma_row.inner_text().strip() + + # Status + status_element = page.locator("td:has-text('Status')").first + if status_element.is_visible(timeout=2000): + status_value = status_element.locator("xpath=following-sibling::td").first + if status_value.is_visible(): + data.status = status_value.inner_text().strip() + + # If we didn't get structured data, try to get raw text + if not data.imiona and not data.nazwisko: + # Get all text from the page and parse + page_text = page.inner_text("body") + + # Look for common patterns + import re + + # Pattern: "Imię i nazwisko: JAN KOWALSKI" + name_match = re.search(r'Imię i nazwisko[:\s]+([A-ZĄĆĘŁŃÓŚŹŻ]+\s+[A-ZĄĆĘŁŃÓŚŹŻ]+)', page_text, re.IGNORECASE) + if name_match: + full_name = name_match.group(1).strip() + parts = full_name.split() + if len(parts) >= 2: + data.imiona = " ".join(parts[:-1]) + data.nazwisko = parts[-1] + + if data.imiona or data.nazwisko or data.nazwa_firmy: + print(f" [OK] Znaleziono: {data.imiona} {data.nazwisko}") + return data + else: + print(f" [WARN] Nie udało się wyciągnąć danych ze strony") + # Save screenshot for debugging + page.screenshot(path=f"/tmp/ceidg_debug_{nip}.png") + return None + else: + print(f" [ERROR] Nie znaleziono wpisu dla NIP {nip}") + return None + + except PlaywrightTimeout as e: + print(f" [ERROR] Timeout dla NIP {nip}: {e}") + return None + except Exception as e: + print(f" [ERROR] Błąd dla NIP {nip}: {e}") + return None + finally: + browser.close() + + +def main(): + parser = argparse.ArgumentParser(description="Download CEIDG data for JDG companies") + parser.add_argument("--nip", type=str, help="Single NIP to fetch") + parser.add_argument("--all", action="store_true", help="Fetch all JDG from database") + parser.add_argument("--output", type=str, help="Output JSON file") + args = parser.parse_args() + + results = [] + + if args.nip: + data = fetch_ceidg_data(args.nip) + if data: + results.append(data.to_dict()) + print(f"\n=== {data.imiona} {data.nazwisko} ===") + print(f" Firma: {data.nazwa_firmy}") + print(f" Status: {data.status}") + print(f" NIP: {data.nip}") + + elif args.all: + # Load environment and import database + from dotenv import load_dotenv + load_dotenv(Path(__file__).parent.parent / '.env') + + from database import SessionLocal, Company + + db = SessionLocal() + try: + # Get JDG companies (no KRS) + jdg_companies = db.query(Company).filter( + (Company.krs.is_(None)) | (Company.krs == ''), + Company.nip.isnot(None), + Company.nip != '' + ).all() + + print(f"Znaleziono {len(jdg_companies)} firm JDG\n") + + for i, company in enumerate(jdg_companies): + print(f"[{i+1}/{len(jdg_companies)}] {company.name}") + data = fetch_ceidg_data(company.nip) + if data: + results.append(data.to_dict()) + time.sleep(3) # Rate limiting + + finally: + db.close() + + else: + parser.print_help() + return + + # Save results + if args.output and results: + with open(args.output, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2) + print(f"\nWyniki zapisane do: {args.output}") + elif results: + print("\n=== JSON OUTPUT ===") + print(json.dumps(results, ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/scripts/download_krs_pdf.py b/scripts/download_krs_pdf.py new file mode 100644 index 0000000..bbc9c1a --- /dev/null +++ b/scripts/download_krs_pdf.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python3 +""" +KRS PDF Downloader - pobiera odpisy pełne z portalu PRS + +Używa Playwright do automatycznego pobierania PDF z oficjalnego +portalu Ministerstwa Sprawiedliwości (prs.ms.gov.pl). + +Pliki PDF zawierają PEŁNE dane (niezanonimizowane), w przeciwieństwie +do API które zwraca dane zanonimizowane. + +Usage: + python scripts/download_krs_pdf.py --krs 0000725183 + python scripts/download_krs_pdf.py --all # wszystkie firmy z bazy +""" + +import os +import sys +import argparse +import time +from pathlib import Path + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +try: + from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout +except ImportError: + print("Playwright nie jest zainstalowany. Uruchom: pip install playwright && playwright install chromium") + sys.exit(1) + + +# Output directory for PDFs +PDF_OUTPUT_DIR = Path(__file__).parent.parent / "data" / "krs_pdfs" + + +def download_krs_pdf(krs_number: str, output_dir: Path = PDF_OUTPUT_DIR) -> str: + """ + Download full KRS extract PDF from wyszukiwarka-krs.ms.gov.pl + + Args: + krs_number: KRS number (with or without leading zeros) + output_dir: Directory to save PDF + + Returns: + Path to downloaded PDF file + """ + # Normalize KRS number + krs = krs_number.zfill(10) + + # Create output directory + output_dir.mkdir(parents=True, exist_ok=True) + output_file = output_dir / f"odpis_pelny_{krs}.pdf" + + # Skip if already downloaded + if output_file.exists(): + print(f" [SKIP] PDF już istnieje: {output_file}") + return str(output_file) + + print(f" [INFO] Pobieranie odpisu pełnego dla KRS {krs}...") + + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + context = browser.new_context( + accept_downloads=True, + user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" + ) + page = context.new_page() + + try: + # Go to KRS search page - wyszukiwarka-krs.ms.gov.pl + page.goto("https://wyszukiwarka-krs.ms.gov.pl/", timeout=30000) + time.sleep(3) + + # Wait for page to load + page.wait_for_load_state("networkidle", timeout=15000) + + # Find visible text input (skip hidden checkbox inputs) + # The KRS input is typically the first visible text input + search_inputs = page.locator("input[type='text']:visible") + search_input = search_inputs.first + + # Fill KRS number + search_input.fill(krs) + time.sleep(1) + + # Click search button + search_btn = page.locator("button:has-text('Szukaj')").first + search_btn.click() + + # Wait for results + time.sleep(5) + page.wait_for_load_state("networkidle", timeout=20000) + + # Click on "Wyświetl szczegóły" to see details + details_btn = page.locator("button:has-text('Wyświetl szczegóły'), a:has-text('Wyświetl szczegóły')").first + if details_btn.is_visible(timeout=5000): + details_btn.click() + time.sleep(3) + page.wait_for_load_state("networkidle", timeout=15000) + + # Find PDF download buttons - look for "Pobierz PDF" + # There are usually 2: "Informacja skrócona" and "Informacja pełna" + # We want "Informacja pełna" (the second one) + pdf_buttons = page.locator("button:has-text('Pobierz PDF')") + + if pdf_buttons.count() >= 2: + # Click the second PDF button (Informacja pełna) + with page.expect_download(timeout=30000) as download_info: + pdf_buttons.nth(1).click() + download = download_info.value + download.save_as(str(output_file)) + print(f" [OK] Zapisano: {output_file}") + return str(output_file) + elif pdf_buttons.count() == 1: + # Only one button, use it + with page.expect_download(timeout=30000) as download_info: + pdf_buttons.first.click() + download = download_info.value + download.save_as(str(output_file)) + print(f" [OK] Zapisano: {output_file}") + return str(output_file) + else: + print(f" [ERROR] Nie znaleziono przycisku PDF dla KRS {krs}") + page.screenshot(path=str(output_dir / f"debug_{krs}.png")) + return None + + except PlaywrightTimeout as e: + print(f" [ERROR] Timeout dla KRS {krs}: {e}") + page.screenshot(path=str(output_dir / f"timeout_{krs}.png")) + return None + except Exception as e: + print(f" [ERROR] Błąd dla KRS {krs}: {e}") + page.screenshot(path=str(output_dir / f"error_{krs}.png")) + return None + finally: + browser.close() + + +def get_all_krs_numbers(): + """Get all KRS numbers from database""" + from database import SessionLocal, Company + + db = SessionLocal() + try: + companies = db.query(Company).filter( + Company.status == 'active', + Company.krs.isnot(None), + Company.krs != '' + ).all() + return [(c.krs, c.name) for c in companies] + finally: + db.close() + + +def main(): + parser = argparse.ArgumentParser(description="Download KRS PDF extracts") + parser.add_argument("--krs", type=str, help="Single KRS number to download") + parser.add_argument("--all", action="store_true", help="Download all KRS from database") + parser.add_argument("--output", type=str, default=str(PDF_OUTPUT_DIR), help="Output directory") + args = parser.parse_args() + + output_dir = Path(args.output) + + if args.krs: + # Download single KRS + result = download_krs_pdf(args.krs, output_dir) + if result: + print(f"\nPobrano: {result}") + else: + print("\nBłąd pobierania") + sys.exit(1) + + elif args.all: + # Download all from database + print("Pobieranie wszystkich firm z KRS z bazy danych...") + companies = get_all_krs_numbers() + print(f"Znaleziono {len(companies)} firm z numerem KRS\n") + + success = 0 + failed = 0 + + for krs, name in companies: + print(f"[{success + failed + 1}/{len(companies)}] {name}") + result = download_krs_pdf(krs, output_dir) + if result: + success += 1 + else: + failed += 1 + # Rate limiting - be nice to the server + time.sleep(2) + + print(f"\n=== PODSUMOWANIE ===") + print(f"Pobrano: {success}") + print(f"Błędy: {failed}") + print(f"Pliki zapisane w: {output_dir}") + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/scripts/fetch_ceidg_api.py b/scripts/fetch_ceidg_api.py new file mode 100644 index 0000000..db6ed70 --- /dev/null +++ b/scripts/fetch_ceidg_api.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python3 +""" +CEIDG API v3 Client - pobiera dane właścicieli JDG + +Używa oficjalnego API CEIDG v3 (dane.biznes.gov.pl) do pobierania +danych o jednoosobowych działalnościach gospodarczych. + +Usage: + python scripts/fetch_ceidg_api.py --nip 5881571773 + python scripts/fetch_ceidg_api.py --all # wszystkie JDG z bazy + python scripts/fetch_ceidg_api.py --all --import # pobierz i importuj do bazy +""" + +import os +import sys +import argparse +import json +import time +from pathlib import Path +from datetime import datetime +from dataclasses import dataclass, asdict +from typing import Optional, List +import requests + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +# Load environment +from dotenv import load_dotenv +load_dotenv(Path(__file__).parent.parent / '.env') + +# API Configuration +CEIDG_API_URL = "https://dane.biznes.gov.pl/api/ceidg/v3/firma" +CEIDG_API_KEY = os.getenv("CEIDG_API_KEY") + +# Output directory for JSON cache +JSON_OUTPUT_DIR = Path(__file__).parent.parent / "data" / "ceidg_json" + + +@dataclass +class CEIDGOwner: + """Dane właściciela JDG z CEIDG""" + imie: str + nazwisko: str + nip: str + regon: str = "" + + def to_dict(self): + return asdict(self) + + +@dataclass +class CEIDGData: + """Dane firmy z CEIDG API v3""" + id: str + nazwa: str + nip: str + regon: str = "" + wlasciciel: Optional[CEIDGOwner] = None + adres_miasto: str = "" + adres_ulica: str = "" + adres_kod: str = "" + pkd_glowny: str = "" + pkd_opis: str = "" + data_rozpoczecia: str = "" + status: str = "" + zrodlo: str = "dane.biznes.gov.pl" + pobrano: str = "" + + def to_dict(self): + d = asdict(self) + if self.wlasciciel: + d['wlasciciel'] = self.wlasciciel.to_dict() + return d + + +def fetch_ceidg_data(nip: str) -> Optional[CEIDGData]: + """ + Pobiera dane z CEIDG API v3 dla podanego NIP. + + Returns: + CEIDGData lub None jeśli nie znaleziono + """ + if not CEIDG_API_KEY: + print(" [ERROR] Brak CEIDG_API_KEY w .env") + return None + + print(f" [INFO] Pobieranie danych CEIDG dla NIP {nip}...") + + headers = { + "Authorization": f"Bearer {CEIDG_API_KEY}", + "Accept": "application/json" + } + + try: + response = requests.get( + CEIDG_API_URL, + params={"nip": nip}, + headers=headers, + timeout=30 + ) + + if response.status_code == 204: + print(f" [WARN] Brak danych w CEIDG dla NIP {nip}") + return None + + if response.status_code == 401: + print(f" [ERROR] Błąd autoryzacji - sprawdź CEIDG_API_KEY") + return None + + if response.status_code != 200: + print(f" [ERROR] HTTP {response.status_code}: {response.text[:100]}") + return None + + data = response.json() + + if "firma" not in data or not data["firma"]: + print(f" [WARN] Brak danych firmy w odpowiedzi") + return None + + firma = data["firma"][0] + + # Parse owner data + owner = None + if "wlasciciel" in firma: + w = firma["wlasciciel"] + owner = CEIDGOwner( + imie=w.get("imie", ""), + nazwisko=w.get("nazwisko", ""), + nip=w.get("nip", nip), + regon=w.get("regon", "") + ) + + # Parse address + adres = firma.get("adresDzialalnosci", {}) + adres_ulica = "" + if adres.get("ulica"): + adres_ulica = adres.get("ulica", "") + if adres.get("budynek"): + adres_ulica += f" {adres.get('budynek')}" + if adres.get("lokal"): + adres_ulica += f"/{adres.get('lokal')}" + + # Parse PKD + pkd_glowny = firma.get("pkdGlowny", {}) + + ceidg_data = CEIDGData( + id=firma.get("id", ""), + nazwa=firma.get("nazwa", ""), + nip=nip, + regon=owner.regon if owner else "", + wlasciciel=owner, + adres_miasto=adres.get("miasto", ""), + adres_ulica=adres_ulica, + adres_kod=adres.get("kod", ""), + pkd_glowny=pkd_glowny.get("kod", ""), + pkd_opis=pkd_glowny.get("nazwa", ""), + data_rozpoczecia=firma.get("dataRozpoczecia", ""), + status=firma.get("status", ""), + pobrano=datetime.now().isoformat() + ) + + if owner: + print(f" [OK] {owner.imie} {owner.nazwisko} ({ceidg_data.status})") + else: + print(f" [OK] {ceidg_data.nazwa} ({ceidg_data.status})") + + return ceidg_data + + except requests.RequestException as e: + print(f" [ERROR] Błąd połączenia: {e}") + return None + except json.JSONDecodeError as e: + print(f" [ERROR] Błąd parsowania JSON: {e}") + return None + + +def import_to_database(results: List[CEIDGData]) -> dict: + """ + Importuje dane właścicieli JDG do bazy danych. + + Returns: + dict z podsumowaniem importu + """ + from database import SessionLocal, Company, Person, CompanyPerson + + db = SessionLocal() + stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0} + + try: + for data in results: + if not data.wlasciciel: + stats["skipped"] += 1 + continue + + owner = data.wlasciciel + + # Find company by NIP + company = db.query(Company).filter(Company.nip == data.nip).first() + if not company: + print(f" [SKIP] Firma z NIP {data.nip} nie istnieje w bazie") + stats["skipped"] += 1 + continue + + # Find or create person (by name since JDG owners don't have PESEL in API) + person = db.query(Person).filter( + Person.nazwisko == owner.nazwisko, + Person.imiona == owner.imie + ).first() + + if not person: + person = Person( + imiona=owner.imie, + nazwisko=owner.nazwisko, + pesel=None # CEIDG API doesn't return PESEL + ) + db.add(person) + db.flush() + print(f" [NEW] Utworzono osobę: {owner.imie} {owner.nazwisko}") + + # Check if relationship already exists + existing = db.query(CompanyPerson).filter( + CompanyPerson.company_id == company.id, + CompanyPerson.person_id == person.id, + CompanyPerson.role_category == "wlasciciel_jdg" + ).first() + + if existing: + # Update source if needed + if existing.source != "dane.biznes.gov.pl": + existing.source = "dane.biznes.gov.pl" + existing.fetched_at = datetime.now() + stats["updated"] += 1 + else: + stats["skipped"] += 1 + else: + # Create new relationship + company_person = CompanyPerson( + company_id=company.id, + person_id=person.id, + role="WŁAŚCICIEL", + role_category="wlasciciel_jdg", + source="dane.biznes.gov.pl", + fetched_at=datetime.now() + ) + db.add(company_person) + stats["imported"] += 1 + print(f" [ADD] {owner.imie} {owner.nazwisko} → {company.name}") + + db.commit() + + except Exception as e: + db.rollback() + print(f" [ERROR] Błąd importu: {e}") + stats["errors"] += 1 + finally: + db.close() + + return stats + + +def main(): + parser = argparse.ArgumentParser(description="Fetch JDG owner data from CEIDG API v3") + parser.add_argument("--nip", type=str, help="Single NIP to fetch") + parser.add_argument("--all", action="store_true", help="Fetch all JDG from database") + parser.add_argument("--import", dest="do_import", action="store_true", + help="Import fetched data to database") + parser.add_argument("--output", type=str, help="Output JSON file") + args = parser.parse_args() + + results = [] + + if args.nip: + data = fetch_ceidg_data(args.nip) + if data: + results.append(data) + print(f"\n=== {data.nazwa} ===") + if data.wlasciciel: + print(f" Właściciel: {data.wlasciciel.imie} {data.wlasciciel.nazwisko}") + print(f" Status: {data.status}") + print(f" PKD: {data.pkd_glowny} - {data.pkd_opis}") + print(f" Adres: {data.adres_ulica}, {data.adres_kod} {data.adres_miasto}") + + elif args.all: + from database import SessionLocal, Company + + db = SessionLocal() + try: + # Get JDG companies (no KRS) + jdg_companies = db.query(Company).filter( + (Company.krs.is_(None)) | (Company.krs == ''), + Company.nip.isnot(None), + Company.nip != '' + ).all() + + print(f"Znaleziono {len(jdg_companies)} firm JDG\n") + + success = 0 + failed = 0 + + for i, company in enumerate(jdg_companies): + print(f"[{i+1}/{len(jdg_companies)}] {company.name}") + data = fetch_ceidg_data(company.nip) + if data: + results.append(data) + success += 1 + else: + failed += 1 + time.sleep(0.5) # Rate limiting + + print(f"\n=== PODSUMOWANIE ===") + print(f"Pobrano: {success}") + print(f"Błędy/brak danych: {failed}") + + finally: + db.close() + + else: + parser.print_help() + return + + # Save to JSON cache + if results: + JSON_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + output_file = args.output or str(JSON_OUTPUT_DIR / f"ceidg_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") + + with open(output_file, 'w', encoding='utf-8') as f: + json.dump([r.to_dict() for r in results], f, ensure_ascii=False, indent=2) + print(f"\nDane zapisane do: {output_file}") + + # Import to database if requested + if args.do_import and results: + print("\n=== IMPORT DO BAZY ===") + stats = import_to_database(results) + print(f"\nZaimportowano: {stats['imported']}") + print(f"Zaktualizowano: {stats['updated']}") + print(f"Pominięto: {stats['skipped']}") + print(f"Błędy: {stats['errors']}") + + +if __name__ == "__main__": + main() diff --git a/scripts/import_krs_people.py b/scripts/import_krs_people.py new file mode 100644 index 0000000..bd9e385 --- /dev/null +++ b/scripts/import_krs_people.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 +""" +Import danych osób z odpisu KRS do bazy danych. + +Używa parse_krs_pdf.py do wyciągania danych z PDF i importuje je do tabel: +- people: osoby (zarząd, wspólnicy, prokurenci) +- company_people: relacje osoba-firma + +Usage: + python scripts/import_krs_people.py --file /path/to/odpis.pdf --company-id 26 + python scripts/import_krs_people.py --dir /path/to/pdfs/ +""" + +import os +import sys +import argparse +from pathlib import Path +from datetime import datetime + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +# Load environment variables +from dotenv import load_dotenv +load_dotenv(Path(__file__).parent.parent / '.env') + +from database import SessionLocal, Company, Person, CompanyPerson +from parse_krs_pdf import parse_krs_pdf, KRSData + + +def get_or_create_person(db, nazwisko: str, imiona: str, pesel: str = None) -> Person: + """ + Znajdź istniejącą osobę lub utwórz nową. + Jeśli PESEL podany, szuka po PESEL (unikalne). + W przeciwnym razie szuka po nazwisku i imionach. + """ + if pesel: + person = db.query(Person).filter(Person.pesel == pesel).first() + if person: + return person + + # Szukaj po nazwisku i imionach (jeśli brak PESEL lub nie znaleziono) + person = db.query(Person).filter( + Person.nazwisko == nazwisko, + Person.imiona == imiona + ).first() + + if person: + # Jeśli znaleziono osobę bez PESEL, a teraz mamy PESEL - aktualizuj + if pesel and not person.pesel: + person.pesel = pesel + db.flush() + return person + + # Utwórz nową osobę + person = Person( + nazwisko=nazwisko, + imiona=imiona, + pesel=pesel + ) + db.add(person) + db.flush() # Aby uzyskać ID + return person + + +def find_company_by_krs(db, krs: str) -> Company: + """Znajdź firmę po numerze KRS.""" + return db.query(Company).filter(Company.krs == krs).first() + + +def find_company_by_nip(db, nip: str) -> Company: + """Znajdź firmę po numerze NIP.""" + return db.query(Company).filter(Company.nip == nip).first() + + +def import_krs_data(db, krs_data: KRSData, company: Company, pdf_filename: str) -> dict: + """ + Importuje dane z odpisu KRS do bazy danych. + + Returns: + dict z podsumowaniem importu + """ + stats = { + 'zarzad_added': 0, + 'wspolnicy_added': 0, + 'prokurenci_added': 0, + 'people_created': 0, + 'people_updated': 0, + 'skipped': 0 + } + + now = datetime.now() + + # Import zarządu + for p in krs_data.zarzad: + person = get_or_create_person(db, p.nazwisko, p.imiona, p.pesel) + + # Sprawdź czy relacja już istnieje + existing = db.query(CompanyPerson).filter( + CompanyPerson.company_id == company.id, + CompanyPerson.person_id == person.id, + CompanyPerson.role_category == 'zarzad', + CompanyPerson.role == p.rola + ).first() + + if not existing: + cp = CompanyPerson( + company_id=company.id, + person_id=person.id, + role=p.rola or 'CZŁONEK ZARZĄDU', + role_category='zarzad', + source='ekrs.ms.gov.pl', + source_document=pdf_filename, + fetched_at=now + ) + db.add(cp) + stats['zarzad_added'] += 1 + else: + stats['skipped'] += 1 + + # Import wspólników + for p in krs_data.wspolnicy: + person = get_or_create_person(db, p.nazwisko, p.imiona, p.pesel) + + existing = db.query(CompanyPerson).filter( + CompanyPerson.company_id == company.id, + CompanyPerson.person_id == person.id, + CompanyPerson.role_category == 'wspolnik' + ).first() + + if not existing: + cp = CompanyPerson( + company_id=company.id, + person_id=person.id, + role='WSPÓLNIK', + role_category='wspolnik', + source='ekrs.ms.gov.pl', + source_document=pdf_filename, + fetched_at=now + ) + db.add(cp) + stats['wspolnicy_added'] += 1 + else: + stats['skipped'] += 1 + + # Import prokurentów + for p in krs_data.prokurenci: + person = get_or_create_person(db, p.nazwisko, p.imiona, p.pesel) + + existing = db.query(CompanyPerson).filter( + CompanyPerson.company_id == company.id, + CompanyPerson.person_id == person.id, + CompanyPerson.role_category == 'prokurent' + ).first() + + if not existing: + cp = CompanyPerson( + company_id=company.id, + person_id=person.id, + role='PROKURENT', + role_category='prokurent', + source='ekrs.ms.gov.pl', + source_document=pdf_filename, + fetched_at=now + ) + db.add(cp) + stats['prokurenci_added'] += 1 + else: + stats['skipped'] += 1 + + return stats + + +def import_from_file(pdf_path: str, company_id: int = None, dry_run: bool = False): + """ + Importuje dane z pojedynczego pliku PDF. + """ + print(f"\n{'='*60}") + print(f"Przetwarzanie: {pdf_path}") + print('='*60) + + # Parsuj PDF + try: + krs_data = parse_krs_pdf(pdf_path) + except Exception as e: + print(f" [ERROR] Błąd parsowania: {e}") + return None + + print(f" Nazwa: {krs_data.nazwa}") + print(f" KRS: {krs_data.krs}") + print(f" NIP: {krs_data.nip}") + print(f" Zarząd: {len(krs_data.zarzad)} osób") + print(f" Wspólnicy: {len(krs_data.wspolnicy)} osób") + print(f" Prokurenci: {len(krs_data.prokurenci)} osób") + + if dry_run: + print(" [DRY-RUN] Pomijam zapis do bazy") + return krs_data + + db = SessionLocal() + try: + # Znajdź firmę w bazie + company = None + + if company_id: + company = db.query(Company).filter(Company.id == company_id).first() + if not company: + print(f" [ERROR] Firma o ID {company_id} nie istnieje") + return None + elif krs_data.krs: + company = find_company_by_krs(db, krs_data.krs) + + if not company and krs_data.nip: + company = find_company_by_nip(db, krs_data.nip) + + if not company: + print(f" [ERROR] Nie znaleziono firmy w bazie (KRS: {krs_data.krs}, NIP: {krs_data.nip})") + return None + + print(f" Firma w bazie: {company.name} (ID: {company.id})") + + # Import danych + pdf_filename = Path(pdf_path).name + stats = import_krs_data(db, krs_data, company, pdf_filename) + + db.commit() + + print(f"\n [OK] Import zakończony:") + print(f" Zarząd: +{stats['zarzad_added']}") + print(f" Wspólnicy: +{stats['wspolnicy_added']}") + print(f" Prokurenci: +{stats['prokurenci_added']}") + print(f" Pominięto (duplikaty): {stats['skipped']}") + + return krs_data + + except Exception as e: + db.rollback() + print(f" [ERROR] Błąd importu: {e}") + raise + finally: + db.close() + + +def import_from_directory(dir_path: str, dry_run: bool = False): + """ + Importuje dane ze wszystkich PDF w katalogu. + """ + pdf_dir = Path(dir_path) + pdf_files = sorted(pdf_dir.glob("odpis_*.pdf")) + + print(f"Znaleziono {len(pdf_files)} plików PDF") + + success = 0 + errors = 0 + + for pdf_file in pdf_files: + try: + result = import_from_file(str(pdf_file), dry_run=dry_run) + if result: + success += 1 + else: + errors += 1 + except Exception as e: + print(f" [ERROR] {e}") + errors += 1 + + print(f"\n{'='*60}") + print("PODSUMOWANIE") + print('='*60) + print(f" Sukces: {success}") + print(f" Błędy: {errors}") + print(f" Łącznie: {len(pdf_files)}") + + +def main(): + parser = argparse.ArgumentParser(description="Import KRS people data to database") + parser.add_argument("--file", type=str, help="Single PDF file to import") + parser.add_argument("--dir", type=str, help="Directory with PDF files") + parser.add_argument("--company-id", type=int, help="Force company ID (for --file only)") + parser.add_argument("--dry-run", action="store_true", help="Parse only, don't save to database") + args = parser.parse_args() + + if args.file: + import_from_file(args.file, company_id=args.company_id, dry_run=args.dry_run) + elif args.dir: + import_from_directory(args.dir, dry_run=args.dry_run) + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/scripts/parse_krs_pdf.py b/scripts/parse_krs_pdf.py new file mode 100644 index 0000000..b4bedc9 --- /dev/null +++ b/scripts/parse_krs_pdf.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +""" +KRS PDF Parser - wyciąga dane zarządu i wspólników z odpisu KRS + +Parsuje odpisy pełne pobrane z ekrs.ms.gov.pl i wyciąga: +- Członków zarządu (funkcja, imię, nazwisko, PESEL) +- Wspólników (imię, nazwisko, PESEL, udziały) +- Prokurentów + +Usage: + python scripts/parse_krs_pdf.py --file /path/to/odpis.pdf + python scripts/parse_krs_pdf.py --dir /path/to/pdfs/ +""" + +import re +import json +import argparse +from pathlib import Path +from dataclasses import dataclass, asdict +from typing import List, Optional, Dict, Any + +try: + import pdfplumber +except ImportError: + print("Wymagana biblioteka pdfplumber. Zainstaluj: pip install pdfplumber") + exit(1) + + +@dataclass +class Person: + """Osoba powiązana z firmą""" + nazwisko: str + imiona: str + pesel: Optional[str] = None + rola: str = "" # PREZES ZARZĄDU, CZŁONEK ZARZĄDU, WSPÓLNIK, PROKURENT + udzialy: Optional[str] = None # dla wspólników + + def full_name(self) -> str: + return f"{self.imiona} {self.nazwisko}" + + +@dataclass +class KRSData: + """Dane wyciągnięte z odpisu KRS""" + krs: str + nazwa: str + nip: Optional[str] = None + regon: Optional[str] = None + zarzad: List[Person] = None + wspolnicy: List[Person] = None + prokurenci: List[Person] = None + zrodlo: str = "ekrs.ms.gov.pl" + + def __post_init__(self): + if self.zarzad is None: + self.zarzad = [] + if self.wspolnicy is None: + self.wspolnicy = [] + if self.prokurenci is None: + self.prokurenci = [] + + def to_dict(self) -> Dict[str, Any]: + return { + 'krs': self.krs, + 'nazwa': self.nazwa, + 'nip': self.nip, + 'regon': self.regon, + 'zarzad': [asdict(p) for p in self.zarzad], + 'wspolnicy': [asdict(p) for p in self.wspolnicy], + 'prokurenci': [asdict(p) for p in self.prokurenci], + 'zrodlo': self.zrodlo + } + + +def extract_text_from_pdf(pdf_path: str) -> str: + """Wyciąga tekst z PDF""" + with pdfplumber.open(pdf_path) as pdf: + text = "" + for page in pdf.pages: + page_text = page.extract_text() + if page_text: + text += page_text + "\n" + return text + + +def parse_person_block(lines: List[str], start_idx: int) -> Optional[Person]: + """ + Parsuje blok danych osoby z linii PDF + + Format w PDF: + 1.Nazwisko / Nazwa lub firma 1 - NAZWISKO + 2.Imiona 1 - IMIĘ DRUGIE_IMIĘ + 3.Numer PESEL/REGON lub data 1 - 12345678901, ------ + """ + person = Person(nazwisko="", imiona="") + found_nazwisko = False + found_imiona = False + found_pesel = False + + for i in range(start_idx, min(start_idx + 10, len(lines))): + line = lines[i].strip() + + # Wykryj początek następnej osoby - przestań parsować + if i > start_idx and ('1.Nazwisko' in line or 'Nazwisko / Nazwa' in line): + # Początek nowej osoby - koniec bloku + break + + # Nazwisko (tylko pierwsze znalezione) + if not found_nazwisko and 'Nazwisko' in line and ' - ' in line: + match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ\-]+)$', line) + if match: + person.nazwisko = match.group(1) + found_nazwisko = True + + # Imiona (tylko pierwsze znalezione) + if not found_imiona and 'Imiona' in line and ' - ' in line: + match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ ]+)$', line) + if match: + person.imiona = match.group(1).strip() + found_imiona = True + + # PESEL (tylko pierwsze znalezione) + if not found_pesel and 'PESEL' in line and ' - ' in line: + match = re.search(r' - (\d{11})', line) + if match: + person.pesel = match.group(1) + found_pesel = True + + # Funkcja (dla zarządu) + if 'Funkcja' in line and ' - ' in line: + match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ ]+)$', line) + if match: + person.rola = match.group(1).strip() + + if person.nazwisko and person.imiona: + return person + return None + + +def parse_krs_pdf(pdf_path: str) -> KRSData: + """ + Parsuje odpis KRS i wyciąga dane + """ + text = extract_text_from_pdf(pdf_path) + lines = text.split('\n') + + # Extract basic info + krs_match = re.search(r'Numer KRS:\s*(\d{10})', text) + krs = krs_match.group(1) if krs_match else "" + + # Find company name - format: "3.Firma, pod którą spółka działa 1 - NAZWA FIRMY" + nazwa = "" + nazwa_match = re.search(r'3\.Firma,?\s+pod którą spółka działa\s+\d+\s+-\s+([^\n]+)', text) + if nazwa_match: + nazwa = nazwa_match.group(1).strip() + + # NIP and REGON - format: "REGON: 369796786, NIP: 5862329746" + nip_match = re.search(r'NIP:\s*(\d{10})', text) + nip = nip_match.group(1) if nip_match else None + + regon_match = re.search(r'REGON:\s*(\d{9,14})', text) + regon = regon_match.group(1) if regon_match else None + + data = KRSData(krs=krs, nazwa=nazwa, nip=nip, regon=regon) + + # Parse sections + in_zarzad = False + in_wspolnicy = False + in_prokurenci = False + + for i, line in enumerate(lines): + line_stripped = line.strip() + + # Detect sections + if 'ZARZĄD' in line_stripped.upper() and 'Nazwa organu' in line_stripped: + in_zarzad = True + in_wspolnicy = False + in_prokurenci = False + continue + + # Wspólnicy - szukaj "Dane wspólników" żeby nie łapać "Wspólnik może mieć:" + if 'Dane wspólników' in line_stripped or 'WSPÓLNICY' in line_stripped.upper(): + in_wspolnicy = True + in_zarzad = False + in_prokurenci = False + + if 'PROKURENCI' in line_stripped.upper() or 'Prokurent' in line_stripped: + in_prokurenci = True + in_zarzad = False + in_wspolnicy = False + + # Parse person data when we find "Nazwisko" + if '1.Nazwisko' in line_stripped or 'Nazwisko / Nazwa' in line_stripped: + person = parse_person_block(lines, i) + if person: + if in_zarzad: + # Look for function in nearby lines + for j in range(i, min(i + 8, len(lines))): + if 'Funkcja' in lines[j] and ' - ' in lines[j]: + func_match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ ]+)$', lines[j]) + if func_match: + person.rola = func_match.group(1).strip() + break + if not person.rola: + person.rola = "CZŁONEK ZARZĄDU" + data.zarzad.append(person) + elif in_wspolnicy: + person.rola = "WSPÓLNIK" + # Look for share info + for j in range(i, min(i + 10, len(lines))): + if 'udziałów' in lines[j].lower() or 'udział' in lines[j].lower(): + data.wspolnicy.append(person) + break + else: + data.wspolnicy.append(person) + elif in_prokurenci: + person.rola = "PROKURENT" + data.prokurenci.append(person) + + return data + + +def main(): + parser = argparse.ArgumentParser(description="Parse KRS PDF files") + parser.add_argument("--file", type=str, help="Single PDF file to parse") + parser.add_argument("--dir", type=str, help="Directory with PDF files") + parser.add_argument("--output", type=str, help="Output JSON file") + args = parser.parse_args() + + results = [] + + if args.file: + print(f"Parsing: {args.file}") + data = parse_krs_pdf(args.file) + results.append(data.to_dict()) + + # Print summary + print(f"\n=== {data.nazwa} (KRS: {data.krs}) ===") + print(f"NIP: {data.nip}, REGON: {data.regon}") + + print(f"\nZarząd ({len(data.zarzad)} osób):") + for p in data.zarzad: + print(f" - {p.full_name()} - {p.rola}") + + print(f"\nWspólnicy ({len(data.wspolnicy)} osób):") + for p in data.wspolnicy: + print(f" - {p.full_name()}") + + if data.prokurenci: + print(f"\nProkurenci ({len(data.prokurenci)} osób):") + for p in data.prokurenci: + print(f" - {p.full_name()}") + + elif args.dir: + pdf_dir = Path(args.dir) + pdf_files = list(pdf_dir.glob("*.pdf")) + print(f"Found {len(pdf_files)} PDF files") + + for pdf_file in pdf_files: + print(f"Parsing: {pdf_file.name}...") + try: + data = parse_krs_pdf(str(pdf_file)) + results.append(data.to_dict()) + print(f" OK: {data.nazwa}") + except Exception as e: + print(f" ERROR: {e}") + + # Save results + if args.output and results: + with open(args.output, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2) + print(f"\nResults saved to: {args.output}") + elif results: + print("\n=== JSON OUTPUT ===") + print(json.dumps(results, ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/templates/company_detail.html b/templates/company_detail.html index 1fb6dd0..137daed 100755 --- a/templates/company_detail.html +++ b/templates/company_detail.html @@ -2261,7 +2261,7 @@ {% endif %} - +{# Company Events - UKRYTE (2026-01-11) - do przywrócenia w przyszłości {% if events %}

@@ -2320,6 +2320,7 @@ {% endfor %}

{% endif %} +#}