feat(ai): Add Gemini model fallback chain for 429 rate limit resilience
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions

Switch primary model to flash-lite (1000 RPD) with automatic fallback
to 3-flash-preview (20 RPD) and flash (20 RPD) on RESOURCE_EXHAUSTED,
giving 1040 req/day on free tier instead of 20.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-02-07 14:35:22 +01:00
parent c99c4ac8dd
commit 5bd1b149c7
2 changed files with 164 additions and 92 deletions

2
app.py
View File

@ -308,7 +308,7 @@ login_manager.login_message = 'Zaloguj się, aby uzyskać dostęp do tej strony.
# Initialize Gemini service
try:
gemini_service.init_gemini_service(model='3-flash') # Gemini 3 Flash Preview - najnowszy model, 7x lepszy reasoning
gemini_service.init_gemini_service(model='flash-lite') # Primary: 1000 RPD, fallback: 3-flash (20 RPD) → flash (20 RPD)
logger.info("Gemini service initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Gemini service: {e}")

View File

@ -49,6 +49,14 @@ GEMINI_MODELS = {
# Models that support thinking mode
THINKING_MODELS = {'gemini-3-flash-preview', 'gemini-3-pro-preview'}
# Fallback chain for rate limit (429) resilience
# Order: highest RPD first → best reasoning → backup
MODEL_FALLBACK_CHAIN = [
'gemini-2.5-flash-lite', # 1000 RPD free tier - primary
'gemini-3-flash-preview', # 20 RPD free tier - best reasoning
'gemini-2.5-flash', # 20 RPD free tier - backup
]
# Available thinking levels for Gemini 3 Flash
THINKING_LEVELS = {
'minimal': 'MINIMAL', # Lowest latency, minimal reasoning
@ -77,7 +85,8 @@ class GeminiService:
api_key: Optional[str] = None,
model: str = 'flash',
thinking_level: str = 'high',
include_thoughts: bool = False
include_thoughts: bool = False,
fallback_models: Optional[List[str]] = None
):
"""
Initialize Gemini service.
@ -87,6 +96,7 @@ class GeminiService:
model: Model to use ('flash', 'flash-lite', 'pro', '3-flash', '3-pro')
thinking_level: Reasoning depth ('minimal', 'low', 'medium', 'high')
include_thoughts: Whether to include thinking process in response (for debugging)
fallback_models: List of full model names for 429 fallback (default: MODEL_FALLBACK_CHAIN)
API Keys (auto-selected by model):
- GOOGLE_GEMINI_API_KEY_FREE: Free tier for Flash models (no cost)
@ -120,6 +130,9 @@ class GeminiService:
# Set model
self.model_name = GEMINI_MODELS.get(model, GEMINI_MODELS['flash'])
# Fallback chain for 429 rate limit resilience
self.fallback_models = fallback_models if fallback_models is not None else MODEL_FALLBACK_CHAIN
# Thinking mode configuration
self.thinking_level = thinking_level
self.include_thoughts = include_thoughts
@ -145,9 +158,11 @@ class GeminiService:
),
]
chain_str = ''.join(self.fallback_models)
logger.info(
f"Gemini service initialized: model={self.model_name}, "
f"thinking={self._thinking_enabled}, level={thinking_level}"
f"thinking={self._thinking_enabled}, level={thinking_level}, "
f"fallback_chain=[{chain_str}]"
)
@property
@ -177,6 +192,36 @@ class GeminiService:
'include_thoughts': self.include_thoughts
}
@staticmethod
def _is_rate_limited(error: Exception) -> bool:
"""Check if error is a 429 / RESOURCE_EXHAUSTED rate limit error."""
error_str = str(error)
return '429' in error_str or 'RESOURCE_EXHAUSTED' in error_str
def _build_generation_config(self, model: str, temperature: float,
max_tokens: Optional[int],
thinking_level: Optional[str]) -> types.GenerateContentConfig:
"""Build GenerateContentConfig, adjusting thinking mode per model."""
config_params = {
'temperature': temperature,
}
if max_tokens:
config_params['max_output_tokens'] = max_tokens
# Only add thinking config for models that support it
if model in THINKING_MODELS:
level = thinking_level or self.thinking_level
thinking_config = types.ThinkingConfig(
thinking_level=THINKING_LEVELS.get(level, 'HIGH'),
include_thoughts=self.include_thoughts
)
config_params['thinking_config'] = thinking_config
return types.GenerateContentConfig(
**config_params,
safety_settings=self.safety_settings
)
def generate_text(
self,
prompt: str,
@ -191,7 +236,9 @@ class GeminiService:
related_entity_id: Optional[int] = None
) -> str:
"""
Generate text using Gemini API with automatic cost tracking and thinking mode.
Generate text using Gemini API with automatic fallback, cost tracking and thinking mode.
On 429 RESOURCE_EXHAUSTED, automatically retries with the next model in fallback chain.
Args:
prompt: Text prompt to send to the model
@ -209,105 +256,126 @@ class GeminiService:
Generated text response
Raises:
Exception: If API call fails
Exception: If API call fails on all models
"""
# Build ordered list of models to try: primary first, then fallbacks
models_to_try = [self.model_name]
for m in self.fallback_models:
if m not in models_to_try:
models_to_try.append(m)
start_time = time.time()
last_error = None
try:
# Build generation config
config_params = {
'temperature': temperature,
}
if max_tokens:
config_params['max_output_tokens'] = max_tokens
# Add thinking config for Gemini 3 models
if self._thinking_enabled:
level = thinking_level or self.thinking_level
thinking_config = types.ThinkingConfig(
thinking_level=THINKING_LEVELS.get(level, 'HIGH'),
include_thoughts=self.include_thoughts
for model in models_to_try:
try:
generation_config = self._build_generation_config(
model=model,
temperature=temperature,
max_tokens=max_tokens,
thinking_level=thinking_level
)
config_params['thinking_config'] = thinking_config
# Build full config
generation_config = types.GenerateContentConfig(
**config_params,
safety_settings=self.safety_settings
)
# Call API
response = self.client.models.generate_content(
model=model,
contents=prompt,
config=generation_config
)
# Call API
response = self.client.models.generate_content(
model=self.model_name,
contents=prompt,
config=generation_config
)
if stream:
return response
if stream:
return response
# Extract response text
response_text = response.text
# Extract response text
response_text = response.text
# Count tokens and log cost
latency_ms = int((time.time() - start_time) * 1000)
# Count tokens and log cost
latency_ms = int((time.time() - start_time) * 1000)
input_tokens = self._count_tokens_from_response(response, 'input')
output_tokens = self._count_tokens_from_response(response, 'output')
thinking_tokens = self._count_tokens_from_response(response, 'thinking')
# Get token counts from response metadata
input_tokens = self._count_tokens_from_response(response, 'input')
output_tokens = self._count_tokens_from_response(response, 'output')
thinking_tokens = self._count_tokens_from_response(response, 'thinking')
# Log with model & thinking info
level = thinking_level or self.thinking_level
is_thinking = model in THINKING_MODELS
is_fallback = model != self.model_name
logger.info(
f"Gemini API call successful. "
f"Tokens: {input_tokens}+{output_tokens}"
f"{f'+{thinking_tokens}t' if thinking_tokens else ''}, "
f"Latency: {latency_ms}ms, "
f"Model: {model}{'(fallback)' if is_fallback else ''}, "
f"Thinking: {level.upper() if is_thinking else 'OFF'}"
)
# Log with thinking level info
level = thinking_level or self.thinking_level
logger.info(
f"Gemini API call successful. "
f"Tokens: {input_tokens}+{output_tokens}"
f"{f'+{thinking_tokens}t' if thinking_tokens else ''}, "
f"Latency: {latency_ms}ms, "
f"Model: {self.model_name}, "
f"Thinking: {level.upper() if self._thinking_enabled else 'OFF'}"
)
# Log to database for cost tracking (use actual model used)
self._log_api_cost(
prompt=prompt,
response_text=response_text,
input_tokens=input_tokens,
output_tokens=output_tokens,
thinking_tokens=thinking_tokens,
latency_ms=latency_ms,
success=True,
feature=feature,
user_id=user_id,
company_id=company_id,
related_entity_type=related_entity_type,
related_entity_id=related_entity_id,
model_override=model if is_fallback else None
)
# Log to database for cost tracking
self._log_api_cost(
prompt=prompt,
response_text=response_text,
input_tokens=input_tokens,
output_tokens=output_tokens,
thinking_tokens=thinking_tokens,
latency_ms=latency_ms,
success=True,
feature=feature,
user_id=user_id,
company_id=company_id,
related_entity_type=related_entity_type,
related_entity_id=related_entity_id
)
return response_text
return response_text
except Exception as e:
if self._is_rate_limited(e) and model != models_to_try[-1]:
logger.warning(f"Rate limited on {model}, trying next fallback...")
last_error = e
continue
except Exception as e:
latency_ms = int((time.time() - start_time) * 1000)
# Non-429 error or last model in chain — fail
latency_ms = int((time.time() - start_time) * 1000)
# Log failed request
self._log_api_cost(
prompt=prompt,
response_text='',
input_tokens=self._estimate_tokens(prompt),
output_tokens=0,
thinking_tokens=0,
latency_ms=latency_ms,
success=False,
error_message=str(e),
feature=feature,
user_id=user_id,
company_id=company_id,
related_entity_type=related_entity_type,
related_entity_id=related_entity_id
)
self._log_api_cost(
prompt=prompt,
response_text='',
input_tokens=self._estimate_tokens(prompt),
output_tokens=0,
thinking_tokens=0,
latency_ms=latency_ms,
success=False,
error_message=str(e),
feature=feature,
user_id=user_id,
company_id=company_id,
related_entity_type=related_entity_type,
related_entity_id=related_entity_id,
model_override=model if model != self.model_name else None
)
logger.error(f"Gemini API error: {str(e)}")
raise Exception(f"Gemini API call failed: {str(e)}")
logger.error(f"Gemini API error on {model}: {str(e)}")
raise Exception(f"Gemini API call failed: {str(e)}")
# All models exhausted (all returned 429)
latency_ms = int((time.time() - start_time) * 1000)
logger.error(f"All fallback models exhausted. Last error: {last_error}")
self._log_api_cost(
prompt=prompt,
response_text='',
input_tokens=self._estimate_tokens(prompt),
output_tokens=0,
thinking_tokens=0,
latency_ms=latency_ms,
success=False,
error_message=f"All models rate limited: {last_error}",
feature=feature,
user_id=user_id,
company_id=company_id,
related_entity_type=related_entity_type,
related_entity_id=related_entity_id
)
raise Exception(f"All Gemini models rate limited. Last error: {last_error}")
def chat(self, messages: List[Dict[str, str]]) -> str:
"""
@ -457,7 +525,8 @@ class GeminiService:
user_id: Optional[int] = None,
company_id: Optional[int] = None,
related_entity_type: Optional[str] = None,
related_entity_id: Optional[int] = None
related_entity_id: Optional[int] = None,
model_override: Optional[str] = None
):
"""
Log API call costs to database for monitoring.
@ -476,13 +545,16 @@ class GeminiService:
company_id: Optional company ID for context
related_entity_type: Entity type ('zopk_news', 'chat_message', etc.)
related_entity_id: Entity ID for reference
model_override: Actual model used (if different from self.model_name due to fallback)
"""
if not DB_AVAILABLE:
return
actual_model = model_override or self.model_name
try:
# Calculate costs
pricing = GEMINI_PRICING.get(self.model_name, {'input': 0.50, 'output': 3.00, 'thinking': 1.00})
# Calculate costs using actual model pricing
pricing = GEMINI_PRICING.get(actual_model, {'input': 0.50, 'output': 3.00, 'thinking': 1.00})
input_cost = (input_tokens / 1_000_000) * pricing['input']
output_cost = (output_tokens / 1_000_000) * pricing['output']
thinking_cost = (thinking_tokens / 1_000_000) * pricing.get('thinking', 0)
@ -501,7 +573,7 @@ class GeminiService:
legacy_log = AIAPICostLog(
timestamp=datetime.now(),
api_provider='gemini',
model_name=self.model_name,
model_name=actual_model,
feature=feature,
user_id=user_id,
input_tokens=input_tokens,
@ -520,7 +592,7 @@ class GeminiService:
# Log to new AIUsageLog table
usage_log = AIUsageLog(
request_type=feature,
model=self.model_name,
model=actual_model,
tokens_input=input_tokens,
tokens_output=output_tokens + thinking_tokens,
cost_cents=cost_cents,