nordabiz/zopk_content_scraper.py
Maciej Pienczyn 1e42c4fbd8 fix(scraper): Dodano domeny paywall do SKIP_DOMAINS
- wyborcza.pl - paywall Gazety Wyborczej
- rp.pl - paywall Rzeczpospolitej
- wykop.pl - agregator bez oryginalnej treści
- reddit.com - agregator

Te domeny zwracają cookie dialog zamiast treści artykułów

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 20:48:26 +01:00

678 lines
21 KiB
Python

"""
ZOPK Content Scraper - Pobieranie pełnej treści artykułów dla bazy wiedzy.
Scraper respektuje robots.txt i stosuje rate limiting.
Obsługuje główne polskie portale newsowe.
Usage:
from zopk_content_scraper import ZOPKContentScraper
scraper = ZOPKContentScraper(db_session)
result = scraper.scrape_article(news_id=123)
# lub batch:
result = scraper.batch_scrape(limit=50)
"""
import re
import time
import logging
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse
from dataclasses import dataclass
import requests
from bs4 import BeautifulSoup, Comment, NavigableString
from database import ZOPKNews
# Configure logging
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURATION
# ============================================================
# User-Agent identifying the bot
USER_AGENT = 'NordaBizBot/1.0 (+https://nordabiznes.pl/bot; kontakt@nordabiznes.pl)'
# Request timeout in seconds
REQUEST_TIMEOUT = 15
# Maximum content length (chars) to avoid memory issues
MAX_CONTENT_LENGTH = 100000 # ~100KB of text
# Rate limiting: seconds between requests per domain
RATE_LIMITS = {
'trojmiasto.pl': 2.0,
'dziennikbaltycki.pl': 2.0,
'nordafm.pl': 1.5,
'ttm24.pl': 1.5,
'radiogdansk.pl': 1.5,
'portalmorski.pl': 1.5,
'biznes.pap.pl': 2.0,
'default': 3.0
}
# Maximum retry attempts
MAX_RETRY_ATTEMPTS = 3
# ============================================================
# CONTENT SELECTORS PER DOMAIN
# ============================================================
# CSS selectors for article content extraction
# Order matters - first match wins
CONTENT_SELECTORS = {
'trojmiasto.pl': [
'article.article-content',
'div.article-body',
'div.article__content',
'div[itemprop="articleBody"]',
],
'dziennikbaltycki.pl': [
'div.article-body',
'article.article-main',
'div[itemprop="articleBody"]',
'div.art-content',
],
'nordafm.pl': [
'div.entry-content',
'article.post-content',
'div.post-body',
],
'ttm24.pl': [
'div.post-content',
'article.entry-content',
'div.article-content',
],
'radiogdansk.pl': [
'div.article-content',
'div.entry-content',
'article.post',
],
'portalmorski.pl': [
'div.article-content',
'div.entry-content',
'article.post-content',
],
'biznes.pap.pl': [
'div.article-content',
'div.news-content',
'article.content',
],
'gov.pl': [
'div.article-content',
'main.main-content',
'div.content',
],
'default': [
'article',
'div[itemprop="articleBody"]',
'div.article-content',
'div.article-body',
'div.entry-content',
'div.post-content',
'main.content',
'main',
]
}
# Elements to remove from content
ELEMENTS_TO_REMOVE = [
'script', 'style', 'nav', 'header', 'footer', 'aside',
'form', 'iframe', 'noscript', 'svg', 'canvas',
'.advertisement', '.ad', '.ads', '.advert', '.banner',
'.social-share', '.share-buttons', '.sharing',
'.related-articles', '.related-posts', '.recommendations',
'.comments', '.comment-section', '#comments',
'.newsletter', '.subscription', '.subscribe',
'.cookie-notice', '.cookie-banner', '.gdpr',
'.popup', '.modal', '.overlay',
'.sidebar', '.widget', '.navigation',
'.breadcrumb', '.breadcrumbs',
'.author-bio', '.author-box',
'.tags', '.tag-list', '.categories',
'.pagination', '.pager',
'[data-ad]', '[data-advertisement]',
]
# Domains that are not scrapeable (paywalls, dynamic content, etc.)
SKIP_DOMAINS = [
# Social media
'facebook.com',
'twitter.com',
'x.com',
'linkedin.com',
'youtube.com',
'instagram.com',
# Paywalled news sites (require login, return cookie dialogs)
'wyborcza.pl', # Gazeta Wyborcza paywall
'rp.pl', # Rzeczpospolita paywall
# Aggregators (no original content)
'wykop.pl', # Social news aggregator
'reddit.com',
]
# ============================================================
# DATA CLASSES
# ============================================================
@dataclass
class ScrapeResult:
"""Result of scraping an article."""
success: bool
content: Optional[str] = None
word_count: int = 0
error: Optional[str] = None
status: str = 'pending' # scraped, failed, skipped
# ============================================================
# SCRAPER CLASS
# ============================================================
class ZOPKContentScraper:
"""
Scraper for ZOPK news article content.
Features:
- Domain-specific content selectors
- Rate limiting per domain
- HTML cleaning (removes ads, navigation, etc.)
- Retry logic with exponential backoff
- robots.txt respect (via User-Agent)
"""
def __init__(self, db_session, user_id: Optional[int] = None):
"""
Initialize scraper.
Args:
db_session: SQLAlchemy database session
user_id: Optional user ID for audit logging
"""
self.db = db_session
self.user_id = user_id
self._last_request_time: Dict[str, float] = {}
self._session = self._create_session()
def _create_session(self) -> requests.Session:
"""Create requests session with proper headers."""
session = requests.Session()
session.headers.update({
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'pl-PL,pl;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
})
return session
def _get_domain(self, url: str) -> str:
"""Extract domain from URL."""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Remove www. prefix
if domain.startswith('www.'):
domain = domain[4:]
return domain
except Exception:
return 'unknown'
def _get_rate_limit(self, domain: str) -> float:
"""Get rate limit for domain."""
# Check exact domain first
if domain in RATE_LIMITS:
return RATE_LIMITS[domain]
# Check if domain ends with known domain
for known_domain, limit in RATE_LIMITS.items():
if domain.endswith(known_domain):
return limit
return RATE_LIMITS['default']
def _wait_for_rate_limit(self, domain: str) -> None:
"""Wait if needed to respect rate limiting."""
limit = self._get_rate_limit(domain)
last_time = self._last_request_time.get(domain, 0)
elapsed = time.time() - last_time
if elapsed < limit:
wait_time = limit - elapsed
logger.debug(f"Rate limiting: waiting {wait_time:.2f}s for {domain}")
time.sleep(wait_time)
self._last_request_time[domain] = time.time()
def _should_skip_domain(self, domain: str) -> bool:
"""Check if domain should be skipped."""
for skip in SKIP_DOMAINS:
if skip in domain:
return True
return False
def _get_content_selectors(self, domain: str) -> List[str]:
"""Get CSS selectors for domain."""
# Check exact domain
if domain in CONTENT_SELECTORS:
return CONTENT_SELECTORS[domain]
# Check if domain ends with known domain
for known_domain, selectors in CONTENT_SELECTORS.items():
if known_domain != 'default' and domain.endswith(known_domain):
return selectors
return CONTENT_SELECTORS['default']
def _fetch_html(self, url: str) -> Tuple[Optional[str], Optional[str]]:
"""
Fetch HTML content from URL.
Returns:
Tuple of (html_content, error_message)
"""
domain = self._get_domain(url)
# Check if domain should be skipped
if self._should_skip_domain(domain):
return None, f"Domain {domain} is not scrapeable (social media/paywall)"
# Apply rate limiting
self._wait_for_rate_limit(domain)
try:
response = self._session.get(
url,
timeout=REQUEST_TIMEOUT,
allow_redirects=True
)
response.raise_for_status()
# Check content type
content_type = response.headers.get('Content-Type', '')
if 'text/html' not in content_type and 'application/xhtml' not in content_type:
return None, f"Not HTML content: {content_type}"
# Detect encoding
response.encoding = response.apparent_encoding or 'utf-8'
return response.text, None
except requests.exceptions.Timeout:
return None, "Request timeout"
except requests.exceptions.TooManyRedirects:
return None, "Too many redirects"
except requests.exceptions.HTTPError as e:
return None, f"HTTP error: {e.response.status_code}"
except requests.exceptions.ConnectionError:
return None, "Connection error"
except requests.exceptions.RequestException as e:
return None, f"Request error: {str(e)}"
def _clean_html(self, soup: BeautifulSoup) -> BeautifulSoup:
"""Remove unwanted elements from HTML."""
# Remove comments
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
comment.extract()
# Remove unwanted elements
for selector in ELEMENTS_TO_REMOVE:
if selector.startswith('.') or selector.startswith('#') or selector.startswith('['):
# CSS selector
for element in soup.select(selector):
element.decompose()
else:
# Tag name
for element in soup.find_all(selector):
element.decompose()
return soup
def _extract_content(self, html: str, domain: str) -> Tuple[Optional[str], Optional[str]]:
"""
Extract article content from HTML.
Returns:
Tuple of (content_text, error_message)
"""
try:
soup = BeautifulSoup(html, 'html.parser')
# Clean HTML first
soup = self._clean_html(soup)
# Try domain-specific selectors
selectors = self._get_content_selectors(domain)
content_element = None
for selector in selectors:
content_element = soup.select_one(selector)
if content_element:
logger.debug(f"Found content with selector: {selector}")
break
if not content_element:
# Fallback: try to find largest text block
content_element = self._find_largest_text_block(soup)
if not content_element:
return None, "Could not find article content"
# Extract text
text = self._extract_text(content_element)
if not text or len(text) < 100:
return None, "Extracted content too short"
# Truncate if too long
if len(text) > MAX_CONTENT_LENGTH:
text = text[:MAX_CONTENT_LENGTH] + "..."
logger.warning(f"Content truncated to {MAX_CONTENT_LENGTH} chars")
return text, None
except Exception as e:
logger.error(f"Error extracting content: {e}")
return None, f"Extraction error: {str(e)}"
def _find_largest_text_block(self, soup: BeautifulSoup) -> Optional[BeautifulSoup]:
"""Find the largest text block in the page (fallback method)."""
candidates = soup.find_all(['article', 'main', 'div', 'section'])
best_element = None
best_score = 0
for element in candidates:
# Skip small elements
text = element.get_text(strip=True)
if len(text) < 200:
continue
# Calculate score based on text density and paragraph count
paragraphs = len(element.find_all('p'))
text_length = len(text)
# Prefer elements with many paragraphs
score = text_length + (paragraphs * 100)
if score > best_score:
best_score = score
best_element = element
return best_element
def _extract_text(self, element: BeautifulSoup) -> str:
"""Extract clean text from element."""
# Get text with proper spacing
lines = []
for child in element.descendants:
if isinstance(child, NavigableString):
text = str(child).strip()
if text:
lines.append(text)
elif child.name in ['br', 'p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li']:
lines.append('\n')
# Join and clean
text = ' '.join(lines)
# Clean up whitespace
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'\n\s*\n', '\n\n', text)
text = text.strip()
return text
def _count_words(self, text: str) -> int:
"""Count words in text."""
if not text:
return 0
words = re.findall(r'\b\w+\b', text)
return len(words)
def scrape_article(self, news_id: int) -> ScrapeResult:
"""
Scrape content for a single article.
Args:
news_id: ID of ZOPKNews record
Returns:
ScrapeResult with content or error
"""
# Get news record
news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first()
if not news:
return ScrapeResult(
success=False,
error=f"News record {news_id} not found",
status='failed'
)
# Check if already scraped
if news.scrape_status == 'scraped' and news.full_content:
return ScrapeResult(
success=True,
content=news.full_content,
word_count=news.content_word_count or 0,
status='scraped'
)
url = news.url
domain = self._get_domain(url)
logger.info(f"Scraping article {news_id}: {url}")
# Check if should skip
if self._should_skip_domain(domain):
news.scrape_status = 'skipped'
news.scrape_error = f"Domain {domain} not scrapeable"
self.db.commit()
return ScrapeResult(
success=False,
error=f"Domain {domain} not scrapeable",
status='skipped'
)
# Fetch HTML
html, fetch_error = self._fetch_html(url)
if fetch_error:
news.scrape_status = 'failed'
news.scrape_error = fetch_error
news.scrape_attempts = (news.scrape_attempts or 0) + 1
self.db.commit()
return ScrapeResult(
success=False,
error=fetch_error,
status='failed'
)
# Extract content
content, extract_error = self._extract_content(html, domain)
if extract_error:
news.scrape_status = 'failed'
news.scrape_error = extract_error
news.scrape_attempts = (news.scrape_attempts or 0) + 1
self.db.commit()
return ScrapeResult(
success=False,
error=extract_error,
status='failed'
)
# Success - update database
word_count = self._count_words(content)
news.full_content = content
news.content_word_count = word_count
news.content_scraped_at = datetime.now()
news.scrape_status = 'scraped'
news.scrape_error = None
news.scrape_attempts = (news.scrape_attempts or 0) + 1
self.db.commit()
logger.info(f"Successfully scraped article {news_id}: {word_count} words")
return ScrapeResult(
success=True,
content=content,
word_count=word_count,
status='scraped'
)
def batch_scrape(
self,
limit: int = 50,
status_filter: Optional[str] = None,
force: bool = False
) -> Dict:
"""
Batch scrape articles.
Args:
limit: Maximum number of articles to scrape
status_filter: Filter by approval status (approved, auto_approved)
force: If True, re-scrape even already scraped articles
Returns:
Dict with statistics
"""
logger.info(f"Starting batch scrape: limit={limit}, force={force}")
# Build query
query = self.db.query(ZOPKNews)
# Filter by approval status
if status_filter:
query = query.filter(ZOPKNews.status == status_filter)
else:
# Default: only approved/auto_approved articles
query = query.filter(ZOPKNews.status.in_(['approved', 'auto_approved']))
# Filter by scrape status
if not force:
query = query.filter(ZOPKNews.scrape_status.in_(['pending', 'failed']))
# Limit retry attempts for failed
query = query.filter(
(ZOPKNews.scrape_status == 'pending') |
((ZOPKNews.scrape_status == 'failed') & (ZOPKNews.scrape_attempts < MAX_RETRY_ATTEMPTS))
)
# Order by creation date (newest first)
query = query.order_by(ZOPKNews.created_at.desc())
# Limit
articles = query.limit(limit).all()
# Statistics
stats = {
'total': len(articles),
'scraped': 0,
'failed': 0,
'skipped': 0,
'errors': [],
'scraped_articles': [],
'processing_time': 0
}
start_time = time.time()
for article in articles:
result = self.scrape_article(article.id)
if result.status == 'scraped':
stats['scraped'] += 1
stats['scraped_articles'].append({
'id': article.id,
'title': article.title[:100],
'word_count': result.word_count,
'source': article.source_name
})
elif result.status == 'skipped':
stats['skipped'] += 1
else:
stats['failed'] += 1
stats['errors'].append({
'id': article.id,
'url': article.url,
'error': result.error
})
stats['processing_time'] = round(time.time() - start_time, 2)
logger.info(
f"Batch scrape complete: {stats['scraped']} scraped, "
f"{stats['failed']} failed, {stats['skipped']} skipped "
f"in {stats['processing_time']}s"
)
return stats
def get_scrape_statistics(self) -> Dict:
"""Get scraping statistics."""
from sqlalchemy import func
# Count by scrape_status
status_counts = self.db.query(
ZOPKNews.scrape_status,
func.count(ZOPKNews.id)
).filter(
ZOPKNews.status.in_(['approved', 'auto_approved'])
).group_by(ZOPKNews.scrape_status).all()
status_dict = {status: count for status, count in status_counts}
# Total approved articles
total_approved = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.status.in_(['approved', 'auto_approved'])
).scalar()
# Articles ready for knowledge extraction
ready_for_extraction = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.scrape_status == 'scraped',
ZOPKNews.knowledge_extracted == False
).scalar()
# Average word count
avg_word_count = self.db.query(func.avg(ZOPKNews.content_word_count)).filter(
ZOPKNews.scrape_status == 'scraped'
).scalar()
return {
'total_approved': total_approved or 0,
'scraped': status_dict.get('scraped', 0),
'pending': status_dict.get('pending', 0) + status_dict.get(None, 0),
'failed': status_dict.get('failed', 0),
'skipped': status_dict.get('skipped', 0),
'ready_for_extraction': ready_for_extraction or 0,
'avg_word_count': round(avg_word_count or 0, 0)
}
# ============================================================
# STANDALONE FUNCTIONS FOR CRON/CLI
# ============================================================
def scrape_pending_articles(db_session, limit: int = 50) -> Dict:
"""
Convenience function for cron jobs.
Usage:
from zopk_content_scraper import scrape_pending_articles
result = scrape_pending_articles(db_session, limit=50)
"""
scraper = ZOPKContentScraper(db_session)
return scraper.batch_scrape(limit=limit)
def get_scrape_stats(db_session) -> Dict:
"""
Get scraping statistics for monitoring.
"""
scraper = ZOPKContentScraper(db_session)
return scraper.get_scrape_statistics()