diff --git a/app.py b/app.py index bd33e7f..5ad7ee1 100644 --- a/app.py +++ b/app.py @@ -308,7 +308,7 @@ login_manager.login_message = 'Zaloguj się, aby uzyskać dostęp do tej strony. # Initialize Gemini service try: - gemini_service.init_gemini_service(model='3-flash') # Gemini 3 Flash Preview - najnowszy model, 7x lepszy reasoning + gemini_service.init_gemini_service(model='flash-lite') # Primary: 1000 RPD, fallback: 3-flash (20 RPD) → flash (20 RPD) logger.info("Gemini service initialized successfully") except Exception as e: logger.error(f"Failed to initialize Gemini service: {e}") diff --git a/gemini_service.py b/gemini_service.py index 2222d17..d6a3ce8 100644 --- a/gemini_service.py +++ b/gemini_service.py @@ -49,6 +49,14 @@ GEMINI_MODELS = { # Models that support thinking mode THINKING_MODELS = {'gemini-3-flash-preview', 'gemini-3-pro-preview'} +# Fallback chain for rate limit (429) resilience +# Order: highest RPD first → best reasoning → backup +MODEL_FALLBACK_CHAIN = [ + 'gemini-2.5-flash-lite', # 1000 RPD free tier - primary + 'gemini-3-flash-preview', # 20 RPD free tier - best reasoning + 'gemini-2.5-flash', # 20 RPD free tier - backup +] + # Available thinking levels for Gemini 3 Flash THINKING_LEVELS = { 'minimal': 'MINIMAL', # Lowest latency, minimal reasoning @@ -77,7 +85,8 @@ class GeminiService: api_key: Optional[str] = None, model: str = 'flash', thinking_level: str = 'high', - include_thoughts: bool = False + include_thoughts: bool = False, + fallback_models: Optional[List[str]] = None ): """ Initialize Gemini service. @@ -87,6 +96,7 @@ class GeminiService: model: Model to use ('flash', 'flash-lite', 'pro', '3-flash', '3-pro') thinking_level: Reasoning depth ('minimal', 'low', 'medium', 'high') include_thoughts: Whether to include thinking process in response (for debugging) + fallback_models: List of full model names for 429 fallback (default: MODEL_FALLBACK_CHAIN) API Keys (auto-selected by model): - GOOGLE_GEMINI_API_KEY_FREE: Free tier for Flash models (no cost) @@ -120,6 +130,9 @@ class GeminiService: # Set model self.model_name = GEMINI_MODELS.get(model, GEMINI_MODELS['flash']) + # Fallback chain for 429 rate limit resilience + self.fallback_models = fallback_models if fallback_models is not None else MODEL_FALLBACK_CHAIN + # Thinking mode configuration self.thinking_level = thinking_level self.include_thoughts = include_thoughts @@ -145,9 +158,11 @@ class GeminiService: ), ] + chain_str = ' → '.join(self.fallback_models) logger.info( f"Gemini service initialized: model={self.model_name}, " - f"thinking={self._thinking_enabled}, level={thinking_level}" + f"thinking={self._thinking_enabled}, level={thinking_level}, " + f"fallback_chain=[{chain_str}]" ) @property @@ -177,6 +192,36 @@ class GeminiService: 'include_thoughts': self.include_thoughts } + @staticmethod + def _is_rate_limited(error: Exception) -> bool: + """Check if error is a 429 / RESOURCE_EXHAUSTED rate limit error.""" + error_str = str(error) + return '429' in error_str or 'RESOURCE_EXHAUSTED' in error_str + + def _build_generation_config(self, model: str, temperature: float, + max_tokens: Optional[int], + thinking_level: Optional[str]) -> types.GenerateContentConfig: + """Build GenerateContentConfig, adjusting thinking mode per model.""" + config_params = { + 'temperature': temperature, + } + if max_tokens: + config_params['max_output_tokens'] = max_tokens + + # Only add thinking config for models that support it + if model in THINKING_MODELS: + level = thinking_level or self.thinking_level + thinking_config = types.ThinkingConfig( + thinking_level=THINKING_LEVELS.get(level, 'HIGH'), + include_thoughts=self.include_thoughts + ) + config_params['thinking_config'] = thinking_config + + return types.GenerateContentConfig( + **config_params, + safety_settings=self.safety_settings + ) + def generate_text( self, prompt: str, @@ -191,7 +236,9 @@ class GeminiService: related_entity_id: Optional[int] = None ) -> str: """ - Generate text using Gemini API with automatic cost tracking and thinking mode. + Generate text using Gemini API with automatic fallback, cost tracking and thinking mode. + + On 429 RESOURCE_EXHAUSTED, automatically retries with the next model in fallback chain. Args: prompt: Text prompt to send to the model @@ -209,105 +256,126 @@ class GeminiService: Generated text response Raises: - Exception: If API call fails + Exception: If API call fails on all models """ + # Build ordered list of models to try: primary first, then fallbacks + models_to_try = [self.model_name] + for m in self.fallback_models: + if m not in models_to_try: + models_to_try.append(m) + start_time = time.time() + last_error = None - try: - # Build generation config - config_params = { - 'temperature': temperature, - } - if max_tokens: - config_params['max_output_tokens'] = max_tokens - - # Add thinking config for Gemini 3 models - if self._thinking_enabled: - level = thinking_level or self.thinking_level - thinking_config = types.ThinkingConfig( - thinking_level=THINKING_LEVELS.get(level, 'HIGH'), - include_thoughts=self.include_thoughts + for model in models_to_try: + try: + generation_config = self._build_generation_config( + model=model, + temperature=temperature, + max_tokens=max_tokens, + thinking_level=thinking_level ) - config_params['thinking_config'] = thinking_config - # Build full config - generation_config = types.GenerateContentConfig( - **config_params, - safety_settings=self.safety_settings - ) + # Call API + response = self.client.models.generate_content( + model=model, + contents=prompt, + config=generation_config + ) - # Call API - response = self.client.models.generate_content( - model=self.model_name, - contents=prompt, - config=generation_config - ) + if stream: + return response - if stream: - return response + # Extract response text + response_text = response.text - # Extract response text - response_text = response.text + # Count tokens and log cost + latency_ms = int((time.time() - start_time) * 1000) - # Count tokens and log cost - latency_ms = int((time.time() - start_time) * 1000) + input_tokens = self._count_tokens_from_response(response, 'input') + output_tokens = self._count_tokens_from_response(response, 'output') + thinking_tokens = self._count_tokens_from_response(response, 'thinking') - # Get token counts from response metadata - input_tokens = self._count_tokens_from_response(response, 'input') - output_tokens = self._count_tokens_from_response(response, 'output') - thinking_tokens = self._count_tokens_from_response(response, 'thinking') + # Log with model & thinking info + level = thinking_level or self.thinking_level + is_thinking = model in THINKING_MODELS + is_fallback = model != self.model_name + logger.info( + f"Gemini API call successful. " + f"Tokens: {input_tokens}+{output_tokens}" + f"{f'+{thinking_tokens}t' if thinking_tokens else ''}, " + f"Latency: {latency_ms}ms, " + f"Model: {model}{'(fallback)' if is_fallback else ''}, " + f"Thinking: {level.upper() if is_thinking else 'OFF'}" + ) - # Log with thinking level info - level = thinking_level or self.thinking_level - logger.info( - f"Gemini API call successful. " - f"Tokens: {input_tokens}+{output_tokens}" - f"{f'+{thinking_tokens}t' if thinking_tokens else ''}, " - f"Latency: {latency_ms}ms, " - f"Model: {self.model_name}, " - f"Thinking: {level.upper() if self._thinking_enabled else 'OFF'}" - ) + # Log to database for cost tracking (use actual model used) + self._log_api_cost( + prompt=prompt, + response_text=response_text, + input_tokens=input_tokens, + output_tokens=output_tokens, + thinking_tokens=thinking_tokens, + latency_ms=latency_ms, + success=True, + feature=feature, + user_id=user_id, + company_id=company_id, + related_entity_type=related_entity_type, + related_entity_id=related_entity_id, + model_override=model if is_fallback else None + ) - # Log to database for cost tracking - self._log_api_cost( - prompt=prompt, - response_text=response_text, - input_tokens=input_tokens, - output_tokens=output_tokens, - thinking_tokens=thinking_tokens, - latency_ms=latency_ms, - success=True, - feature=feature, - user_id=user_id, - company_id=company_id, - related_entity_type=related_entity_type, - related_entity_id=related_entity_id - ) + return response_text - return response_text + except Exception as e: + if self._is_rate_limited(e) and model != models_to_try[-1]: + logger.warning(f"Rate limited on {model}, trying next fallback...") + last_error = e + continue - except Exception as e: - latency_ms = int((time.time() - start_time) * 1000) + # Non-429 error or last model in chain — fail + latency_ms = int((time.time() - start_time) * 1000) - # Log failed request - self._log_api_cost( - prompt=prompt, - response_text='', - input_tokens=self._estimate_tokens(prompt), - output_tokens=0, - thinking_tokens=0, - latency_ms=latency_ms, - success=False, - error_message=str(e), - feature=feature, - user_id=user_id, - company_id=company_id, - related_entity_type=related_entity_type, - related_entity_id=related_entity_id - ) + self._log_api_cost( + prompt=prompt, + response_text='', + input_tokens=self._estimate_tokens(prompt), + output_tokens=0, + thinking_tokens=0, + latency_ms=latency_ms, + success=False, + error_message=str(e), + feature=feature, + user_id=user_id, + company_id=company_id, + related_entity_type=related_entity_type, + related_entity_id=related_entity_id, + model_override=model if model != self.model_name else None + ) - logger.error(f"Gemini API error: {str(e)}") - raise Exception(f"Gemini API call failed: {str(e)}") + logger.error(f"Gemini API error on {model}: {str(e)}") + raise Exception(f"Gemini API call failed: {str(e)}") + + # All models exhausted (all returned 429) + latency_ms = int((time.time() - start_time) * 1000) + logger.error(f"All fallback models exhausted. Last error: {last_error}") + self._log_api_cost( + prompt=prompt, + response_text='', + input_tokens=self._estimate_tokens(prompt), + output_tokens=0, + thinking_tokens=0, + latency_ms=latency_ms, + success=False, + error_message=f"All models rate limited: {last_error}", + feature=feature, + user_id=user_id, + company_id=company_id, + related_entity_type=related_entity_type, + related_entity_id=related_entity_id + ) + raise Exception(f"All Gemini models rate limited. Last error: {last_error}") def chat(self, messages: List[Dict[str, str]]) -> str: """ @@ -457,7 +525,8 @@ class GeminiService: user_id: Optional[int] = None, company_id: Optional[int] = None, related_entity_type: Optional[str] = None, - related_entity_id: Optional[int] = None + related_entity_id: Optional[int] = None, + model_override: Optional[str] = None ): """ Log API call costs to database for monitoring. @@ -476,13 +545,16 @@ class GeminiService: company_id: Optional company ID for context related_entity_type: Entity type ('zopk_news', 'chat_message', etc.) related_entity_id: Entity ID for reference + model_override: Actual model used (if different from self.model_name due to fallback) """ if not DB_AVAILABLE: return + actual_model = model_override or self.model_name + try: - # Calculate costs - pricing = GEMINI_PRICING.get(self.model_name, {'input': 0.50, 'output': 3.00, 'thinking': 1.00}) + # Calculate costs using actual model pricing + pricing = GEMINI_PRICING.get(actual_model, {'input': 0.50, 'output': 3.00, 'thinking': 1.00}) input_cost = (input_tokens / 1_000_000) * pricing['input'] output_cost = (output_tokens / 1_000_000) * pricing['output'] thinking_cost = (thinking_tokens / 1_000_000) * pricing.get('thinking', 0) @@ -501,7 +573,7 @@ class GeminiService: legacy_log = AIAPICostLog( timestamp=datetime.now(), api_provider='gemini', - model_name=self.model_name, + model_name=actual_model, feature=feature, user_id=user_id, input_tokens=input_tokens, @@ -520,7 +592,7 @@ class GeminiService: # Log to new AIUsageLog table usage_log = AIUsageLog( request_type=feature, - model=self.model_name, + model=actual_model, tokens_input=input_tokens, tokens_output=output_tokens + thinking_tokens, cost_cents=cost_cents,