feat(nordagpt): streaming SSE responses — word-by-word output with thinking animation
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions

- Extract _build_full_prompt() from _query_ai() for reuse in streaming
- Add send_message_stream() generator in NordaBizChatEngine using generate_content_stream
- Add /api/chat/<id>/message/stream SSE endpoint in blueprints/chat/routes.py
- Replace sendMessage() with streaming version: thinking dots → token-by-token rendering
- Add thinking animation CSS (.thinking-dots with thinkBounce keyframes)
- Fallback to non-streaming fetch if SSE fails

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-03-28 05:42:38 +01:00
parent 0b2e210da6
commit 0640ffbb9d
3 changed files with 454 additions and 36 deletions

View File

@ -5,10 +5,11 @@ Chat Routes
AI Chat interface, API, and analytics.
"""
import json as json_module
import logging
from datetime import datetime, date, timedelta
from flask import render_template, request, redirect, url_for, flash, jsonify, session
from flask import render_template, request, redirect, url_for, flash, jsonify, session, Response, stream_with_context
from flask_login import login_required, current_user
from sqlalchemy import func, desc
@ -324,6 +325,86 @@ def chat_send_message(conversation_id):
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/api/chat/<int:conversation_id>/message/stream', methods=['POST'])
@login_required
@member_required
def chat_send_message_stream(conversation_id):
"""Send message to AI chat — streaming SSE response (word-by-word)"""
try:
data = request.get_json()
message = (data.get('message') or '').strip()
if not message:
return jsonify({'success': False, 'error': 'Wiadomość nie może być pusta'}), 400
# Verify conversation belongs to user
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
finally:
db.close()
# Get model from request or session
model_choice = data.get('model') or session.get('chat_model', 'flash')
# Check usage limits
exceeded, limit_msg = check_user_limits(current_user.id, current_user.email)
if exceeded:
return jsonify({'success': False, **limit_msg}), 429
# Build user context for AI personalization (same as non-streaming)
user_context = {
'user_id': current_user.id,
'user_name': current_user.name,
'user_email': current_user.email,
'company_name': current_user.company.name if current_user.company else None,
'company_id': current_user.company.id if current_user.company else None,
'company_category': current_user.company.category.name if current_user.company and current_user.company.category else None,
'company_role': current_user.company_role or 'MEMBER',
'is_norda_member': current_user.is_norda_member,
'chamber_role': current_user.chamber_role,
'member_since': current_user.created_at.strftime('%Y-%m-%d') if current_user.created_at else None,
}
model_map = {
'flash': '3-flash',
'pro': '3-pro'
}
model_key = model_map.get(model_choice, '3-flash')
chat_engine = NordaBizChatEngine(model=model_key)
def generate():
try:
for chunk in chat_engine.send_message_stream(
conversation_id=conversation_id,
user_message=message,
user_id=current_user.id,
user_context=user_context
):
yield f"data: {json_module.dumps(chunk, ensure_ascii=False)}\n\n"
except Exception as e:
logger.error(f"SSE generator error: {e}")
error_chunk = {'type': 'error', 'content': f'Błąd: {str(e)}'}
yield f"data: {json_module.dumps(error_chunk)}\n\n"
response = Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return response
except Exception as e:
logger.error(f"Error setting up streaming: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/api/chat/<int:conversation_id>/history', methods=['GET'])
@login_required
@member_required

View File

@ -930,25 +930,25 @@ class NordaBizChatEngine:
return context
def _query_ai(
def _build_full_prompt(
self,
context: Dict[str, Any],
user_message: str,
user_id: Optional[int] = None,
thinking_level: str = 'high',
user_context: Optional[Dict[str, Any]] = None
user_context: Optional[Dict[str, Any]] = None,
thinking_level: str = 'high'
) -> str:
"""
Query Gemini AI with full company database context
Build the complete prompt string for the AI.
Extracted from _query_ai() for reuse in streaming.
Args:
context: Context dict with ALL companies
user_message: User's message
user_id: User ID for cost tracking
user_context: Optional user identity/context dict
thinking_level: AI reasoning depth ('minimal', 'low', 'medium', 'high')
Returns:
AI response text
Full prompt string ready to send to Gemini
"""
import json
@ -1411,6 +1411,30 @@ W dyskusji [Artur Wiertel](link) pytał o moderację. Pełna treść: [moje uwag
full_prompt += f"\nUżytkownik: {user_message}\nTy: "
return full_prompt
def _query_ai(
self,
context: Dict[str, Any],
user_message: str,
user_id: Optional[int] = None,
thinking_level: str = 'high',
user_context: Optional[Dict[str, Any]] = None
) -> str:
"""
Query Gemini AI with full company database context.
Args:
context: Context dict with ALL companies
user_message: User's message
user_id: User ID for cost tracking
thinking_level: AI reasoning depth ('minimal', 'low', 'medium', 'high')
Returns:
AI response text
"""
full_prompt = self._build_full_prompt(context, user_message, user_context, thinking_level)
# Get response with automatic cost tracking to ai_api_costs table
if self.use_global_service and self.gemini_service:
# Read router decision from context to select model
@ -1437,6 +1461,189 @@ W dyskusji [Artur Wiertel](link) pytał o moderację. Pełna treść: [moje uwag
# Post-process to ensure links are added even if AI didn't format them
return self._postprocess_links(response.text, context)
def send_message_stream(
self,
conversation_id: int,
user_message: str,
user_id: int,
user_context: Optional[Dict[str, Any]] = None
):
"""
Generator yielding streaming chunks as dicts for SSE.
Yields dicts:
{'type': 'token', 'content': text_chunk}
{'type': 'done', 'message_id': int, 'latency_ms': int, 'model': str}
{'type': 'error', 'content': error_message}
"""
import json as json_module
from gemini_service import GEMINI_MODELS
db = SessionLocal()
start_time = time.time()
try:
# SECURITY: Validate ownership
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id
).first()
if not conversation:
yield {'type': 'error', 'content': 'Rozmowa nie istnieje'}
return
if conversation.user_id != user_id:
logger.warning(
f"SECURITY: User {user_id} attempted to stream conversation {conversation_id} "
f"owned by user {conversation.user_id}"
)
yield {'type': 'error', 'content': 'Brak dostępu do tej rozmowy'}
return
# RODO/GDPR: Sanitize user message before storage
sanitized_message = user_message
if SENSITIVE_DATA_SERVICE_AVAILABLE:
sanitized_message, _ = sanitize_message(user_message)
# Save user message
user_msg = AIChatMessage(
conversation_id=conversation_id,
created_at=datetime.now(),
role='user',
content=sanitized_message,
edited=False,
regenerated=False
)
db.add(user_msg)
db.commit()
# Smart Router — classify query, select data + model
thinking_level = 'high'
effective_model_id = None
if SMART_ROUTER_AVAILABLE:
route_decision = route_query(
message=user_message,
user_context=user_context,
gemini_service=self.gemini_service
)
logger.info(
f"NordaGPT Stream Router: complexity={route_decision['complexity']}, "
f"model={route_decision.get('model')}, thinking={route_decision.get('thinking')}"
)
# Build selective or full context
if route_decision.get('routed_by') != 'fallback':
context = build_selective_context(
data_needed=route_decision.get('data_needed', []),
conversation_id=conversation.id,
current_message=user_message,
user_context=user_context
)
else:
context = self._build_conversation_context(db, conversation, user_message)
context['_route_decision'] = route_decision
thinking_level = route_decision.get('thinking', 'high')
model_alias = route_decision.get('model')
if model_alias:
effective_model_id = GEMINI_MODELS.get(model_alias)
else:
context = self._build_conversation_context(db, conversation, user_message)
# Build full prompt
full_prompt = self._build_full_prompt(context, user_message, user_context, thinking_level)
# Determine model name for logging
primary_model = self.gemini_service.model_name if self.gemini_service else 'gemini-3-flash-preview'
actual_model = effective_model_id or primary_model
# Stream from Gemini using generate_content_stream
full_response_text = ""
try:
from gemini_service import THINKING_MODELS, THINKING_LEVELS
from google.genai import types as genai_types
config_params = {'temperature': 0.7}
if actual_model in THINKING_MODELS:
config_params['thinking_config'] = genai_types.ThinkingConfig(
thinking_level=THINKING_LEVELS.get(thinking_level, 'HIGH'),
include_thoughts=False
)
safety_settings = self.gemini_service.safety_settings if self.gemini_service else []
generation_config = genai_types.GenerateContentConfig(
**config_params,
safety_settings=safety_settings
)
stream_response = self.gemini_service.client.models.generate_content_stream(
model=actual_model,
contents=full_prompt,
config=generation_config
)
for chunk in stream_response:
chunk_text = None
try:
chunk_text = chunk.text
except Exception:
pass
if chunk_text:
full_response_text += chunk_text
yield {'type': 'token', 'content': chunk_text}
except Exception as e:
logger.error(f"Streaming error: {e}")
yield {'type': 'error', 'content': f'Błąd generowania odpowiedzi: {str(e)}'}
return
# Post-process links in full response
full_response_text = self._postprocess_links(full_response_text, context)
# Calculate metrics
latency_ms = int((time.time() - start_time) * 1000)
input_tokens = len(full_prompt) // 4
output_tokens = len(full_response_text) // 4
cost_usd = self._calculate_cost(input_tokens, output_tokens)
# Save AI response to DB
ai_msg = AIChatMessage(
conversation_id=conversation_id,
created_at=datetime.now(),
role='assistant',
content=full_response_text,
tokens_input=input_tokens,
tokens_output=output_tokens,
cost_usd=cost_usd,
latency_ms=latency_ms,
edited=False,
regenerated=False
)
db.add(ai_msg)
# Update conversation stats
conversation.message_count = (conversation.message_count or 0) + 2
conversation.updated_at = datetime.now()
db.commit()
db.refresh(ai_msg)
yield {
'type': 'done',
'message_id': ai_msg.id,
'latency_ms': latency_ms,
'model': actual_model,
'cost_usd': round(cost_usd, 6),
'full_text': full_response_text
}
except Exception as e:
logger.error(f"send_message_stream error: {e}")
yield {'type': 'error', 'content': f'Błąd: {str(e)}'}
finally:
db.close()
def _postprocess_links(self, text: str, context: Dict) -> str:
"""
Post-process AI response to add markdown links for companies and people.

View File

@ -1473,6 +1473,25 @@
max-height: 280px;
}
}
/* Thinking animation for streaming responses */
.thinking-dots {
display: flex;
gap: 4px;
padding: 8px 0;
}
.thinking-dots span {
animation: thinkBounce 1.4s infinite ease-in-out both;
font-size: 1.5rem;
color: var(--text-secondary);
}
.thinking-dots span:nth-child(1) { animation-delay: -0.32s; }
.thinking-dots span:nth-child(2) { animation-delay: -0.16s; }
.thinking-dots span:nth-child(3) { animation-delay: 0s; }
@keyframes thinkBounce {
0%, 80%, 100% { transform: scale(0); }
40% { transform: scale(1); }
}
</style>
{% endblock %}
@ -2388,9 +2407,6 @@ async function sendMessage() {
// Add user message
addMessage('user', message);
// Show typing indicator
document.getElementById('typingIndicator').style.display = 'flex';
scrollToBottom();
try {
@ -2411,34 +2427,148 @@ async function sendMessage() {
}
}
// Send message with model selection
const response = await fetch(`/api/chat/${currentConversationId}/message`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'X-CSRFToken': csrfToken },
body: JSON.stringify({
message: message,
model: currentModel
})
});
const data = await response.json();
// Create assistant message bubble with thinking animation
const messagesDiv = document.getElementById('chatMessages');
const typingIndicator = document.getElementById('typingIndicator');
// Hide typing indicator
document.getElementById('typingIndicator').style.display = 'none';
const streamMsgDiv = document.createElement('div');
streamMsgDiv.className = 'message assistant';
const streamAvatar = document.createElement('div');
streamAvatar.className = 'message-avatar';
streamAvatar.textContent = 'AI';
const streamContent = document.createElement('div');
streamContent.className = 'message-content';
if (data.success) {
addMessage('assistant', data.message, true, data.tech_info);
loadConversations();
// Update cost if available
if (data.tech_info && data.tech_info.cost_usd) {
updateMonthlyCost(data.tech_info.cost_usd);
}
} else if (data.limit_exceeded) {
// Show limit banner and usage info
if (data.usage) updateUsageBars(data.usage);
showLimitBanner(data.usage || {daily_percent: 100});
addMessage('assistant', data.error);
// Thinking dots placeholder
const thinkingDots = document.createElement('div');
thinkingDots.className = 'thinking-dots';
thinkingDots.innerHTML = '<span></span><span></span><span></span>';
streamContent.appendChild(thinkingDots);
streamMsgDiv.appendChild(streamAvatar);
streamMsgDiv.appendChild(streamContent);
if (typingIndicator) {
messagesDiv.insertBefore(streamMsgDiv, typingIndicator);
} else {
addMessage('assistant', 'Przepraszam, wystąpił błąd: ' + (data.error || 'Nieznany błąd'));
messagesDiv.appendChild(streamMsgDiv);
}
scrollToBottom();
// Try streaming endpoint
let streamingSucceeded = false;
try {
const streamResp = await fetch(`/api/chat/${currentConversationId}/message/stream`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'X-CSRFToken': csrfToken },
body: JSON.stringify({ message: message, model: currentModel })
});
if (!streamResp.ok) throw new Error(`HTTP ${streamResp.status}`);
const reader = streamResp.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let firstToken = true;
let accumulatedText = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // keep incomplete line in buffer
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const jsonStr = line.slice(6).trim();
if (!jsonStr) continue;
let chunk;
try { chunk = JSON.parse(jsonStr); } catch (e) { continue; }
if (chunk.type === 'token') {
// Remove thinking dots on first token
if (firstToken) {
thinkingDots.remove();
firstToken = false;
}
accumulatedText += chunk.content;
streamContent.innerHTML = formatMessage(accumulatedText);
scrollToBottom();
} else if (chunk.type === 'done') {
// Render final post-processed text (links corrected server-side)
if (chunk.full_text) {
streamContent.innerHTML = formatMessage(chunk.full_text);
}
// Add tech info badge
const techInfo = {
model: currentModel,
latency_ms: chunk.latency_ms || 0,
cost_usd: chunk.cost_usd || 0
};
const infoBadge = document.createElement('div');
infoBadge.className = 'thinking-info-badge';
const modelLabels = {
'flash': '⚡ Flash', 'pro': '🧠 Pro',
'gemini-3-flash-preview': '⚡ Flash',
'gemini-3.1-pro-preview': '🧠 Pro'
};
const modelLabel = modelLabels[currentModel] || currentModel;
const latencySec = ((chunk.latency_ms || 0) / 1000).toFixed(1);
const costStr = (chunk.cost_usd || 0) > 0 ? `$${(chunk.cost_usd).toFixed(4)}` : '$0.00';
let badgeHTML = `<span class="thinking-badge-level">${modelLabel}</span> · <span class="thinking-badge-time">${latencySec}s</span> · <span class="thinking-badge-cost">${costStr}</span>`;
if (currentModel === 'flash') {
badgeHTML += ` · <a href="#" class="pro-upgrade-hint" onclick="event.preventDefault(); setModel('pro');" title="Przełącz na Gemini 3 Pro dla lepszych odpowiedzi">Lepsze odpowiedzi? <strong>Spróbuj Pro</strong> 🧠</a>`;
}
infoBadge.innerHTML = badgeHTML;
streamContent.appendChild(infoBadge);
if (chunk.cost_usd) updateMonthlyCost(chunk.cost_usd);
loadConversations();
scrollToBottom();
streamingSucceeded = true;
} else if (chunk.type === 'error') {
thinkingDots.remove();
streamContent.innerHTML = formatMessage('Przepraszam, wystąpił błąd: ' + chunk.content);
scrollToBottom();
streamingSucceeded = true; // handled
}
}
}
} catch (streamError) {
console.warn('Streaming failed, falling back to non-streaming:', streamError);
// Remove the partial streaming bubble
streamMsgDiv.remove();
}
// Fallback: non-streaming if SSE failed
if (!streamingSucceeded) {
// Show typing indicator
document.getElementById('typingIndicator').style.display = 'flex';
scrollToBottom();
const response = await fetch(`/api/chat/${currentConversationId}/message`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'X-CSRFToken': csrfToken },
body: JSON.stringify({ message: message, model: currentModel })
});
const data = await response.json();
document.getElementById('typingIndicator').style.display = 'none';
if (data.success) {
addMessage('assistant', data.message, true, data.tech_info);
loadConversations();
if (data.tech_info && data.tech_info.cost_usd) updateMonthlyCost(data.tech_info.cost_usd);
} else if (data.limit_exceeded) {
if (data.usage) updateUsageBars(data.usage);
showLimitBanner(data.usage || {daily_percent: 100});
addMessage('assistant', data.error);
} else {
addMessage('assistant', 'Przepraszam, wystąpił błąd: ' + (data.error || 'Nieznany błąd'));
}
}
} catch (error) {