Changes: - Remove position: sticky from konto sidebar (dane, prywatnosc, bezpieczenstwo, blokady) - Add "Firmy" link to admin dropdown menu (before "Użytkownicy") - Add scan_websites_for_nip.py script for data quality Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
362 lines
11 KiB
Python
362 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Website NIP Scanner - skanuje strony www firm w poszukiwaniu NIP/REGON
|
|
|
|
Dla firm bez NIP w bazie - pobiera stronę www (z domeny email)
|
|
i szuka numerów NIP/REGON w treści.
|
|
|
|
Usage:
|
|
python scripts/scan_websites_for_nip.py # Skanuj wszystkie
|
|
python scripts/scan_websites_for_nip.py --id 119 # Skanuj konkretną firmę
|
|
python scripts/scan_websites_for_nip.py --apply # Zapisz znalezione NIP do bazy
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
import argparse
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, asdict
|
|
from typing import Optional, List, Tuple
|
|
import requests
|
|
from urllib.parse import urlparse
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from database import SessionLocal, Company
|
|
|
|
# Output directory for scan results
|
|
RESULTS_DIR = Path(__file__).parent.parent / "data" / "nip_scan_results"
|
|
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Domains to skip (public email providers)
|
|
SKIP_DOMAINS = {
|
|
'gmail.com', 'wp.pl', 'onet.pl', 'op.pl', 'interia.pl',
|
|
'o2.pl', 'poczta.fm', 'yahoo.com', 'hotmail.com', 'outlook.com'
|
|
}
|
|
|
|
# Request timeout
|
|
REQUEST_TIMEOUT = 15
|
|
|
|
# User agent
|
|
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
|
|
|
|
@dataclass
|
|
class ScanResult:
|
|
"""Wynik skanowania strony www"""
|
|
company_id: int
|
|
company_name: str
|
|
domain: str
|
|
url_scanned: str
|
|
nip_found: Optional[str] = None
|
|
regon_found: Optional[str] = None
|
|
nips_all: List[str] = None # Wszystkie znalezione NIP (może być wiele)
|
|
regons_all: List[str] = None
|
|
phone_found: Optional[str] = None
|
|
address_found: Optional[str] = None
|
|
confidence: str = "low" # low, medium, high
|
|
error: Optional[str] = None
|
|
scanned_at: str = ""
|
|
|
|
def __post_init__(self):
|
|
if self.nips_all is None:
|
|
self.nips_all = []
|
|
if self.regons_all is None:
|
|
self.regons_all = []
|
|
if not self.scanned_at:
|
|
self.scanned_at = datetime.now().isoformat()
|
|
|
|
def to_dict(self):
|
|
return asdict(self)
|
|
|
|
|
|
def extract_domain_from_email(email: str) -> Optional[str]:
|
|
"""Wyciąga domenę z adresu email"""
|
|
if not email or '@' not in email:
|
|
return None
|
|
domain = email.split('@')[1].lower()
|
|
if domain in SKIP_DOMAINS:
|
|
return None
|
|
return domain
|
|
|
|
|
|
def normalize_nip(nip: str) -> str:
|
|
"""Normalizuje NIP do 10 cyfr"""
|
|
return re.sub(r'[^0-9]', '', nip)
|
|
|
|
|
|
def validate_nip(nip: str) -> bool:
|
|
"""Waliduje NIP (checksum)"""
|
|
nip = normalize_nip(nip)
|
|
if len(nip) != 10:
|
|
return False
|
|
|
|
weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
|
|
try:
|
|
checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11
|
|
return checksum == int(nip[9])
|
|
except (ValueError, IndexError):
|
|
return False
|
|
|
|
|
|
def validate_regon(regon: str) -> bool:
|
|
"""Waliduje REGON (9 lub 14 cyfr)"""
|
|
regon = re.sub(r'[^0-9]', '', regon)
|
|
|
|
if len(regon) == 9:
|
|
weights = [8, 9, 2, 3, 4, 5, 6, 7]
|
|
checksum = sum(int(regon[i]) * weights[i] for i in range(8)) % 11
|
|
if checksum == 10:
|
|
checksum = 0
|
|
return checksum == int(regon[8])
|
|
elif len(regon) == 14:
|
|
# Validate first 9 digits
|
|
weights9 = [8, 9, 2, 3, 4, 5, 6, 7]
|
|
checksum9 = sum(int(regon[i]) * weights9[i] for i in range(8)) % 11
|
|
if checksum9 == 10:
|
|
checksum9 = 0
|
|
if checksum9 != int(regon[8]):
|
|
return False
|
|
|
|
# Validate full 14 digits
|
|
weights14 = [2, 4, 8, 5, 0, 9, 7, 3, 6, 1, 2, 4, 8]
|
|
checksum14 = sum(int(regon[i]) * weights14[i] for i in range(13)) % 11
|
|
if checksum14 == 10:
|
|
checksum14 = 0
|
|
return checksum14 == int(regon[13])
|
|
|
|
return False
|
|
|
|
|
|
def find_nips_in_text(text: str) -> List[str]:
|
|
"""Znajduje wszystkie NIP-y w tekście"""
|
|
# Patterns for NIP
|
|
patterns = [
|
|
r'NIP[:\s]*(\d{3}[-\s]?\d{3}[-\s]?\d{2}[-\s]?\d{2})', # NIP: 123-456-78-90
|
|
r'NIP[:\s]*(\d{10})', # NIP: 1234567890
|
|
r'numer\s+identyfikacji\s+podatkowej[:\s]*(\d{10})',
|
|
]
|
|
|
|
nips = []
|
|
for pattern in patterns:
|
|
matches = re.findall(pattern, text, re.IGNORECASE)
|
|
for match in matches:
|
|
nip = normalize_nip(match)
|
|
if validate_nip(nip) and nip not in nips:
|
|
nips.append(nip)
|
|
|
|
return nips
|
|
|
|
|
|
def find_regons_in_text(text: str) -> List[str]:
|
|
"""Znajduje wszystkie REGON-y w tekście"""
|
|
patterns = [
|
|
r'REGON[:\s]*(\d{9,14})',
|
|
r'rejestr\s+gospodarczy[:\s]*(\d{9,14})',
|
|
]
|
|
|
|
regons = []
|
|
for pattern in patterns:
|
|
matches = re.findall(pattern, text, re.IGNORECASE)
|
|
for match in matches:
|
|
regon = re.sub(r'[^0-9]', '', match)
|
|
if validate_regon(regon) and regon not in regons:
|
|
regons.append(regon)
|
|
|
|
return regons
|
|
|
|
|
|
def fetch_website(url: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Pobiera zawartość strony www.
|
|
|
|
Returns:
|
|
(content, error) - treść strony lub błąd
|
|
"""
|
|
headers = {
|
|
'User-Agent': USER_AGENT,
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
|
'Accept-Language': 'pl-PL,pl;q=0.9,en;q=0.8',
|
|
}
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT, allow_redirects=True)
|
|
response.raise_for_status()
|
|
return response.text, None
|
|
except requests.exceptions.Timeout:
|
|
return None, "Timeout"
|
|
except requests.exceptions.ConnectionError:
|
|
return None, "Connection error"
|
|
except requests.exceptions.HTTPError as e:
|
|
return None, f"HTTP {e.response.status_code}"
|
|
except Exception as e:
|
|
return None, str(e)
|
|
|
|
|
|
def scan_company_website(company: Company) -> ScanResult:
|
|
"""
|
|
Skanuje stronę www firmy w poszukiwaniu NIP/REGON.
|
|
"""
|
|
# Get domain from email or website
|
|
domain = None
|
|
if company.website:
|
|
parsed = urlparse(company.website if company.website.startswith('http') else f'https://{company.website}')
|
|
domain = parsed.netloc or parsed.path.split('/')[0]
|
|
elif company.email:
|
|
domain = extract_domain_from_email(company.email)
|
|
|
|
if not domain:
|
|
return ScanResult(
|
|
company_id=company.id,
|
|
company_name=company.name,
|
|
domain="",
|
|
url_scanned="",
|
|
error="No domain available"
|
|
)
|
|
|
|
# Clean domain
|
|
domain = domain.lower().replace('www.', '')
|
|
|
|
# Try different URL variants
|
|
urls_to_try = [
|
|
f"https://{domain}",
|
|
f"https://www.{domain}",
|
|
f"https://{domain}/kontakt",
|
|
f"https://{domain}/o-nas",
|
|
f"https://{domain}/contact",
|
|
f"https://{domain}/about",
|
|
]
|
|
|
|
result = ScanResult(
|
|
company_id=company.id,
|
|
company_name=company.name,
|
|
domain=domain,
|
|
url_scanned=""
|
|
)
|
|
|
|
all_nips = []
|
|
all_regons = []
|
|
|
|
for url in urls_to_try:
|
|
print(f" Scanning: {url}")
|
|
content, error = fetch_website(url)
|
|
|
|
if error:
|
|
continue
|
|
|
|
result.url_scanned = url
|
|
|
|
# Find NIPs and REGONs
|
|
nips = find_nips_in_text(content)
|
|
regons = find_regons_in_text(content)
|
|
|
|
all_nips.extend([n for n in nips if n not in all_nips])
|
|
all_regons.extend([r for r in regons if r not in all_regons])
|
|
|
|
# If found, set confidence
|
|
if nips or regons:
|
|
print(f" Found NIP: {nips}, REGON: {regons}")
|
|
break
|
|
|
|
time.sleep(0.5) # Rate limiting
|
|
|
|
# Set results
|
|
if all_nips:
|
|
result.nips_all = all_nips
|
|
result.nip_found = all_nips[0] # Primary NIP
|
|
result.confidence = "high" if len(all_nips) == 1 else "medium"
|
|
|
|
if all_regons:
|
|
result.regons_all = all_regons
|
|
result.regon_found = all_regons[0]
|
|
|
|
if not all_nips and not all_regons and not result.error:
|
|
result.error = "NIP/REGON not found on website"
|
|
result.confidence = "low"
|
|
|
|
return result
|
|
|
|
|
|
def get_companies_without_nip(db, company_id: int = None) -> List[Company]:
|
|
"""Pobiera firmy bez NIP z domeną firmową"""
|
|
query = db.query(Company).filter(
|
|
(Company.nip == None) | (Company.nip == '')
|
|
)
|
|
|
|
if company_id:
|
|
query = query.filter(Company.id == company_id)
|
|
|
|
companies = query.order_by(Company.name).all()
|
|
|
|
# Filter out companies with public email domains
|
|
result = []
|
|
for c in companies:
|
|
domain = extract_domain_from_email(c.email) if c.email else None
|
|
if domain or c.website:
|
|
result.append(c)
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Scan websites for NIP/REGON")
|
|
parser.add_argument('--id', type=int, help="Scan specific company ID")
|
|
parser.add_argument('--apply', action='store_true', help="Apply found NIPs to database")
|
|
parser.add_argument('--output', type=str, help="Output JSON file path")
|
|
args = parser.parse_args()
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
companies = get_companies_without_nip(db, args.id)
|
|
print(f"\n=== Skanowanie {len(companies)} firm bez NIP ===\n")
|
|
|
|
results = []
|
|
found_count = 0
|
|
|
|
for i, company in enumerate(companies, 1):
|
|
print(f"[{i}/{len(companies)}] {company.name}")
|
|
|
|
result = scan_company_website(company)
|
|
results.append(result)
|
|
|
|
if result.nip_found:
|
|
found_count += 1
|
|
print(f" ✓ NIP: {result.nip_found} (confidence: {result.confidence})")
|
|
|
|
if args.apply and result.confidence in ('high', 'medium'):
|
|
company.nip = result.nip_found
|
|
if result.regon_found and not company.regon:
|
|
company.regon = result.regon_found
|
|
db.commit()
|
|
print(f" → Zapisano do bazy")
|
|
elif result.error:
|
|
print(f" ✗ {result.error}")
|
|
|
|
time.sleep(1) # Rate limiting between companies
|
|
|
|
# Save results to JSON
|
|
output_file = args.output or (RESULTS_DIR / f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump([r.to_dict() for r in results], f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"\n=== Podsumowanie ===")
|
|
print(f"Przeskanowano: {len(companies)} firm")
|
|
print(f"Znaleziono NIP: {found_count}")
|
|
print(f"Wyniki zapisane: {output_file}")
|
|
|
|
if found_count > 0 and not args.apply:
|
|
print(f"\nUżyj --apply aby zapisać znalezione NIP do bazy")
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|