nordabiz/blueprints/admin/routes_audits.py
Maciej Pienczyn 4181a2e760 refactor: Migrate access control from is_admin to role-based system
Replace ~170 manual `if not current_user.is_admin` checks with:
- @role_required(SystemRole.ADMIN) for user management, security, ZOPK
- @role_required(SystemRole.OFFICE_MANAGER) for content management
- current_user.can_access_admin_panel() for admin UI access
- current_user.can_moderate_forum() for forum moderation
- current_user.can_edit_company(id) for company permissions

Add @office_manager_required decorator shortcut.
Add SQL migration to sync existing users' role field.

Role hierarchy: UNAFFILIATED(10) < MEMBER(20) < EMPLOYEE(30) < MANAGER(40) < OFFICE_MANAGER(50) < ADMIN(100)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 21:05:22 +01:00

726 lines
28 KiB
Python

"""
Admin Audit Routes
==================
SEO and GBP audit dashboards for admin panel.
"""
import logging
from datetime import datetime
from flask import render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
from . import bp
from database import (
SessionLocal, Company, Category, CompanyWebsiteAnalysis, GBPAudit,
CompanyDigitalMaturity, KRSAudit, CompanyPKD, CompanyPerson,
ITAudit, ITCollaborationMatch, SystemRole
)
from utils.decorators import role_required
logger = logging.getLogger(__name__)
# ============================================================
# SEO ADMIN DASHBOARD
# ============================================================
@bp.route('/seo')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_seo():
"""
Admin dashboard for SEO metrics overview.
Displays:
- Summary stats (score distribution, average score)
- Sortable table of all companies with SEO scores
- Color-coded score badges (green 90-100, yellow 50-89, red 0-49)
- Filtering by category, score range, and search text
- Last audit date with staleness indicator
- Actions: view profile, trigger single company audit
Query Parameters:
- company: Slug of company to highlight/filter (optional)
"""
# Get optional company filter from URL
filter_company_slug = request.args.get('company', '')
db = SessionLocal()
try:
from sqlalchemy import func
# Get all active companies with their latest SEO analysis data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
Category.name.label('category_name'),
CompanyWebsiteAnalysis.pagespeed_seo_score,
CompanyWebsiteAnalysis.pagespeed_performance_score,
CompanyWebsiteAnalysis.pagespeed_accessibility_score,
CompanyWebsiteAnalysis.pagespeed_best_practices_score,
CompanyWebsiteAnalysis.seo_audited_at
).outerjoin(
Category,
Company.category_id == Category.id
).outerjoin(
CompanyWebsiteAnalysis,
Company.id == CompanyWebsiteAnalysis.company_id
).filter(
Company.status == 'active'
).order_by(
Company.name
).all()
# Build companies list with named attributes for template
companies = []
for row in companies_query:
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'website': row.website,
'category': row.category_name,
'seo_score': row.pagespeed_seo_score,
'performance_score': row.pagespeed_performance_score,
'accessibility_score': row.pagespeed_accessibility_score,
'best_practices_score': row.pagespeed_best_practices_score,
'seo_audited_at': row.seo_audited_at
})
# Calculate statistics
audited_companies = [c for c in companies if c['seo_score'] is not None]
not_audited = [c for c in companies if c['seo_score'] is None]
good_count = len([c for c in audited_companies if c['seo_score'] >= 90])
medium_count = len([c for c in audited_companies if 50 <= c['seo_score'] < 90])
poor_count = len([c for c in audited_companies if c['seo_score'] < 50])
not_audited_count = len(not_audited)
# Calculate average score (only for audited companies)
if audited_companies:
avg_score = round(sum(c['seo_score'] for c in audited_companies) / len(audited_companies))
else:
avg_score = None
stats = {
'good_count': good_count,
'medium_count': medium_count,
'poor_count': poor_count,
'not_audited_count': not_audited_count,
'avg_score': avg_score
}
# Get unique categories for filter dropdown
categories = sorted(set(c['category'] for c in companies if c['category']))
# Convert companies list to objects with attribute access for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
return render_template('admin_seo_dashboard.html',
companies=companies_objects,
stats=stats,
categories=categories,
now=datetime.now(),
filter_company=filter_company_slug
)
finally:
db.close()
# ============================================================
# GBP AUDIT ADMIN DASHBOARD
# ============================================================
@bp.route('/gbp-audit')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_gbp_audit():
"""
Admin dashboard for GBP (Google Business Profile) audit overview.
Displays:
- Summary stats (completeness score distribution, field coverage)
- Sortable table of all companies with GBP audit data
- Review metrics (avg rating, review counts)
- Photo statistics
"""
db = SessionLocal()
try:
from sqlalchemy import func
# Subquery to get latest audit for each company
latest_audit_subq = db.query(
GBPAudit.company_id,
func.max(GBPAudit.audit_date).label('max_date')
).group_by(GBPAudit.company_id).subquery()
# Get all companies with their latest GBP audit data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
Category.name.label('category_name'),
GBPAudit.completeness_score,
GBPAudit.average_rating,
GBPAudit.review_count,
GBPAudit.photo_count,
GBPAudit.has_name,
GBPAudit.has_address,
GBPAudit.has_phone,
GBPAudit.has_website,
GBPAudit.has_hours,
GBPAudit.has_categories,
GBPAudit.has_photos,
GBPAudit.has_description,
GBPAudit.has_services,
GBPAudit.has_reviews,
GBPAudit.audit_date
).outerjoin(
Category,
Company.category_id == Category.id
).outerjoin(
latest_audit_subq,
Company.id == latest_audit_subq.c.company_id
).outerjoin(
GBPAudit,
(Company.id == GBPAudit.company_id) &
(GBPAudit.audit_date == latest_audit_subq.c.max_date)
).filter(
Company.status == 'active'
).order_by(Company.name).all()
# Build companies list
companies = []
for row in companies_query:
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'website': row.website,
'category': row.category_name,
'completeness_score': row.completeness_score,
'average_rating': float(row.average_rating) if row.average_rating else None,
'review_count': row.review_count or 0,
'photo_count': row.photo_count or 0,
'has_name': row.has_name,
'has_address': row.has_address,
'has_phone': row.has_phone,
'has_website': row.has_website,
'has_hours': row.has_hours,
'has_categories': row.has_categories,
'has_photos': row.has_photos,
'has_description': row.has_description,
'has_services': row.has_services,
'has_reviews': row.has_reviews,
'audit_date': row.audit_date
})
# Calculate statistics
total_companies = len(companies)
audited = [c for c in companies if c['completeness_score'] is not None]
not_audited = [c for c in companies if c['completeness_score'] is None]
# Score distribution
excellent_count = len([c for c in audited if c['completeness_score'] >= 90])
good_count = len([c for c in audited if 70 <= c['completeness_score'] < 90])
poor_count = len([c for c in audited if c['completeness_score'] < 70])
not_audited_count = len(not_audited)
# Average completeness
avg_completeness = round(sum(c['completeness_score'] for c in audited) / len(audited)) if audited else None
# Average rating (only for companies with reviews)
companies_with_rating = [c for c in audited if c['average_rating']]
avg_rating = round(sum(c['average_rating'] for c in companies_with_rating) / len(companies_with_rating), 1) if companies_with_rating else None
# Total reviews
total_reviews = sum(c['review_count'] for c in companies)
# Field coverage stats (percentage of audited companies with each field)
if audited:
field_coverage = {
'name': round(len([c for c in audited if c['has_name']]) / len(audited) * 100),
'address': round(len([c for c in audited if c['has_address']]) / len(audited) * 100),
'phone': round(len([c for c in audited if c['has_phone']]) / len(audited) * 100),
'website': round(len([c for c in audited if c['has_website']]) / len(audited) * 100),
'hours': round(len([c for c in audited if c['has_hours']]) / len(audited) * 100),
'categories': round(len([c for c in audited if c['has_categories']]) / len(audited) * 100),
'photos': round(len([c for c in audited if c['has_photos']]) / len(audited) * 100),
'description': round(len([c for c in audited if c['has_description']]) / len(audited) * 100),
'services': round(len([c for c in audited if c['has_services']]) / len(audited) * 100),
'reviews': round(len([c for c in audited if c['has_reviews']]) / len(audited) * 100),
}
else:
field_coverage = {k: 0 for k in ['name', 'address', 'phone', 'website', 'hours', 'categories', 'photos', 'description', 'services', 'reviews']}
stats = {
'total_companies': total_companies,
'audited_count': len(audited),
'excellent_count': excellent_count,
'good_count': good_count,
'poor_count': poor_count,
'not_audited_count': not_audited_count,
'avg_completeness': avg_completeness,
'avg_rating': avg_rating,
'total_reviews': total_reviews,
'field_coverage': field_coverage
}
# Get unique categories
categories = sorted(set(c['category'] for c in companies if c['category']))
# Convert to objects for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
return render_template('admin/gbp_audit_dashboard.html',
companies=companies_objects,
stats=stats,
categories=categories,
now=datetime.now()
)
finally:
db.close()
# ============================================================
# DIGITAL MATURITY DASHBOARD
# ============================================================
@bp.route('/digital-maturity')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def digital_maturity_dashboard():
"""Admin dashboard for digital maturity assessment results"""
db = SessionLocal()
try:
from sqlalchemy import func, desc
# Get all companies with maturity data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
CompanyDigitalMaturity.overall_score,
CompanyDigitalMaturity.online_presence_score,
CompanyDigitalMaturity.sales_readiness,
CompanyDigitalMaturity.total_opportunity_value,
CompanyWebsiteAnalysis.opportunity_score,
CompanyWebsiteAnalysis.has_blog,
CompanyWebsiteAnalysis.has_portfolio,
CompanyWebsiteAnalysis.has_contact_form,
CompanyWebsiteAnalysis.content_richness_score,
CompanyDigitalMaturity.critical_gaps,
CompanyWebsiteAnalysis.missing_features
).join(
CompanyDigitalMaturity, Company.id == CompanyDigitalMaturity.company_id
).join(
CompanyWebsiteAnalysis, Company.id == CompanyWebsiteAnalysis.company_id
).filter(
CompanyDigitalMaturity.overall_score > 0
).order_by(
desc(CompanyDigitalMaturity.overall_score)
).all()
# Calculate stats
total_analyzed = len(companies_query)
avg_score = round(sum(c.overall_score for c in companies_query) / total_analyzed, 1) if total_analyzed else 0
total_opportunity = sum(float(c.total_opportunity_value or 0) for c in companies_query)
warm_leads = [c for c in companies_query if c.sales_readiness == 'warm']
cold_leads = [c for c in companies_query if c.sales_readiness == 'cold']
# Top 10 and bottom 10
top_performers = companies_query[:10]
bottom_performers = sorted(companies_query, key=lambda c: c.overall_score)[:10]
# Top opportunities
top_opportunities = sorted(
companies_query,
key=lambda c: float(c.total_opportunity_value or 0),
reverse=True
)[:10]
return render_template('admin/digital_maturity.html',
total_analyzed=total_analyzed,
avg_score=avg_score,
total_opportunity=total_opportunity,
warm_leads_count=len(warm_leads),
cold_leads_count=len(cold_leads),
top_performers=top_performers,
bottom_performers=bottom_performers,
top_opportunities=top_opportunities,
all_companies=companies_query
)
finally:
db.close()
# ============================================================
# KRS AUDIT DASHBOARD
# ============================================================
@bp.route('/krs-audit')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_krs_audit():
"""
Admin dashboard for KRS (Krajowy Rejestr Sądowy) audit.
Displays:
- Summary stats (with KRS, audited count, data extraction status)
- List of companies with KRS numbers
- Audit progress and status for each company
- Links to source PDF files
"""
# Check if KRS audit service is available
try:
from krs_audit_service import KRS_AUDIT_AVAILABLE
except ImportError:
KRS_AUDIT_AVAILABLE = False
db = SessionLocal()
try:
from sqlalchemy import func
# Get all active companies with KRS numbers
companies_query = db.query(Company).filter(
Company.status == 'active',
Company.krs.isnot(None),
Company.krs != ''
).order_by(Company.name).all()
# Get latest audit for each company
companies = []
for company in companies_query:
# Get latest audit
latest_audit = db.query(KRSAudit).filter(
KRSAudit.company_id == company.id
).order_by(KRSAudit.audit_date.desc()).first()
# Get PKD codes (all)
pkd_codes = db.query(CompanyPKD).filter(
CompanyPKD.company_id == company.id
).order_by(CompanyPKD.is_primary.desc(), CompanyPKD.pkd_code).all()
pkd_count = len(pkd_codes)
# Get people count
people_count = db.query(CompanyPerson).filter(
CompanyPerson.company_id == company.id
).count()
companies.append({
'id': company.id,
'name': company.name,
'slug': company.slug,
'krs': company.krs,
'nip': company.nip,
'capital_amount': company.capital_amount,
'krs_last_audit_at': company.krs_last_audit_at,
'krs_pdf_path': company.krs_pdf_path,
'audit': latest_audit,
'pkd_count': pkd_count,
'pkd_codes': [{
'code': pkd.pkd_code,
'description': pkd.pkd_description,
'is_primary': pkd.is_primary
} for pkd in pkd_codes],
'people_count': people_count,
'capital_shares_count': company.capital_shares_count
})
# Calculate stats
total_with_krs = len(companies)
audited_count = len([c for c in companies if c['krs_last_audit_at']])
not_audited_count = total_with_krs - audited_count
with_capital = len([c for c in companies if c['capital_amount']])
with_people = len([c for c in companies if c['people_count'] > 0])
with_pkd = len([c for c in companies if c['pkd_count'] > 0])
# Companies without KRS
no_krs_count = db.query(Company).filter(
Company.status == 'active',
(Company.krs.is_(None)) | (Company.krs == '')
).count()
stats = {
'total_with_krs': total_with_krs,
'audited_count': audited_count,
'not_audited_count': not_audited_count,
'no_krs_count': no_krs_count,
'with_capital': with_capital,
'with_people': with_people,
'with_pkd': with_pkd
}
return render_template('admin/krs_audit_dashboard.html',
companies=companies,
stats=stats,
krs_audit_available=KRS_AUDIT_AVAILABLE,
now=datetime.now()
)
finally:
db.close()
# ============================================================
# IT AUDIT ADMIN DASHBOARD
# ============================================================
@bp.route('/it-audit')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_it_audit():
"""
Admin dashboard for IT audit overview.
Displays:
- Summary stats (audit count, average scores, maturity distribution)
- Technology adoption stats (Azure AD, M365, PBS, Zabbix, EDR, DR)
- Collaboration flags distribution
- Company table with IT audit data
- Collaboration matches matrix
Access: Office Manager and above
"""
db = SessionLocal()
try:
from sqlalchemy import func
# Import IT audit service helper
try:
from it_audit_service import get_maturity_level_label
except ImportError:
def get_maturity_level_label(level):
labels = {
'basic': 'Podstawowy',
'developing': 'Rozwijający się',
'established': 'Ustalony',
'advanced': 'Zaawansowany'
}
return labels.get(level, level)
# Get all active companies with their latest IT audit
# Using subquery to get only the latest audit per company
latest_audit_subq = db.query(
ITAudit.company_id,
func.max(ITAudit.audit_date).label('max_date')
).group_by(ITAudit.company_id).subquery()
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
ITAudit.id.label('audit_id'),
ITAudit.overall_score,
ITAudit.security_score,
ITAudit.collaboration_score,
ITAudit.completeness_score,
ITAudit.maturity_level,
ITAudit.audit_date,
ITAudit.has_azure_ad,
ITAudit.has_m365,
ITAudit.has_proxmox_pbs,
ITAudit.monitoring_solution,
ITAudit.has_edr,
ITAudit.has_dr_plan
).outerjoin(
latest_audit_subq,
Company.id == latest_audit_subq.c.company_id
).outerjoin(
ITAudit,
(Company.id == ITAudit.company_id) &
(ITAudit.audit_date == latest_audit_subq.c.max_date)
).filter(
Company.status == 'active'
).order_by(
Company.name
).all()
# Build companies list with named attributes for template
companies = []
for row in companies_query:
# Detect Zabbix from monitoring_solution field
has_zabbix = row.monitoring_solution and 'zabbix' in str(row.monitoring_solution).lower()
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'audit_id': row.audit_id,
'overall_score': row.overall_score,
'security_score': row.security_score,
'collaboration_score': row.collaboration_score,
'completeness_score': row.completeness_score,
'maturity_level': row.maturity_level,
'maturity_label': get_maturity_level_label(row.maturity_level) if row.maturity_level else None,
'audit_date': row.audit_date,
'has_azure_ad': row.has_azure_ad,
'has_m365': row.has_m365,
'has_proxmox_pbs': row.has_proxmox_pbs,
'has_zabbix': has_zabbix,
'has_edr': row.has_edr,
'has_dr_plan': row.has_dr_plan
})
# Calculate statistics
audited_companies = [c for c in companies if c['overall_score'] is not None]
not_audited = [c for c in companies if c['overall_score'] is None]
# Maturity distribution
maturity_counts = {
'basic': 0,
'developing': 0,
'established': 0,
'advanced': 0
}
for c in audited_companies:
level = c['maturity_level']
if level in maturity_counts:
maturity_counts[level] += 1
# Calculate average scores
if audited_companies:
avg_overall = round(sum(c['overall_score'] for c in audited_companies) / len(audited_companies))
avg_security = round(sum(c['security_score'] or 0 for c in audited_companies) / len(audited_companies))
avg_collaboration = round(sum(c['collaboration_score'] or 0 for c in audited_companies) / len(audited_companies))
else:
avg_overall = None
avg_security = None
avg_collaboration = None
# Technology adoption stats
tech_stats = {
'azure_ad': len([c for c in audited_companies if c['has_azure_ad']]),
'm365': len([c for c in audited_companies if c['has_m365']]),
'proxmox_pbs': len([c for c in audited_companies if c['has_proxmox_pbs']]),
'zabbix': len([c for c in audited_companies if c['has_zabbix']]),
'edr': len([c for c in audited_companies if c['has_edr']]),
'dr_plan': len([c for c in audited_companies if c['has_dr_plan']])
}
# Collaboration flags stats from latest audits
collab_stats = {}
if audited_companies:
collab_flags = [
'open_to_shared_licensing',
'open_to_backup_replication',
'open_to_teams_federation',
'open_to_shared_monitoring',
'open_to_collective_purchasing',
'open_to_knowledge_sharing'
]
for flag in collab_flags:
count = db.query(func.count(ITAudit.id)).filter(
ITAudit.id.in_([c['audit_id'] for c in audited_companies if c['audit_id']]),
getattr(ITAudit, flag) == True
).scalar()
collab_stats[flag] = count
# Get collaboration matches with both companies' info
matches = db.query(ITCollaborationMatch).order_by(
ITCollaborationMatch.match_score.desc()
).all()
# Build flat list of collaboration matches with all necessary attributes
class CollabMatchRow:
"""Helper class for template attribute access"""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
collaboration_matches = []
for match in matches:
# Get company A and B info
company_a = db.query(Company).filter(Company.id == match.company_a_id).first()
company_b = db.query(Company).filter(Company.id == match.company_b_id).first()
collaboration_matches.append(CollabMatchRow(
id=match.id,
match_type=match.match_type,
company_a_id=match.company_a_id,
company_a_name=company_a.name if company_a else 'Nieznana',
company_a_slug=company_a.slug if company_a else '',
company_b_id=match.company_b_id,
company_b_name=company_b.name if company_b else 'Nieznana',
company_b_slug=company_b.slug if company_b else '',
match_reason=match.match_reason,
match_score=match.match_score,
status=match.status,
created_at=match.created_at
))
stats = {
# Main stats
'total_audits': len(audited_companies),
'total_companies': len(companies),
'companies_without_audit': len(not_audited),
# Score averages
'avg_overall_score': avg_overall,
'avg_security_score': avg_security,
'avg_collaboration_score': avg_collaboration,
# Maturity distribution (flattened for template)
'maturity_basic': maturity_counts['basic'],
'maturity_developing': maturity_counts['developing'],
'maturity_established': maturity_counts['established'],
'maturity_advanced': maturity_counts['advanced'],
# Technology adoption stats (matching template naming with has_* prefix)
'has_azure_ad': tech_stats['azure_ad'],
'has_m365': tech_stats['m365'],
'has_proxmox_pbs': tech_stats['proxmox_pbs'],
'has_zabbix': tech_stats['zabbix'],
'has_edr': tech_stats['edr'],
'has_dr_plan': tech_stats['dr_plan'],
# Collaboration flags
'open_to_shared_licensing': collab_stats.get('open_to_shared_licensing', 0),
'open_to_backup_replication': collab_stats.get('open_to_backup_replication', 0),
'open_to_teams_federation': collab_stats.get('open_to_teams_federation', 0),
'open_to_shared_monitoring': collab_stats.get('open_to_shared_monitoring', 0),
'open_to_collective_purchasing': collab_stats.get('open_to_collective_purchasing', 0),
'open_to_knowledge_sharing': collab_stats.get('open_to_knowledge_sharing', 0),
# Legacy nested structures (for any templates that still use them)
'maturity_counts': maturity_counts,
'tech_stats': tech_stats,
'collab_stats': collab_stats,
'total_matches': len(collaboration_matches)
}
# Convert companies list to objects with attribute access for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
return render_template('admin/it_audit_dashboard.html',
companies=companies_objects,
stats=stats,
collaboration_matches=collaboration_matches,
now=datetime.now()
)
finally:
db.close()