""" Google Places API (New) Service for NordaBiz ============================================= Comprehensive Google Places API client for fetching rich business data. Uses the Places API (New) with field masks for efficient billing. API Reference: https://developers.google.com/maps/documentation/places/web-service/op-overview Author: NordaBiz Development Team Created: 2026-02-06 """ import os import logging from datetime import datetime, timedelta from typing import Optional, Dict, List, Any from decimal import Decimal import requests logger = logging.getLogger(__name__) # API Configuration PLACES_API_BASE = "https://places.googleapis.com/v1/places" PLACES_SEARCH_URL = "https://places.googleapis.com/v1/places:searchText" PLACES_NEARBY_URL = "https://places.googleapis.com/v1/places:searchNearby" # Field masks grouped by billing tier # Basic fields (no charge): id, displayName, formattedAddress, location, types, etc. # Contact fields: nationalPhoneNumber, websiteUri, etc. # Atmosphere fields: reviews, rating, etc. BASIC_FIELDS = [ "id", "displayName", "formattedAddress", "location", "types", "primaryType", "primaryTypeDisplayName", "businessStatus", "googleMapsUri", "googleMapsLinks", "utcOffsetMinutes", "adrFormatAddress", "shortFormattedAddress" ] CONTACT_FIELDS = [ "nationalPhoneNumber", "internationalPhoneNumber", "websiteUri" ] HOURS_FIELDS = [ "regularOpeningHours", "currentOpeningHours" ] ATMOSPHERE_FIELDS = [ "rating", "userRatingCount", "reviews", "priceLevel", "editorialSummary" ] PHOTO_FIELDS = [ "photos" ] ATTRIBUTE_FIELDS = [ "paymentOptions", "parkingOptions", "accessibilityOptions", "outdoorSeating", "liveMusic", "servesBreakfast", "servesLunch", "servesDinner", "servesBeer", "servesWine", "servesCoffee", "goodForChildren", "allowsDogs", "restroom", "goodForGroups", "goodForWatchingSports", "reservable", "delivery", "dineIn", "takeout", "curbsidePickup" ] class GooglePlacesService: """Fetches rich GBP data via Places API (New).""" def __init__(self, api_key: str = None): self.api_key = api_key or os.getenv('GOOGLE_PLACES_API_KEY') if not self.api_key: raise ValueError("GOOGLE_PLACES_API_KEY not set in environment") self.session = requests.Session() self.session.headers.update({ 'X-Goog-Api-Key': self.api_key, 'Content-Type': 'application/json' }) def _build_field_mask(self, include_reviews: bool = True, include_photos: bool = True, include_attributes: bool = True) -> str: """Build field mask string for API request.""" fields = BASIC_FIELDS + CONTACT_FIELDS + HOURS_FIELDS + ATMOSPHERE_FIELDS if include_photos: fields += PHOTO_FIELDS if include_attributes: fields += ATTRIBUTE_FIELDS return ','.join(f'places.{f}' if '.' not in f else f for f in fields) def get_place_details(self, place_id: str, include_reviews: bool = True, include_photos: bool = True, include_attributes: bool = True) -> Optional[Dict[str, Any]]: """ Fetch comprehensive place details by Place ID. Args: place_id: Google Place ID include_reviews: Include reviews data (billed separately) include_photos: Include photo references include_attributes: Include business attributes Returns: Dict with place details or None on error """ url = f"{PLACES_API_BASE}/{place_id}" # Build field mask fields = list(BASIC_FIELDS + CONTACT_FIELDS + HOURS_FIELDS + ATMOSPHERE_FIELDS) if include_photos: fields += PHOTO_FIELDS if include_attributes: fields += ATTRIBUTE_FIELDS field_mask = ','.join(fields) headers = { 'X-Goog-FieldMask': field_mask } params = { 'languageCode': 'pl', } try: response = self.session.get(url, headers=headers, params=params, timeout=15) response.raise_for_status() data = response.json() logger.info(f"Fetched place details for {place_id}: {data.get('displayName', {}).get('text', 'unknown')}") return data except requests.exceptions.HTTPError as e: logger.error(f"Places API HTTP error for {place_id}: {e.response.status_code} - {e.response.text}") return None except requests.exceptions.RequestException as e: logger.error(f"Places API request error for {place_id}: {e}") return None @staticmethod def _tokenize_name(name: str) -> set: """Tokenize a company name into significant lowercase words.""" import re as _re skip_words = { 'sp', 'z', 'o', 'oo', 'sa', 'sc', 'j', 'k', 'ul', 'i', 'w', 'do', 'na', 'po', 'ze', 'the', 'and', 'of', 'for', 'group', } # Split on non-alphanumeric, keep words; treat & as connector (P&P, S&K) words = _re.findall(r'[a-ząćęłńóśźż0-9]+(?:&[a-ząćęłńóśźż0-9]+)*', name.lower()) return {w for w in words if len(w) > 1 and w not in skip_words} @staticmethod def _name_match_score(company_name: str, google_name: str) -> float: """ Compute name match score with prefix detection and bidirectional fallback. 1. If Google name starts with company name (word boundary) → 1.0 "INPI" matches "INPI - Infrastruktura IT" (same company, extra description) "TERMO" does NOT match "TERMO-BUD" (compound name, no space separator) 2. Otherwise, bidirectional word matching with max() denominator. """ import re as _re cn = company_name.strip().lower() gn = google_name.strip().lower() # Strip legal forms for prefix comparison (Sp. z o.o., S.A., Sp.j., etc.) clean_cn = _re.sub( r'\s*(sp\.?\s*z\.?\s*o\.?\s*o\.?|sp\.?\s*[jkp]\.?|s\.?\s*[ac]\.?)\s*\.?\s*$', '', cn, flags=_re.IGNORECASE ).strip() or cn # Also strip legal forms from Google name for reverse prefix check clean_gn = _re.sub( r'\s*(sp\.?\s*z\.?\s*o\.?\s*o\.?|sp\.?\s*[jkp]\.?|s\.?\s*[ac]\.?)\s*\.?\s*$', '', gn, flags=_re.IGNORECASE ).strip() or gn # Prefix check: company name at start of Google name, or vice versa, # followed by space, period, comma, or end — NOT dash (compound names) wb = r'(?:[\s.,;:]|$)' if clean_cn and (_re.match(_re.escape(clean_cn) + wb, gn) or _re.match(_re.escape(clean_gn) + wb, clean_cn)): return 1.0 company_words = GooglePlacesService._tokenize_name(company_name) google_words = GooglePlacesService._tokenize_name(google_name) if not company_words: return 0.0 denominator = max(len(company_words), len(google_words)) if denominator == 0: return 0.0 matched = company_words & google_words return len(matched) / denominator def search_place(self, query: str, location_bias: Dict = None, company_name: str = None) -> Optional[Dict[str, Any]]: """ Search for a place by text query. Args: query: Search text (e.g., "TERMO Wejherowo") location_bias: Optional location bias {"latitude": 54.6, "longitude": 18.2, "radius": 5000} company_name: Optional company name for result validation. If provided, verifies the result name matches before returning. Returns: Best matching place or None """ body = { "textQuery": query, "languageCode": "pl", "maxResultCount": 5 } if location_bias: body["locationBias"] = { "circle": { "center": { "latitude": location_bias["latitude"], "longitude": location_bias["longitude"] }, "radius": location_bias.get("radius", 5000.0) } } field_mask = ','.join(f'places.{f}' for f in ['id', 'displayName', 'formattedAddress', 'types', 'rating', 'userRatingCount', 'googleMapsUri']) headers = { 'X-Goog-FieldMask': field_mask } try: response = self.session.post(PLACES_SEARCH_URL, json=body, headers=headers, timeout=15) response.raise_for_status() data = response.json() places = data.get('places', []) if not places: logger.warning(f"No places found for query: {query}") return None if not company_name: return places[0] # Validate: company name must significantly match Google result name. # Uses word-boundary matching with minimum threshold: # - Short names (1-2 significant words): ALL words must match # - Longer names (3+ words): at least 50% of words must match company_words = self._tokenize_name(company_name) min_ratio = 1.0 if len(company_words) <= 2 else 0.5 best_place = None best_score = 0.0 for place in places: google_name = place.get('displayName', {}).get('text', '') score = self._name_match_score(company_name, google_name) if score >= min_ratio and score > best_score: best_score = score best_place = place if best_place: matched_name = best_place.get('displayName', {}).get('text', '') logger.info( f"Name match for '{company_name}': '{matched_name}' (score={best_score:.2f})" ) return best_place logger.warning( f"No name match for '{company_name}' (min_ratio={min_ratio:.0%}) in Google results: " f"{[p.get('displayName', {}).get('text', '') for p in places]}" ) return None except requests.exceptions.RequestException as e: logger.error(f"Places search error for '{query}': {e}") return None def search_places_raw(self, query: str, location_bias: Dict = None) -> List[Dict[str, Any]]: """ Search for places and return ALL results (no name filtering). Used for manual review/matching in admin panel. """ body = { "textQuery": query, "languageCode": "pl", "maxResultCount": 5 } if location_bias: body["locationBias"] = { "circle": { "center": { "latitude": location_bias["latitude"], "longitude": location_bias["longitude"] }, "radius": location_bias.get("radius", 5000.0) } } field_mask = ','.join(f'places.{f}' for f in [ 'id', 'displayName', 'formattedAddress', 'types', 'rating', 'userRatingCount', 'googleMapsUri' ]) headers = {'X-Goog-FieldMask': field_mask} try: response = self.session.post(PLACES_SEARCH_URL, json=body, headers=headers, timeout=15) response.raise_for_status() return response.json().get('places', []) except requests.exceptions.RequestException as e: logger.error(f"Places raw search error for '{query}': {e}") return [] def search_nearby(self, latitude: float, longitude: float, radius: float = 5000.0, included_types: List[str] = None, max_results: int = 10) -> List[Dict[str, Any]]: """ Search for nearby places (for competitor discovery). Args: latitude: Center point latitude longitude: Center point longitude radius: Search radius in meters included_types: Filter by place types (e.g., ["restaurant"]) max_results: Maximum results to return Returns: List of nearby places """ body = { "locationRestriction": { "circle": { "center": { "latitude": latitude, "longitude": longitude }, "radius": radius } }, "maxResultCount": min(max_results, 20), "languageCode": "pl" } if included_types: body["includedTypes"] = included_types field_mask = ','.join(f'places.{f}' for f in [ 'id', 'displayName', 'formattedAddress', 'types', 'rating', 'userRatingCount', 'googleMapsUri', 'websiteUri', 'primaryType', 'photos', 'businessStatus', 'location' ]) headers = { 'X-Goog-FieldMask': field_mask } try: response = self.session.post(PLACES_NEARBY_URL, json=body, headers=headers, timeout=15) response.raise_for_status() data = response.json() return data.get('places', []) except requests.exceptions.RequestException as e: logger.error(f"Nearby search error: {e}") return [] def get_photo_url(self, photo_name: str, max_width: int = 400) -> str: """ Get photo URL from photo resource name. Args: photo_name: Photo resource name from place details max_width: Maximum width in pixels Returns: Photo URL string """ return f"https://places.googleapis.com/v1/{photo_name}/media?maxWidthPx={max_width}&key={self.api_key}" def extract_reviews_data(self, place_data: Dict) -> Dict[str, Any]: """ Extract and analyze reviews from place details. Returns: Dict with review statistics and individual reviews """ reviews = place_data.get('reviews', []) if not reviews: return { 'total_from_api': 0, 'total_reported': place_data.get('userRatingCount', 0), 'average_rating': place_data.get('rating'), 'reviews': [], 'with_response': 0, 'without_response': 0, 'response_rate': 0.0, 'rating_distribution': {1: 0, 2: 0, 3: 0, 4: 0, 5: 0} } rating_dist = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0} with_response = 0 processed_reviews = [] for review in reviews: rating = review.get('rating', 0) if rating in rating_dist: rating_dist[rating] += 1 has_response = bool(review.get('authorAttribution', {}).get('displayName')) # Check if there's an owner response (Google marks these differently) # The Places API (New) doesn't directly expose owner responses in the same way # We'll check for the presence of a response field processed_reviews.append({ 'author': review.get('authorAttribution', {}).get('displayName', 'Anonim'), 'rating': rating, 'text': review.get('text', {}).get('text', ''), 'time': review.get('publishTime', ''), 'relative_time': review.get('relativePublishTimeDescription', ''), 'language': review.get('text', {}).get('languageCode', 'pl'), }) total = len(reviews) response_rate = (with_response / total * 100) if total > 0 else 0.0 return { 'total_from_api': total, 'total_reported': place_data.get('userRatingCount', 0), 'average_rating': place_data.get('rating'), 'reviews': processed_reviews, 'with_response': with_response, 'without_response': total - with_response, 'response_rate': round(response_rate, 1), 'rating_distribution': rating_dist } def extract_attributes(self, place_data: Dict) -> Dict[str, Any]: """ Extract business attributes from place details. Returns: Dict with categorized attributes """ attributes = {} # Payment options payment = place_data.get('paymentOptions', {}) if payment: attributes['payment'] = { 'accepts_credit_cards': payment.get('acceptsCreditCards'), 'accepts_debit_cards': payment.get('acceptsDebitCards'), 'accepts_cash_only': payment.get('acceptsCashOnly'), 'accepts_nfc': payment.get('acceptsNfc'), } # Parking parking = place_data.get('parkingOptions', {}) if parking: attributes['parking'] = { 'free_parking': parking.get('freeParkingLot'), 'paid_parking': parking.get('paidParkingLot'), 'street_parking': parking.get('freeStreetParking'), 'garage_parking': parking.get('freeGarageParking'), 'valet_parking': parking.get('valetParking'), } # Accessibility accessibility = place_data.get('accessibilityOptions', {}) if accessibility: attributes['accessibility'] = { 'wheelchair_entrance': accessibility.get('wheelchairAccessibleEntrance'), 'wheelchair_seating': accessibility.get('wheelchairAccessibleSeating'), 'wheelchair_restroom': accessibility.get('wheelchairAccessibleRestroom'), 'wheelchair_parking': accessibility.get('wheelchairAccessibleParking'), } # Service options service = {} bool_fields = { 'delivery': 'delivery', 'dineIn': 'dine_in', 'takeout': 'takeout', 'curbsidePickup': 'curbside_pickup', 'reservable': 'reservable', 'outdoorSeating': 'outdoor_seating', } for api_field, key in bool_fields.items(): val = place_data.get(api_field) if val is not None: service[key] = val if service: attributes['service'] = service # Amenities amenities = {} amenity_fields = { 'restroom': 'restroom', 'goodForChildren': 'good_for_children', 'allowsDogs': 'allows_dogs', 'goodForGroups': 'good_for_groups', 'liveMusic': 'live_music', 'goodForWatchingSports': 'good_for_watching_sports', } for api_field, key in amenity_fields.items(): val = place_data.get(api_field) if val is not None: amenities[key] = val if amenities: attributes['amenities'] = amenities # Food & Drink food = {} food_fields = { 'servesBreakfast': 'breakfast', 'servesLunch': 'lunch', 'servesDinner': 'dinner', 'servesBeer': 'beer', 'servesWine': 'wine', 'servesCoffee': 'coffee', } for api_field, key in food_fields.items(): val = place_data.get(api_field) if val is not None: food[key] = val if food: attributes['food_and_drink'] = food return attributes def extract_hours(self, place_data: Dict) -> Dict[str, Any]: """Extract opening hours from place details.""" result = { 'regular': None, 'current': None, 'has_special_hours': False, 'special_hours': None } regular = place_data.get('regularOpeningHours', {}) if regular: result['regular'] = { 'periods': regular.get('periods', []), 'weekday_descriptions': regular.get('weekdayDescriptions', []), 'open_now': regular.get('openNow') } current = place_data.get('currentOpeningHours', {}) if current: result['current'] = { 'periods': current.get('periods', []), 'weekday_descriptions': current.get('weekdayDescriptions', []), 'open_now': current.get('openNow') } # If current differs from regular, there are special hours if current.get('specialDays'): result['has_special_hours'] = True result['special_hours'] = current.get('specialDays', []) return result def extract_photos_metadata(self, place_data: Dict) -> Dict[str, Any]: """Extract photo metadata from place details.""" photos = place_data.get('photos', []) if not photos: return { 'total_count': 0, 'photos': [], 'has_owner_photos': False } photo_list = [] has_owner = False for photo in photos: attributions = photo.get('authorAttributions', []) is_owner = any(a.get('displayName', '').lower() in ['owner', 'właściciel'] for a in attributions) if is_owner: has_owner = True photo_list.append({ 'name': photo.get('name', ''), 'width': photo.get('widthPx', 0), 'height': photo.get('heightPx', 0), 'attributions': [a.get('displayName', '') for a in attributions], 'is_owner_photo': is_owner }) return { 'total_count': len(photo_list), 'photos': photo_list[:20], # Limit stored photos 'has_owner_photos': has_owner }