#!/usr/bin/env python3 """ Skrypt do pobierania obrazków dla newsów z Google News RSS. Problem: Google News RSS nie zawiera obrazków, a URL-e są przekierowaniami. Rozwiązanie: Zdekoduj URL Google News → pobierz og:image z oryginalnej strony. Użycie: python scripts/fix_google_news_images.py --dry-run # Test python scripts/fix_google_news_images.py # Produkcja python scripts/fix_google_news_images.py --limit 20 # Ogranicz """ import os import sys import re import base64 import argparse import requests from urllib.parse import urlparse, urljoin, parse_qs, unquote from bs4 import BeautifulSoup import time # Dodaj ścieżkę projektu PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, PROJECT_ROOT) from dotenv import load_dotenv load_dotenv(os.path.join(PROJECT_ROOT, '.env')) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker DATABASE_URL = os.getenv('DATABASE_URL') if not DATABASE_URL: print("❌ Błąd: Brak zmiennej DATABASE_URL w .env") sys.exit(1) HEADERS = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'pl,en;q=0.5', 'Cookie': 'CONSENT=YES+cb.20210720-07-p0.en+FX+410' # Pre-accept Google consent } REQUEST_TIMEOUT = 15 def decode_google_news_url(google_url: str, max_depth: int = 3) -> str | None: """ Dekoduj URL Google News do oryginalnego źródła. Google News używa różnych formatów: 1. /rss/articles/CBMi... - Base64 encoded 2. /articles/CBMi... - Base64 encoded 3. Przekierowania przez consent.google.com Args: google_url: URL do zdekodowania max_depth: Maksymalna głębokość (zabezpieczenie przed nieskończoną pętlą) """ if max_depth <= 0: return None # Metoda 1: Dekodowanie Base64 z URL (preferowana - bez HTTP request) try: # Znajdź zakodowaną część match = re.search(r'/articles/([A-Za-z0-9_-]+)', google_url) if match: encoded = match.group(1) # Dodaj padding padding = 4 - len(encoded) % 4 if padding != 4: encoded += '=' * padding # Dekoduj try: decoded = base64.urlsafe_b64decode(encoded) # Szukaj URL-ów w zdekodowanych danych urls = re.findall(rb'https?://[^\x00-\x1f\s"\'<>]+', decoded) for url in urls: try: url_str = url.decode('utf-8', errors='ignore').rstrip('/') # Pomijamy URL-e Google if 'google.' not in url_str and len(url_str) > 20: # Wyczyść URL url_str = url_str.split('\x00')[0] url_str = url_str.split('\r')[0] url_str = url_str.split('\n')[0] if url_str.startswith('http'): return url_str except: continue except: pass except Exception as e: pass # Metoda 2: Podążaj za przekierowaniami (tylko jeśli Base64 nie zadziałał) # UWAGA: Ta metoda wykonuje HTTP request try: response = requests.get( google_url, headers=HEADERS, timeout=REQUEST_TIMEOUT, allow_redirects=True ) final_url = response.url response.close() # Jeśli wylądowaliśmy na consent.google.com, wyciągnij URL z parametrów if 'consent.google.com' in final_url: parsed = urlparse(final_url) params = parse_qs(parsed.query) if 'continue' in params: continue_url = unquote(params['continue'][0]) # Iteracyjnie dekoduj (nie rekurencyjnie!) if 'news.google.com' in continue_url: return decode_google_news_url(continue_url, max_depth - 1) return continue_url # Jeśli to nie jest Google, mamy oryginalny URL if 'google.com' not in final_url: return final_url except Exception as e: pass return None def extract_og_image(url: str) -> str | None: """Pobierz og:image z podanej strony.""" try: # Użyj context manager i zamknij połączenie with requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, stream=False) as response: response.raise_for_status() html_content = response.text soup = BeautifulSoup(html_content, 'html.parser') # Szukaj og:image og_image = soup.find('meta', property='og:image') if og_image and og_image.get('content'): image_url = og_image['content'] if not image_url.startswith('http'): image_url = urljoin(url, image_url) return image_url # Fallback: twitter:image twitter_image = soup.find('meta', attrs={'name': 'twitter:image'}) if twitter_image and twitter_image.get('content'): image_url = twitter_image['content'] if not image_url.startswith('http'): image_url = urljoin(url, image_url) return image_url return None except Exception as e: return None def get_domain_favicon(domain: str) -> str: """Fallback: favicon domeny przez Google API.""" return f"https://www.google.com/s2/favicons?domain={domain}&sz=128" def main(): parser = argparse.ArgumentParser(description='Pobierz obrazki dla newsów z Google News') parser.add_argument('--dry-run', action='store_true', help='Tryb testowy') parser.add_argument('--limit', type=int, default=None, help='Limit newsów') args = parser.parse_args() print("=" * 70) print("Google News Image Fixer") print("=" * 70) if args.dry_run: print("🔍 TRYB TESTOWY - zmiany NIE będą zapisane\n") engine = create_engine(DATABASE_URL) Session = sessionmaker(bind=engine) session = Session() try: from database import ZOPKNews # Pobierz newsy z Google News które mają tylko favicon query = session.query(ZOPKNews).filter( ZOPKNews.status.in_(['approved', 'auto_approved']), ZOPKNews.source_domain == 'news.google.com', ZOPKNews.image_url.like('%s2/favicons%') ).order_by(ZOPKNews.published_at.desc()) if args.limit: query = query.limit(args.limit) news_items = query.all() print(f"📰 Znaleziono {len(news_items)} newsów do przetworzenia\n") stats = { 'processed': 0, 'og_image': 0, 'favicon_original': 0, 'failed': 0, 'decode_failed': 0 } for i, news in enumerate(news_items, 1): print(f"[{i}/{len(news_items)}] {news.title[:55]}...", flush=True) # Dekoduj URL Google News print(f" → Dekodowanie URL...", flush=True) original_url = decode_google_news_url(news.url) if not original_url: print(f" ✗ Nie udało się zdekodować URL") stats['decode_failed'] += 1 stats['failed'] += 1 print() continue print(f" → Oryginalny URL: {original_url[:60]}...") # Pobierz og:image print(f" → Pobieranie og:image...") og_image = extract_og_image(original_url) if og_image: stats['processed'] += 1 stats['og_image'] += 1 # Aktualizuj też source_domain na prawdziwą domenę parsed = urlparse(original_url) real_domain = parsed.netloc if not args.dry_run: news.image_url = og_image news.source_domain = real_domain session.commit() print(f" ✓ Zapisano og:image + domena: {real_domain}") else: print(f" [DRY-RUN] og:image: {og_image[:50]}...") print(f" [DRY-RUN] domena: {real_domain}") else: # Fallback: favicon oryginalnej domeny parsed = urlparse(original_url) real_domain = parsed.netloc favicon = get_domain_favicon(real_domain) stats['processed'] += 1 stats['favicon_original'] += 1 if not args.dry_run: news.image_url = favicon news.source_domain = real_domain session.commit() print(f" ✓ Użyto favicon + domena: {real_domain}") else: print(f" [DRY-RUN] favicon: {favicon[:50]}...") print() time.sleep(0.3) # Rate limiting print("=" * 70) print("PODSUMOWANIE") print("=" * 70) print(f"Przetworzono: {stats['processed']}") print(f" - og:image (prawdziwe grafiki): {stats['og_image']}") print(f" - favicon oryginalnej domeny: {stats['favicon_original']}") print(f"Nieudane dekodowanie URL: {stats['decode_failed']}") print(f"Nieudane ogółem: {stats['failed']}") if args.dry_run: print("\n⚠️ To był tryb testowy. Uruchom bez --dry-run aby zapisać.") except Exception as e: print(f"❌ Błąd: {e}") import traceback traceback.print_exc() session.rollback() finally: session.close() if __name__ == '__main__': main()