""" Logo Fetch Service - Automatically downloads company logos from their websites. Strategies (in priority order): 1. og:image / twitter:image meta tags 2. apple-touch-icon / link rel="icon" (largest size) 3. elements with "logo" in class/id/alt/src 4. Google Favicon API fallback Steps reported to frontend: - fetch_website: GET company website - meta_tags: Parse og:image, twitter:image, favicon - scan_images: Scan img elements for logo candidates - download: Download best candidate image - convert: Convert to WebP format - save: Save to static/img/companies/{slug}.webp """ import logging import os import re from io import BytesIO from urllib.parse import urljoin, urlparse import requests from bs4 import BeautifulSoup logger = logging.getLogger(__name__) USER_AGENT = 'Mozilla/5.0 (compatible; NordaBizBot/1.0)' TIMEOUT = 10 MAX_DOWNLOAD_SIZE = 5 * 1024 * 1024 # 5MB MIN_LOGO_SIZE = 64 # px MAX_LOGO_SIZE = 800 # px WEBP_QUALITY = 85 LOGO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static', 'img', 'companies') class LogoFetchService: def fetch_logo(self, website_url: str, slug: str, preview: bool = False) -> dict: """ Fetch logo from company website and save as WebP. Args: preview: If True, save as {slug}_preview.{ext} instead of {slug}.{ext} Returns: {'success': bool, 'message': str, 'source': str, 'file_ext': str, 'steps': [...]} """ steps = [] candidates = [] # Ensure URL has protocol if not website_url.startswith('http'): website_url = 'https://' + website_url # Step 1: Fetch website html, base_url = self._step_fetch_website(website_url, steps) if html is None: return {'success': False, 'message': steps[-1]['message'], 'source': None, 'steps': steps} soup = BeautifulSoup(html, 'html.parser') # Step 2: Meta tags self._step_meta_tags(soup, base_url, candidates, steps) # Step 3: Scan images self._step_scan_images(soup, base_url, candidates, steps) # Add Google Favicon as last-resort fallback domain = urlparse(base_url).netloc if domain: candidates.append({ 'url': f'https://www.google.com/s2/favicons?domain={domain}&sz=128', 'source': 'google_favicon', 'priority': 100 }) if not candidates: steps.append({'step': 'download', 'status': 'error', 'message': 'Nie znaleziono kandydatów na logo'}) steps.append({'step': 'convert', 'status': 'skipped', 'message': 'Pominięto — brak obrazu'}) steps.append({'step': 'save', 'status': 'skipped', 'message': 'Pominięto — brak obrazu'}) return {'success': False, 'message': 'Nie znaleziono logo na stronie firmy', 'source': None, 'steps': steps} # Sort by priority (lower = better) candidates.sort(key=lambda c: c['priority']) # Step 4: Download best candidate image_data, image_source, content_type = self._step_download(candidates, steps) if image_data is None: steps.append({'step': 'convert', 'status': 'skipped', 'message': 'Pominięto — brak obrazu'}) steps.append({'step': 'save', 'status': 'skipped', 'message': 'Pominięto — brak obrazu'}) return {'success': False, 'message': 'Nie udało się pobrać żadnego kandydata', 'source': None, 'steps': steps} # Step 5: Convert is_svg = content_type and 'svg' in content_type output_data, file_ext = self._step_convert(image_data, is_svg, steps) if output_data is None: steps.append({'step': 'save', 'status': 'skipped', 'message': 'Pominięto — błąd konwersji'}) return {'success': False, 'message': 'Błąd konwersji obrazu', 'source': None, 'steps': steps} # Step 6: Save save_slug = f'{slug}_preview' if preview else slug saved_path = self._step_save(output_data, save_slug, file_ext, steps) if saved_path is None: return {'success': False, 'message': 'Błąd zapisu pliku', 'source': None, 'steps': steps} return { 'success': True, 'message': f'Logo pobrane z {image_source} i zapisane jako {save_slug}.{file_ext}', 'source': image_source, 'file_ext': file_ext, 'steps': steps } @staticmethod def confirm_logo(slug: str, file_ext: str) -> bool: """Rename preview file to final.""" preview = os.path.join(LOGO_DIR, f'{slug}_preview.{file_ext}') final = os.path.join(LOGO_DIR, f'{slug}.{file_ext}') if os.path.exists(preview): # Remove old logo in other format if exists for ext in ('webp', 'svg'): old = os.path.join(LOGO_DIR, f'{slug}.{ext}') if old != final and os.path.exists(old): os.remove(old) os.rename(preview, final) return True return False @staticmethod def cancel_logo(slug: str) -> bool: """Delete preview file.""" for ext in ('webp', 'svg'): preview = os.path.join(LOGO_DIR, f'{slug}_preview.{ext}') if os.path.exists(preview): os.remove(preview) return True @staticmethod def has_existing_logo(slug: str) -> str | None: """Check if company already has a logo. Returns extension or None.""" for ext in ('webp', 'svg'): if os.path.exists(os.path.join(LOGO_DIR, f'{slug}.{ext}')): return ext return None def _step_fetch_website(self, url, steps): """Step 1: Fetch the website HTML.""" try: response = requests.get(url, timeout=TIMEOUT, headers={ 'User-Agent': USER_AGENT, 'Accept': 'text/html,application/xhtml+xml' }, allow_redirects=True) response.raise_for_status() steps.append({ 'step': 'fetch_website', 'status': 'complete', 'message': f'Strona pobrana ({len(response.text)} znaków)' }) return response.text, response.url except requests.exceptions.SSLError: # Retry without SSL verification try: http_url = url.replace('https://', 'http://') response = requests.get(http_url, timeout=TIMEOUT, headers={ 'User-Agent': USER_AGENT }, allow_redirects=True) response.raise_for_status() steps.append({ 'step': 'fetch_website', 'status': 'complete', 'message': f'Strona pobrana przez HTTP (błąd SSL)' }) return response.text, response.url except Exception as e: steps.append({ 'step': 'fetch_website', 'status': 'error', 'message': f'Błąd SSL i HTTP: {str(e)[:100]}' }) return None, None except Exception as e: steps.append({ 'step': 'fetch_website', 'status': 'error', 'message': f'Nie udało się pobrać strony: {str(e)[:100]}' }) return None, None def _step_meta_tags(self, soup, base_url, candidates, steps): """Step 2: Search meta tags for logo candidates.""" found = [] # og:image og_img = soup.find('meta', property='og:image') if og_img and og_img.get('content'): url = urljoin(base_url, og_img['content']) candidates.append({'url': url, 'source': 'og:image', 'priority': 10}) found.append('og:image') # twitter:image tw_img = soup.find('meta', attrs={'name': 'twitter:image'}) if tw_img and tw_img.get('content'): url = urljoin(base_url, tw_img['content']) candidates.append({'url': url, 'source': 'twitter:image', 'priority': 11}) found.append('twitter:image') # apple-touch-icon (prefer largest) touch_icons = soup.find_all('link', rel=lambda r: r and 'apple-touch-icon' in r) if touch_icons: best = max(touch_icons, key=lambda t: self._parse_size(t.get('sizes', '0x0'))) url = urljoin(base_url, best.get('href', '')) if url: candidates.append({'url': url, 'source': 'apple-touch-icon', 'priority': 5}) found.append('apple-touch-icon') # link rel="icon" (prefer largest, skip tiny favicons) icons = soup.find_all('link', rel=lambda r: r and 'icon' in r and 'apple' not in str(r)) for icon in icons: size = self._parse_size(icon.get('sizes', '0x0')) href = icon.get('href', '') if href and size >= 64: url = urljoin(base_url, href) candidates.append({'url': url, 'source': 'favicon', 'priority': 15}) found.append(f'favicon ({icon.get("sizes", "?")})') if found: steps.append({ 'step': 'meta_tags', 'status': 'complete', 'message': f'Znaleziono: {", ".join(found)}' }) else: steps.append({ 'step': 'meta_tags', 'status': 'missing', 'message': 'Brak meta tagów z logo' }) def _step_scan_images(self, soup, base_url, candidates, steps): """Step 3: Scan img elements for logo candidates.""" found_count = 0 for img in soup.find_all('img'): attrs_text = ' '.join([ img.get('class', [''])[0] if isinstance(img.get('class'), list) else str(img.get('class', '')), img.get('id', ''), img.get('alt', ''), img.get('src', '') ]).lower() if 'logo' in attrs_text: src = img.get('src') or img.get('data-src') or img.get('data-lazy-src') if src: url = urljoin(base_url, src) # Prioritize based on attribute match priority = 20 if 'logo' in (img.get('id', '') + ' '.join(img.get('class', []))).lower(): priority = 3 # Class/ID match is very strong signal elif 'logo' in img.get('alt', '').lower(): priority = 8 candidates.append({'url': url, 'source': 'img_scan', 'priority': priority}) found_count += 1 # Also check CSS background images in header/nav for el in soup.select('header a[class*="logo"], nav a[class*="logo"], .logo, #logo, [class*="brand"]'): style = el.get('style', '') bg_match = re.search(r'url\(["\']?([^"\')\s]+)["\']?\)', style) if bg_match: url = urljoin(base_url, bg_match.group(1)) candidates.append({'url': url, 'source': 'css_bg', 'priority': 7}) found_count += 1 if found_count > 0: steps.append({ 'step': 'scan_images', 'status': 'complete', 'message': f'Znaleziono {found_count} kandydatów z elementów img/CSS' }) else: steps.append({ 'step': 'scan_images', 'status': 'missing', 'message': 'Brak elementów img z "logo" w atrybutach' }) def _step_download(self, candidates, steps): """Step 4: Download the best candidate image.""" for candidate in candidates: url = candidate['url'] try: response = requests.get(url, timeout=TIMEOUT, headers={ 'User-Agent': USER_AGENT }, stream=True) content_length = int(response.headers.get('content-length', 0)) if content_length > MAX_DOWNLOAD_SIZE: logger.debug(f"Skipping {url}: too large ({content_length} bytes)") continue content_type = response.headers.get('content-type', '') # Verify it's an image if not any(t in content_type for t in ['image', 'svg', 'octet-stream']): # Could be a redirect to HTML page (common for og:image on some sites) if 'html' in content_type: continue data = response.content if len(data) > MAX_DOWNLOAD_SIZE: continue # For raster images, verify dimensions if 'svg' not in content_type: try: from PIL import Image img = Image.open(BytesIO(data)) w, h = img.size if w < MIN_LOGO_SIZE or h < MIN_LOGO_SIZE: logger.debug(f"Skipping {url}: too small ({w}x{h})") continue except Exception: continue steps.append({ 'step': 'download', 'status': 'complete', 'message': f'Pobrano obraz z {candidate["source"]} ({len(data)} bajtów)' }) return data, candidate['source'], content_type except Exception as e: logger.debug(f"Failed to download {url}: {e}") continue steps.append({ 'step': 'download', 'status': 'error', 'message': 'Żaden kandydat nie spełnił wymagań (rozmiar, format)' }) return None, None, None def _step_convert(self, image_data, is_svg, steps): """Step 5: Convert image to WebP (or keep SVG).""" if is_svg: steps.append({ 'step': 'convert', 'status': 'complete', 'message': 'Format SVG — zapisuję bez konwersji' }) return image_data, 'svg' try: from PIL import Image img = Image.open(BytesIO(image_data)) # Convert RGBA/P to RGB for WebP if img.mode in ('RGBA', 'LA', 'P'): if img.mode == 'P': img = img.convert('RGBA') background = Image.new('RGBA', img.size, (255, 255, 255, 255)) background.paste(img, mask=img.split()[-1] if 'A' in img.mode else None) img = background.convert('RGB') elif img.mode != 'RGB': img = img.convert('RGB') # Resize if too large w, h = img.size if w > MAX_LOGO_SIZE or h > MAX_LOGO_SIZE: img.thumbnail((MAX_LOGO_SIZE, MAX_LOGO_SIZE), Image.LANCZOS) w, h = img.size # Save to WebP output = BytesIO() img.save(output, format='WEBP', quality=WEBP_QUALITY) output_data = output.getvalue() steps.append({ 'step': 'convert', 'status': 'complete', 'message': f'Konwersja do WebP ({w}x{h}, {len(output_data)} bajtów)' }) return output_data, 'webp' except Exception as e: steps.append({ 'step': 'convert', 'status': 'error', 'message': f'Błąd konwersji: {str(e)[:100]}' }) return None, None def _step_save(self, data, slug, ext, steps): """Step 6: Save the file to disk.""" try: os.makedirs(LOGO_DIR, exist_ok=True) filename = f'{slug}.{ext}' filepath = os.path.join(LOGO_DIR, filename) with open(filepath, 'wb') as f: f.write(data) steps.append({ 'step': 'save', 'status': 'complete', 'message': f'Zapisano jako {filename}' }) return filepath except Exception as e: steps.append({ 'step': 'save', 'status': 'error', 'message': f'Błąd zapisu: {str(e)[:100]}' }) return None @staticmethod def _parse_size(sizes_str): """Parse '180x180' to max dimension int.""" match = re.search(r'(\d+)', str(sizes_str)) return int(match.group(1)) if match else 0