nordabiz/blueprints/admin/routes_user_insights.py
Maciej Pienczyn cca52301a6
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: filter bots from analytics, use audit_logs for failed logins, logarithmic engagement score
- Add is_bot column to user_sessions with backfill from user_agent patterns
- Update analytics_daily trigger to skip bot sessions
- Recalculate 90 days of analytics_daily without bot contamination
- Replace cumulative failed_login_attempts with time-based audit_logs queries
- Switch engagement score from linear (capped at 100) to log2 scale
- Expand section_map from 9 to 17 categories (~95% traffic coverage)
- Exclude robots.txt, sitemap.xml etc from page view tracking
- Add bot filter to all overview, pages, paths, and engagement queries

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 08:14:50 +01:00

1355 lines
51 KiB
Python

"""
Admin User Insights Routes
============================
User Insights Dashboard - problem detection, engagement scoring,
page popularity, user flows, and behavioral profiles.
"""
import csv
import io
import logging
import math
from datetime import date, timedelta, datetime
from flask import render_template, request, redirect, url_for, flash, Response
from flask_login import login_required
from sqlalchemy import func, desc, text, or_
from sqlalchemy.orm import joinedload
from . import bp
from database import (
SessionLocal, User, UserSession, PageView, SearchQuery,
ConversionEvent, JSError, EmailLog, SecurityAlert,
AuditLog, AnalyticsDaily, SystemRole
)
from utils.decorators import role_required
logger = logging.getLogger(__name__)
def _non_bot_sessions(db, start_dt=None):
"""Subquery of non-bot session IDs for filtering page_views."""
q = db.query(UserSession.id).filter(UserSession.is_bot == False)
if start_dt:
q = q.filter(UserSession.started_at >= start_dt)
return q
def _log_engagement_score(raw):
"""Logarithmic engagement score: better distribution than linear capped at 100."""
if raw <= 0:
return 0
return min(100, int(math.log2(raw + 1) * 6))
def _get_period_dates(period):
"""Return (start_date, days) for given period string."""
today = date.today()
if period == 'day':
return today, 1
elif period == 'month':
return today - timedelta(days=30), 30
else: # week (default)
return today - timedelta(days=7), 7
# ============================================================
# MAIN DASHBOARD
# ============================================================
@bp.route('/user-insights')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def user_insights():
"""User Insights Dashboard - 5 tabs."""
tab = request.args.get('tab', 'problems')
period = request.args.get('period', 'week')
start_date, days = _get_period_dates(period)
db = SessionLocal()
try:
data = {}
if tab == 'problems':
data = _tab_problems(db, start_date, days)
elif tab == 'engagement':
data = _tab_engagement(db, start_date, days)
elif tab == 'pages':
data = _tab_pages(db, start_date, days)
elif tab == 'paths':
data = _tab_paths(db, start_date, days)
elif tab == 'overview':
data = _tab_overview(db, start_date, days)
return render_template(
'admin/user_insights.html',
tab=tab,
period=period,
data=data
)
except Exception as e:
logger.error(f"User insights error: {e}", exc_info=True)
flash('Błąd ładowania danych insights.', 'error')
return redirect(url_for('admin.admin_analytics'))
finally:
db.close()
# ============================================================
# TAB 1: PROBLEMS
# ============================================================
def _tab_problems(db, start_date, days):
"""Problem detection tab - identify users with issues."""
now = datetime.now()
start_dt = datetime.combine(start_date, datetime.min.time())
start_30d = datetime.combine(date.today() - timedelta(days=30), datetime.min.time())
# Stat cards
locked_accounts = db.query(func.count(User.id)).filter(
User.locked_until > now, User.is_active == True
).scalar() or 0
failed_logins_7d = db.query(func.count(AuditLog.id)).filter(
AuditLog.action == 'login_failed',
AuditLog.created_at >= start_dt
).scalar() or 0
password_resets_7d = db.query(func.count(EmailLog.id)).filter(
EmailLog.email_type == 'password_reset',
EmailLog.created_at >= start_dt
).scalar() or 0
js_errors_7d = db.query(func.count(JSError.id)).filter(
JSError.occurred_at >= start_dt
).scalar() or 0
# Problem users - raw data per user
users = db.query(User).filter(User.is_active == True).all()
problem_users = []
for user in users:
# Failed logins (from audit_logs, time-based)
fl = db.query(func.count(AuditLog.id)).filter(
AuditLog.user_email == user.email,
AuditLog.action == 'login_failed',
AuditLog.created_at >= start_dt
).scalar() or 0
# Security alerts 7d
sa_7d = db.query(func.count(SecurityAlert.id)).filter(
SecurityAlert.user_email == user.email,
SecurityAlert.created_at >= start_dt
).scalar() or 0
# Password resets 30d (email_logs.user_id often NULL, match by recipient_email)
pr_30d = db.query(func.count(EmailLog.id)).filter(
EmailLog.recipient_email == user.email,
EmailLog.email_type == 'password_reset',
EmailLog.created_at >= start_30d
).scalar() or 0
# JS errors 7d (via sessions)
je_7d = db.query(func.count(JSError.id)).join(
UserSession, JSError.session_id == UserSession.id
).filter(
UserSession.user_id == user.id,
JSError.occurred_at >= start_dt
).scalar() or 0
# Slow pages 7d
sp_7d = db.query(func.count(PageView.id)).filter(
PageView.user_id == user.id,
PageView.viewed_at >= start_dt,
PageView.load_time_ms > 3000
).scalar() or 0
is_locked = 1 if user.locked_until and user.locked_until > now else 0
score = min(100,
fl * 10 +
pr_30d * 15 +
je_7d * 3 +
sp_7d * 2 +
sa_7d * 20 +
is_locked * 40
)
if score > 0:
problem_users.append({
'user': user,
'score': score,
'failed_logins': fl,
'password_resets': pr_30d,
'js_errors': je_7d,
'slow_pages': sp_7d,
'security_alerts': sa_7d,
'is_locked': is_locked,
'last_login': user.last_login,
})
problem_users.sort(key=lambda x: x['score'], reverse=True)
# Proactive alerts
alerts = []
# Alert: Never logged in (account > 7 days old)
never_logged = db.query(User).filter(
User.is_active == True,
User.last_login.is_(None),
User.created_at < now - timedelta(days=7)
).all()
for u in never_logged:
has_welcome = db.query(EmailLog.id).filter(
EmailLog.recipient_email == u.email,
EmailLog.email_type == 'welcome'
).first() is not None
priority = 'critical' if (u.failed_login_attempts or 0) >= 3 else 'high'
alerts.append({
'type': 'never_logged_in',
'priority': priority,
'user': u,
'message': f'Nigdy nie zalogowany ({(now - u.created_at).days}d od rejestracji)',
'detail': f'Prób logowania: {u.failed_login_attempts or 0}. Email powitalny: {"Tak" if has_welcome else "NIE WYSŁANO"}.',
})
# Alert: Account locked
locked_users = db.query(User).filter(
User.locked_until > now, User.is_active == True
).all()
for u in locked_users:
alerts.append({
'type': 'locked',
'priority': 'critical',
'user': u,
'message': f'Konto zablokowane (do {u.locked_until.strftime("%d.%m %H:%M")})',
'detail': f'Nieudane próby: {u.failed_login_attempts or 0}.',
})
# Alert: Reset without effect (reset sent > 24h ago, no login after)
recent_resets = db.query(
EmailLog.recipient_email,
func.max(EmailLog.created_at).label('last_reset')
).filter(
EmailLog.email_type == 'password_reset',
EmailLog.created_at >= start_30d,
EmailLog.status == 'sent'
).group_by(EmailLog.recipient_email).all()
for r in recent_resets:
u = db.query(User).filter(User.email == r.recipient_email, User.is_active == True).first()
if not u:
continue
# Check if user logged in AFTER the reset
login_after = db.query(AuditLog.id).filter(
AuditLog.user_email == u.email,
AuditLog.action == 'login',
AuditLog.created_at > r.last_reset
).first()
if login_after is None and r.last_reset < now - timedelta(hours=24):
alerts.append({
'type': 'reset_no_effect',
'priority': 'high',
'user': u,
'message': f'Reset hasła bez efektu (wysłany {r.last_reset.strftime("%d.%m %H:%M")})',
'detail': 'Użytkownik nie zalogował się po otrzymaniu emaila z resetem hasła.',
})
# Alert: Repeated resets (>= 3 in 7 days)
repeat_resets = db.query(
EmailLog.recipient_email,
func.count(EmailLog.id).label('cnt')
).filter(
EmailLog.email_type == 'password_reset',
EmailLog.created_at >= start_dt
).group_by(EmailLog.recipient_email).having(func.count(EmailLog.id) >= 3).all()
for r in repeat_resets:
u = db.query(User).filter(User.email == r.recipient_email, User.is_active == True).first()
if u:
# Skip if already in alerts
if not any(a['user'].id == u.id and a['type'] == 'reset_no_effect' for a in alerts):
alerts.append({
'type': 'repeat_resets',
'priority': 'high',
'user': u,
'message': f'{r.cnt} resetów hasła w {days}d',
'detail': 'Wielokrotne resety mogą wskazywać na problem z emailem lub hasłem.',
})
# Sort alerts: critical first, then high
priority_order = {'critical': 0, 'high': 1, 'medium': 2}
alerts.sort(key=lambda a: priority_order.get(a['priority'], 3))
# Stat: never logged in count
never_logged_count = len(never_logged)
return {
'locked_accounts': locked_accounts,
'failed_logins': failed_logins_7d,
'password_resets': password_resets_7d,
'js_errors': js_errors_7d,
'never_logged_in': never_logged_count,
'problem_users': problem_users[:50],
'alerts': alerts,
}
# ============================================================
# TAB 2: ENGAGEMENT
# ============================================================
def _tab_engagement(db, start_date, days):
"""Engagement ranking tab."""
now = datetime.now()
start_dt = datetime.combine(start_date, datetime.min.time())
start_30d = datetime.combine(date.today() - timedelta(days=30), datetime.min.time())
prev_start = datetime.combine(start_date - timedelta(days=days), datetime.min.time())
# Stat cards
active_7d = db.query(func.count(func.distinct(UserSession.user_id))).filter(
UserSession.user_id.isnot(None),
UserSession.started_at >= start_dt,
UserSession.is_bot == False
).scalar() or 0
all_users = db.query(User).filter(User.is_active == True).all()
at_risk = 0
dormant = 0
new_this_month = 0
first_of_month = date.today().replace(day=1)
for u in all_users:
if u.created_at and u.created_at.date() >= first_of_month:
new_this_month += 1
if u.last_login:
days_since = (date.today() - u.last_login.date()).days
if 8 <= days_since <= 30:
at_risk += 1
elif days_since > 30:
dormant += 1
elif u.last_login is None:
dormant += 1
# Engagement ranking - compute per user
registered_users = db.query(User).filter(
User.is_active == True, User.role != 'UNAFFILIATED'
).all()
engagement_list = []
for user in registered_users:
# Current period (exclude bots)
sessions_cur = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user.id,
UserSession.started_at >= start_dt,
UserSession.is_bot == False
).scalar() or 0
pv_cur = db.query(func.count(PageView.id)).filter(
PageView.user_id == user.id,
PageView.viewed_at >= start_dt,
PageView.session_id.in_(_non_bot_sessions(db, start_dt))
).scalar() or 0
# Previous period for WoW
sessions_prev = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user.id,
UserSession.started_at >= prev_start,
UserSession.started_at < start_dt,
UserSession.is_bot == False
).scalar() or 0
pv_prev = db.query(func.count(PageView.id)).filter(
PageView.user_id == user.id,
PageView.viewed_at >= prev_start,
PageView.viewed_at < start_dt,
PageView.session_id.in_(_non_bot_sessions(db, prev_start))
).scalar() or 0
# 30d engagement score components (exclude bots)
s30 = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user.id,
UserSession.started_at >= start_30d,
UserSession.is_bot == False
).scalar() or 0
pv30 = db.query(func.count(PageView.id)).filter(
PageView.user_id == user.id,
PageView.viewed_at >= start_30d,
PageView.session_id.in_(_non_bot_sessions(db, start_30d))
).scalar() or 0
clicks30 = db.query(func.sum(UserSession.clicks_count)).filter(
UserSession.user_id == user.id,
UserSession.started_at >= start_30d,
UserSession.is_bot == False
).scalar() or 0
dur30 = db.query(func.sum(UserSession.duration_seconds)).filter(
UserSession.user_id == user.id,
UserSession.started_at >= start_30d,
UserSession.is_bot == False
).scalar() or 0
conv30 = db.query(func.count(ConversionEvent.id)).filter(
ConversionEvent.user_id == user.id,
ConversionEvent.converted_at >= start_30d
).scalar() or 0
search30 = db.query(func.count(SearchQuery.id)).filter(
SearchQuery.user_id == user.id,
SearchQuery.searched_at >= start_30d
).scalar() or 0
raw = (s30 * 3 + pv30 * 1 + int(clicks30) * 0.5 +
int(dur30) / 60 * 2 + conv30 * 10 + search30 * 2)
score = _log_engagement_score(raw)
# WoW change
wow = None
if pv_prev > 0:
wow = round((pv_cur - pv_prev) / pv_prev * 100)
elif pv_cur > 0:
wow = 100
# Status
days_since_login = None
if user.last_login:
days_since_login = (date.today() - user.last_login.date()).days
if days_since_login is not None and days_since_login <= 7 and score >= 20:
status = 'active'
elif (days_since_login is not None and 8 <= days_since_login <= 30) or (5 <= score < 20):
status = 'at_risk'
else:
status = 'dormant'
# Daily sparkline (7 days)
sparkline = []
for i in range(7):
d = date.today() - timedelta(days=6 - i)
d_start = datetime.combine(d, datetime.min.time())
d_end = datetime.combine(d + timedelta(days=1), datetime.min.time())
cnt = db.query(func.count(PageView.id)).filter(
PageView.user_id == user.id,
PageView.viewed_at >= d_start,
PageView.viewed_at < d_end
).scalar() or 0
sparkline.append(cnt)
if sessions_cur > 0 or score > 0:
engagement_list.append({
'user': user,
'score': score,
'sessions': sessions_cur,
'page_views': pv_cur,
'wow': wow,
'status': status,
'sparkline': sparkline,
})
engagement_list.sort(key=lambda x: x['score'], reverse=True)
return {
'active_7d': active_7d,
'at_risk': at_risk,
'dormant': dormant,
'new_this_month': new_this_month,
'engagement_list': engagement_list[:50],
}
# ============================================================
# TAB 3: PAGE MAP
# ============================================================
def _tab_pages(db, start_date, days):
"""Page popularity map."""
start_dt = datetime.combine(start_date, datetime.min.time())
# Page sections with grouping (expanded to cover ~95% of traffic)
section_map = {
'Strona główna': ['/'],
'Profile firm': ['/company/'],
'Forum': ['/forum'],
'Chat': ['/chat'],
'Wyszukiwarka': ['/search', '/szukaj'],
'Wydarzenia': ['/events', '/wydarzenia', '/kalendarz'],
'Ogłoszenia': ['/classifieds', '/ogloszenia', '/tablica'],
'Członkostwo': ['/membership', '/czlonkostwo', '/korzysci'],
'Logowanie': ['/login', '/register', '/forgot-password', '/reset-password', '/verify-email'],
'Panel użytkownika': ['/dashboard', '/konto'],
'Wiadomości': ['/wiadomosci'],
'Edukacja': ['/edukacja'],
'Rada': ['/rada'],
'ZOPK': ['/zopk'],
'Kontakty': ['/kontakty'],
'Raporty': ['/raporty'],
'Admin': ['/admin'],
}
sections = []
for name, prefixes in section_map.items():
conditions = [PageView.path.like(p + '%') for p in prefixes]
if prefixes == ['/']:
conditions = [PageView.path == '/']
q = db.query(
func.count(PageView.id).label('views'),
func.count(func.distinct(PageView.user_id)).label('unique_users'),
func.avg(PageView.time_on_page_seconds).label('avg_time')
).join(UserSession, PageView.session_id == UserSession.id).filter(
or_(*conditions),
PageView.viewed_at >= start_dt,
UserSession.is_bot == False
).first()
sections.append({
'name': name,
'views': q.views or 0,
'unique_users': q.unique_users or 0,
'avg_time': int(q.avg_time or 0),
})
max_views = max((s['views'] for s in sections), default=1) or 1
for s in sections:
s['intensity'] = min(100, int(s['views'] / max_views * 100))
# Top 50 pages (exclude bots)
top_pages = db.query(
PageView.path,
func.count(PageView.id).label('views'),
func.count(func.distinct(PageView.user_id)).label('unique_users'),
func.avg(PageView.time_on_page_seconds).label('avg_time'),
func.avg(PageView.scroll_depth_percent).label('avg_scroll'),
func.avg(PageView.load_time_ms).label('avg_load'),
).join(UserSession, PageView.session_id == UserSession.id).filter(
PageView.viewed_at >= start_dt,
UserSession.is_bot == False
).group_by(PageView.path).order_by(desc('views')).limit(50).all()
max_page_views = top_pages[0].views if top_pages else 1
pages_list = []
for p in top_pages:
pages_list.append({
'path': p.path,
'views': p.views,
'unique_users': p.unique_users,
'avg_time': int(p.avg_time or 0),
'avg_scroll': int(p.avg_scroll or 0),
'avg_load': int(p.avg_load or 0),
'bar_pct': int(p.views / max_page_views * 100),
})
# Ignored pages (< 5 views in 30d, exclude bots)
start_30d = datetime.combine(date.today() - timedelta(days=30), datetime.min.time())
ignored = db.query(
PageView.path,
func.count(PageView.id).label('views'),
).join(UserSession, PageView.session_id == UserSession.id).filter(
PageView.viewed_at >= start_30d,
UserSession.is_bot == False
).group_by(PageView.path).having(
func.count(PageView.id) < 5
).order_by('views').limit(30).all()
return {
'sections': sections,
'top_pages': pages_list,
'ignored_pages': [{'path': p.path, 'views': p.views} for p in ignored],
}
# ============================================================
# TAB 4: PATHS
# ============================================================
def _tab_paths(db, start_date, days):
"""User flow analysis."""
start_dt = datetime.combine(start_date, datetime.min.time())
# Entry pages - first page in each session (exclude bots)
entry_sql = text("""
WITH first_pages AS (
SELECT DISTINCT ON (pv.session_id) pv.path
FROM page_views pv
JOIN user_sessions us ON pv.session_id = us.id
WHERE pv.viewed_at >= :start_dt AND pv.session_id IS NOT NULL AND us.is_bot = false
ORDER BY pv.session_id, pv.viewed_at ASC
)
SELECT path, COUNT(*) as cnt
FROM first_pages
GROUP BY path ORDER BY cnt DESC LIMIT 10
""")
entry_pages = db.execute(entry_sql, {'start_dt': start_dt}).fetchall()
# Exit pages - last page in each session (exclude bots)
exit_sql = text("""
WITH last_pages AS (
SELECT DISTINCT ON (pv.session_id) pv.path
FROM page_views pv
JOIN user_sessions us ON pv.session_id = us.id
WHERE pv.viewed_at >= :start_dt AND pv.session_id IS NOT NULL AND us.is_bot = false
ORDER BY pv.session_id, pv.viewed_at DESC
)
SELECT path, COUNT(*) as cnt
FROM last_pages
GROUP BY path ORDER BY cnt DESC LIMIT 10
""")
exit_pages = db.execute(exit_sql, {'start_dt': start_dt}).fetchall()
max_entry = entry_pages[0].cnt if entry_pages else 1
max_exit = exit_pages[0].cnt if exit_pages else 1
# Top transitions (exclude bots)
transitions_sql = text("""
WITH ordered AS (
SELECT pv.session_id, pv.path,
LEAD(pv.path) OVER (PARTITION BY pv.session_id ORDER BY pv.viewed_at) AS next_path
FROM page_views pv
JOIN user_sessions us ON pv.session_id = us.id
WHERE pv.viewed_at >= :start_dt AND pv.session_id IS NOT NULL AND us.is_bot = false
)
SELECT path, next_path, COUNT(*) as cnt
FROM ordered
WHERE next_path IS NOT NULL AND path != next_path
GROUP BY path, next_path ORDER BY cnt DESC LIMIT 30
""")
transitions = db.execute(transitions_sql, {'start_dt': start_dt}).fetchall()
# Drop-off pages (high exit rate, exclude bots)
dropoff_sql = text("""
WITH page_stats AS (
SELECT pv.path, COUNT(*) as total_views
FROM page_views pv
JOIN user_sessions us ON pv.session_id = us.id
WHERE pv.viewed_at >= :start_dt AND pv.session_id IS NOT NULL AND us.is_bot = false
GROUP BY pv.path HAVING COUNT(*) >= 5
),
exit_stats AS (
SELECT path, COUNT(*) as exit_count
FROM (
SELECT DISTINCT ON (pv.session_id) pv.path
FROM page_views pv
JOIN user_sessions us ON pv.session_id = us.id
WHERE pv.viewed_at >= :start_dt AND pv.session_id IS NOT NULL AND us.is_bot = false
ORDER BY pv.session_id, pv.viewed_at DESC
) lp
GROUP BY path
)
SELECT ps.path, ps.total_views as views,
COALESCE(es.exit_count, 0) as exits,
ROUND(COALESCE(es.exit_count, 0)::numeric / ps.total_views * 100, 1) as exit_rate
FROM page_stats ps
LEFT JOIN exit_stats es ON ps.path = es.path
ORDER BY exit_rate DESC LIMIT 20
""")
dropoff = db.execute(dropoff_sql, {'start_dt': start_dt}).fetchall()
# Session length distribution (exclude bots)
session_length_sql = text("""
SELECT
CASE
WHEN pv_count = 1 THEN '1 strona'
WHEN pv_count = 2 THEN '2 strony'
WHEN pv_count BETWEEN 3 AND 5 THEN '3-5 stron'
WHEN pv_count BETWEEN 6 AND 10 THEN '6-10 stron'
ELSE '10+ stron'
END as bucket,
COUNT(*) as cnt
FROM (
SELECT pv.session_id, COUNT(*) as pv_count
FROM page_views pv
JOIN user_sessions us ON pv.session_id = us.id
WHERE pv.viewed_at >= :start_dt AND pv.session_id IS NOT NULL AND us.is_bot = false
GROUP BY pv.session_id
) session_counts
GROUP BY bucket
ORDER BY MIN(pv_count)
""")
session_lengths = db.execute(session_length_sql, {'start_dt': start_dt}).fetchall()
max_sl = max((r.cnt for r in session_lengths), default=1) or 1
return {
'entry_pages': [{'path': r.path, 'count': r.cnt, 'bar_pct': int(r.cnt / max_entry * 100)} for r in entry_pages],
'exit_pages': [{'path': r.path, 'count': r.cnt, 'bar_pct': int(r.cnt / max_exit * 100)} for r in exit_pages],
'transitions': [{'from': r.path, 'to': r.next_path, 'count': r.cnt} for r in transitions],
'dropoff': [{'path': r.path, 'views': r.views, 'exits': r.exits, 'exit_rate': float(r.exit_rate)} for r in dropoff],
'session_lengths': [{'bucket': r.bucket, 'count': r.cnt, 'bar_pct': int(r.cnt / max_sl * 100)} for r in session_lengths],
}
# ============================================================
# TAB 5: OVERVIEW
# ============================================================
def _tab_overview(db, start_date, days):
"""Overview charts - sessions, hourly heatmap, devices."""
filter_type = request.args.get('filter', 'all') # all, logged, anonymous
start_dt = datetime.combine(start_date, datetime.min.time())
start_30d = datetime.combine(date.today() - timedelta(days=30), datetime.min.time())
# Daily sessions from analytics_daily (already bot-filtered after migration)
daily_data = db.query(AnalyticsDaily).filter(
AnalyticsDaily.date >= date.today() - timedelta(days=30)
).order_by(AnalyticsDaily.date).all()
chart_labels = []
chart_sessions = []
for d in daily_data:
chart_labels.append(d.date.strftime('%d.%m'))
if filter_type == 'logged':
chart_sessions.append(d.total_sessions - (d.anonymous_sessions or 0))
elif filter_type == 'anonymous':
chart_sessions.append(d.anonymous_sessions or 0)
else:
chart_sessions.append(d.total_sessions or 0)
# Daily page views from raw PageView + JOIN (bot-filtered, supports logged/anon filter)
pv_filter = [
PageView.viewed_at >= start_30d,
UserSession.is_bot == False,
]
if filter_type == 'logged':
pv_filter.append(UserSession.user_id.isnot(None))
elif filter_type == 'anonymous':
pv_filter.append(UserSession.user_id.is_(None))
pv_daily = db.query(
func.date(PageView.viewed_at).label('day'),
func.count(PageView.id).label('cnt')
).join(UserSession, PageView.session_id == UserSession.id).filter(
*pv_filter
).group_by(func.date(PageView.viewed_at)).all()
pv_by_date = {str(r.day): r.cnt for r in pv_daily}
chart_pageviews = []
for d in daily_data:
chart_pageviews.append(pv_by_date.get(str(d.date), 0))
# Hourly heatmap (7 days x 24 hours, exclude bots)
heatmap_sql = text("""
SELECT EXTRACT(DOW FROM started_at)::int as dow,
EXTRACT(HOUR FROM started_at)::int as hour,
COUNT(*) as cnt
FROM user_sessions
WHERE started_at >= :start_dt AND is_bot = false
GROUP BY dow, hour
""")
heatmap_raw = db.execute(heatmap_sql, {'start_dt': start_30d}).fetchall()
heatmap = {}
max_heat = 1
for r in heatmap_raw:
key = (r.dow, r.hour)
heatmap[key] = r.cnt
if r.cnt > max_heat:
max_heat = r.cnt
heatmap_grid = []
dow_names = ['Nd', 'Pn', 'Wt', 'Śr', 'Cz', 'Pt', 'Sb']
for dow in range(7):
row = {'name': dow_names[dow], 'hours': []}
for h in range(24):
cnt = heatmap.get((dow, h), 0)
intensity = int(cnt / max_heat * 100) if max_heat else 0
row['hours'].append({'count': cnt, 'intensity': intensity})
heatmap_grid.append(row)
# Logged vs Anonymous (exclude bots)
total_logged = db.query(func.count(UserSession.id)).filter(
UserSession.started_at >= start_30d,
UserSession.user_id.isnot(None),
UserSession.is_bot == False
).scalar() or 0
total_anon = db.query(func.count(UserSession.id)).filter(
UserSession.started_at >= start_30d,
UserSession.user_id.is_(None),
UserSession.is_bot == False
).scalar() or 0
# Devices over time (weekly, exclude bots)
devices_sql = text("""
SELECT DATE_TRUNC('week', started_at)::date as week,
device_type,
COUNT(*) as cnt
FROM user_sessions
WHERE started_at >= :start_dt AND is_bot = false
GROUP BY week, device_type
ORDER BY week
""")
devices_raw = db.execute(devices_sql, {'start_dt': start_30d}).fetchall()
weeks_set = sorted(set(r.week for r in devices_raw))
device_map = {}
for r in devices_raw:
if r.week not in device_map:
device_map[r.week] = {}
device_map[r.week][r.device_type or 'unknown'] = r.cnt
device_labels = [w.strftime('%d.%m') for w in weeks_set]
device_desktop = [device_map.get(w, {}).get('desktop', 0) for w in weeks_set]
device_mobile = [device_map.get(w, {}).get('mobile', 0) for w in weeks_set]
device_tablet = [device_map.get(w, {}).get('tablet', 0) for w in weeks_set]
return {
'filter_type': filter_type,
'chart_data': {
'labels': chart_labels,
'sessions': chart_sessions,
'pageviews': chart_pageviews,
},
'heatmap': heatmap_grid,
'logged_vs_anon': {'logged': total_logged, 'anonymous': total_anon},
'devices': {
'labels': device_labels,
'desktop': device_desktop,
'mobile': device_mobile,
'tablet': device_tablet,
},
}
# ============================================================
# USER PROFILE DRILL-DOWN
# ============================================================
@bp.route('/user-insights/user/<int:user_id>')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def user_insights_profile(user_id):
"""Individual user behavioral profile."""
db = SessionLocal()
try:
user = db.query(User).options(joinedload(User.company)).get(user_id)
if not user:
flash('Użytkownik nie znaleziony.', 'error')
return redirect(url_for('admin.user_insights'))
now = datetime.now()
start_30d = datetime.combine(date.today() - timedelta(days=30), datetime.min.time())
start_7d = datetime.combine(date.today() - timedelta(days=7), datetime.min.time())
# Engagement score (30d)
s30 = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user_id, UserSession.started_at >= start_30d
).scalar() or 0
pv30 = db.query(func.count(PageView.id)).filter(
PageView.user_id == user_id, PageView.viewed_at >= start_30d
).scalar() or 0
clicks30 = db.query(func.sum(UserSession.clicks_count)).filter(
UserSession.user_id == user_id, UserSession.started_at >= start_30d
).scalar() or 0
dur30 = db.query(func.sum(UserSession.duration_seconds)).filter(
UserSession.user_id == user_id, UserSession.started_at >= start_30d
).scalar() or 0
conv30 = db.query(func.count(ConversionEvent.id)).filter(
ConversionEvent.user_id == user_id, ConversionEvent.converted_at >= start_30d
).scalar() or 0
search30 = db.query(func.count(SearchQuery.id)).filter(
SearchQuery.user_id == user_id, SearchQuery.searched_at >= start_30d
).scalar() or 0
raw = (s30 * 3 + pv30 * 1 + int(clicks30) * 0.5 +
int(dur30) / 60 * 2 + conv30 * 10 + search30 * 2)
engagement_score = _log_engagement_score(raw)
# Problem score (failed logins from audit_logs, time-based)
fl = db.query(func.count(AuditLog.id)).filter(
AuditLog.user_email == user.email,
AuditLog.action == 'login_failed',
AuditLog.created_at >= start_7d
).scalar() or 0
sa_7d = db.query(func.count(SecurityAlert.id)).filter(
SecurityAlert.user_email == user.email,
SecurityAlert.created_at >= start_7d
).scalar() or 0
pr_30d = db.query(func.count(EmailLog.id)).filter(
EmailLog.recipient_email == user.email,
EmailLog.email_type == 'password_reset',
EmailLog.created_at >= start_30d
).scalar() or 0
je_7d = db.query(func.count(JSError.id)).join(
UserSession, JSError.session_id == UserSession.id
).filter(
UserSession.user_id == user_id,
JSError.occurred_at >= start_7d
).scalar() or 0
sp_7d = db.query(func.count(PageView.id)).filter(
PageView.user_id == user_id,
PageView.viewed_at >= start_7d,
PageView.load_time_ms > 3000
).scalar() or 0
is_locked = 1 if user.locked_until and user.locked_until > now else 0
problem_score = min(100,
fl * 10 + pr_30d * 15 + je_7d * 3 + sp_7d * 2 + sa_7d * 20 + is_locked * 40
)
# ============================================================
# FULL PROBLEM CHRONOLOGY (audit_logs + email_logs + sessions)
# ============================================================
timeline = []
# Audit logs: login attempts (successful and failed)
audit_entries = db.query(AuditLog).filter(
AuditLog.user_email == user.email
).order_by(desc(AuditLog.created_at)).limit(50).all()
for a in audit_entries:
if a.action == 'login':
timeline.append({
'type': 'login',
'icon': 'key',
'time': a.created_at,
'desc': f'Zalogowano pomyślnie',
'detail': f'IP: {a.ip_address or "?"}',
'css': 'success',
})
elif a.action == 'login_failed':
timeline.append({
'type': 'problem',
'icon': 'x',
'time': a.created_at,
'desc': f'Nieudane logowanie',
'detail': f'IP: {a.ip_address or "?"}',
'css': 'danger',
})
elif a.action == 'email_verified':
timeline.append({
'type': 'login',
'icon': 'check',
'time': a.created_at,
'desc': 'Email zweryfikowany',
'detail': '',
'css': 'success',
})
elif a.action == 'logout':
timeline.append({
'type': 'info',
'icon': 'logout',
'time': a.created_at,
'desc': 'Wylogowanie',
'detail': '',
'css': 'muted',
})
else:
timeline.append({
'type': 'info',
'icon': 'info',
'time': a.created_at,
'desc': f'{a.action}',
'detail': f'{a.entity_type or ""} {a.entity_name or ""}',
'css': 'muted',
})
# All emails sent to this user
all_emails = db.query(EmailLog).filter(
EmailLog.recipient_email == user.email
).order_by(desc(EmailLog.created_at)).limit(30).all()
for e in all_emails:
email_labels = {
'password_reset': 'Reset hasła',
'welcome': 'Email powitalny',
'notification': 'Powiadomienie',
'forum_notification': 'Powiadomienie z forum',
'role_notification': 'Zmiana roli',
'registration_notification': 'Rejestracja',
}
label = email_labels.get(e.email_type, e.email_type)
status_label = {'sent': 'wysłany', 'failed': 'BŁĄD', 'pending': 'oczekuje'}.get(e.status, e.status)
css = 'warning' if e.email_type == 'password_reset' else 'info'
if e.status == 'failed':
css = 'danger'
timeline.append({
'type': 'email',
'icon': 'mail',
'time': e.created_at,
'desc': f'Email: {label} ({status_label})',
'detail': e.subject or '',
'css': css,
})
# Sessions (browser/device context)
sessions = db.query(UserSession).filter(
UserSession.user_id == user_id
).order_by(desc(UserSession.started_at)).limit(20).all()
for s in sessions:
dur = f', {s.duration_seconds // 60}min' if s.duration_seconds else ''
timeline.append({
'type': 'login',
'icon': 'monitor',
'time': s.started_at,
'desc': f'Sesja: {s.device_type or "?"} / {s.browser or "?"} / {s.os or "?"}',
'detail': f'{s.page_views_count or 0} stron, {s.clicks_count or 0} kliknięć{dur}',
'css': 'info',
})
# Key page views
key_paths = ['/', '/forum', '/chat', '/search', '/admin', '/events', '/membership']
recent_pvs = db.query(PageView).filter(
PageView.user_id == user_id,
).order_by(desc(PageView.viewed_at)).limit(50).all()
for pv in recent_pvs:
is_key = any(pv.path == p or pv.path.startswith(p + '/') for p in key_paths)
if is_key or '/company/' in pv.path:
load_info = ''
if pv.load_time_ms and pv.load_time_ms > 3000:
load_info = f' (WOLNE: {pv.load_time_ms}ms)'
timeline.append({
'type': 'pageview',
'icon': 'eye',
'time': pv.viewed_at,
'desc': f'Odwiedzono: {pv.path}{load_info}',
'detail': '',
'css': 'danger' if load_info else 'muted',
})
# Searches
searches = db.query(SearchQuery).filter(
SearchQuery.user_id == user_id
).order_by(desc(SearchQuery.searched_at)).limit(10).all()
for s in searches:
timeline.append({
'type': 'search',
'icon': 'search',
'time': s.searched_at,
'desc': f'Szukano: "{s.query}"',
'detail': f'{s.results_count} wyników' if s.results_count else 'Brak wyników',
'css': 'muted' if s.has_results else 'warning',
})
# Conversions
convs = db.query(ConversionEvent).filter(
ConversionEvent.user_id == user_id
).order_by(desc(ConversionEvent.converted_at)).limit(10).all()
for c in convs:
timeline.append({
'type': 'conversion',
'icon': 'check',
'time': c.converted_at,
'desc': f'Konwersja: {c.event_type}',
'detail': c.target_type or '',
'css': 'success',
})
# Security alerts
sec_alerts = db.query(SecurityAlert).filter(
SecurityAlert.user_email == user.email
).order_by(desc(SecurityAlert.created_at)).limit(10).all()
for a in sec_alerts:
timeline.append({
'type': 'problem',
'icon': 'shield',
'time': a.created_at,
'desc': f'Alert: {a.alert_type} ({a.severity})',
'detail': f'IP: {a.ip_address or "?"}',
'css': 'danger' if a.severity in ('high', 'critical') else 'warning',
})
# Account creation event
if user.created_at:
has_welcome = db.query(EmailLog.id).filter(
EmailLog.recipient_email == user.email,
EmailLog.email_type == 'welcome'
).first() is not None
timeline.append({
'type': 'info',
'icon': 'user',
'time': user.created_at,
'desc': 'Konto utworzone',
'detail': f'Email powitalny: {"Tak" if has_welcome else "NIE WYSŁANO"}',
'css': 'info' if has_welcome else 'danger',
})
timeline.sort(key=lambda x: x['time'], reverse=True)
timeline = timeline[:150]
# ============================================================
# PROBLEM RESOLUTION STATUS
# ============================================================
resolution = None
has_problems = (fl > 0 or pr_30d > 0 or is_locked)
if has_problems or user.last_login is None:
# Find first symptom
first_failed = db.query(func.min(AuditLog.created_at)).filter(
AuditLog.user_email == user.email,
AuditLog.action == 'login_failed'
).scalar()
first_reset = db.query(func.min(EmailLog.created_at)).filter(
EmailLog.recipient_email == user.email,
EmailLog.email_type == 'password_reset'
).scalar()
first_symptom = None
if first_failed and first_reset:
first_symptom = min(first_failed, first_reset)
else:
first_symptom = first_failed or first_reset
# What was sent
all_resets = db.query(EmailLog).filter(
EmailLog.recipient_email == user.email,
EmailLog.email_type == 'password_reset'
).order_by(EmailLog.created_at).all()
# Did user login after last reset?
last_reset_time = all_resets[-1].created_at if all_resets else None
login_after_reset = None
if last_reset_time:
login_after_reset = db.query(AuditLog).filter(
AuditLog.user_email == user.email,
AuditLog.action == 'login',
AuditLog.created_at > last_reset_time
).first()
# Has active token?
has_active_token = (
user.reset_token is not None and
user.reset_token_expires is not None and
user.reset_token_expires > now
)
# Determine status
if user.last_login and (not last_reset_time or user.last_login > last_reset_time):
status = 'resolved'
status_label = 'Rozwiązany'
elif login_after_reset:
status = 'resolved'
status_label = 'Rozwiązany'
elif has_active_token:
status = 'pending'
status_label = f'Oczekuje (token ważny do {user.reset_token_expires.strftime("%d.%m %H:%M")})'
elif is_locked:
status = 'blocked'
status_label = f'Zablokowany (do {user.locked_until.strftime("%d.%m %H:%M")})'
elif all_resets and not login_after_reset:
status = 'unresolved'
status_label = 'Nierozwiązany (token wygasł, brak loginu)'
elif user.last_login is None:
status = 'unresolved'
status_label = 'Nigdy nie zalogowany'
else:
status = 'unknown'
status_label = 'Nieznany'
# Time to resolution
duration = None
if status == 'resolved' and first_symptom:
resolved_at = login_after_reset.created_at if login_after_reset else user.last_login
if resolved_at:
delta = resolved_at - first_symptom
hours = delta.total_seconds() / 3600
if hours < 1:
duration = f'{int(delta.total_seconds() / 60)} min'
elif hours < 24:
duration = f'{hours:.1f} godz.'
else:
duration = f'{delta.days} dni'
resolution = {
'status': status,
'status_label': status_label,
'first_symptom': first_symptom,
'resets_sent': len(all_resets),
'last_reset': last_reset_time,
'login_after_reset': login_after_reset is not None,
'has_active_token': has_active_token,
'duration': duration,
'has_welcome_email': db.query(EmailLog.id).filter(
EmailLog.recipient_email == user.email,
EmailLog.email_type == 'welcome'
).first() is not None,
}
# Favorite pages (top 10)
fav_pages = db.query(
PageView.path,
func.count(PageView.id).label('cnt')
).filter(
PageView.user_id == user_id,
PageView.viewed_at >= start_30d
).group_by(PageView.path).order_by(desc('cnt')).limit(10).all()
max_fav = fav_pages[0].cnt if fav_pages else 1
# Device/browser breakdown
devices = db.query(
UserSession.device_type,
func.count(UserSession.id).label('cnt')
).filter(
UserSession.user_id == user_id,
UserSession.started_at >= start_30d
).group_by(UserSession.device_type).all()
browsers = db.query(
UserSession.browser,
func.count(UserSession.id).label('cnt')
).filter(
UserSession.user_id == user_id,
UserSession.started_at >= start_30d
).group_by(UserSession.browser).order_by(desc('cnt')).limit(5).all()
# Hourly activity pattern (24 bars)
hourly_sql = text("""
SELECT EXTRACT(HOUR FROM started_at)::int as hour, COUNT(*) as cnt
FROM user_sessions
WHERE user_id = :uid AND started_at >= :start_dt
GROUP BY hour ORDER BY hour
""")
hourly_raw = db.execute(hourly_sql, {'uid': user_id, 'start_dt': start_30d}).fetchall()
hourly = {r.hour: r.cnt for r in hourly_raw}
max_hourly = max(hourly.values(), default=1) or 1
hourly_bars = []
for h in range(24):
cnt = hourly.get(h, 0)
hourly_bars.append({'hour': h, 'count': cnt, 'pct': int(cnt / max_hourly * 100)})
# Daily engagement trend (30d for Chart.js)
trend_labels = []
trend_scores = []
for i in range(30):
d = date.today() - timedelta(days=29 - i)
d_start = datetime.combine(d, datetime.min.time())
d_end = datetime.combine(d + timedelta(days=1), datetime.min.time())
d_sessions = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user_id,
UserSession.started_at >= d_start,
UserSession.started_at < d_end
).scalar() or 0
d_pv = db.query(func.count(PageView.id)).filter(
PageView.user_id == user_id,
PageView.viewed_at >= d_start,
PageView.viewed_at < d_end
).scalar() or 0
daily_score = _log_engagement_score(d_sessions * 3 + d_pv)
trend_labels.append(d.strftime('%d.%m'))
trend_scores.append(daily_score)
# Problem history
js_errors_list = db.query(JSError).join(
UserSession, JSError.session_id == UserSession.id
).filter(
UserSession.user_id == user_id
).order_by(desc(JSError.occurred_at)).limit(10).all()
slow_pages_list = db.query(PageView).filter(
PageView.user_id == user_id,
PageView.load_time_ms > 3000
).order_by(desc(PageView.viewed_at)).limit(10).all()
# Avg sessions per week
weeks_active = max(1, (date.today() - (user.created_at.date() if user.created_at else date.today())).days / 7)
total_sessions_all = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user_id
).scalar() or 0
avg_sessions_week = round(total_sessions_all / weeks_active, 1)
avg_session_dur = db.query(func.avg(UserSession.duration_seconds)).filter(
UserSession.user_id == user_id,
UserSession.duration_seconds.isnot(None)
).scalar() or 0
return render_template(
'admin/user_insights_profile.html',
user=user,
engagement_score=engagement_score,
problem_score=problem_score,
timeline=timeline,
fav_pages=[{'path': p.path, 'count': p.cnt, 'bar_pct': int(p.cnt / max_fav * 100)} for p in fav_pages],
devices=[{'type': d.device_type or 'unknown', 'count': d.cnt} for d in devices],
browsers=[{'name': b.browser or 'unknown', 'count': b.cnt} for b in browsers],
hourly_bars=hourly_bars,
trend_data={'labels': trend_labels, 'scores': trend_scores},
js_errors=js_errors_list,
slow_pages=slow_pages_list,
password_resets=pr_30d,
security_alerts_count=sa_7d,
avg_sessions_week=avg_sessions_week,
avg_session_duration=int(avg_session_dur),
search_queries=searches,
resolution=resolution,
)
except Exception as e:
logger.error(f"User insights profile error: {e}", exc_info=True)
flash('Błąd ładowania profilu użytkownika.', 'error')
return redirect(url_for('admin.user_insights'))
finally:
db.close()
# ============================================================
# CSV EXPORT
# ============================================================
@bp.route('/user-insights/export')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def user_insights_export():
"""Export user insights data as CSV."""
export_type = request.args.get('type', 'engagement')
period = request.args.get('period', 'week')
start_date, days = _get_period_dates(period)
db = SessionLocal()
try:
output = io.StringIO()
writer = csv.writer(output)
if export_type == 'problems':
data = _tab_problems(db, start_date, days)
writer.writerow(['Użytkownik', 'Email', 'Problem Score', 'Nieudane logowania',
'Resety hasła', 'Błędy JS', 'Wolne strony', 'Ostatni login'])
for p in data['problem_users']:
writer.writerow([
p['user'].name, p['user'].email, p['score'],
p['failed_logins'], p['password_resets'], p['js_errors'],
p['slow_pages'], p['last_login'] or 'Nigdy'
])
elif export_type == 'engagement':
data = _tab_engagement(db, start_date, days)
writer.writerow(['Użytkownik', 'Email', 'Score', 'Sesje', 'Odsłony',
'Zmiana WoW %', 'Status'])
for e in data['engagement_list']:
writer.writerow([
e['user'].name, e['user'].email, e['score'],
e['sessions'], e['page_views'],
f"{e['wow']}%" if e['wow'] is not None else 'N/A',
e['status']
])
elif export_type == 'pages':
data = _tab_pages(db, start_date, days)
writer.writerow(['Ścieżka', 'Odsłony', 'Unikalni', 'Śr. czas (s)',
'Śr. scroll %', 'Śr. ładowanie (ms)'])
for p in data['top_pages']:
writer.writerow([
p['path'], p['views'], p['unique_users'],
p['avg_time'], p['avg_scroll'], p['avg_load']
])
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={'Content-Disposition': f'attachment; filename=user_insights_{export_type}_{period}.csv'}
)
except Exception as e:
logger.error(f"User insights export error: {e}")
flash('Błąd eksportu danych.', 'error')
return redirect(url_for('admin.user_insights'))
finally:
db.close()