""" ZOPK Content Scraper - Pobieranie pełnej treści artykułów dla bazy wiedzy. Scraper respektuje robots.txt i stosuje rate limiting. Obsługuje główne polskie portale newsowe. Usage: from zopk_content_scraper import ZOPKContentScraper scraper = ZOPKContentScraper(db_session) result = scraper.scrape_article(news_id=123) # lub batch: result = scraper.batch_scrape(limit=50) """ import re import time import logging import hashlib import base64 from datetime import datetime from typing import Dict, List, Optional, Tuple, Callable, Any from urllib.parse import urlparse, parse_qs, unquote from dataclasses import dataclass, field import requests from bs4 import BeautifulSoup, Comment, NavigableString from database import ZOPKNews # Configure logging logger = logging.getLogger(__name__) # ============================================================ # CONFIGURATION # ============================================================ # User-Agent identifying the bot USER_AGENT = 'NordaBizBot/1.0 (+https://nordabiznes.pl/bot; kontakt@nordabiznes.pl)' # Request timeout in seconds REQUEST_TIMEOUT = 15 # Maximum content length (chars) to avoid memory issues MAX_CONTENT_LENGTH = 100000 # ~100KB of text # Rate limiting: seconds between requests per domain RATE_LIMITS = { 'trojmiasto.pl': 2.0, 'dziennikbaltycki.pl': 2.0, 'nordafm.pl': 1.5, 'ttm24.pl': 1.5, 'radiogdansk.pl': 1.5, 'portalmorski.pl': 1.5, 'biznes.pap.pl': 2.0, 'default': 3.0 } # Maximum retry attempts MAX_RETRY_ATTEMPTS = 3 # ============================================================ # CONTENT SELECTORS PER DOMAIN # ============================================================ # CSS selectors for article content extraction # Order matters - first match wins CONTENT_SELECTORS = { 'trojmiasto.pl': [ 'article.article-content', 'div.article-body', 'div.article__content', 'div[itemprop="articleBody"]', ], 'dziennikbaltycki.pl': [ 'div.article-body', 'article.article-main', 'div[itemprop="articleBody"]', 'div.art-content', ], 'nordafm.pl': [ 'div.entry-content', 'article.post-content', 'div.post-body', ], 'ttm24.pl': [ 'div.post-content', 'article.entry-content', 'div.article-content', ], 'radiogdansk.pl': [ 'div.article-content', 'div.entry-content', 'article.post', ], 'portalmorski.pl': [ 'div.article-content', 'div.entry-content', 'article.post-content', ], 'biznes.pap.pl': [ 'div.article-content', 'div.news-content', 'article.content', ], 'gov.pl': [ 'div.article-content', 'main.main-content', 'div.content', ], 'default': [ 'article', 'div[itemprop="articleBody"]', 'div.article-content', 'div.article-body', 'div.entry-content', 'div.post-content', 'main.content', 'main', ] } # Elements to remove from content ELEMENTS_TO_REMOVE = [ 'script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'iframe', 'noscript', 'svg', 'canvas', '.advertisement', '.ad', '.ads', '.advert', '.banner', '.social-share', '.share-buttons', '.sharing', '.related-articles', '.related-posts', '.recommendations', '.comments', '.comment-section', '#comments', '.newsletter', '.subscription', '.subscribe', '.cookie-notice', '.cookie-banner', '.gdpr', '.popup', '.modal', '.overlay', '.sidebar', '.widget', '.navigation', '.breadcrumb', '.breadcrumbs', '.author-bio', '.author-box', '.tags', '.tag-list', '.categories', '.pagination', '.pager', '[data-ad]', '[data-advertisement]', ] # Domains that are not scrapeable (paywalls, dynamic content, etc.) SKIP_DOMAINS = [ # Social media 'facebook.com', 'twitter.com', 'x.com', 'linkedin.com', 'youtube.com', 'instagram.com', # Paywalled news sites (require login, return cookie dialogs) 'wyborcza.pl', # Gazeta Wyborcza paywall 'rp.pl', # Rzeczpospolita paywall # Aggregators (no original content) 'wykop.pl', # Social news aggregator 'reddit.com', # Google News aggregator (URLs need decoding first) 'news.google.com', ] # ============================================================ # GOOGLE NEWS URL DECODING # ============================================================ # Headers for Google News requests GOOGLE_NEWS_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml', 'Accept-Language': 'pl,en;q=0.5', 'Cookie': 'CONSENT=YES+cb.20210720-07-p0.en+FX+410' } def decode_google_news_url(google_url: str, max_depth: int = 3) -> Optional[str]: """ Decode Google News URL to original source URL. Google News uses Protocol Buffer encoding (not simple Base64). The googlenewsdecoder library handles this correctly. Args: google_url: URL to decode max_depth: Maximum recursion depth (protection against infinite loops) Returns: Original URL or None if decoding failed """ if max_depth <= 0: return None # Method 1: Use googlenewsdecoder library (PREFERRED - handles Protocol Buffer encoding) # This is the most reliable method for modern Google News URLs decoded = decode_google_news_url_with_library(google_url) if decoded: logger.debug(f"googlenewsdecoder succeeded: {decoded[:80]}...") return decoded # Method 2: Try Base64 decode (fallback for older URL formats) try: # Find encoded part (supports both /articles/ and /rss/articles/) match = re.search(r'/(?:rss/)?articles/([A-Za-z0-9_-]+)', google_url) if match: encoded = match.group(1) # Add padding padding = 4 - len(encoded) % 4 if padding != 4: encoded += '=' * padding # Decode try: decoded_bytes = base64.urlsafe_b64decode(encoded) # Find URLs in decoded data urls = re.findall(rb'https?://[^\x00-\x1f\s"\'<>]+', decoded_bytes) for url in urls: try: url_str = url.decode('utf-8', errors='ignore').rstrip('/') # Skip Google URLs if 'google.' not in url_str and len(url_str) > 20: # Clean URL url_str = url_str.split('\x00')[0] url_str = url_str.split('\r')[0] url_str = url_str.split('\n')[0] if url_str.startswith('http'): logger.debug(f"Base64 decode succeeded: {url_str[:80]}...") return url_str except: continue except: pass except Exception: pass # Method 3: Follow redirects (last resort - often fails due to consent.google.com) # Only try this if we haven't exhausted max_depth significantly if max_depth >= 2: try: response = requests.get( google_url, headers=GOOGLE_NEWS_HEADERS, timeout=10, allow_redirects=True ) final_url = response.url response.close() # If it's not Google, we have the original URL if 'google.com' not in final_url: logger.debug(f"Redirect follow succeeded: {final_url[:80]}...") return final_url # If we landed on consent.google.com, don't recurse - it doesn't help # The consent page doesn't redirect to the actual article except Exception as e: logger.debug(f"Redirect follow failed: {e}") logger.warning(f"All Google News URL decoding methods failed for: {google_url[:80]}...") return None def is_google_news_url(url: str) -> bool: """Check if URL is a Google News URL that needs decoding.""" if not url: return False return 'news.google.com' in url.lower() def decode_google_news_url_with_library(google_url: str) -> Optional[str]: """ Decode Google News URL using googlenewsdecoder library. This is a fallback method when Base64 decoding fails. The library handles Protocol Buffer encoded URLs. Args: google_url: Google News URL to decode Returns: Original URL or None if decoding failed """ try: from googlenewsdecoder import gnewsdecoder result = gnewsdecoder(google_url, interval=0.5) if result and result.get('status') and result.get('decoded_url'): return result['decoded_url'] except ImportError: logger.warning("googlenewsdecoder library not installed") except Exception as e: logger.debug(f"googlenewsdecoder failed: {e}") return None # ============================================================ # DATA CLASSES # ============================================================ @dataclass class ScrapeResult: """Result of scraping an article.""" success: bool content: Optional[str] = None word_count: int = 0 error: Optional[str] = None status: str = 'pending' # scraped, failed, skipped @dataclass class ProgressUpdate: """Progress update for batch operations.""" current: int total: int percent: float stage: str # 'scraping', 'extracting', 'embedding' status: str # 'processing', 'success', 'failed', 'complete' message: str details: Dict[str, Any] = field(default_factory=dict) article_id: Optional[int] = None article_title: Optional[str] = None # Type alias for progress callback ProgressCallback = Optional[Callable[[ProgressUpdate], None]] # ============================================================ # SCRAPER CLASS # ============================================================ class ZOPKContentScraper: """ Scraper for ZOPK news article content. Features: - Domain-specific content selectors - Rate limiting per domain - HTML cleaning (removes ads, navigation, etc.) - Retry logic with exponential backoff - robots.txt respect (via User-Agent) """ def __init__(self, db_session, user_id: Optional[int] = None): """ Initialize scraper. Args: db_session: SQLAlchemy database session user_id: Optional user ID for audit logging """ self.db = db_session self.user_id = user_id self._last_request_time: Dict[str, float] = {} self._session = self._create_session() def _create_session(self) -> requests.Session: """Create requests session with proper headers.""" session = requests.Session() session.headers.update({ 'User-Agent': USER_AGENT, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'pl-PL,pl;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', }) return session def _get_domain(self, url: str) -> str: """Extract domain from URL.""" try: parsed = urlparse(url) domain = parsed.netloc.lower() # Remove www. prefix if domain.startswith('www.'): domain = domain[4:] return domain except Exception: return 'unknown' def _get_rate_limit(self, domain: str) -> float: """Get rate limit for domain.""" # Check exact domain first if domain in RATE_LIMITS: return RATE_LIMITS[domain] # Check if domain ends with known domain for known_domain, limit in RATE_LIMITS.items(): if domain.endswith(known_domain): return limit return RATE_LIMITS['default'] def _wait_for_rate_limit(self, domain: str) -> None: """Wait if needed to respect rate limiting.""" limit = self._get_rate_limit(domain) last_time = self._last_request_time.get(domain, 0) elapsed = time.time() - last_time if elapsed < limit: wait_time = limit - elapsed logger.debug(f"Rate limiting: waiting {wait_time:.2f}s for {domain}") time.sleep(wait_time) self._last_request_time[domain] = time.time() def _should_skip_domain(self, domain: str) -> bool: """Check if domain should be skipped.""" for skip in SKIP_DOMAINS: if skip in domain: return True return False def _get_content_selectors(self, domain: str) -> List[str]: """Get CSS selectors for domain.""" # Check exact domain if domain in CONTENT_SELECTORS: return CONTENT_SELECTORS[domain] # Check if domain ends with known domain for known_domain, selectors in CONTENT_SELECTORS.items(): if known_domain != 'default' and domain.endswith(known_domain): return selectors return CONTENT_SELECTORS['default'] def _fetch_html(self, url: str) -> Tuple[Optional[str], Optional[str]]: """ Fetch HTML content from URL. Returns: Tuple of (html_content, error_message) """ domain = self._get_domain(url) # Check if domain should be skipped if self._should_skip_domain(domain): return None, f"Domain {domain} is not scrapeable (social media/paywall)" # Apply rate limiting self._wait_for_rate_limit(domain) try: response = self._session.get( url, timeout=REQUEST_TIMEOUT, allow_redirects=True ) response.raise_for_status() # Check content type content_type = response.headers.get('Content-Type', '') if 'text/html' not in content_type and 'application/xhtml' not in content_type: return None, f"Not HTML content: {content_type}" # Detect encoding response.encoding = response.apparent_encoding or 'utf-8' return response.text, None except requests.exceptions.Timeout: return None, "Request timeout" except requests.exceptions.TooManyRedirects: return None, "Too many redirects" except requests.exceptions.HTTPError as e: return None, f"HTTP error: {e.response.status_code}" except requests.exceptions.ConnectionError: return None, "Connection error" except requests.exceptions.RequestException as e: return None, f"Request error: {str(e)}" def _clean_html(self, soup: BeautifulSoup) -> BeautifulSoup: """Remove unwanted elements from HTML.""" # Remove comments for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): comment.extract() # Remove unwanted elements for selector in ELEMENTS_TO_REMOVE: if selector.startswith('.') or selector.startswith('#') or selector.startswith('['): # CSS selector for element in soup.select(selector): element.decompose() else: # Tag name for element in soup.find_all(selector): element.decompose() return soup def _extract_content(self, html: str, domain: str) -> Tuple[Optional[str], Optional[str]]: """ Extract article content from HTML. Returns: Tuple of (content_text, error_message) """ try: soup = BeautifulSoup(html, 'html.parser') # Clean HTML first soup = self._clean_html(soup) # Try domain-specific selectors selectors = self._get_content_selectors(domain) content_element = None for selector in selectors: content_element = soup.select_one(selector) if content_element: logger.debug(f"Found content with selector: {selector}") break if not content_element: # Fallback: try to find largest text block content_element = self._find_largest_text_block(soup) if not content_element: return None, "Could not find article content" # Extract text text = self._extract_text(content_element) if not text or len(text) < 100: return None, "Extracted content too short" # Truncate if too long if len(text) > MAX_CONTENT_LENGTH: text = text[:MAX_CONTENT_LENGTH] + "..." logger.warning(f"Content truncated to {MAX_CONTENT_LENGTH} chars") return text, None except Exception as e: logger.error(f"Error extracting content: {e}") return None, f"Extraction error: {str(e)}" def _find_largest_text_block(self, soup: BeautifulSoup) -> Optional[BeautifulSoup]: """Find the largest text block in the page (fallback method).""" candidates = soup.find_all(['article', 'main', 'div', 'section']) best_element = None best_score = 0 for element in candidates: # Skip small elements text = element.get_text(strip=True) if len(text) < 200: continue # Calculate score based on text density and paragraph count paragraphs = len(element.find_all('p')) text_length = len(text) # Prefer elements with many paragraphs score = text_length + (paragraphs * 100) if score > best_score: best_score = score best_element = element return best_element def _extract_text(self, element: BeautifulSoup) -> str: """Extract clean text from element.""" # Get text with proper spacing lines = [] for child in element.descendants: if isinstance(child, NavigableString): text = str(child).strip() if text: lines.append(text) elif child.name in ['br', 'p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li']: lines.append('\n') # Join and clean text = ' '.join(lines) # Clean up whitespace text = re.sub(r'\s+', ' ', text) text = re.sub(r'\n\s*\n', '\n\n', text) text = text.strip() return text def _count_words(self, text: str) -> int: """Count words in text.""" if not text: return 0 words = re.findall(r'\b\w+\b', text) return len(words) def scrape_article(self, news_id: int) -> ScrapeResult: """ Scrape content for a single article. Args: news_id: ID of ZOPKNews record Returns: ScrapeResult with content or error """ # Get news record news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first() if not news: return ScrapeResult( success=False, error=f"News record {news_id} not found", status='failed' ) # Check if already scraped if news.scrape_status == 'scraped' and news.full_content: return ScrapeResult( success=True, content=news.full_content, word_count=news.content_word_count or 0, status='scraped' ) url = news.url original_google_url = None # Handle Google News URLs - decode to original source if is_google_news_url(url): logger.info(f"Decoding Google News URL for article {news_id}") original_google_url = url decoded_url = decode_google_news_url(url) if decoded_url: url = decoded_url logger.info(f"Decoded to: {url}") # Update news record with original URL and domain parsed = urlparse(url) real_domain = parsed.netloc.lower() if real_domain.startswith('www.'): real_domain = real_domain[4:] news.url = url news.source_domain = real_domain # Commit the URL update immediately self.db.commit() else: # Could not decode - mark as failed news.scrape_status = 'failed' news.scrape_error = 'Could not decode Google News URL' news.scrape_attempts = (news.scrape_attempts or 0) + 1 self.db.commit() return ScrapeResult( success=False, error='Could not decode Google News URL', status='failed' ) domain = self._get_domain(url) logger.info(f"Scraping article {news_id}: {url}") # Check if should skip if self._should_skip_domain(domain): news.scrape_status = 'skipped' news.scrape_error = f"Domain {domain} not scrapeable" self.db.commit() return ScrapeResult( success=False, error=f"Domain {domain} not scrapeable", status='skipped' ) # Fetch HTML html, fetch_error = self._fetch_html(url) if fetch_error: news.scrape_status = 'failed' news.scrape_error = fetch_error news.scrape_attempts = (news.scrape_attempts or 0) + 1 self.db.commit() return ScrapeResult( success=False, error=fetch_error, status='failed' ) # Extract content content, extract_error = self._extract_content(html, domain) if extract_error: news.scrape_status = 'failed' news.scrape_error = extract_error news.scrape_attempts = (news.scrape_attempts or 0) + 1 self.db.commit() return ScrapeResult( success=False, error=extract_error, status='failed' ) # Success - update database word_count = self._count_words(content) news.full_content = content news.content_word_count = word_count news.content_scraped_at = datetime.now() news.scrape_status = 'scraped' news.scrape_error = None news.scrape_attempts = (news.scrape_attempts or 0) + 1 self.db.commit() logger.info(f"Successfully scraped article {news_id}: {word_count} words") return ScrapeResult( success=True, content=content, word_count=word_count, status='scraped' ) def batch_scrape( self, limit: int = 50, status_filter: Optional[str] = None, force: bool = False, progress_callback: ProgressCallback = None ) -> Dict: """ Batch scrape articles. Args: limit: Maximum number of articles to scrape status_filter: Filter by approval status (approved, auto_approved) force: If True, re-scrape even already scraped articles progress_callback: Optional callback for progress updates Returns: Dict with statistics """ logger.info(f"Starting batch scrape: limit={limit}, force={force}") # Build query query = self.db.query(ZOPKNews) # Filter by approval status if status_filter: query = query.filter(ZOPKNews.status == status_filter) else: # Default: only approved/auto_approved articles query = query.filter(ZOPKNews.status.in_(['approved', 'auto_approved'])) # Filter by scrape status if not force: query = query.filter(ZOPKNews.scrape_status.in_(['pending', 'failed'])) # Limit retry attempts for failed query = query.filter( (ZOPKNews.scrape_status == 'pending') | ((ZOPKNews.scrape_status == 'failed') & (ZOPKNews.scrape_attempts < MAX_RETRY_ATTEMPTS)) ) # Order by creation date (newest first) query = query.order_by(ZOPKNews.created_at.desc()) # Limit articles = query.limit(limit).all() total = len(articles) # Statistics stats = { 'total': total, 'scraped': 0, 'failed': 0, 'skipped': 0, 'errors': [], 'scraped_articles': [], 'processing_time': 0 } # Send initial progress if progress_callback and total > 0: progress_callback(ProgressUpdate( current=0, total=total, percent=0.0, stage='scraping', status='processing', message=f'Rozpoczynam scraping {total} artykułów...', details={'scraped': 0, 'failed': 0, 'skipped': 0} )) start_time = time.time() for idx, article in enumerate(articles, 1): # Send progress update before processing if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round((idx - 1) / total * 100, 1), stage='scraping', status='processing', message=f'Pobieram treść: {article.title[:50]}...', article_id=article.id, article_title=article.title[:80], details={ 'scraped': stats['scraped'], 'failed': stats['failed'], 'skipped': stats['skipped'], 'source': article.source_name or 'nieznane' } )) result = self.scrape_article(article.id) if result.status == 'scraped': stats['scraped'] += 1 stats['scraped_articles'].append({ 'id': article.id, 'title': article.title[:100], 'word_count': result.word_count, 'source': article.source_name }) # Send success progress if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='scraping', status='success', message=f'✓ Pobrano {result.word_count} słów: {article.title[:40]}...', article_id=article.id, article_title=article.title[:80], details={ 'scraped': stats['scraped'], 'failed': stats['failed'], 'skipped': stats['skipped'], 'word_count': result.word_count } )) elif result.status == 'skipped': stats['skipped'] += 1 if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='scraping', status='skipped', message=f'⊘ Pominięto: {article.title[:40]}...', article_id=article.id, details={'scraped': stats['scraped'], 'failed': stats['failed'], 'skipped': stats['skipped']} )) else: stats['failed'] += 1 stats['errors'].append({ 'id': article.id, 'url': article.url, 'error': result.error }) if progress_callback: progress_callback(ProgressUpdate( current=idx, total=total, percent=round(idx / total * 100, 1), stage='scraping', status='failed', message=f'✗ Błąd: {result.error[:50]}...' if result.error else '✗ Błąd', article_id=article.id, article_title=article.title[:80], details={ 'scraped': stats['scraped'], 'failed': stats['failed'], 'skipped': stats['skipped'], 'error': result.error } )) stats['processing_time'] = round(time.time() - start_time, 2) # Send completion progress if progress_callback: progress_callback(ProgressUpdate( current=total, total=total, percent=100.0, stage='scraping', status='complete', message=f'Zakończono: {stats["scraped"]} pobrano, {stats["failed"]} błędów, {stats["skipped"]} pominięto', details={ 'scraped': stats['scraped'], 'failed': stats['failed'], 'skipped': stats['skipped'], 'processing_time': stats['processing_time'] } )) logger.info( f"Batch scrape complete: {stats['scraped']} scraped, " f"{stats['failed']} failed, {stats['skipped']} skipped " f"in {stats['processing_time']}s" ) return stats def get_scrape_statistics(self) -> Dict: """Get scraping statistics.""" from sqlalchemy import func # Count by scrape_status status_counts = self.db.query( ZOPKNews.scrape_status, func.count(ZOPKNews.id) ).filter( ZOPKNews.status.in_(['approved', 'auto_approved']) ).group_by(ZOPKNews.scrape_status).all() status_dict = {status: count for status, count in status_counts} # Total approved articles total_approved = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.status.in_(['approved', 'auto_approved']) ).scalar() # Articles ready for knowledge extraction ready_for_extraction = self.db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.scrape_status == 'scraped', ZOPKNews.knowledge_extracted == False ).scalar() # Average word count avg_word_count = self.db.query(func.avg(ZOPKNews.content_word_count)).filter( ZOPKNews.scrape_status == 'scraped' ).scalar() return { 'total_approved': total_approved or 0, 'scraped': status_dict.get('scraped', 0), 'pending': status_dict.get('pending', 0) + status_dict.get(None, 0), 'failed': status_dict.get('failed', 0), 'skipped': status_dict.get('skipped', 0), 'ready_for_extraction': ready_for_extraction or 0, 'avg_word_count': round(avg_word_count or 0, 0) } # ============================================================ # STANDALONE FUNCTIONS FOR CRON/CLI # ============================================================ def scrape_pending_articles(db_session, limit: int = 50) -> Dict: """ Convenience function for cron jobs. Usage: from zopk_content_scraper import scrape_pending_articles result = scrape_pending_articles(db_session, limit=50) """ scraper = ZOPKContentScraper(db_session) return scraper.batch_scrape(limit=limit) def get_scrape_stats(db_session) -> Dict: """ Get scraping statistics for monitoring. """ scraper = ZOPKContentScraper(db_session) return scraper.get_scrape_statistics()