diff --git a/app.py b/app.py index b60027e..d8bc81a 100644 --- a/app.py +++ b/app.py @@ -7535,14 +7535,22 @@ def admin_zopk_news_add(): @login_required def api_zopk_search_news(): """ - Search for ZOPK news using Brave Search API. - Admin only - triggers manual search. + Search for ZOPK news using multiple sources with cross-verification. + + Sources: + - Brave Search API + - Google News RSS + - Local media RSS (trojmiasto.pl, dziennikbaltycki.pl) + + Cross-verification: + - 1 source → pending (manual review) + - 3+ sources → auto_approved """ if not current_user.is_admin: return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403 - from database import ZOPKNews, ZOPKNewsFetchJob - import hashlib + from database import ZOPKNewsFetchJob + from zopk_news_service import ZOPKNewsService import uuid db = SessionLocal() @@ -7550,20 +7558,12 @@ def api_zopk_search_news(): data = request.get_json() or {} query = data.get('query', 'Zielony Okręg Przemysłowy Kaszubia') - # Check for Brave API key - brave_api_key = os.getenv('BRAVE_SEARCH_API_KEY') - if not brave_api_key: - return jsonify({ - 'success': False, - 'error': 'Brak klucza API Brave Search. Ustaw BRAVE_SEARCH_API_KEY w .env' - }), 500 - # Create fetch job record job_id = str(uuid.uuid4())[:8] fetch_job = ZOPKNewsFetchJob( job_id=job_id, search_query=query, - search_api='brave', + search_api='multi_source', # Brave + RSS triggered_by='admin', triggered_by_user=current_user.id, status='running', @@ -7572,97 +7572,48 @@ def api_zopk_search_news(): db.add(fetch_job) db.commit() - # Call Brave Search API - import requests - headers = { - 'Accept': 'application/json', - 'X-Subscription-Token': brave_api_key - } - params = { - 'q': query, - 'count': 20, - 'freshness': 'pm', # past month - 'country': 'pl', - 'search_lang': 'pl' - } + # Use multi-source service + service = ZOPKNewsService(db) + results = service.search_all_sources(query) - response = requests.get( - 'https://api.search.brave.com/res/v1/news/search', - headers=headers, - params=params, - timeout=30 - ) - - if response.status_code != 200: - fetch_job.status = 'failed' - fetch_job.error_message = f'Brave API error: {response.status_code}' - fetch_job.completed_at = datetime.now() - db.commit() - return jsonify({ - 'success': False, - 'error': f'Błąd API Brave: {response.status_code}' - }), 500 - - results = response.json().get('results', []) - fetch_job.results_found = len(results) - - # Process results - new_count = 0 - for item in results: - url = item.get('url', '') - if not url: - continue - - url_hash = hashlib.sha256(url.encode()).hexdigest() - - # Skip if already exists - existing = db.query(ZOPKNews).filter(ZOPKNews.url_hash == url_hash).first() - if existing: - continue - - # Extract domain - from urllib.parse import urlparse - parsed = urlparse(url) - source_domain = parsed.netloc.replace('www.', '') - - # Parse date - published_at = None - age = item.get('age', '') - # Age format: "2 days ago", "5 hours ago", etc. - # For now, just use current time minus rough estimate - - news = ZOPKNews( - title=item.get('title', 'Bez tytułu'), - url=url, - url_hash=url_hash, - description=item.get('description', ''), - source_name=item.get('source', source_domain), - source_domain=source_domain, - image_url=item.get('thumbnail', {}).get('src'), - source_type='brave_search', - fetch_job_id=job_id, - status='pending', # Requires moderation - published_at=datetime.now() # Would need proper date parsing - ) - db.add(news) - new_count += 1 - - fetch_job.results_new = new_count + # Update fetch job + fetch_job.results_found = results['total_found'] + fetch_job.results_new = results['saved_new'] + fetch_job.results_approved = results['auto_approved'] fetch_job.status = 'completed' fetch_job.completed_at = datetime.now() db.commit() + # Build detailed message + source_info = ', '.join(f"{k}: {v}" for k, v in results['source_stats'].items() if v > 0) + return jsonify({ 'success': True, - 'message': f'Znaleziono {len(results)} wyników, dodano {new_count} nowych', + 'message': f"Znaleziono {results['total_found']} wyników z {len(results['source_stats'])} źródeł. " + f"Dodano {results['saved_new']} nowych, zaktualizowano {results['updated_existing']}. " + f"Auto-zatwierdzono: {results['auto_approved']}", 'job_id': job_id, - 'found': len(results), - 'new': new_count + 'total_found': results['total_found'], + 'unique_items': results['unique_items'], + 'saved_new': results['saved_new'], + 'updated_existing': results['updated_existing'], + 'auto_approved': results['auto_approved'], + 'source_stats': results['source_stats'] }) except Exception as e: db.rollback() logger.error(f"ZOPK news search error: {e}") + + # Update job status on error + try: + fetch_job.status = 'failed' + fetch_job.error_message = str(e) + fetch_job.completed_at = datetime.now() + db.commit() + except: + pass + return jsonify({'success': False, 'error': str(e)}), 500 finally: diff --git a/database.py b/database.py index 5d66923..deb5d6d 100644 --- a/database.py +++ b/database.py @@ -1772,8 +1772,15 @@ class ZOPKNews(Base): ai_summary = Column(Text) # AI-generated summary keywords = Column(StringArray) # Extracted keywords + # Cross-verification (multi-source confidence) + confidence_score = Column(Integer, default=1) # 1-5, increases with source confirmations + source_count = Column(Integer, default=1) # Number of sources that found this story + sources_list = Column(StringArray) # List of sources: ['brave', 'google_news', 'rss_trojmiasto'] + title_hash = Column(String(64), index=True) # For fuzzy title matching (normalized) + is_auto_verified = Column(Boolean, default=False) # True if 3+ sources confirmed + # Moderation workflow - status = Column(String(20), default='pending', index=True) # pending, approved, rejected + status = Column(String(20), default='pending', index=True) # pending, approved, rejected, auto_approved moderated_by = Column(Integer, ForeignKey('users.id')) moderated_at = Column(DateTime) rejection_reason = Column(Text) diff --git a/database/migrations/005_zopk_knowledge_base.sql b/database/migrations/005_zopk_knowledge_base.sql index fee9b8f..40475de 100644 --- a/database/migrations/005_zopk_knowledge_base.sql +++ b/database/migrations/005_zopk_knowledge_base.sql @@ -346,6 +346,65 @@ GRANT USAGE, SELECT ON SEQUENCE zopk_resources_id_seq TO nordabiz_app; GRANT USAGE, SELECT ON SEQUENCE zopk_company_links_id_seq TO nordabiz_app; GRANT USAGE, SELECT ON SEQUENCE zopk_news_fetch_jobs_id_seq TO nordabiz_app; +-- ============================================================ +-- 11. ALTER TABLE - Multi-source cross-verification columns +-- ============================================================ +-- These columns support automatic cross-verification from multiple sources + +-- Confidence score (1-5 based on source count) +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'zopk_news' AND column_name = 'confidence_score') THEN + ALTER TABLE zopk_news ADD COLUMN confidence_score INTEGER DEFAULT 1; + END IF; +END $$; + +-- Number of sources that found this story +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'zopk_news' AND column_name = 'source_count') THEN + ALTER TABLE zopk_news ADD COLUMN source_count INTEGER DEFAULT 1; + END IF; +END $$; + +-- List of sources (e.g., ['brave', 'google_news', 'rss_trojmiasto']) +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'zopk_news' AND column_name = 'sources_list') THEN + ALTER TABLE zopk_news ADD COLUMN sources_list TEXT[]; + END IF; +END $$; + +-- Title hash for fuzzy deduplication (normalized title) +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'zopk_news' AND column_name = 'title_hash') THEN + ALTER TABLE zopk_news ADD COLUMN title_hash VARCHAR(64); + END IF; +END $$; + +-- Auto-verified flag (True if 3+ sources confirmed) +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'zopk_news' AND column_name = 'is_auto_verified') THEN + ALTER TABLE zopk_news ADD COLUMN is_auto_verified BOOLEAN DEFAULT FALSE; + END IF; +END $$; + +-- Update status to include 'auto_approved' option +-- (no alter needed, just documentation that status can be: pending, approved, rejected, auto_approved) + +-- Index for title_hash (fuzzy matching) +CREATE INDEX IF NOT EXISTS idx_zopk_news_title_hash ON zopk_news(title_hash); + +-- Index for confidence score (filtering high-confidence news) +CREATE INDEX IF NOT EXISTS idx_zopk_news_confidence ON zopk_news(confidence_score); + -- ============================================================ -- MIGRATION COMPLETE -- ============================================================ diff --git a/zopk_news_service.py b/zopk_news_service.py new file mode 100644 index 0000000..f7e16fd --- /dev/null +++ b/zopk_news_service.py @@ -0,0 +1,515 @@ +""" +ZOPK News Service +================ + +Multi-source news search and cross-verification for +Zielony Okręg Przemysłowy Kaszubia (ZOPK) knowledge base. + +Sources: +- Brave Search API (web news) +- Google News RSS (aggregated news) +- Local media RSS feeds (trojmiasto.pl, dziennikbaltycki.pl) + +Cross-verification: +- 1 source → pending (manual moderation required) +- 2 sources → pending with higher confidence +- 3+ sources → auto_approved (verified automatically) + +Author: NordaBiz Development Team +Created: 2026-01-11 +""" + +import os +import re +import hashlib +import logging +import unicodedata +from datetime import datetime, timedelta +from typing import List, Dict, Optional, Tuple +from dataclasses import dataclass +from urllib.parse import urlparse + +import requests +import feedparser + +logger = logging.getLogger(__name__) + + +# ============================================================ +# RSS FEED SOURCES +# ============================================================ + +RSS_SOURCES = { + # Local media + 'trojmiasto': { + 'url': 'https://www.trojmiasto.pl/rss/wiadomosci.xml', + 'name': 'trojmiasto.pl', + 'type': 'local_media', + 'keywords': ['kaszubia', 'wejherowo', 'rumia', 'gdynia', 'pomorze', 'offshore', 'energia', 'przemysł', 'samsonowicz', 'kongsberg'] + }, + 'dziennik_baltycki': { + 'url': 'https://dziennikbaltycki.pl/rss/najnowsze.xml', + 'name': 'Dziennik Bałtycki', + 'type': 'local_media', + 'keywords': ['kaszubia', 'wejherowo', 'rumia', 'gdynia', 'elektrownia', 'offshore', 'samsonowicz', 'kongsberg', 'lubiatowo'] + }, + # Government sources + 'gov_mon': { + 'url': 'https://www.gov.pl/web/obrona-narodowa/rss', + 'name': 'Ministerstwo Obrony Narodowej', + 'type': 'government', + 'keywords': ['kongsberg', 'przemysł obronny', 'kaszubia', 'rumia', 'samsonowicz', 'inwestycje'] + }, + 'gov_przemysl': { + 'url': 'https://www.gov.pl/web/rozwoj-technologia/rss', + 'name': 'Ministerstwo Rozwoju i Technologii', + 'type': 'government', + 'keywords': ['offshore', 'elektrownia jądrowa', 'centrum danych', 'wodór', 'transformacja'] + }, + # Google News aggregated searches + 'google_news_zopk': { + 'url': 'https://news.google.com/rss/search?q=Zielony+Okr%C4%99g+Przemys%C5%82owy+Kaszubia&hl=pl&gl=PL&ceid=PL:pl', + 'name': 'Google News', + 'type': 'aggregator', + 'keywords': [] # No filtering, query-based + }, + 'google_news_offshore': { + 'url': 'https://news.google.com/rss/search?q=offshore+Polska+Baltyk&hl=pl&gl=PL&ceid=PL:pl', + 'name': 'Google News', + 'type': 'aggregator', + 'keywords': [] + }, + 'google_news_nuclear': { + 'url': 'https://news.google.com/rss/search?q=elektrownia+jadrowa+Polska+Lubiatowo&hl=pl&gl=PL&ceid=PL:pl', + 'name': 'Google News', + 'type': 'aggregator', + 'keywords': [] + }, + 'google_news_samsonowicz': { + 'url': 'https://news.google.com/rss/search?q=Maciej+Samsonowicz+MON&hl=pl&gl=PL&ceid=PL:pl', + 'name': 'Google News', + 'type': 'aggregator', + 'keywords': [] + }, + 'google_news_kongsberg': { + 'url': 'https://news.google.com/rss/search?q=Kongsberg+Polska+Rumia&hl=pl&gl=PL&ceid=PL:pl', + 'name': 'Google News', + 'type': 'aggregator', + 'keywords': [] + }, + # Business/local organizations (via Google News) + 'google_news_norda': { + 'url': 'https://news.google.com/rss/search?q=Norda+Biznes+Wejherowo&hl=pl&gl=PL&ceid=PL:pl', + 'name': 'Google News', + 'type': 'aggregator', + 'keywords': [] + }, + 'google_news_spoko': { + 'url': 'https://news.google.com/rss/search?q=Spoko+Gospodarcze+Pomorze&hl=pl&gl=PL&ceid=PL:pl', + 'name': 'Google News', + 'type': 'aggregator', + 'keywords': [] + } +} + +# ZOPK-related keywords for filtering +ZOPK_KEYWORDS = [ + # Project names + 'zielony okręg przemysłowy', + 'zopk', + 'kaszubia przemysłowa', + # Energy projects + 'offshore wind polska', + 'offshore bałtyk', + 'farma wiatrowa bałtyk', + 'elektrownia jądrowa lubiatowo', + 'elektrownia jądrowa kopalino', + 'pej lubiatowo', # Polskie Elektrownie Jądrowe + # Defense industry + 'kongsberg rumia', + 'kongsberg polska', + 'kongsberg defence', + 'przemysł obronny pomorze', + 'przemysł zbrojeniowy pomorze', + # Technology + 'centrum danych gdynia', + 'centrum danych pomorze', + 'data center pomorze', + 'wodór pomorze', + 'hydrogen pomorze', + 'laboratoria wodorowe', + # Key people + 'samsonowicz mon', + 'maciej samsonowicz', + 'kosiniak-kamysz przemysł', + # Locations + 'transformacja energetyczna pomorze', + 'inwestycje wejherowo', + 'inwestycje rumia', + 'strefa ekonomiczna rumia', + 'rumia invest park', + # Organizations + 'norda biznes', + 'spoko gospodarcze', + 'izba gospodarcza pomorze' +] + + +@dataclass +class NewsItem: + """Represents a news item from any source""" + title: str + url: str + description: str + source_name: str + source_type: str # brave, google_news, rss_local + source_id: str # specific source identifier + published_at: Optional[datetime] + image_url: Optional[str] = None + + @property + def url_hash(self) -> str: + """SHA256 hash of URL for exact deduplication""" + return hashlib.sha256(self.url.encode()).hexdigest() + + @property + def title_hash(self) -> str: + """Normalized title hash for fuzzy matching""" + return normalize_title_hash(self.title) + + @property + def domain(self) -> str: + """Extract domain from URL""" + parsed = urlparse(self.url) + return parsed.netloc.replace('www.', '') + + +def normalize_title_hash(title: str) -> str: + """ + Create a normalized hash from title for fuzzy matching. + + Normalization: + - Lowercase + - Remove diacritics (ą→a, ę→e, etc.) + - Remove punctuation + - Remove common words (i, w, z, na, do, etc.) + - Sort words alphabetically + - Hash the result + """ + if not title: + return '' + + # Lowercase + text = title.lower() + + # Remove diacritics + text = unicodedata.normalize('NFKD', text) + text = ''.join(c for c in text if not unicodedata.combining(c)) + + # Remove punctuation + text = re.sub(r'[^\w\s]', '', text) + + # Remove common Polish stop words + stop_words = {'i', 'w', 'z', 'na', 'do', 'o', 'od', 'za', 'po', 'przy', 'dla', 'oraz', 'sie', 'to', 'jest', 'ze', 'nie', 'jak', 'czy', 'ale', 'a'} + words = [w for w in text.split() if w not in stop_words and len(w) > 2] + + # Sort and join + text = ' '.join(sorted(words)) + + # Hash + return hashlib.sha256(text.encode()).hexdigest()[:32] + + +def is_zopk_relevant(title: str, description: str = '') -> bool: + """Check if content is relevant to ZOPK topics""" + text = f"{title} {description}".lower() + + for keyword in ZOPK_KEYWORDS: + if keyword.lower() in text: + return True + + return False + + +class ZOPKNewsService: + """ + Multi-source news search service with cross-verification. + """ + + def __init__(self, db_session, brave_api_key: Optional[str] = None): + self.db = db_session + self.brave_api_key = brave_api_key or os.getenv('BRAVE_API_KEY') + + def search_all_sources(self, query: str = 'Zielony Okręg Przemysłowy Kaszubia') -> Dict: + """ + Search all sources and return aggregated results with cross-verification. + + Returns: + Dict with search results and statistics + """ + all_items: List[NewsItem] = [] + source_stats = {} + + # 1. Brave Search API + if self.brave_api_key: + brave_items = self._search_brave(query) + all_items.extend(brave_items) + source_stats['brave'] = len(brave_items) + logger.info(f"Brave Search: found {len(brave_items)} items") + + # 2. RSS Feeds + for source_id, source_config in RSS_SOURCES.items(): + rss_items = self._fetch_rss(source_id, source_config) + all_items.extend(rss_items) + source_stats[source_id] = len(rss_items) + logger.info(f"RSS {source_id}: found {len(rss_items)} items") + + # 3. Cross-verify and deduplicate + verified_items = self._cross_verify(all_items) + + # 4. Save to database + saved_count, updated_count = self._save_to_database(verified_items) + + return { + 'total_found': len(all_items), + 'unique_items': len(verified_items), + 'saved_new': saved_count, + 'updated_existing': updated_count, + 'source_stats': source_stats, + 'auto_approved': sum(1 for item in verified_items if item.get('auto_approve', False)) + } + + def _search_brave(self, query: str) -> List[NewsItem]: + """Search Brave API for news""" + if not self.brave_api_key: + return [] + + items = [] + try: + headers = { + 'Accept': 'application/json', + 'X-Subscription-Token': self.brave_api_key + } + params = { + 'q': query, + 'count': 20, + 'freshness': 'pm', # past month + 'country': 'pl', + 'search_lang': 'pl' + } + + response = requests.get( + 'https://api.search.brave.com/res/v1/news/search', + headers=headers, + params=params, + timeout=30 + ) + + if response.status_code == 200: + results = response.json().get('results', []) + for item in results: + if item.get('url'): + items.append(NewsItem( + title=item.get('title', 'Bez tytułu'), + url=item['url'], + description=item.get('description', ''), + source_name=item.get('source', ''), + source_type='brave', + source_id='brave_search', + published_at=datetime.now(), # Brave doesn't provide exact date + image_url=item.get('thumbnail', {}).get('src') + )) + else: + logger.error(f"Brave API error: {response.status_code}") + + except Exception as e: + logger.error(f"Brave search error: {e}") + + return items + + def _fetch_rss(self, source_id: str, config: Dict) -> List[NewsItem]: + """Fetch and parse RSS feed""" + items = [] + try: + feed = feedparser.parse(config['url']) + + for entry in feed.entries[:30]: # Limit to 30 per feed + title = entry.get('title', '') + description = entry.get('summary', entry.get('description', '')) + + # Filter by keywords if specified + keywords = config.get('keywords', []) + if keywords and not any(kw in f"{title} {description}".lower() for kw in keywords): + continue + + # Check ZOPK relevance for local media + if config['type'] == 'local_media' and not is_zopk_relevant(title, description): + continue + + # Parse date + published_at = None + if hasattr(entry, 'published_parsed') and entry.published_parsed: + published_at = datetime(*entry.published_parsed[:6]) + + items.append(NewsItem( + title=title, + url=entry.get('link', ''), + description=description[:500], + source_name=config['name'], + source_type='rss_' + config['type'], + source_id=source_id, + published_at=published_at, + image_url=self._extract_image_from_entry(entry) + )) + + except Exception as e: + logger.error(f"RSS fetch error for {source_id}: {e}") + + return items + + def _extract_image_from_entry(self, entry) -> Optional[str]: + """Extract image URL from RSS entry""" + # Try media:thumbnail + if hasattr(entry, 'media_thumbnail') and entry.media_thumbnail: + return entry.media_thumbnail[0].get('url') + + # Try media:content + if hasattr(entry, 'media_content') and entry.media_content: + for media in entry.media_content: + if media.get('type', '').startswith('image/'): + return media.get('url') + + # Try enclosure + if hasattr(entry, 'enclosures') and entry.enclosures: + for enc in entry.enclosures: + if enc.get('type', '').startswith('image/'): + return enc.get('href') + + return None + + def _cross_verify(self, items: List[NewsItem]) -> List[Dict]: + """ + Cross-verify items from multiple sources. + + Groups items by title_hash to find the same story from different sources. + Increases confidence_score based on number of sources. + """ + # Group by title_hash (fuzzy match) + title_groups: Dict[str, List[NewsItem]] = {} + for item in items: + title_hash = item.title_hash + if title_hash not in title_groups: + title_groups[title_hash] = [] + title_groups[title_hash].append(item) + + # Also track URL hashes to avoid exact duplicates + seen_urls = set() + verified_items = [] + + for title_hash, group in title_groups.items(): + # Get unique sources + unique_sources = list(set(item.source_id for item in group)) + source_count = len(unique_sources) + + # Use the first item as base (prefer Brave for better metadata) + base_item = sorted(group, key=lambda x: x.source_type != 'brave')[0] + + if base_item.url_hash in seen_urls: + continue + seen_urls.add(base_item.url_hash) + + # Calculate confidence + confidence_score = min(5, source_count + 1) # 1-5 scale + auto_approve = source_count >= 3 + + verified_items.append({ + 'title': base_item.title, + 'url': base_item.url, + 'url_hash': base_item.url_hash, + 'title_hash': title_hash, + 'description': base_item.description, + 'source_name': base_item.source_name, + 'source_domain': base_item.domain, + 'source_type': base_item.source_type, + 'published_at': base_item.published_at, + 'image_url': base_item.image_url, + 'confidence_score': confidence_score, + 'source_count': source_count, + 'sources_list': unique_sources, + 'auto_approve': auto_approve + }) + + return verified_items + + def _save_to_database(self, items: List[Dict]) -> Tuple[int, int]: + """ + Save verified items to database. + + Returns: + Tuple of (new_count, updated_count) + """ + from database import ZOPKNews + + new_count = 0 + updated_count = 0 + + for item in items: + # Check if URL already exists + existing = self.db.query(ZOPKNews).filter( + ZOPKNews.url_hash == item['url_hash'] + ).first() + + if existing: + # Update source count and confidence if new sources found + existing_sources = existing.sources_list or [] + new_sources = [s for s in item['sources_list'] if s not in existing_sources] + + if new_sources: + existing.sources_list = existing_sources + new_sources + existing.source_count = len(existing.sources_list) + existing.confidence_score = min(5, existing.source_count + 1) + + # Auto-approve if threshold reached + if existing.source_count >= 3 and existing.status == 'pending': + existing.status = 'auto_approved' + existing.is_auto_verified = True + + updated_count += 1 + else: + # Create new entry + status = 'auto_approved' if item['auto_approve'] else 'pending' + + news = ZOPKNews( + title=item['title'], + url=item['url'], + url_hash=item['url_hash'], + title_hash=item['title_hash'], + description=item['description'], + source_name=item['source_name'], + source_domain=item['source_domain'], + source_type=item['source_type'], + published_at=item['published_at'], + image_url=item['image_url'], + confidence_score=item['confidence_score'], + source_count=item['source_count'], + sources_list=item['sources_list'], + is_auto_verified=item['auto_approve'], + status=status + ) + self.db.add(news) + new_count += 1 + + self.db.commit() + return new_count, updated_count + + +def search_zopk_news(db_session, query: str = None) -> Dict: + """ + Convenience function to search ZOPK news from all sources. + + Usage: + from zopk_news_service import search_zopk_news + results = search_zopk_news(db) + """ + service = ZOPKNewsService(db_session) + return service.search_all_sources(query or 'Zielony Okręg Przemysłowy Kaszubia')