feat: Extend CEIDG data storage and enrichment

- Add new Company fields: ceidg_id, ceidg_status, pkd_codes (JSONB),
  correspondence address, owner_citizenships, ceidg_raw_data
- Add enrich_companies_from_ceidg() to fetch full CEIDG details
- Add fetch_full_ceidg_details() for detailed API calls
- Add update_company_from_ceidg() to save all CEIDG fields
- Add --enrich and --apply flags for batch enrichment
- Add migration 036_ceidg_extended_data.sql

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-02-01 07:21:21 +01:00
parent 79a1a60621
commit 9f2b261df2
3 changed files with 328 additions and 1 deletions

View File

@ -666,9 +666,15 @@ class Company(Base):
branch_count = Column(Integer)
employee_count_range = Column(String(50))
# === CEIDG DATA (API dane.biznes.gov.pl) ===
# Podstawowe dane CEIDG
ceidg_id = Column(String(50)) # GUID firmy w CEIDG
ceidg_status = Column(String(50)) # AKTYWNY, ZAWIESZONY, WYKREŚLONY
# PKD (kod działalności gospodarczej) - z CEIDG
pkd_code = Column(String(10)) # np. "6201Z"
pkd_code = Column(String(10)) # np. "6201Z" (główny PKD)
pkd_description = Column(Text) # np. "Działalność związana z oprogramowaniem"
pkd_codes = Column(PG_JSONB, default=[]) # Wszystkie PKD jako [{kod, nazwa}]
# Data rozpoczęcia działalności - z CEIDG
business_start_date = Column(Date) # np. 2021-02-10
@ -676,6 +682,16 @@ class Company(Base):
# Właściciel JDG - z CEIDG (tylko dla jednoosobowych działalności)
owner_first_name = Column(String(100))
owner_last_name = Column(String(100))
owner_citizenships = Column(PG_JSONB, default=[]) # [{symbol, kraj}]
# Adres korespondencyjny (z CEIDG)
correspondence_street = Column(String(255))
correspondence_city = Column(String(100))
correspondence_postal = Column(String(10))
# Surowe dane z CEIDG API
ceidg_raw_data = Column(PG_JSONB)
ceidg_fetched_at = Column(DateTime)
# Data source tracking
data_source = Column(String(100))

View File

@ -0,0 +1,40 @@
-- ============================================================
-- 036_ceidg_extended_data.sql
-- Rozszerzone dane z CEIDG API
-- ============================================================
-- CEIDG ID (GUID z rejestru)
ALTER TABLE companies ADD COLUMN IF NOT EXISTS ceidg_id VARCHAR(50);
-- Status z CEIDG (AKTYWNY, ZAWIESZONY, WYKREŚLONY, etc.)
ALTER TABLE companies ADD COLUMN IF NOT EXISTS ceidg_status VARCHAR(50);
-- Pełny adres korespondencyjny
ALTER TABLE companies ADD COLUMN IF NOT EXISTS correspondence_street VARCHAR(255);
ALTER TABLE companies ADD COLUMN IF NOT EXISTS correspondence_city VARCHAR(100);
ALTER TABLE companies ADD COLUMN IF NOT EXISTS correspondence_postal VARCHAR(10);
-- PKD - wszystkie kody (JSONB array)
ALTER TABLE companies ADD COLUMN IF NOT EXISTS pkd_codes JSONB DEFAULT '[]';
-- Obywatelstwa właściciela (JSONB array)
ALTER TABLE companies ADD COLUMN IF NOT EXISTS owner_citizenships JSONB DEFAULT '[]';
-- Surowe dane z API (dla przyszłych potrzeb)
ALTER TABLE companies ADD COLUMN IF NOT EXISTS ceidg_raw_data JSONB;
-- Timestamp ostatniego pobrania z CEIDG
ALTER TABLE companies ADD COLUMN IF NOT EXISTS ceidg_fetched_at TIMESTAMP;
-- Indeks na ceidg_id dla szybkiego wyszukiwania
CREATE INDEX IF NOT EXISTS idx_companies_ceidg_id ON companies(ceidg_id);
-- Komentarze
COMMENT ON COLUMN companies.ceidg_id IS 'GUID firmy w rejestrze CEIDG';
COMMENT ON COLUMN companies.ceidg_status IS 'Status z CEIDG: AKTYWNY, ZAWIESZONY, WYKREŚLONY';
COMMENT ON COLUMN companies.pkd_codes IS 'Wszystkie kody PKD jako JSON array [{kod, nazwa}]';
COMMENT ON COLUMN companies.ceidg_raw_data IS 'Pełna odpowiedź z API CEIDG (JSON)';
COMMENT ON COLUMN companies.ceidg_fetched_at IS 'Data ostatniego pobrania danych z CEIDG';
-- Grant permissions
GRANT ALL ON TABLE companies TO nordabiz_app;

View File

@ -338,6 +338,259 @@ def search_missing_nip_companies() -> List[dict]:
return results
def fetch_full_ceidg_details(ceidg_id: str) -> Optional[dict]:
"""
Pobiera pełne szczegóły firmy z CEIDG API po ID.
Args:
ceidg_id: GUID firmy w CEIDG
Returns:
Pełny słownik z danymi firmy lub None
"""
if not CEIDG_API_KEY:
print(" [ERROR] Brak CEIDG_API_KEY w .env")
return None
url = f"{CEIDG_API_V3_URL.replace('/firmy', '/firma')}/{ceidg_id}"
headers = {
"Authorization": f"Bearer {CEIDG_API_KEY}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 204:
return None
if response.status_code != 200:
print(f" [ERROR] HTTP {response.status_code}")
return None
data = response.json()
if "firma" in data and data["firma"]:
return data["firma"][0]
return None
except requests.RequestException as e:
print(f" [ERROR] Błąd połączenia: {e}")
return None
def update_company_from_ceidg(company_id: int, ceidg_data: dict, db) -> bool:
"""
Aktualizuje firmę w bazie wszystkimi danymi z CEIDG.
Args:
company_id: ID firmy w naszej bazie
ceidg_data: Słownik z danymi z CEIDG API
db: Sesja SQLAlchemy
Returns:
True jeśli sukces
"""
from database import Company
from datetime import datetime
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
print(f" [ERROR] Firma {company_id} nie istnieje")
return False
try:
# CEIDG ID i status
company.ceidg_id = ceidg_data.get("id")
company.ceidg_status = ceidg_data.get("status")
# NIP i REGON z właściciela
wlasciciel = ceidg_data.get("wlasciciel", {})
if wlasciciel.get("nip") and not company.nip:
company.nip = wlasciciel.get("nip")
if wlasciciel.get("regon") and not company.regon:
company.regon = wlasciciel.get("regon")
# Właściciel
company.owner_first_name = wlasciciel.get("imie")
company.owner_last_name = wlasciciel.get("nazwisko")
# Obywatelstwa
if ceidg_data.get("obywatelstwa"):
company.owner_citizenships = ceidg_data.get("obywatelstwa")
# Adres działalności
adres = ceidg_data.get("adresDzialalnosci", {})
if adres:
ulica = adres.get("ulica", "")
budynek = adres.get("budynek", "")
lokal = adres.get("lokal", "")
street = ulica
if budynek:
street += f" {budynek}"
if lokal:
street += f"/{lokal}"
if not company.address_street:
company.address_street = street
if not company.address_city:
company.address_city = adres.get("miasto")
if not company.address_postal:
company.address_postal = adres.get("kod")
# Adres korespondencyjny
koresp = ceidg_data.get("adresKorespondencyjny", {})
if koresp:
k_ulica = koresp.get("ulica", "")
k_budynek = koresp.get("budynek", "")
k_lokal = koresp.get("lokal", "")
k_street = k_ulica
if k_budynek:
k_street += f" {k_budynek}"
if k_lokal:
k_street += f"/{k_lokal}"
company.correspondence_street = k_street
company.correspondence_city = koresp.get("miasto")
company.correspondence_postal = koresp.get("kod")
# PKD główny
pkd_glowny = ceidg_data.get("pkdGlowny", {})
if pkd_glowny:
company.pkd_code = pkd_glowny.get("kod")
company.pkd_description = pkd_glowny.get("nazwa")
# Wszystkie PKD
if ceidg_data.get("pkd"):
company.pkd_codes = ceidg_data.get("pkd")
# Data rozpoczęcia działalności
if ceidg_data.get("dataRozpoczecia"):
from datetime import datetime as dt
try:
company.business_start_date = dt.strptime(
ceidg_data.get("dataRozpoczecia"), "%Y-%m-%d"
).date()
except:
pass
# Surowe dane
company.ceidg_raw_data = ceidg_data
company.ceidg_fetched_at = datetime.now()
# Data source
company.data_source = "CEIDG API"
company.last_verified_at = datetime.now()
return True
except Exception as e:
print(f" [ERROR] Błąd aktualizacji: {e}")
return False
def enrich_companies_from_ceidg(apply: bool = False) -> dict:
"""
Wzbogaca dane firm w bazie o informacje z CEIDG.
Używa NIP do wyszukania, potem pobiera pełne szczegóły.
Args:
apply: Czy zapisać zmiany do bazy
Returns:
Statystyki operacji
"""
from database import SessionLocal, Company
db = SessionLocal()
stats = {"searched": 0, "found": 0, "updated": 0, "errors": 0}
try:
# Pobierz firmy z NIP ale bez ceidg_id
companies = db.query(Company).filter(
Company.nip.isnot(None),
Company.nip != '',
(Company.ceidg_id.is_(None)) | (Company.ceidg_id == '')
).all()
print(f"\n=== Wzbogacanie danych dla {len(companies)} firm z NIP ===\n")
for i, company in enumerate(companies):
print(f"[{i+1}/{len(companies)}] {company.name} (NIP: {company.nip})")
stats["searched"] += 1
# Szukaj po NIP
search_results = search_ceidg_by_name(company.nip) # NIP też działa jako nazwa
# Lepiej: użyj dedykowanego endpointu po NIP
headers = {
"Authorization": f"Bearer {CEIDG_API_KEY}",
"Accept": "application/json"
}
response = requests.get(
CEIDG_API_V3_URL,
params={"nip": company.nip},
headers=headers,
timeout=30
)
if response.status_code != 200:
print(f" ✗ Nie znaleziono w CEIDG")
time.sleep(0.3)
continue
data = response.json()
if "firmy" not in data or not data["firmy"]:
print(f" ✗ Brak danych w CEIDG")
time.sleep(0.3)
continue
firma = data["firmy"][0]
ceidg_id = firma.get("id")
if not ceidg_id:
print(f" ✗ Brak CEIDG ID")
time.sleep(0.3)
continue
stats["found"] += 1
# Pobierz pełne szczegóły
print(f" → Pobieram szczegóły (ID: {ceidg_id})...")
full_data = fetch_full_ceidg_details(ceidg_id)
if not full_data:
full_data = firma # Użyj danych z wyszukiwania
# Aktualizuj firmę
if update_company_from_ceidg(company.id, full_data, db):
stats["updated"] += 1
print(f" ✓ Zaktualizowano dane CEIDG")
if apply:
db.commit()
else:
stats["errors"] += 1
time.sleep(0.5) # Rate limiting
if not apply:
db.rollback()
print("\n[INFO] Zmiany NIE zostały zapisane (użyj --apply)")
else:
db.commit()
finally:
db.close()
return stats
def import_to_database(results: List[CEIDGData]) -> dict:
"""
Importuje dane właścicieli JDG do bazy danych.
@ -429,10 +682,14 @@ def main():
parser.add_argument("--city", type=str, help="City for name search (optional)")
parser.add_argument("--all", action="store_true", help="Fetch all JDG from database")
parser.add_argument("--missing-nip", action="store_true", help="Search NIP for companies without NIP")
parser.add_argument("--enrich", action="store_true",
help="Enrich companies with NIP with full CEIDG data")
parser.add_argument("--import", dest="do_import", action="store_true",
help="Import fetched data to database")
parser.add_argument("--apply-nip", action="store_true",
help="Apply found NIPs to database (with --missing-nip)")
parser.add_argument("--apply", action="store_true",
help="Apply changes to database (with --enrich)")
parser.add_argument("--output", type=str, help="Output JSON file")
args = parser.parse_args()
@ -512,6 +769,20 @@ def main():
db.close()
return
elif args.enrich:
# Enrich companies with full CEIDG data
stats = enrich_companies_from_ceidg(apply=args.apply)
print(f"\n=== PODSUMOWANIE WZBOGACANIA ===")
print(f"Przeszukano: {stats['searched']}")
print(f"Znaleziono w CEIDG: {stats['found']}")
print(f"Zaktualizowano: {stats['updated']}")
print(f"Błędy: {stats['errors']}")
if not args.apply:
print("\n[UWAGA] Użyj --apply aby zapisać zmiany do bazy")
return
elif args.all:
from database import SessionLocal, Company