Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
New "Pobierz logo" button on company detail page that automatically downloads and converts company logos from their websites. Uses 4-strategy approach (DOM scan, meta tags, favicon, Google API fallback) with animated progress overlay showing 6 steps. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
390 lines
15 KiB
Python
390 lines
15 KiB
Python
"""
|
|
Logo Fetch Service - Automatically downloads company logos from their websites.
|
|
|
|
Strategies (in priority order):
|
|
1. og:image / twitter:image meta tags
|
|
2. apple-touch-icon / link rel="icon" (largest size)
|
|
3. <img> elements with "logo" in class/id/alt/src
|
|
4. Google Favicon API fallback
|
|
|
|
Steps reported to frontend:
|
|
- fetch_website: GET company website
|
|
- meta_tags: Parse og:image, twitter:image, favicon
|
|
- scan_images: Scan img elements for logo candidates
|
|
- download: Download best candidate image
|
|
- convert: Convert to WebP format
|
|
- save: Save to static/img/companies/{slug}.webp
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
from io import BytesIO
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
USER_AGENT = 'Mozilla/5.0 (compatible; NordaBizBot/1.0)'
|
|
TIMEOUT = 10
|
|
MAX_DOWNLOAD_SIZE = 5 * 1024 * 1024 # 5MB
|
|
MIN_LOGO_SIZE = 64 # px
|
|
MAX_LOGO_SIZE = 800 # px
|
|
WEBP_QUALITY = 85
|
|
|
|
LOGO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static', 'img', 'companies')
|
|
|
|
|
|
class LogoFetchService:
|
|
|
|
def fetch_logo(self, website_url: str, slug: str) -> dict:
|
|
"""
|
|
Fetch logo from company website and save as WebP.
|
|
|
|
Returns: {'success': bool, 'message': str, 'source': str, 'steps': [...]}
|
|
"""
|
|
steps = []
|
|
candidates = []
|
|
|
|
# Ensure URL has protocol
|
|
if not website_url.startswith('http'):
|
|
website_url = 'https://' + website_url
|
|
|
|
# Step 1: Fetch website
|
|
html, base_url = self._step_fetch_website(website_url, steps)
|
|
if html is None:
|
|
return {'success': False, 'message': steps[-1]['message'], 'source': None, 'steps': steps}
|
|
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Step 2: Meta tags
|
|
self._step_meta_tags(soup, base_url, candidates, steps)
|
|
|
|
# Step 3: Scan images
|
|
self._step_scan_images(soup, base_url, candidates, steps)
|
|
|
|
# Add Google Favicon as last-resort fallback
|
|
domain = urlparse(base_url).netloc
|
|
if domain:
|
|
candidates.append({
|
|
'url': f'https://www.google.com/s2/favicons?domain={domain}&sz=128',
|
|
'source': 'google_favicon',
|
|
'priority': 100
|
|
})
|
|
|
|
if not candidates:
|
|
steps.append({'step': 'download', 'status': 'error', 'message': 'Nie znaleziono kandydatów na logo'})
|
|
steps.append({'step': 'convert', 'status': 'skipped', 'message': 'Pominięto — brak obrazu'})
|
|
steps.append({'step': 'save', 'status': 'skipped', 'message': 'Pominięto — brak obrazu'})
|
|
return {'success': False, 'message': 'Nie znaleziono logo na stronie firmy', 'source': None, 'steps': steps}
|
|
|
|
# Sort by priority (lower = better)
|
|
candidates.sort(key=lambda c: c['priority'])
|
|
|
|
# Step 4: Download best candidate
|
|
image_data, image_source, content_type = self._step_download(candidates, steps)
|
|
if image_data is None:
|
|
steps.append({'step': 'convert', 'status': 'skipped', 'message': 'Pominięto — brak obrazu'})
|
|
steps.append({'step': 'save', 'status': 'skipped', 'message': 'Pominięto — brak obrazu'})
|
|
return {'success': False, 'message': 'Nie udało się pobrać żadnego kandydata', 'source': None, 'steps': steps}
|
|
|
|
# Step 5: Convert
|
|
is_svg = content_type and 'svg' in content_type
|
|
output_data, file_ext = self._step_convert(image_data, is_svg, steps)
|
|
if output_data is None:
|
|
steps.append({'step': 'save', 'status': 'skipped', 'message': 'Pominięto — błąd konwersji'})
|
|
return {'success': False, 'message': 'Błąd konwersji obrazu', 'source': None, 'steps': steps}
|
|
|
|
# Step 6: Save
|
|
saved_path = self._step_save(output_data, slug, file_ext, steps)
|
|
if saved_path is None:
|
|
return {'success': False, 'message': 'Błąd zapisu pliku', 'source': None, 'steps': steps}
|
|
|
|
return {
|
|
'success': True,
|
|
'message': f'Logo pobrane z {image_source} i zapisane jako {slug}.{file_ext}',
|
|
'source': image_source,
|
|
'steps': steps
|
|
}
|
|
|
|
def _step_fetch_website(self, url, steps):
|
|
"""Step 1: Fetch the website HTML."""
|
|
try:
|
|
response = requests.get(url, timeout=TIMEOUT, headers={
|
|
'User-Agent': USER_AGENT,
|
|
'Accept': 'text/html,application/xhtml+xml'
|
|
}, allow_redirects=True)
|
|
response.raise_for_status()
|
|
steps.append({
|
|
'step': 'fetch_website',
|
|
'status': 'complete',
|
|
'message': f'Strona pobrana ({len(response.text)} znaków)'
|
|
})
|
|
return response.text, response.url
|
|
except requests.exceptions.SSLError:
|
|
# Retry without SSL verification
|
|
try:
|
|
http_url = url.replace('https://', 'http://')
|
|
response = requests.get(http_url, timeout=TIMEOUT, headers={
|
|
'User-Agent': USER_AGENT
|
|
}, allow_redirects=True)
|
|
response.raise_for_status()
|
|
steps.append({
|
|
'step': 'fetch_website',
|
|
'status': 'complete',
|
|
'message': f'Strona pobrana przez HTTP (błąd SSL)'
|
|
})
|
|
return response.text, response.url
|
|
except Exception as e:
|
|
steps.append({
|
|
'step': 'fetch_website',
|
|
'status': 'error',
|
|
'message': f'Błąd SSL i HTTP: {str(e)[:100]}'
|
|
})
|
|
return None, None
|
|
except Exception as e:
|
|
steps.append({
|
|
'step': 'fetch_website',
|
|
'status': 'error',
|
|
'message': f'Nie udało się pobrać strony: {str(e)[:100]}'
|
|
})
|
|
return None, None
|
|
|
|
def _step_meta_tags(self, soup, base_url, candidates, steps):
|
|
"""Step 2: Search meta tags for logo candidates."""
|
|
found = []
|
|
|
|
# og:image
|
|
og_img = soup.find('meta', property='og:image')
|
|
if og_img and og_img.get('content'):
|
|
url = urljoin(base_url, og_img['content'])
|
|
candidates.append({'url': url, 'source': 'og:image', 'priority': 10})
|
|
found.append('og:image')
|
|
|
|
# twitter:image
|
|
tw_img = soup.find('meta', attrs={'name': 'twitter:image'})
|
|
if tw_img and tw_img.get('content'):
|
|
url = urljoin(base_url, tw_img['content'])
|
|
candidates.append({'url': url, 'source': 'twitter:image', 'priority': 11})
|
|
found.append('twitter:image')
|
|
|
|
# apple-touch-icon (prefer largest)
|
|
touch_icons = soup.find_all('link', rel=lambda r: r and 'apple-touch-icon' in r)
|
|
if touch_icons:
|
|
best = max(touch_icons, key=lambda t: self._parse_size(t.get('sizes', '0x0')))
|
|
url = urljoin(base_url, best.get('href', ''))
|
|
if url:
|
|
candidates.append({'url': url, 'source': 'apple-touch-icon', 'priority': 5})
|
|
found.append('apple-touch-icon')
|
|
|
|
# link rel="icon" (prefer largest, skip tiny favicons)
|
|
icons = soup.find_all('link', rel=lambda r: r and 'icon' in r and 'apple' not in str(r))
|
|
for icon in icons:
|
|
size = self._parse_size(icon.get('sizes', '0x0'))
|
|
href = icon.get('href', '')
|
|
if href and size >= 64:
|
|
url = urljoin(base_url, href)
|
|
candidates.append({'url': url, 'source': 'favicon', 'priority': 15})
|
|
found.append(f'favicon ({icon.get("sizes", "?")})')
|
|
|
|
if found:
|
|
steps.append({
|
|
'step': 'meta_tags',
|
|
'status': 'complete',
|
|
'message': f'Znaleziono: {", ".join(found)}'
|
|
})
|
|
else:
|
|
steps.append({
|
|
'step': 'meta_tags',
|
|
'status': 'missing',
|
|
'message': 'Brak meta tagów z logo'
|
|
})
|
|
|
|
def _step_scan_images(self, soup, base_url, candidates, steps):
|
|
"""Step 3: Scan img elements for logo candidates."""
|
|
found_count = 0
|
|
|
|
for img in soup.find_all('img'):
|
|
attrs_text = ' '.join([
|
|
img.get('class', [''])[0] if isinstance(img.get('class'), list) else str(img.get('class', '')),
|
|
img.get('id', ''),
|
|
img.get('alt', ''),
|
|
img.get('src', '')
|
|
]).lower()
|
|
|
|
if 'logo' in attrs_text:
|
|
src = img.get('src') or img.get('data-src') or img.get('data-lazy-src')
|
|
if src:
|
|
url = urljoin(base_url, src)
|
|
# Prioritize based on attribute match
|
|
priority = 20
|
|
if 'logo' in (img.get('id', '') + ' '.join(img.get('class', []))).lower():
|
|
priority = 3 # Class/ID match is very strong signal
|
|
elif 'logo' in img.get('alt', '').lower():
|
|
priority = 8
|
|
candidates.append({'url': url, 'source': 'img_scan', 'priority': priority})
|
|
found_count += 1
|
|
|
|
# Also check CSS background images in header/nav
|
|
for el in soup.select('header a[class*="logo"], nav a[class*="logo"], .logo, #logo, [class*="brand"]'):
|
|
style = el.get('style', '')
|
|
bg_match = re.search(r'url\(["\']?([^"\')\s]+)["\']?\)', style)
|
|
if bg_match:
|
|
url = urljoin(base_url, bg_match.group(1))
|
|
candidates.append({'url': url, 'source': 'css_bg', 'priority': 7})
|
|
found_count += 1
|
|
|
|
if found_count > 0:
|
|
steps.append({
|
|
'step': 'scan_images',
|
|
'status': 'complete',
|
|
'message': f'Znaleziono {found_count} kandydatów z elementów img/CSS'
|
|
})
|
|
else:
|
|
steps.append({
|
|
'step': 'scan_images',
|
|
'status': 'missing',
|
|
'message': 'Brak elementów img z "logo" w atrybutach'
|
|
})
|
|
|
|
def _step_download(self, candidates, steps):
|
|
"""Step 4: Download the best candidate image."""
|
|
for candidate in candidates:
|
|
url = candidate['url']
|
|
try:
|
|
response = requests.get(url, timeout=TIMEOUT, headers={
|
|
'User-Agent': USER_AGENT
|
|
}, stream=True)
|
|
|
|
content_length = int(response.headers.get('content-length', 0))
|
|
if content_length > MAX_DOWNLOAD_SIZE:
|
|
logger.debug(f"Skipping {url}: too large ({content_length} bytes)")
|
|
continue
|
|
|
|
content_type = response.headers.get('content-type', '')
|
|
|
|
# Verify it's an image
|
|
if not any(t in content_type for t in ['image', 'svg', 'octet-stream']):
|
|
# Could be a redirect to HTML page (common for og:image on some sites)
|
|
if 'html' in content_type:
|
|
continue
|
|
|
|
data = response.content
|
|
|
|
if len(data) > MAX_DOWNLOAD_SIZE:
|
|
continue
|
|
|
|
# For raster images, verify dimensions
|
|
if 'svg' not in content_type:
|
|
try:
|
|
from PIL import Image
|
|
img = Image.open(BytesIO(data))
|
|
w, h = img.size
|
|
if w < MIN_LOGO_SIZE or h < MIN_LOGO_SIZE:
|
|
logger.debug(f"Skipping {url}: too small ({w}x{h})")
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
steps.append({
|
|
'step': 'download',
|
|
'status': 'complete',
|
|
'message': f'Pobrano obraz z {candidate["source"]} ({len(data)} bajtów)'
|
|
})
|
|
return data, candidate['source'], content_type
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Failed to download {url}: {e}")
|
|
continue
|
|
|
|
steps.append({
|
|
'step': 'download',
|
|
'status': 'error',
|
|
'message': 'Żaden kandydat nie spełnił wymagań (rozmiar, format)'
|
|
})
|
|
return None, None, None
|
|
|
|
def _step_convert(self, image_data, is_svg, steps):
|
|
"""Step 5: Convert image to WebP (or keep SVG)."""
|
|
if is_svg:
|
|
steps.append({
|
|
'step': 'convert',
|
|
'status': 'complete',
|
|
'message': 'Format SVG — zapisuję bez konwersji'
|
|
})
|
|
return image_data, 'svg'
|
|
|
|
try:
|
|
from PIL import Image
|
|
|
|
img = Image.open(BytesIO(image_data))
|
|
|
|
# Convert RGBA/P to RGB for WebP
|
|
if img.mode in ('RGBA', 'LA', 'P'):
|
|
if img.mode == 'P':
|
|
img = img.convert('RGBA')
|
|
background = Image.new('RGBA', img.size, (255, 255, 255, 255))
|
|
background.paste(img, mask=img.split()[-1] if 'A' in img.mode else None)
|
|
img = background.convert('RGB')
|
|
elif img.mode != 'RGB':
|
|
img = img.convert('RGB')
|
|
|
|
# Resize if too large
|
|
w, h = img.size
|
|
if w > MAX_LOGO_SIZE or h > MAX_LOGO_SIZE:
|
|
img.thumbnail((MAX_LOGO_SIZE, MAX_LOGO_SIZE), Image.LANCZOS)
|
|
w, h = img.size
|
|
|
|
# Save to WebP
|
|
output = BytesIO()
|
|
img.save(output, format='WEBP', quality=WEBP_QUALITY)
|
|
output_data = output.getvalue()
|
|
|
|
steps.append({
|
|
'step': 'convert',
|
|
'status': 'complete',
|
|
'message': f'Konwersja do WebP ({w}x{h}, {len(output_data)} bajtów)'
|
|
})
|
|
return output_data, 'webp'
|
|
|
|
except Exception as e:
|
|
steps.append({
|
|
'step': 'convert',
|
|
'status': 'error',
|
|
'message': f'Błąd konwersji: {str(e)[:100]}'
|
|
})
|
|
return None, None
|
|
|
|
def _step_save(self, data, slug, ext, steps):
|
|
"""Step 6: Save the file to disk."""
|
|
try:
|
|
os.makedirs(LOGO_DIR, exist_ok=True)
|
|
filename = f'{slug}.{ext}'
|
|
filepath = os.path.join(LOGO_DIR, filename)
|
|
|
|
with open(filepath, 'wb') as f:
|
|
f.write(data)
|
|
|
|
steps.append({
|
|
'step': 'save',
|
|
'status': 'complete',
|
|
'message': f'Zapisano jako {filename}'
|
|
})
|
|
return filepath
|
|
|
|
except Exception as e:
|
|
steps.append({
|
|
'step': 'save',
|
|
'status': 'error',
|
|
'message': f'Błąd zapisu: {str(e)[:100]}'
|
|
})
|
|
return None
|
|
|
|
@staticmethod
|
|
def _parse_size(sizes_str):
|
|
"""Parse '180x180' to max dimension int."""
|
|
match = re.search(r'(\d+)', str(sizes_str))
|
|
return int(match.group(1)) if match else 0
|