""" Google Gemini AI Service ======================== Reusable service for interacting with Google Gemini API. Features: - Multiple model support (Flash, Pro, Gemini 3) - Thinking Mode support for Gemini 3 models - Error handling and retries - Cost tracking - Safety settings configuration Author: NordaBiz Team Updated: 2026-01-29 (Gemini 3 SDK migration) """ import os import logging import hashlib import time from datetime import datetime from typing import Optional, Dict, Any, List # New Gemini SDK (google-genai) with thinking mode support from google import genai from google.genai import types # Configure logging logger = logging.getLogger(__name__) # Database imports for cost tracking try: from database import SessionLocal, AIAPICostLog, AIUsageLog DB_AVAILABLE = True except ImportError: logger.warning("Database not available - cost tracking disabled") DB_AVAILABLE = False # Available Gemini models (2026 - Gemini 3 generation available) GEMINI_MODELS = { 'flash': 'gemini-2.5-flash', # Best for general use - balanced cost/quality 'flash-lite': 'gemini-2.5-flash-lite', # Ultra cheap - $0.10/$0.40 per 1M tokens 'pro': 'gemini-2.5-pro', # High quality - best reasoning/coding 'flash-2.0': 'gemini-2.0-flash', # Second generation - 1M context window (wycofywany 31.03.2026) '3-flash': 'gemini-3-flash-preview', # Gemini 3 Flash - 7x lepszy reasoning, thinking mode '3-pro': 'gemini-3-pro-preview', # Gemini 3 Pro - najlepszy reasoning, 2M context } # Models that support thinking mode THINKING_MODELS = {'gemini-3-flash-preview', 'gemini-3-pro-preview'} # Available thinking levels for Gemini 3 Flash THINKING_LEVELS = { 'minimal': 'MINIMAL', # Lowest latency, minimal reasoning 'low': 'LOW', # Fast, simple tasks 'medium': 'MEDIUM', # Balanced (Gemini 3 Flash only) 'high': 'HIGH', # Maximum reasoning depth (default) } # Pricing per 1M tokens (USD) - updated 2026-01-29 # Note: Flash on Free Tier = $0.00, Pro on Paid Tier = paid pricing GEMINI_PRICING = { 'gemini-2.5-flash': {'input': 0.30, 'output': 2.50, 'thinking': 0}, 'gemini-2.5-flash-lite': {'input': 0.10, 'output': 0.40, 'thinking': 0}, 'gemini-2.5-pro': {'input': 1.25, 'output': 10.00, 'thinking': 0}, 'gemini-2.0-flash': {'input': 0.10, 'output': 0.40, 'thinking': 0}, 'gemini-3-flash-preview': {'input': 0.00, 'output': 0.00, 'thinking': 0.00}, # Free tier! 'gemini-3-pro-preview': {'input': 2.00, 'output': 12.00, 'thinking': 4.00}, # Paid tier } class GeminiService: """Service class for Google Gemini API interactions with Thinking Mode support.""" def __init__( self, api_key: Optional[str] = None, model: str = 'flash', thinking_level: str = 'high', include_thoughts: bool = False ): """ Initialize Gemini service. Args: api_key: Google AI API key (reads from env if not provided) model: Model to use ('flash', 'flash-lite', 'pro', '3-flash', '3-pro') thinking_level: Reasoning depth ('minimal', 'low', 'medium', 'high') include_thoughts: Whether to include thinking process in response (for debugging) API Keys (auto-selected by model): - GOOGLE_GEMINI_API_KEY_FREE: Free tier for Flash models (no cost) - GOOGLE_GEMINI_API_KEY: Paid tier for Pro models """ # Auto-select API key based on model (Free tier for Flash, Paid for Pro) if api_key: self.api_key = api_key elif model in ('3-pro', 'pro'): # Pro models use paid tier self.api_key = os.getenv('GOOGLE_GEMINI_API_KEY') else: # Flash models prefer free tier, fallback to paid self.api_key = os.getenv('GOOGLE_GEMINI_API_KEY_FREE') or os.getenv('GOOGLE_GEMINI_API_KEY') # Debug: Log API key (masked) if self.api_key: logger.info(f"API key loaded: {self.api_key[:10]}...{self.api_key[-4:]}") else: logger.error("API key is None or empty!") if not self.api_key or self.api_key == 'TWOJ_KLUCZ_API_TUTAJ': raise ValueError( "GOOGLE_GEMINI_API_KEY not configured. " "Please add your API key to .env file." ) # Initialize new Gemini client self.client = genai.Client(api_key=self.api_key) # Set model self.model_name = GEMINI_MODELS.get(model, GEMINI_MODELS['flash']) # Thinking mode configuration self.thinking_level = thinking_level self.include_thoughts = include_thoughts self._thinking_enabled = self.model_name in THINKING_MODELS # Safety settings self.safety_settings = [ types.SafetySetting( category="HARM_CATEGORY_HATE_SPEECH", threshold="BLOCK_NONE" ), types.SafetySetting( category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_NONE" ), types.SafetySetting( category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="BLOCK_NONE" ), types.SafetySetting( category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE" ), ] logger.info( f"Gemini service initialized: model={self.model_name}, " f"thinking={self._thinking_enabled}, level={thinking_level}" ) @property def thinking_enabled(self) -> bool: """Whether thinking mode is enabled for current model.""" return self._thinking_enabled @property def thinking_level_display(self) -> str: """Human-readable thinking level for UI.""" if not self._thinking_enabled: return "Wyłączony" return { 'minimal': 'Minimalny', 'low': 'Niski', 'medium': 'Średni', 'high': 'Wysoki' }.get(self.thinking_level, self.thinking_level) def get_status(self) -> Dict[str, Any]: """Get service status for UI display.""" return { 'model': self.model_name, 'thinking_enabled': self._thinking_enabled, 'thinking_level': self.thinking_level, 'thinking_level_display': self.thinking_level_display, 'include_thoughts': self.include_thoughts } def generate_text( self, prompt: str, temperature: float = 0.7, max_tokens: Optional[int] = None, stream: bool = False, thinking_level: Optional[str] = None, feature: str = 'general', user_id: Optional[int] = None, company_id: Optional[int] = None, related_entity_type: Optional[str] = None, related_entity_id: Optional[int] = None ) -> str: """ Generate text using Gemini API with automatic cost tracking and thinking mode. Args: prompt: Text prompt to send to the model temperature: Sampling temperature (0.0-1.0). Higher = more creative max_tokens: Maximum tokens to generate (None = model default) stream: Whether to stream the response thinking_level: Override default thinking level for this call feature: Feature name for cost tracking ('chat', 'news_evaluation', etc.) user_id: Optional user ID for cost tracking company_id: Optional company ID for context related_entity_type: Entity type ('zopk_news', 'chat_message', etc.) related_entity_id: Entity ID for reference Returns: Generated text response Raises: Exception: If API call fails """ start_time = time.time() try: # Build generation config config_params = { 'temperature': temperature, } if max_tokens: config_params['max_output_tokens'] = max_tokens # Add thinking config for Gemini 3 models if self._thinking_enabled: level = thinking_level or self.thinking_level thinking_config = types.ThinkingConfig( thinking_level=THINKING_LEVELS.get(level, 'HIGH'), include_thoughts=self.include_thoughts ) config_params['thinking_config'] = thinking_config # Build full config generation_config = types.GenerateContentConfig( **config_params, safety_settings=self.safety_settings ) # Call API response = self.client.models.generate_content( model=self.model_name, contents=prompt, config=generation_config ) if stream: return response # Extract response text response_text = response.text # Count tokens and log cost latency_ms = int((time.time() - start_time) * 1000) # Get token counts from response metadata input_tokens = self._count_tokens_from_response(response, 'input') output_tokens = self._count_tokens_from_response(response, 'output') thinking_tokens = self._count_tokens_from_response(response, 'thinking') # Log with thinking level info level = thinking_level or self.thinking_level logger.info( f"Gemini API call successful. " f"Tokens: {input_tokens}+{output_tokens}" f"{f'+{thinking_tokens}t' if thinking_tokens else ''}, " f"Latency: {latency_ms}ms, " f"Model: {self.model_name}, " f"Thinking: {level.upper() if self._thinking_enabled else 'OFF'}" ) # Log to database for cost tracking self._log_api_cost( prompt=prompt, response_text=response_text, input_tokens=input_tokens, output_tokens=output_tokens, thinking_tokens=thinking_tokens, latency_ms=latency_ms, success=True, feature=feature, user_id=user_id, company_id=company_id, related_entity_type=related_entity_type, related_entity_id=related_entity_id ) return response_text except Exception as e: latency_ms = int((time.time() - start_time) * 1000) # Log failed request self._log_api_cost( prompt=prompt, response_text='', input_tokens=self._estimate_tokens(prompt), output_tokens=0, thinking_tokens=0, latency_ms=latency_ms, success=False, error_message=str(e), feature=feature, user_id=user_id, company_id=company_id, related_entity_type=related_entity_type, related_entity_id=related_entity_id ) logger.error(f"Gemini API error: {str(e)}") raise Exception(f"Gemini API call failed: {str(e)}") def chat(self, messages: List[Dict[str, str]]) -> str: """ Multi-turn chat conversation. Args: messages: List of message dicts with 'role' and 'content' keys Example: [ {'role': 'user', 'content': 'Hello'}, {'role': 'model', 'content': 'Hi there!'}, {'role': 'user', 'content': 'How are you?'} ] Returns: Model's response to the last message """ try: # Build contents from messages contents = [] for msg in messages: role = 'user' if msg['role'] == 'user' else 'model' contents.append(types.Content( role=role, parts=[types.Part(text=msg['content'])] )) # Build config with thinking if available config_params = {'temperature': 0.7} if self._thinking_enabled: config_params['thinking_config'] = types.ThinkingConfig( thinking_level=THINKING_LEVELS.get(self.thinking_level, 'HIGH'), include_thoughts=self.include_thoughts ) generation_config = types.GenerateContentConfig( **config_params, safety_settings=self.safety_settings ) response = self.client.models.generate_content( model=self.model_name, contents=contents, config=generation_config ) return response.text except Exception as e: logger.error(f"Gemini chat error: {str(e)}") raise Exception(f"Gemini chat failed: {str(e)}") def analyze_image(self, image_path: str, prompt: str) -> str: """ Analyze image with Gemini Vision. Args: image_path: Path to image file prompt: Text prompt describing what to analyze Returns: Analysis result """ try: import PIL.Image img = PIL.Image.open(image_path) # Convert image to bytes import io img_bytes = io.BytesIO() img.save(img_bytes, format=img.format or 'PNG') img_bytes = img_bytes.getvalue() contents = [ types.Part(text=prompt), types.Part( inline_data=types.Blob( mime_type=f"image/{(img.format or 'png').lower()}", data=img_bytes ) ) ] response = self.client.models.generate_content( model=self.model_name, contents=contents ) return response.text except Exception as e: logger.error(f"Gemini image analysis error: {str(e)}") raise Exception(f"Image analysis failed: {str(e)}") def count_tokens(self, text: str) -> int: """ Count tokens in text. Args: text: Text to count tokens for Returns: Number of tokens """ try: result = self.client.models.count_tokens( model=self.model_name, contents=text ) return result.total_tokens except Exception as e: logger.warning(f"Token counting failed: {e}") return self._estimate_tokens(text) def _estimate_tokens(self, text: str) -> int: """Estimate tokens when API counting fails (~4 chars per token).""" return len(text) // 4 def _count_tokens_from_response(self, response, token_type: str) -> int: """Extract token count from API response metadata.""" try: usage = response.usage_metadata if not usage: return 0 if token_type == 'input': return getattr(usage, 'prompt_token_count', 0) or 0 elif token_type == 'output': return getattr(usage, 'candidates_token_count', 0) or 0 elif token_type == 'thinking': # Gemini 3 reports thinking tokens separately return getattr(usage, 'thinking_token_count', 0) or 0 except Exception: return 0 return 0 def _log_api_cost( self, prompt: str, response_text: str, input_tokens: int, output_tokens: int, thinking_tokens: int = 0, latency_ms: int = 0, success: bool = True, error_message: Optional[str] = None, feature: str = 'general', user_id: Optional[int] = None, company_id: Optional[int] = None, related_entity_type: Optional[str] = None, related_entity_id: Optional[int] = None ): """ Log API call costs to database for monitoring. Args: prompt: Input prompt text response_text: Output response text input_tokens: Number of input tokens used output_tokens: Number of output tokens generated thinking_tokens: Number of thinking tokens (Gemini 3) latency_ms: Response time in milliseconds success: Whether API call succeeded error_message: Error details if failed feature: Feature name ('chat', 'news_evaluation', 'user_creation', etc.) user_id: Optional user ID company_id: Optional company ID for context related_entity_type: Entity type ('zopk_news', 'chat_message', etc.) related_entity_id: Entity ID for reference """ if not DB_AVAILABLE: return try: # Calculate costs pricing = GEMINI_PRICING.get(self.model_name, {'input': 0.50, 'output': 3.00, 'thinking': 1.00}) input_cost = (input_tokens / 1_000_000) * pricing['input'] output_cost = (output_tokens / 1_000_000) * pricing['output'] thinking_cost = (thinking_tokens / 1_000_000) * pricing.get('thinking', 0) total_cost = input_cost + output_cost + thinking_cost # Cost in cents for AIUsageLog cost_cents = total_cost * 100 # Create prompt hash (for debugging, not storing full prompt for privacy) prompt_hash = hashlib.sha256(prompt.encode()).hexdigest() # Save to database db = SessionLocal() try: # Log to legacy AIAPICostLog table legacy_log = AIAPICostLog( timestamp=datetime.now(), api_provider='gemini', model_name=self.model_name, feature=feature, user_id=user_id, input_tokens=input_tokens, output_tokens=output_tokens + thinking_tokens, # Combined for legacy total_tokens=input_tokens + output_tokens + thinking_tokens, input_cost=input_cost, output_cost=output_cost + thinking_cost, # Combined for legacy total_cost=total_cost, success=success, error_message=error_message, latency_ms=latency_ms, prompt_hash=prompt_hash ) db.add(legacy_log) # Log to new AIUsageLog table usage_log = AIUsageLog( request_type=feature, model=self.model_name, tokens_input=input_tokens, tokens_output=output_tokens + thinking_tokens, cost_cents=cost_cents, user_id=user_id, company_id=company_id, related_entity_type=related_entity_type, related_entity_id=related_entity_id, prompt_length=len(prompt), response_length=len(response_text), response_time_ms=latency_ms, success=success, error_message=error_message ) db.add(usage_log) db.commit() logger.info( f"API cost logged: {feature} - ${total_cost:.6f} " f"({input_tokens}+{output_tokens}" f"{f'+{thinking_tokens}t' if thinking_tokens else ''} tokens, {latency_ms}ms)" ) finally: db.close() except Exception as e: logger.error(f"Failed to log API cost: {e}") def generate_embedding( self, text: str, task_type: str = 'RETRIEVAL_DOCUMENT', title: Optional[str] = None, user_id: Optional[int] = None, feature: str = 'embedding' ) -> Optional[List[float]]: """ Generate embedding vector for text using Google's text-embedding model. Args: text: Text to embed task_type: One of: - 'RETRIEVAL_DOCUMENT': For documents to be retrieved - 'RETRIEVAL_QUERY': For search queries - 'SEMANTIC_SIMILARITY': For comparing texts - 'CLASSIFICATION': For text classification - 'CLUSTERING': For text clustering title: Optional title for document (improves quality) user_id: User ID for cost tracking feature: Feature name for cost tracking Returns: 768-dimensional embedding vector or None on error """ if not text or not text.strip(): logger.warning("Empty text provided for embedding") return None start_time = time.time() try: # Build content with optional title content_parts = [] if title: content_parts.append(types.Part(text=f"Title: {title}\n\n")) content_parts.append(types.Part(text=text)) result = self.client.models.embed_content( model='text-embedding-004', contents=types.Content(parts=content_parts), config=types.EmbedContentConfig( task_type=task_type ) ) embedding = result.embeddings[0].values if result.embeddings else None if not embedding: logger.error("No embedding returned from API") return None # Log cost latency_ms = int((time.time() - start_time) * 1000) token_count = len(text) // 4 cost_usd = (token_count / 1000) * 0.00001 logger.debug( f"Embedding generated: {len(embedding)} dims, " f"{token_count} tokens, {latency_ms}ms, ${cost_usd:.8f}" ) return list(embedding) except Exception as e: logger.error(f"Embedding generation error: {e}") return None def generate_embeddings_batch( self, texts: List[str], task_type: str = 'RETRIEVAL_DOCUMENT', user_id: Optional[int] = None ) -> List[Optional[List[float]]]: """ Generate embeddings for multiple texts. Args: texts: List of texts to embed task_type: Task type for all embeddings user_id: User ID for cost tracking Returns: List of embedding vectors (None for failed items) """ results = [] for text in texts: embedding = self.generate_embedding( text=text, task_type=task_type, user_id=user_id, feature='embedding_batch' ) results.append(embedding) return results # Global service instance (initialized in app.py) _gemini_service: Optional[GeminiService] = None def init_gemini_service( api_key: Optional[str] = None, model: str = 'flash', thinking_level: str = 'high' ): """ Initialize global Gemini service instance. Call this in app.py during Flask app initialization. Args: api_key: Google AI API key (optional if set in env) model: Model to use ('flash', 'flash-lite', 'pro', '3-flash', '3-pro') thinking_level: Reasoning depth for Gemini 3 models ('minimal', 'low', 'medium', 'high') """ global _gemini_service try: _gemini_service = GeminiService( api_key=api_key, model=model, thinking_level=thinking_level ) logger.info("Global Gemini service initialized successfully") except Exception as e: logger.error(f"Failed to initialize Gemini service: {e}") _gemini_service = None def get_gemini_service() -> Optional[GeminiService]: """ Get global Gemini service instance. Returns: GeminiService instance or None if not initialized """ return _gemini_service def generate_text(prompt: str, **kwargs) -> Optional[str]: """ Convenience function to generate text using global service. Args: prompt: Text prompt **kwargs: Additional arguments for generate_text() Returns: Generated text or None if service not initialized """ service = get_gemini_service() if service: return service.generate_text(prompt, **kwargs) logger.warning("Gemini service not initialized") return None