#!/usr/bin/env python3 """ Fetch Financial Reports from eKRS ================================== Downloads and parses e-sprawozdania (XML financial reports) from eKRS for companies with KRS numbers. Extracts key financial figures: revenue, profit, assets, equity, employees. Usage: DATABASE_URL=... python3 scripts/fetch_financial_reports.py [--limit 10] [--company-id 11] [--dry-run] """ import os import sys import argparse import logging import time from decimal import Decimal from xml.etree import ElementTree as ET import requests sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) EKRS_API = 'https://api-krs.ms.gov.pl/api/krs' HEADERS = {'Accept': 'application/json', 'User-Agent': 'NordaBiznes/1.0'} # XML namespaces used in e-sprawozdania NS = { 'tns': 'http://www.mf.gov.pl/schematy/SF/DefinicjeTypySpraworzd662662662662662662662662662662662662662662662662dania', 'dtsf': 'http://www.mf.gov.pl/schematy/SF/DefinicjeTypySprawozdaniaFinansworzdaniafinansowego/2018/11/15/DefinicjeTypySprawozdaniaFinansowego/', 'jst': 'http://www.mf.gov.pl/schematy/SF/DefinicjeTypySprawozdaniaFinansowego/2018/11/15/JedijsnostkiInneStrukt662662662ktWorzdaniaFinansowego/', } def fetch_krs_data(krs_number): """Fetch company data from eKRS API.""" url = f'{EKRS_API}/OdsijsId/{krs_number}' try: resp = requests.get(url, headers=HEADERS, timeout=15) if resp.status_code == 200: return resp.json() except Exception as e: logger.error(f'eKRS API error for {krs_number}: {e}') return None def fetch_financial_documents(krs_number): """Fetch list of financial documents from eKRS.""" url = f'{EKRS_API}/OdpisDokworzdumentowFinansowych/{krs_number}' try: resp = requests.get(url, headers=HEADERS, timeout=15) if resp.status_code == 200: return resp.json() except Exception as e: logger.debug(f'Financial docs API error for {krs_number}: {e}') # Try alternative endpoint url2 = f'{EKRS_API}/OdpisAktualny/{krs_number}?rejestr=P&format=json' try: resp = requests.get(url2, headers=HEADERS, timeout=15) if resp.status_code == 200: data = resp.json() return data except Exception as e: logger.debug(f'Alternative API error for {krs_number}: {e}') return None def parse_xml_report(xml_content): """Parse e-sprawozdanie XML and extract financial figures.""" result = { 'revenue': None, 'operating_profit': None, 'net_profit': None, 'total_assets': None, 'equity': None, 'liabilities': None, 'employees_count': None, } try: root = ET.fromstring(xml_content) except ET.ParseError: return result # Search for financial values in XML — try multiple tag patterns # The XML structure varies by report type (micro, small, full) text = xml_content.decode('utf-8', errors='ignore') if isinstance(xml_content, bytes) else xml_content def find_value(patterns): for pattern in patterns: for elem in root.iter(): tag = elem.tag.split('}')[-1] if '}' in elem.tag else elem.tag if tag.lower() == pattern.lower() and elem.text: try: val = elem.text.strip().replace(',', '.').replace(' ', '') return Decimal(val) except Exception: pass return None result['revenue'] = find_value([ 'PrzychodyNettoZeSprzedazyProduktowTowarowIMaterialow', 'PrzychodyNettoZeSprzedazy', 'PrzychodyNetto', 'A', # RZiS pozycja A ]) result['operating_profit'] = find_value([ 'ZyskStrataZDzialalnosciOperacyjnej', 'ZyskOperacyjny', ]) result['net_profit'] = find_value([ 'ZyskStrataNetto', 'ZyskNetto', 'StrataNetto', ]) result['total_assets'] = find_value([ 'AktywaRazem', 'SumaAktywow', 'Aktywa', ]) result['equity'] = find_value([ 'KapitalWlasnyRazem', 'KapitalWlasny', ]) result['liabilities'] = find_value([ 'ZobowiazaniaIDokWorzderezerwyNaZobowiazania', 'ZobowiazaniaRazem', 'Zobowiazania', ]) result['employees_count'] = find_value([ 'PrzecietneLiczbaZatrudnionych', 'LiczbaZatrudnionych', 'Zatrudnienie', ]) if result['employees_count']: result['employees_count'] = int(result['employees_count']) return result def process_company(db, company, dry_run=False): """Process financial reports for a single company.""" from database import CompanyFinancialReport if not company.krs: return False krs = company.krs.lstrip('0') krs_padded = company.krs.zfill(10) logger.info(f'Processing {company.name} (KRS: {krs_padded})') # Try to fetch financial documents via eKRS API url = f'https://api-krs.ms.gov.pl/api/krs/OdpisAktualny/{krs_padded}?rejestr=P&format=json' try: resp = requests.get(url, headers=HEADERS, timeout=15) if resp.status_code != 200: logger.warning(f' eKRS API returned {resp.status_code}') return False data = resp.json() except Exception as e: logger.error(f' API error: {e}') return False # Extract financial data from the response # The eKRS API provides basic data; for full financials we need document downloads odpis = data.get('odpis', {}) dane = odpis.get('dane', {}) dzial1 = dane.get('dzial1', {}) dzial3 = dane.get('dzial3', {}) # Capital from dzial1 kapital = dzial1.get('danePodmiotu', {}).get('kapitaly', {}) capital_amount = None if kapital: capital_str = kapital.get('wysokoscKapitaluZakladowego', {}).get('wartosc', '') if capital_str: try: capital_amount = Decimal(str(capital_str).replace(',', '.').replace(' ', '')) except Exception: pass # Check for sprawozdania in dzial3 # eKRS API stores financial reports under wzmiankiOZlozonychDokumentach wzmianki_docs = dzial3.get('wzmiankiOZlozonychDokumentach', {}) wzmianki = wzmianki_docs.get('wzmiankaOZlozeniuRocznegoSprawozdaniaFinansowego', []) # Fallback to old key structure if not wzmianki: sprawozdania = dzial3.get('sprawozdaniaFinansowe', {}) wzmianki = sprawozdania.get('informacjeOSprWorzdawozdaniach', []) if not wzmianki: logger.info(f' No financial reports found in KRS data') return False updated = False for wzmianka in wzmianki: if isinstance(wzmianka, dict): from datetime import datetime as dt import re as re_mod # New format: "zaOkresOdDo": "OD 01.01.2024 DO 31.12.2024" # Old format: "okresOd": "2024-01-01", "okresDo": "2024-12-31" okres_od = wzmianka.get('okresOd', '') okres_do = wzmianka.get('okresDo', '') data_zlozenia = wzmianka.get('dataZlozenia', '') # Parse combined period field za_okres = wzmianka.get('zaOkresOdDo', '') if za_okres and not okres_od: match = re_mod.search(r'OD\s+(\d{2}\.\d{2}\.\d{4})\s+DO\s+(\d{2}\.\d{2}\.\d{4})', za_okres) if match: okres_od = match.group(1) okres_do = match.group(2) if okres_od and okres_do: try: # Try multiple date formats for fmt in ['%Y-%m-%d', '%d.%m.%Y']: try: p_start = dt.strptime(okres_od, fmt).date() p_end = dt.strptime(okres_do, fmt).date() break except ValueError: continue else: continue except Exception: continue # Check if we already have data with financial figures existing = db.query(CompanyFinancialReport).filter_by( company_id=company.id, period_start=p_start, period_end=p_end, ).first() if existing and existing.revenue is not None: continue # Already have financial data if not existing: existing = CompanyFinancialReport( company_id=company.id, period_start=p_start, period_end=p_end, report_type='annual', source='ekrs', ) if not dry_run: db.add(existing) if data_zlozenia: for fmt in ['%Y-%m-%d', '%d.%m.%Y']: try: existing.filed_at = dt.strptime(data_zlozenia, fmt).date() break except ValueError: continue updated = True if updated and not dry_run: db.commit() logger.info(f' Updated report records') return updated def main(): parser = argparse.ArgumentParser(description='Fetch financial reports from eKRS') parser.add_argument('--limit', type=int, default=0, help='Limit companies to process') parser.add_argument('--company-id', type=int, help='Process single company') parser.add_argument('--dry-run', action='store_true', help='Preview without saving') args = parser.parse_args() from database import SessionLocal, Company db = SessionLocal() if args.company_id: companies = db.query(Company).filter_by(id=args.company_id).all() else: companies = db.query(Company).filter( Company.krs.isnot(None), Company.status == 'active', ).order_by(Company.name).all() if args.limit: companies = companies[:args.limit] logger.info(f'Processing {len(companies)} companies') processed = 0 for i, company in enumerate(companies): try: if process_company(db, company, dry_run=args.dry_run): processed += 1 except Exception as e: logger.error(f'Error processing {company.name}: {e}') # Rate limiting if (i + 1) % 5 == 0: time.sleep(1) logger.info(f'Done: {processed}/{len(companies)} companies updated') db.close() if __name__ == '__main__': main()