#!/usr/bin/env python3 """ KRS PDF Downloader - pobiera odpisy pełne z portalu PRS Używa Playwright do automatycznego pobierania PDF z oficjalnego portalu Ministerstwa Sprawiedliwości (prs.ms.gov.pl). Pliki PDF zawierają PEŁNE dane (niezanonimizowane), w przeciwieństwie do API które zwraca dane zanonimizowane. Usage: python scripts/download_krs_pdf.py --krs 0000725183 python scripts/download_krs_pdf.py --all # wszystkie firmy z bazy """ import os import sys import argparse import time from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout except ImportError: print("Playwright nie jest zainstalowany. Uruchom: pip install playwright && playwright install chromium") sys.exit(1) # Output directory for PDFs PDF_OUTPUT_DIR = Path(__file__).parent.parent / "data" / "krs_pdfs" def download_krs_pdf(krs_number: str, output_dir: Path = PDF_OUTPUT_DIR) -> str: """ Download full KRS extract PDF from wyszukiwarka-krs.ms.gov.pl Args: krs_number: KRS number (with or without leading zeros) output_dir: Directory to save PDF Returns: Path to downloaded PDF file """ # Normalize KRS number krs = krs_number.zfill(10) # Create output directory output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / f"odpis_pelny_{krs}.pdf" # Skip if already downloaded if output_file.exists(): print(f" [SKIP] PDF już istnieje: {output_file}") return str(output_file) print(f" [INFO] Pobieranie odpisu pełnego dla KRS {krs}...") with sync_playwright() as p: browser = p.chromium.launch(headless=True) context = browser.new_context( accept_downloads=True, user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" ) page = context.new_page() try: # Go to KRS search page - wyszukiwarka-krs.ms.gov.pl page.goto("https://wyszukiwarka-krs.ms.gov.pl/", timeout=30000) time.sleep(3) # Wait for page to load page.wait_for_load_state("networkidle", timeout=15000) # Find visible text input (skip hidden checkbox inputs) # The KRS input is typically the first visible text input search_inputs = page.locator("input[type='text']:visible") search_input = search_inputs.first # Fill KRS number search_input.fill(krs) time.sleep(1) # Click search button search_btn = page.locator("button:has-text('Szukaj')").first search_btn.click() # Wait for results time.sleep(5) page.wait_for_load_state("networkidle", timeout=20000) # Click on "Wyświetl szczegóły" to see details details_btn = page.locator("button:has-text('Wyświetl szczegóły'), a:has-text('Wyświetl szczegóły')").first if details_btn.is_visible(timeout=5000): details_btn.click() time.sleep(3) page.wait_for_load_state("networkidle", timeout=15000) # Find PDF download buttons - look for "Pobierz PDF" # There are usually 2: "Informacja skrócona" and "Informacja pełna" # We want "Informacja pełna" (the second one) pdf_buttons = page.locator("button:has-text('Pobierz PDF')") if pdf_buttons.count() >= 2: # Click the second PDF button (Informacja pełna) with page.expect_download(timeout=30000) as download_info: pdf_buttons.nth(1).click() download = download_info.value download.save_as(str(output_file)) print(f" [OK] Zapisano: {output_file}") return str(output_file) elif pdf_buttons.count() == 1: # Only one button, use it with page.expect_download(timeout=30000) as download_info: pdf_buttons.first.click() download = download_info.value download.save_as(str(output_file)) print(f" [OK] Zapisano: {output_file}") return str(output_file) else: print(f" [ERROR] Nie znaleziono przycisku PDF dla KRS {krs}") page.screenshot(path=str(output_dir / f"debug_{krs}.png")) return None except PlaywrightTimeout as e: print(f" [ERROR] Timeout dla KRS {krs}: {e}") page.screenshot(path=str(output_dir / f"timeout_{krs}.png")) return None except Exception as e: print(f" [ERROR] Błąd dla KRS {krs}: {e}") page.screenshot(path=str(output_dir / f"error_{krs}.png")) return None finally: browser.close() def get_all_krs_numbers(): """Get all KRS numbers from database""" from database import SessionLocal, Company db = SessionLocal() try: companies = db.query(Company).filter( Company.status == 'active', Company.krs.isnot(None), Company.krs != '' ).all() return [(c.krs, c.name) for c in companies] finally: db.close() def main(): parser = argparse.ArgumentParser(description="Download KRS PDF extracts") parser.add_argument("--krs", type=str, help="Single KRS number to download") parser.add_argument("--all", action="store_true", help="Download all KRS from database") parser.add_argument("--output", type=str, default=str(PDF_OUTPUT_DIR), help="Output directory") args = parser.parse_args() output_dir = Path(args.output) if args.krs: # Download single KRS result = download_krs_pdf(args.krs, output_dir) if result: print(f"\nPobrano: {result}") else: print("\nBłąd pobierania") sys.exit(1) elif args.all: # Download all from database print("Pobieranie wszystkich firm z KRS z bazy danych...") companies = get_all_krs_numbers() print(f"Znaleziono {len(companies)} firm z numerem KRS\n") success = 0 failed = 0 for krs, name in companies: print(f"[{success + failed + 1}/{len(companies)}] {name}") result = download_krs_pdf(krs, output_dir) if result: success += 1 else: failed += 1 # Rate limiting - be nice to the server time.sleep(2) print(f"\n=== PODSUMOWANIE ===") print(f"Pobrano: {success}") print(f"Błędy: {failed}") print(f"Pliki zapisane w: {output_dir}") else: parser.print_help() if __name__ == "__main__": main()