""" Logo Fetch Service - Downloads multiple logo candidates from company websites. Strategies (in priority order): 1. elements with "logo" in class/id/alt/src 2. apple-touch-icon / link rel="icon" (largest size) 3. og:image / twitter:image meta tags 4. Google Favicon API fallback Flow: 1. Fetch website, find all candidates 2. Download up to MAX_CANDIDATES, save as {slug}_cand_{i}.webp 3. Frontend shows gallery — user picks one 4. confirm_candidate() renames chosen file to {slug}.webp 5. cleanup_candidates() removes temp files """ import glob import logging import os import re from io import BytesIO from urllib.parse import urljoin, urlparse import requests from bs4 import BeautifulSoup logger = logging.getLogger(__name__) USER_AGENT = 'Mozilla/5.0 (compatible; NordaBizBot/1.0)' TIMEOUT = 10 MAX_DOWNLOAD_SIZE = 5 * 1024 * 1024 # 5MB MIN_LOGO_SIZE = 32 # px (lowered to catch more candidates) MAX_LOGO_SIZE = 800 # px WEBP_QUALITY = 85 MAX_CANDIDATES = 6 LOGO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static', 'img', 'companies') SOURCE_LABELS = { 'img_scan': 'Element HTML', 'css_bg': 'Tło CSS', 'apple-touch-icon': 'Apple Touch Icon', 'og:image': 'Open Graph', 'twitter:image': 'Twitter Card', 'favicon': 'Favicon', 'google_favicon': 'Google Favicon', } class LogoFetchService: def fetch_candidates(self, website_url: str, slug: str) -> dict: """ Fetch multiple logo candidates from company website. Returns: { 'success': bool, 'message': str, 'candidates': [{'index': 0, 'source': str, 'label': str, 'ext': str, 'width': int, 'height': int}, ...], 'steps': [...] } """ steps = [] candidates = [] if not website_url.startswith('http'): website_url = 'https://' + website_url # Step 1: Fetch website html, base_url = self._step_fetch_website(website_url, steps) if html is None: return {'success': False, 'message': steps[-1]['message'], 'candidates': [], 'steps': steps} soup = BeautifulSoup(html, 'html.parser') # Step 2: Meta tags self._step_meta_tags(soup, base_url, candidates, steps) # Step 3: Scan images self._step_scan_images(soup, base_url, candidates, steps) # Add Google Favicon as last-resort fallback domain = urlparse(base_url).netloc if domain: candidates.append({ 'url': f'https://www.google.com/s2/favicons?domain={domain}&sz=128', 'source': 'google_favicon', 'priority': 100 }) if not candidates: steps.append({'step': 'download', 'status': 'error', 'message': 'Nie znaleziono kandydatów na logo'}) return {'success': False, 'message': 'Nie znaleziono logo na stronie firmy', 'candidates': [], 'steps': steps} # Deduplicate by URL seen_urls = set() unique = [] for c in candidates: if c['url'] not in seen_urls: seen_urls.add(c['url']) unique.append(c) candidates = unique # Sort by priority candidates.sort(key=lambda c: c['priority']) # Step 4+5+6: Download, convert, save each candidate saved = self._download_and_save_candidates(candidates, slug, steps) if not saved: return {'success': False, 'message': 'Nie udało się pobrać żadnego kandydata', 'candidates': [], 'steps': steps} steps.append({ 'step': 'save', 'status': 'complete', 'message': f'Zapisano {len(saved)} kandydatów do wyboru' }) return { 'success': True, 'message': f'Znaleziono {len(saved)} kandydatów na logo', 'candidates': saved, 'steps': steps } def _download_and_save_candidates(self, candidates, slug, steps): """Download up to MAX_CANDIDATES, convert and save each.""" saved = [] idx = 0 # Clean up old candidates first self.cleanup_candidates(slug) for candidate in candidates: if idx >= MAX_CANDIDATES: break url = candidate['url'] try: response = requests.get(url, timeout=TIMEOUT, headers={ 'User-Agent': USER_AGENT }) content_length = int(response.headers.get('content-length', 0)) if content_length > MAX_DOWNLOAD_SIZE: continue content_type = response.headers.get('content-type', '') if not any(t in content_type for t in ['image', 'svg', 'octet-stream']): if 'html' in content_type: continue data = response.content if len(data) > MAX_DOWNLOAD_SIZE or len(data) < 100: continue is_svg = 'svg' in content_type width, height = 0, 0 if not is_svg: try: from PIL import Image img = Image.open(BytesIO(data)) width, height = img.size if width < MIN_LOGO_SIZE or height < MIN_LOGO_SIZE: continue except Exception: continue # Convert output_data, file_ext = self._convert(data, is_svg) if output_data is None: continue # Get dimensions after conversion if not is_svg and width == 0: try: from PIL import Image img = Image.open(BytesIO(output_data)) width, height = img.size except Exception: pass # Save as candidate file os.makedirs(LOGO_DIR, exist_ok=True) filename = f'{slug}_cand_{idx}.{file_ext}' filepath = os.path.join(LOGO_DIR, filename) with open(filepath, 'wb') as f: f.write(output_data) saved.append({ 'index': idx, 'source': candidate['source'], 'label': SOURCE_LABELS.get(candidate['source'], candidate['source']), 'ext': file_ext, 'width': width, 'height': height, 'size': len(output_data), 'filename': filename }) idx += 1 except Exception as e: logger.debug(f"Failed to download candidate {url}: {e}") continue count = len(saved) if count > 0: steps.append({ 'step': 'download', 'status': 'complete', 'message': f'Pobrano {count} obrazów' }) else: steps.append({ 'step': 'download', 'status': 'error', 'message': 'Żaden kandydat nie spełnił wymagań' }) return saved def _convert(self, image_data, is_svg): """Convert image to WebP (or keep SVG). Returns (data, ext) or (None, None).""" if is_svg: return image_data, 'svg' try: from PIL import Image img = Image.open(BytesIO(image_data)) if img.mode in ('RGBA', 'LA', 'P'): if img.mode == 'P': img = img.convert('RGBA') background = Image.new('RGBA', img.size, (255, 255, 255, 255)) background.paste(img, mask=img.split()[-1] if 'A' in img.mode else None) img = background.convert('RGB') elif img.mode != 'RGB': img = img.convert('RGB') w, h = img.size if w > MAX_LOGO_SIZE or h > MAX_LOGO_SIZE: img.thumbnail((MAX_LOGO_SIZE, MAX_LOGO_SIZE), Image.LANCZOS) output = BytesIO() img.save(output, format='WEBP', quality=WEBP_QUALITY) return output.getvalue(), 'webp' except Exception as e: logger.debug(f"Conversion error: {e}") return None, None @staticmethod def confirm_candidate(slug: str, index: int) -> bool: """Rename chosen candidate to final logo file.""" # Find candidate file for ext in ('webp', 'svg'): cand = os.path.join(LOGO_DIR, f'{slug}_cand_{index}.{ext}') if os.path.exists(cand): final = os.path.join(LOGO_DIR, f'{slug}.{ext}') # Remove old logo in other format for old_ext in ('webp', 'svg'): old = os.path.join(LOGO_DIR, f'{slug}.{old_ext}') if old != final and os.path.exists(old): os.remove(old) os.rename(cand, final) # Cleanup remaining candidates LogoFetchService.cleanup_candidates(slug) return True return False @staticmethod def cleanup_candidates(slug: str): """Delete all candidate files for a slug.""" pattern = os.path.join(LOGO_DIR, f'{slug}_cand_*') for f in glob.glob(pattern): try: os.remove(f) except OSError: pass @staticmethod def has_existing_logo(slug: str) -> str | None: """Check if company already has a logo. Returns extension or None.""" for ext in ('webp', 'svg'): if os.path.exists(os.path.join(LOGO_DIR, f'{slug}.{ext}')): return ext return None def _step_fetch_website(self, url, steps): """Step 1: Fetch the website HTML.""" try: response = requests.get(url, timeout=TIMEOUT, headers={ 'User-Agent': USER_AGENT, 'Accept': 'text/html,application/xhtml+xml' }, allow_redirects=True) response.raise_for_status() steps.append({ 'step': 'fetch_website', 'status': 'complete', 'message': f'Strona pobrana ({len(response.text)} znaków)' }) return response.text, response.url except requests.exceptions.SSLError: try: http_url = url.replace('https://', 'http://') response = requests.get(http_url, timeout=TIMEOUT, headers={ 'User-Agent': USER_AGENT }, allow_redirects=True) response.raise_for_status() steps.append({ 'step': 'fetch_website', 'status': 'complete', 'message': f'Strona pobrana przez HTTP (błąd SSL)' }) return response.text, response.url except Exception as e: steps.append({ 'step': 'fetch_website', 'status': 'error', 'message': f'Błąd SSL i HTTP: {str(e)[:100]}' }) return None, None except Exception as e: steps.append({ 'step': 'fetch_website', 'status': 'error', 'message': f'Nie udało się pobrać strony: {str(e)[:100]}' }) return None, None def _step_meta_tags(self, soup, base_url, candidates, steps): """Step 2: Search meta tags for logo candidates.""" found = [] og_img = soup.find('meta', property='og:image') if og_img and og_img.get('content'): url = urljoin(base_url, og_img['content']) candidates.append({'url': url, 'source': 'og:image', 'priority': 10}) found.append('og:image') tw_img = soup.find('meta', attrs={'name': 'twitter:image'}) if tw_img and tw_img.get('content'): url = urljoin(base_url, tw_img['content']) candidates.append({'url': url, 'source': 'twitter:image', 'priority': 11}) found.append('twitter:image') touch_icons = soup.find_all('link', rel=lambda r: r and 'apple-touch-icon' in r) if touch_icons: best = max(touch_icons, key=lambda t: self._parse_size(t.get('sizes', '0x0'))) url = urljoin(base_url, best.get('href', '')) if url: candidates.append({'url': url, 'source': 'apple-touch-icon', 'priority': 5}) found.append('apple-touch-icon') icons = soup.find_all('link', rel=lambda r: r and 'icon' in r and 'apple' not in str(r)) for icon in icons: size = self._parse_size(icon.get('sizes', '0x0')) href = icon.get('href', '') if href and size >= 32: url = urljoin(base_url, href) candidates.append({'url': url, 'source': 'favicon', 'priority': 15}) found.append(f'favicon ({icon.get("sizes", "?")})') if found: steps.append({'step': 'meta_tags', 'status': 'complete', 'message': f'Znaleziono: {", ".join(found)}'}) else: steps.append({'step': 'meta_tags', 'status': 'missing', 'message': 'Brak meta tagów z logo'}) def _step_scan_images(self, soup, base_url, candidates, steps): """Step 3: Scan img elements for logo candidates.""" found_count = 0 for img in soup.find_all('img'): attrs_text = ' '.join([ img.get('class', [''])[0] if isinstance(img.get('class'), list) else str(img.get('class', '')), img.get('id', ''), img.get('alt', ''), img.get('src', '') ]).lower() if 'logo' in attrs_text: src = img.get('src') or img.get('data-src') or img.get('data-lazy-src') if src: url = urljoin(base_url, src) priority = 20 if 'logo' in (img.get('id', '') + ' '.join(img.get('class', []))).lower(): priority = 3 elif 'logo' in img.get('alt', '').lower(): priority = 8 candidates.append({'url': url, 'source': 'img_scan', 'priority': priority}) found_count += 1 for el in soup.select('header a[class*="logo"], nav a[class*="logo"], .logo, #logo, [class*="brand"]'): style = el.get('style', '') bg_match = re.search(r'url\(["\']?([^"\')\s]+)["\']?\)', style) if bg_match: url = urljoin(base_url, bg_match.group(1)) candidates.append({'url': url, 'source': 'css_bg', 'priority': 7}) found_count += 1 if found_count > 0: steps.append({'step': 'scan_images', 'status': 'complete', 'message': f'Znaleziono {found_count} kandydatów z elementów img/CSS'}) else: steps.append({'step': 'scan_images', 'status': 'missing', 'message': 'Brak elementów img z "logo" w atrybutach'}) @staticmethod def _parse_size(sizes_str): """Parse '180x180' to max dimension int.""" match = re.search(r'(\d+)', str(sizes_str)) return int(match.group(1)) if match else 0