feat(nordagpt): add memory service — fact extraction, summaries, CRUD, prompt injection
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-03-28 05:47:44 +01:00
parent f953648c7d
commit 59f8db8213
3 changed files with 542 additions and 1 deletions

View File

@ -645,3 +645,38 @@ def chat_feedback():
def chat_analytics(): def chat_analytics():
"""Redirect to consolidated analytics chat tab.""" """Redirect to consolidated analytics chat tab."""
return redirect(url_for('admin.user_insights', tab='chat')) return redirect(url_for('admin.user_insights', tab='chat'))
# ============================================================
# MEMORY CRUD API
# ============================================================
@bp.route('/api/chat/memory', methods=['GET'])
@login_required
@member_required
def get_user_memory_api():
"""API: Get user's memory facts and conversation summaries."""
try:
from memory_service import get_user_memory, get_conversation_summaries
return jsonify({
'facts': get_user_memory(current_user.id, limit=20),
'summaries': get_conversation_summaries(current_user.id, limit=10)
})
except Exception as e:
logger.error(f"Error fetching user memory: {e}")
return jsonify({'error': str(e)}), 500
@bp.route('/api/chat/memory/<int:fact_id>', methods=['DELETE'])
@login_required
@member_required
def delete_memory_fact(fact_id):
"""API: Soft-delete a memory fact."""
try:
from memory_service import delete_user_fact
if delete_user_fact(current_user.id, fact_id):
return jsonify({'status': 'ok'})
return jsonify({'error': 'Nie znaleziono'}), 404
except Exception as e:
logger.error(f"Error deleting memory fact {fact_id}: {e}")
return jsonify({'error': str(e)}), 500

452
memory_service.py Normal file
View File

@ -0,0 +1,452 @@
"""
Memory Service
==============
Manages persistent user memory for NordaGPT conversations.
Features:
- Extract key facts from conversations using Flash-Lite
- Generate and update conversation summaries
- Format memory for injection into AI prompts
- CRUD operations for user memory facts
Author: Norda Biznes Development Team
"""
import json
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
from database import SessionLocal, AIUserMemory, AIConversationSummary, AIChatMessage
logger = logging.getLogger(__name__)
FLASH_LITE_MODEL = 'gemini-3.1-flash-lite-preview'
def get_user_memory(user_id: int, limit: int = 10) -> List[Dict[str, Any]]:
"""
Returns list of active, non-expired memory facts for a user.
Sorted by confidence desc, then by created_at desc (recency).
Args:
user_id: User ID
limit: Maximum number of facts to return
Returns:
List of dicts with fact data
"""
db = SessionLocal()
try:
now = datetime.now()
query = db.query(AIUserMemory).filter(
AIUserMemory.user_id == user_id,
AIUserMemory.is_active == True,
(AIUserMemory.expires_at == None) | (AIUserMemory.expires_at > now)
).order_by(
AIUserMemory.confidence.desc(),
AIUserMemory.created_at.desc()
).limit(limit)
facts = query.all()
return [
{
'id': f.id,
'fact': f.fact,
'category': f.category or 'general',
'confidence': float(f.confidence) if f.confidence else 1.0,
'created_at': f.created_at.isoformat() if f.created_at else None,
'expires_at': f.expires_at.isoformat() if f.expires_at else None,
'source_conversation_id': f.source_conversation_id
}
for f in facts
]
finally:
db.close()
def get_conversation_summaries(user_id: int, limit: int = 5) -> List[Dict[str, Any]]:
"""
Returns list of recent conversation summaries for a user.
Args:
user_id: User ID
limit: Maximum number of summaries to return
Returns:
List of dicts with summary data
"""
db = SessionLocal()
try:
summaries = db.query(AIConversationSummary).filter(
AIConversationSummary.user_id == user_id
).order_by(
AIConversationSummary.updated_at.desc()
).limit(limit).all()
return [
{
'id': s.id,
'conversation_id': s.conversation_id,
'summary': s.summary,
'key_topics': s.key_topics or [],
'created_at': s.created_at.isoformat() if s.created_at else None,
'updated_at': s.updated_at.isoformat() if s.updated_at else None
}
for s in summaries
]
finally:
db.close()
def format_memory_for_prompt(user_id: int) -> str:
"""
Formats user memory and conversation summaries as a string for AI prompt injection.
Returns empty string if no memory exists.
Args:
user_id: User ID
Returns:
Formatted memory string or empty string
"""
facts = get_user_memory(user_id, limit=10)
summaries = get_conversation_summaries(user_id, limit=5)
if not facts and not summaries:
return ""
lines = ["\n# PAMIĘĆ O UŻYTKOWNIKU"]
if facts:
lines.append("Znane fakty:")
for f in facts:
category = f.get('category', 'general')
fact_text = f.get('fact', '')
lines.append(f"- [{category}] {fact_text}")
if summaries:
lines.append("\nOstatnie rozmowy:")
for s in summaries:
# Format date from updated_at
date_str = ''
if s.get('updated_at'):
try:
dt = datetime.fromisoformat(s['updated_at'])
date_str = dt.strftime('%Y-%m-%d')
except Exception:
date_str = s['updated_at'][:10] if s.get('updated_at') else ''
summary_text = s.get('summary', '')
topics = s.get('key_topics', [])
topics_str = ', '.join(topics) if topics else ''
if topics_str:
lines.append(f"- {date_str}: {summary_text} (tematy: {topics_str})")
else:
lines.append(f"- {date_str}: {summary_text}")
lines.append("\nWykorzystuj tę wiedzę do personalizacji odpowiedzi.")
return "\n".join(lines) + "\n"
def extract_facts_async(
conversation_id: int,
user_id: int,
user_context: Optional[Dict[str, Any]],
gemini_service
) -> None:
"""
Calls Flash-Lite to extract key facts from conversation and saves to DB.
Deduplicates against existing facts. Max 3 facts per extraction.
Handles all errors gracefully.
Args:
conversation_id: Conversation ID to extract facts from
user_id: User ID
user_context: Optional user context dict
gemini_service: GeminiService instance for API calls
"""
db = SessionLocal()
try:
# Load last 10 messages from the conversation
messages = db.query(AIChatMessage).filter(
AIChatMessage.conversation_id == conversation_id
).order_by(AIChatMessage.created_at.desc()).limit(10).all()
if not messages:
return
# Build conversation text (chronological order)
conversation_text = ""
for msg in reversed(messages):
role = "Użytkownik" if msg.role == 'user' else "Asystent"
conversation_text += f"{role}: {msg.content}\n"
# Load existing facts for deduplication
existing_facts = get_user_memory(user_id, limit=20)
existing_texts = [f['fact'] for f in existing_facts]
existing_str = "\n".join(f"- {t}" for t in existing_texts) if existing_texts else "(brak)"
# Build extraction prompt
prompt = f"""Przeanalizuj poniższy fragment rozmowy i wyodrębnij maksymalnie 3 kluczowe fakty o UŻYTKOWNIKU (nie asystencie).
Fakty powinny dotyczyć: preferencji użytkownika, jego firmy, branży, zainteresowań biznesowych, planów, potrzeb.
ISTNIEJĄCE FAKTY (nie duplikuj):
{existing_str}
ROZMOWA:
{conversation_text}
Odpowiedz TYLKO w formacie JSON (bez markdown, bez komentarzy):
[
{{"fact": "treść faktu po polsku", "category": "kategoria"}},
...
]
Kategorie: business, interest, preference, contact, plan, general
Jeśli nie ma nowych faktów wartych zapamiętania, zwróć pustą tablicę: []"""
# Call Flash-Lite
response_text = gemini_service.generate_text(
prompt=prompt,
feature='memory_extraction',
user_id=user_id,
temperature=0.3,
model=FLASH_LITE_MODEL
)
if not response_text:
return
# Strip markdown code fences if present
cleaned = response_text.strip()
if cleaned.startswith("```"):
# Remove first and last fence lines
lines = cleaned.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
cleaned = "\n".join(lines).strip()
# Parse JSON
try:
extracted = json.loads(cleaned)
except json.JSONDecodeError:
logger.warning(f"Memory extraction: JSON parse failed for user {user_id}: {cleaned[:200]}")
return
if not isinstance(extracted, list):
return
# Save new facts (max 3, deduplicate)
saved_count = 0
for item in extracted[:3]:
if not isinstance(item, dict):
continue
fact_text = item.get('fact', '').strip()
category = item.get('category', 'general')
if not fact_text:
continue
# Simple deduplication: skip if very similar to existing
is_duplicate = any(
_similarity_check(fact_text, existing)
for existing in existing_texts
)
if is_duplicate:
continue
memory = AIUserMemory(
user_id=user_id,
fact=fact_text,
category=category,
source_conversation_id=conversation_id,
confidence=0.8,
is_active=True
)
db.add(memory)
saved_count += 1
if saved_count > 0:
db.commit()
logger.info(f"Memory: saved {saved_count} facts for user {user_id}")
except Exception as e:
logger.warning(f"Memory extraction failed for user {user_id}, conv {conversation_id}: {e}")
finally:
db.close()
def summarize_conversation_async(
conversation_id: int,
user_id: int,
gemini_service
) -> None:
"""
Generates or updates conversation summary using Flash-Lite.
Handles all errors gracefully.
Args:
conversation_id: Conversation ID to summarize
user_id: User ID
gemini_service: GeminiService instance for API calls
"""
db = SessionLocal()
try:
# Load messages from the conversation
messages = db.query(AIChatMessage).filter(
AIChatMessage.conversation_id == conversation_id
).order_by(AIChatMessage.created_at.asc()).all()
if not messages:
return
# Build conversation text
conversation_text = ""
for msg in messages:
role = "Użytkownik" if msg.role == 'user' else "Asystent"
conversation_text += f"{role}: {msg.content[:300]}\n"
# Trim if too long
if len(conversation_text) > 4000:
conversation_text = conversation_text[:4000] + "..."
prompt = f"""Napisz krótkie podsumowanie (max 2 zdania) poniższej rozmowy, skupiając się na głównym temacie i potrzebach użytkownika. Podaj też listę 3-5 słów kluczowych opisujących tematykę.
ROZMOWA:
{conversation_text}
Odpowiedz TYLKO w formacie JSON (bez markdown):
{{"summary": "podsumowanie rozmowy", "key_topics": ["temat1", "temat2", "temat3"]}}"""
response_text = gemini_service.generate_text(
prompt=prompt,
feature='memory_summary',
user_id=user_id,
temperature=0.3,
model=FLASH_LITE_MODEL
)
if not response_text:
return
# Strip markdown code fences
cleaned = response_text.strip()
if cleaned.startswith("```"):
lines = cleaned.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
cleaned = "\n".join(lines).strip()
# Parse JSON
try:
result = json.loads(cleaned)
except json.JSONDecodeError:
logger.warning(f"Summary: JSON parse failed for conv {conversation_id}: {cleaned[:200]}")
return
if not isinstance(result, dict):
return
summary_text = result.get('summary', '').strip()
key_topics = result.get('key_topics', [])
if not summary_text:
return
# Upsert summary (unique on conversation_id)
existing = db.query(AIConversationSummary).filter(
AIConversationSummary.conversation_id == conversation_id
).first()
if existing:
existing.summary = summary_text
existing.key_topics = key_topics
existing.updated_at = datetime.now()
else:
summary = AIConversationSummary(
conversation_id=conversation_id,
user_id=user_id,
summary=summary_text,
key_topics=key_topics
)
db.add(summary)
db.commit()
logger.info(f"Memory: updated summary for conv {conversation_id}")
except Exception as e:
logger.warning(f"Summary generation failed for conv {conversation_id}: {e}")
finally:
db.close()
def delete_user_fact(user_id: int, fact_id: int) -> bool:
"""
Soft-deletes a fact (sets is_active=False).
Args:
user_id: User ID (for ownership check)
fact_id: Fact ID to delete
Returns:
True if deleted, False if not found or not owned by user
"""
db = SessionLocal()
try:
fact = db.query(AIUserMemory).filter(
AIUserMemory.id == fact_id,
AIUserMemory.user_id == user_id,
AIUserMemory.is_active == True
).first()
if not fact:
return False
fact.is_active = False
db.commit()
logger.info(f"Memory: soft-deleted fact {fact_id} for user {user_id}")
return True
except Exception as e:
logger.warning(f"Memory delete failed for fact {fact_id}: {e}")
return False
finally:
db.close()
def _similarity_check(new_fact: str, existing_fact: str, threshold: float = 0.6) -> bool:
"""
Simple word-overlap similarity check for deduplication.
Args:
new_fact: New fact text
existing_fact: Existing fact text
threshold: Overlap ratio threshold (0-1)
Returns:
True if facts are considered duplicates
"""
new_words = set(new_fact.lower().split())
existing_words = set(existing_fact.lower().split())
if not new_words or not existing_words:
return False
# Remove short/common words
stop_words = {'w', 'z', 'i', 'a', 'to', 'na', 'do', 'jest', 'się', 'że', 'nie', 'jak', 'co'}
new_words -= stop_words
existing_words -= stop_words
if not new_words or not existing_words:
return False
intersection = new_words & existing_words
overlap = len(intersection) / min(len(new_words), len(existing_words))
return overlap >= threshold

View File

@ -87,6 +87,14 @@ except ImportError:
SMART_ROUTER_AVAILABLE = False SMART_ROUTER_AVAILABLE = False
logger.warning("Smart Router or Context Builder not available - using full context fallback") logger.warning("Smart Router or Context Builder not available - using full context fallback")
# Import memory service for user fact storage and prompt injection
try:
from memory_service import format_memory_for_prompt, extract_facts_async, summarize_conversation_async
MEMORY_SERVICE_AVAILABLE = True
except ImportError:
MEMORY_SERVICE_AVAILABLE = False
logger.warning("Memory service not available - user memory will not be injected")
class NordaBizChatEngine: class NordaBizChatEngine:
""" """
@ -319,6 +327,25 @@ class NordaBizChatEngine:
db.commit() db.commit()
db.refresh(ai_msg) db.refresh(ai_msg)
# Async memory extraction in background thread
if MEMORY_SERVICE_AVAILABLE:
import threading
_conv_id = conversation_id
_user_id = user_id
_msg_count = conversation.message_count or 0
_uctx = user_context
_gsvc = self.gemini_service
def _extract_memory():
try:
extract_facts_async(_conv_id, _user_id, _uctx, _gsvc)
if _msg_count % 5 == 0 and _msg_count > 0:
summarize_conversation_async(_conv_id, _user_id, _gsvc)
except Exception as e:
logger.warning(f"Async memory extraction failed: {e}")
threading.Thread(target=_extract_memory, daemon=True).start()
return ai_msg return ai_msg
finally: finally:
@ -983,7 +1010,15 @@ ZASADY PERSONALIZACJI:
- NIE ujawniaj danych technicznych (user_id, company_id, rola systemowa) - NIE ujawniaj danych technicznych (user_id, company_id, rola systemowa)
""" """
system_prompt = user_identity + f"""Jesteś pomocnym asystentem portalu Norda Biznes - katalogu firm zrzeszonych w stowarzyszeniu Norda Biznes z Wejherowa. # Inject user memory (facts + conversation summaries) into prompt
user_memory_text = ""
if MEMORY_SERVICE_AVAILABLE and user_context and user_context.get('user_id'):
try:
user_memory_text = format_memory_for_prompt(user_context['user_id'])
except Exception as e:
logger.warning(f"Memory load failed: {e}")
system_prompt = user_identity + user_memory_text + f"""Jesteś pomocnym asystentem portalu Norda Biznes - katalogu firm zrzeszonych w stowarzyszeniu Norda Biznes z Wejherowa.
📊 MASZ DOSTĘP DO BAZY WIEDZY: 📊 MASZ DOSTĘP DO BAZY WIEDZY:
- Liczba firm: {context['total_companies']} - Liczba firm: {context['total_companies']}
@ -1629,6 +1664,25 @@ W dyskusji [Artur Wiertel](link) pytał o moderację. Pełna treść: [moje uwag
db.commit() db.commit()
db.refresh(ai_msg) db.refresh(ai_msg)
# Async memory extraction in background thread
if MEMORY_SERVICE_AVAILABLE:
import threading
_conv_id = conversation_id
_user_id = user_id
_msg_count = conversation.message_count or 0
_uctx = user_context
_gsvc = self.gemini_service
def _extract_memory_stream():
try:
extract_facts_async(_conv_id, _user_id, _uctx, _gsvc)
if _msg_count % 5 == 0 and _msg_count > 0:
summarize_conversation_async(_conv_id, _user_id, _gsvc)
except Exception as e:
logger.warning(f"Async memory extraction failed: {e}")
threading.Thread(target=_extract_memory_stream, daemon=True).start()
yield { yield {
'type': 'done', 'type': 'done',
'message_id': ai_msg.id, 'message_id': ai_msg.id,