nordabiz/blueprints/admin/routes_social.py
Maciej Pienczyn fe288f0441
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: YouTube Data API v3 integration for social media enrichment
- YouTubeService now fetches: subscribers, views, video count, description,
  avatar, banner, country, creation date, recent 5 videos
- Enricher uses API first, falls back to scraping
- Extra YouTube data stored in content_types JSONB
- Audit detail shows view count, country, creation date, recent videos
- Requires enabling YouTube Data API v3 in Google Cloud Console

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 14:01:15 +01:00

1327 lines
55 KiB
Python

"""
Admin Social Media Routes
==========================
Social media analytics and audit dashboards.
"""
import logging
from datetime import datetime, timedelta
import threading
from flask import render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from sqlalchemy import func, distinct, or_
from . import bp
from database import (
SessionLocal, Company, Category, CompanySocialMedia, SystemRole,
OAuthToken, SocialMediaConfig
)
from utils.decorators import role_required, is_audit_owner
# Data source hierarchy: higher priority sources should not be overwritten by lower
SOURCE_PRIORITY = {
'facebook_api': 3,
'manual_edit': 2,
'manual': 2,
'website_scrape': 1,
'brave_search': 0,
None: 0,
}
SOURCE_LABELS = {
'facebook_api': {'label': 'Facebook API (OAuth)', 'color': '#22c55e', 'icon': 'api', 'description': 'Dane pobrane bezpośrednio z Facebook Graph API przez autoryzowane połączenie OAuth. Najwyższa wiarygodność danych.'},
'manual_edit': {'label': 'Ręczna edycja', 'color': '#6b7280', 'icon': 'manual', 'description': 'URL dodany ręcznie przez menedżera firmy lub administratora w edycji profilu.'},
'manual': {'label': 'Ręczna edycja', 'color': '#6b7280', 'icon': 'manual', 'description': 'URL dodany ręcznie przez administratora.'},
'website_scrape': {'label': 'Scraping strony www', 'color': '#f59e0b', 'icon': 'scrape', 'description': 'Dane zebrane automatycznie ze strony internetowej firmy. Metryki (followers, engagement) są szacunkowe — mogą różnić się od rzeczywistych.'},
'brave_search': {'label': 'Wyszukiwarka', 'color': '#f59e0b', 'icon': 'search', 'description': 'Profil znaleziony przez wyszukiwarkę. Metryki niedostępne.'},
None: {'label': 'Nieznane', 'color': '#ef4444', 'icon': 'unknown', 'description': 'Brak informacji o źródle danych.'},
}
# Which platforms support OAuth
OAUTH_PLATFORMS = {
'facebook': {'provider': 'meta', 'service': 'facebook'},
'instagram': {'provider': 'meta', 'service': 'instagram'},
}
logger = logging.getLogger(__name__)
def _compute_followers_growth(followers_history):
"""Compute followers growth rate (%) from JSONB history.
Returns: (growth_rate_pct, trend_direction)
- growth_rate_pct: float, percentage change over last 30 days
- trend_direction: 'up', 'down', 'stable', 'unknown'
"""
if not followers_history or not isinstance(followers_history, list) or len(followers_history) < 2:
return 0, 'unknown'
# Sort by date descending
try:
sorted_history = sorted(followers_history, key=lambda x: x.get('date', ''), reverse=True)
except (TypeError, AttributeError):
return 0, 'unknown'
latest = sorted_history[0].get('count', 0)
# Find entry ~30 days ago
previous = None
for entry in sorted_history[1:]:
previous = entry.get('count', 0)
break # Just take the previous entry
if not previous or previous == 0:
return 0, 'unknown'
rate = round((latest - previous) / previous * 100, 1)
if rate > 1:
return rate, 'up'
elif rate < -1:
return rate, 'down'
return rate, 'stable'
def _compute_activity_status(last_post_date):
"""Classify account activity based on last post date.
Returns: (status, label, color)
"""
if not last_post_date:
return 'unknown', 'Brak danych', '#9ca3af'
days = (datetime.now() - last_post_date).days
if days <= 14:
return 'active', 'Aktywne', '#22c55e'
elif days <= 30:
return 'moderate', 'Umiarkowane', '#84cc16'
elif days <= 90:
return 'slow', 'Sporadyczne', '#f59e0b'
elif days <= 365:
return 'dormant', 'Uśpione', '#ef4444'
return 'abandoned', 'Porzucone', '#991b1b'
def _compute_social_health_score(platform_details):
"""Compute composite social health score (0-100) for a company.
Weights:
- Activity (30%): based on last_post_date freshness
- Engagement (25%): based on engagement_rate
- Completeness (20%): based on profile_completeness_score
- Growth (15%): based on followers_history trend
- Cross-platform (10%): based on number of platforms
"""
if not platform_details:
return 0
# Activity score (0-100)
activity_scores = []
for p in platform_details:
if p.get('last_post_date'):
days = (datetime.now() - p['last_post_date']).days
if days <= 7:
activity_scores.append(100)
elif days <= 14:
activity_scores.append(85)
elif days <= 30:
activity_scores.append(65)
elif days <= 90:
activity_scores.append(35)
elif days <= 365:
activity_scores.append(10)
else:
activity_scores.append(0)
else:
activity_scores.append(0)
activity = sum(activity_scores) / len(activity_scores) if activity_scores else 0
# Engagement score (0-100): 5%+ = excellent, 1% = good, 0 = no data
engagement_rates = [float(p.get('engagement_rate') or 0) for p in platform_details]
avg_engagement = sum(engagement_rates) / len(engagement_rates) if engagement_rates else 0
engagement = min(avg_engagement * 20, 100) # 5% engagement = 100 score
# Completeness score (0-100)
completeness_scores = [p.get('profile_completeness_score', 0) for p in platform_details]
completeness = sum(completeness_scores) / len(completeness_scores) if completeness_scores else 0
# Growth score (0-100)
growth_scores = []
for p in platform_details:
rate, _ = _compute_followers_growth(p.get('followers_history', []))
if rate > 10:
growth_scores.append(100)
elif rate > 5:
growth_scores.append(80)
elif rate > 0:
growth_scores.append(60)
elif rate == 0:
growth_scores.append(30)
else:
growth_scores.append(max(0, 30 + rate)) # Negative growth penalized
growth = sum(growth_scores) / len(growth_scores) if growth_scores else 30
# Cross-platform score (0-100)
num_platforms = len(platform_details)
cross_platform_map = {1: 20, 2: 45, 3: 70, 4: 85, 5: 95}
cross_platform = cross_platform_map.get(num_platforms, 100 if num_platforms >= 6 else 0)
# Weighted composite
score = (
activity * 0.30 +
engagement * 0.25 +
completeness * 0.20 +
growth * 0.15 +
cross_platform * 0.10
)
return round(score)
# ============================================================
# SOCIAL MEDIA ANALYTICS DASHBOARD
# ============================================================
@bp.route('/social-media')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_social_media():
"""Admin dashboard for social media analytics"""
db = SessionLocal()
try:
# Total counts per platform
platform_stats = db.query(
CompanySocialMedia.platform,
func.count(CompanySocialMedia.id).label('count'),
func.count(distinct(CompanySocialMedia.company_id)).label('companies')
).filter(
CompanySocialMedia.is_valid == True
).group_by(CompanySocialMedia.platform).all()
# Companies with each platform combination
company_platforms = db.query(
Company.id,
Company.name,
Company.slug,
func.array_agg(distinct(CompanySocialMedia.platform)).label('platforms')
).outerjoin(
CompanySocialMedia,
(Company.id == CompanySocialMedia.company_id) & (CompanySocialMedia.is_valid == True)
).group_by(Company.id, Company.name, Company.slug).all()
# Analysis
total_companies = len(company_platforms)
companies_with_sm = [c for c in company_platforms if c.platforms and c.platforms[0] is not None]
companies_without_sm = [c for c in company_platforms if not c.platforms or c.platforms[0] is None]
# Platform combinations
platform_combos_raw = {}
for c in companies_with_sm:
platforms = sorted([p for p in c.platforms if p]) if c.platforms else []
key = ', '.join(platforms) if platforms else 'Brak'
if key not in platform_combos_raw:
platform_combos_raw[key] = []
platform_combos_raw[key].append({'id': c.id, 'name': c.name, 'slug': c.slug})
# Sort by number of companies (descending)
platform_combos = dict(sorted(platform_combos_raw.items(), key=lambda x: len(x[1]), reverse=True))
# Only Facebook
only_facebook = [c for c in companies_with_sm if set(c.platforms) == {'facebook'}]
# Only LinkedIn
only_linkedin = [c for c in companies_with_sm if set(c.platforms) == {'linkedin'}]
# Only Instagram
only_instagram = [c for c in companies_with_sm if set(c.platforms) == {'instagram'}]
# Has all major (FB + LI + IG)
has_all_major = [c for c in companies_with_sm if {'facebook', 'linkedin', 'instagram'}.issubset(set(c.platforms or []))]
# Get all social media entries with company info for detailed view
all_entries = db.query(
CompanySocialMedia,
Company.name.label('company_name'),
Company.slug.label('company_slug')
).join(Company).order_by(
Company.name, CompanySocialMedia.platform
).all()
# Freshness analysis
now = datetime.now()
fresh_30d = db.query(func.count(CompanySocialMedia.id)).filter(
CompanySocialMedia.verified_at >= now - timedelta(days=30)
).scalar()
stale_90d = db.query(func.count(CompanySocialMedia.id)).filter(
CompanySocialMedia.verified_at < now - timedelta(days=90)
).scalar()
return render_template('admin/social_media.html',
platform_stats=platform_stats,
total_companies=total_companies,
companies_with_sm=len(companies_with_sm),
companies_without_sm=companies_without_sm,
platform_combos=platform_combos,
only_facebook=only_facebook,
only_linkedin=only_linkedin,
only_instagram=only_instagram,
has_all_major=has_all_major,
all_entries=all_entries,
fresh_30d=fresh_30d,
stale_90d=stale_90d,
now=now
)
finally:
db.close()
# ============================================================
# SOCIAL MEDIA AUDIT DASHBOARD
# ============================================================
@bp.route('/social-audit')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_social_audit():
"""
Admin dashboard for Social Media audit overview.
Displays:
- Summary stats (coverage per platform, total profiles)
- Platform coverage with progress bars
- Sortable table with platform icons per company
- Followers aggregate statistics
"""
if not is_audit_owner():
from flask import abort
abort(404)
db = SessionLocal()
try:
# Platform definitions
platforms = ['facebook', 'instagram', 'linkedin', 'youtube', 'twitter', 'tiktok']
# Total companies count
total_companies = db.query(func.count(Company.id)).filter(Company.status == 'active').scalar()
# Get all companies with their social media profiles
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
Category.name.label('category_name')
).outerjoin(
Category,
Company.category_id == Category.id
).filter(
Company.status == 'active'
).order_by(Company.name).all()
# Get social media data per company (valid profiles)
social_data = db.query(
CompanySocialMedia.company_id,
CompanySocialMedia.platform,
CompanySocialMedia.url,
CompanySocialMedia.followers_count,
CompanySocialMedia.verified_at,
CompanySocialMedia.is_valid,
CompanySocialMedia.check_status,
CompanySocialMedia.profile_completeness_score,
CompanySocialMedia.engagement_rate,
CompanySocialMedia.last_post_date,
CompanySocialMedia.posts_count_30d,
CompanySocialMedia.posting_frequency_score
).filter(
or_(
CompanySocialMedia.is_valid == True,
CompanySocialMedia.check_status == 'needs_verification'
)
).all()
# Group social media by company + collect needs_verification items
company_social = {}
needs_verification_items = []
for sm in social_data:
if sm.company_id not in company_social:
company_social[sm.company_id] = {}
company_social[sm.company_id][sm.platform] = {
'url': sm.url,
'followers': sm.followers_count or 0,
'verified_at': sm.verified_at,
'needs_verification': sm.check_status == 'needs_verification',
'completeness': sm.profile_completeness_score or 0,
'engagement_rate': sm.engagement_rate or 0,
'last_post_date': sm.last_post_date,
'posts_30d': sm.posts_count_30d or 0,
'posting_freq': sm.posting_frequency_score or 0
}
if sm.check_status == 'needs_verification':
needs_verification_items.append({
'company_id': sm.company_id,
'platform': sm.platform,
'url': sm.url
})
# Build companies list with social media info
companies = []
for row in companies_query:
sm_data = company_social.get(row.id, {})
total_followers = sum(p.get('followers', 0) for p in sm_data.values())
platform_count = len(sm_data)
# Get last verified date across all platforms
verified_dates = [p.get('verified_at') for p in sm_data.values() if p.get('verified_at')]
last_verified = max(verified_dates) if verified_dates else None
# Generate recommendations
recommendations = []
needs_verify_platforms = [p for p, d in sm_data.items() if d.get('needs_verification')]
if not sm_data:
recommendations.append('Brak profili social media')
else:
for nv_platform in needs_verify_platforms:
recommendations.append(f'{nv_platform.capitalize()}: do weryfikacji')
fb = sm_data.get('facebook')
if not fb:
recommendations.append('Brak Facebook')
elif fb.get('url') and 'profile.php?id=' in fb['url']:
recommendations.append('Facebook: zmien adres na nazwe firmy')
if 'instagram' not in sm_data:
recommendations.append('Brak Instagram')
if 'linkedin' not in sm_data:
recommendations.append('Brak LinkedIn')
if 'youtube' not in sm_data:
recommendations.append('Brak YouTube')
# Aggregate engagement & completeness across platforms
completeness_scores = [p.get('completeness', 0) for p in sm_data.values() if p.get('completeness')]
avg_completeness = round(sum(completeness_scores) / len(completeness_scores)) if completeness_scores else 0
engagement_rates = [p.get('engagement_rate', 0) for p in sm_data.values() if p.get('engagement_rate')]
avg_engagement = round(sum(engagement_rates) / len(engagement_rates), 2) if engagement_rates else 0
total_posts_30d = sum(p.get('posts_30d', 0) for p in sm_data.values())
# Last post date across all platforms
post_dates = [p.get('last_post_date') for p in sm_data.values() if p.get('last_post_date')]
last_post = max(post_dates) if post_dates else None
# Compute health score from platform data
platform_dicts = [
{
'last_post_date': p.get('last_post_date'),
'engagement_rate': p.get('engagement_rate', 0),
'profile_completeness_score': p.get('completeness', 0),
'followers_history': [],
}
for p in sm_data.values()
]
health_score = _compute_social_health_score(platform_dicts) if platform_dicts else 0
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'website': row.website,
'category': row.category_name,
'platforms': sm_data,
'platform_count': platform_count,
'total_followers': total_followers,
'last_verified': last_verified,
'has_facebook': 'facebook' in sm_data,
'has_instagram': 'instagram' in sm_data,
'has_linkedin': 'linkedin' in sm_data,
'has_youtube': 'youtube' in sm_data,
'has_twitter': 'twitter' in sm_data,
'has_tiktok': 'tiktok' in sm_data,
'has_needs_verification': len(needs_verify_platforms) > 0,
'recommendations': recommendations,
'avg_completeness': avg_completeness,
'avg_engagement': avg_engagement,
'total_posts_30d': total_posts_30d,
'last_post': last_post,
'health_score': health_score
})
# Platform statistics
platform_stats = {}
for platform in platforms:
count = db.query(func.count(distinct(CompanySocialMedia.company_id))).filter(
CompanySocialMedia.platform == platform,
CompanySocialMedia.is_valid == True
).scalar() or 0
platform_stats[platform] = {
'count': count,
'percent': round(count / total_companies * 100) if total_companies > 0 else 0
}
# Summary stats
companies_with_sm = len([c for c in companies if c['platform_count'] > 0])
companies_without_sm = total_companies - companies_with_sm
total_profiles = sum(c['platform_count'] for c in companies)
total_followers = sum(c['total_followers'] for c in companies)
# Top followers (top 10 companies by total followers)
top_followers = sorted([c for c in companies if c['total_followers'] > 0],
key=lambda x: x['total_followers'], reverse=True)[:10]
# Resolve company names for needs_verification items
company_names = {row.id: row.name for row in companies_query}
for item in needs_verification_items:
item['company_name'] = company_names.get(item['company_id'], 'Nieznana')
# Aggregate engagement & activity stats
all_completeness = [c['avg_completeness'] for c in companies if c['avg_completeness'] > 0]
avg_completeness_global = round(sum(all_completeness) / len(all_completeness)) if all_completeness else 0
all_engagement = [c['avg_engagement'] for c in companies if c['avg_engagement'] > 0]
avg_engagement_global = round(sum(all_engagement) / len(all_engagement), 2) if all_engagement else 0
now = datetime.now()
inactive_30d = len([c for c in companies if c['platform_count'] > 0 and (
not c['last_post'] or (now - c['last_post']).days > 30
)])
stats = {
'total_companies': total_companies,
'companies_with_sm': companies_with_sm,
'companies_without_sm': companies_without_sm,
'total_profiles': total_profiles,
'total_followers': total_followers,
'needs_verification_count': len(needs_verification_items),
'platform_stats': platform_stats,
'avg_completeness': avg_completeness_global,
'avg_engagement': avg_engagement_global,
'inactive_30d': inactive_30d
}
# Get unique categories
categories = sorted(set(c['category'] for c in companies if c['category']))
# Convert to objects for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
top_followers_objects = [CompanyRow(c) for c in top_followers]
return render_template('admin/social_audit_dashboard.html',
companies=companies_objects,
stats=stats,
needs_verification=needs_verification_items,
categories=categories,
platforms=platforms,
top_followers=top_followers_objects,
now=datetime.now()
)
finally:
db.close()
# ============================================================
# SOCIAL MEDIA AUDIT DETAIL (per company)
# ============================================================
@bp.route('/social-audit/<int:company_id>')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_social_audit_detail(company_id):
"""Detailed social media audit view for a single company."""
if not is_audit_owner():
from flask import abort
abort(404)
db = SessionLocal()
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
flash('Firma nie istnieje.', 'error')
return redirect(url_for('admin.admin_social_audit'))
# Get all social media profiles for this company
profiles = db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company_id
).order_by(CompanySocialMedia.platform).all()
# Separate valid and invalid
valid_profiles = [p for p in profiles if p.is_valid]
invalid_profiles = [p for p in profiles if not p.is_valid]
# Check OAuth status for this company
oauth_tokens = db.query(OAuthToken).filter(
OAuthToken.company_id == company_id,
OAuthToken.is_active == True
).all()
oauth_status = {}
for token in oauth_tokens:
key = token.service # 'facebook', 'instagram'
oauth_status[key] = {
'connected': True,
'provider': token.provider,
'account_name': token.account_name,
'expires_at': token.expires_at,
'expired': token.expires_at < datetime.now() if token.expires_at else False,
}
# Check SocialMediaConfig (for Facebook page selection)
sm_configs = db.query(SocialMediaConfig).filter(
SocialMediaConfig.company_id == company_id,
SocialMediaConfig.is_active == True
).all()
for cfg in sm_configs:
platform_key = cfg.platform
if platform_key in oauth_status:
oauth_status[platform_key]['page_configured'] = bool(cfg.page_id)
oauth_status[platform_key]['page_name'] = cfg.page_name
oauth_status[platform_key]['last_sync'] = cfg.updated_at
else:
oauth_status[platform_key] = {
'connected': False,
'page_configured': bool(cfg.page_id),
'page_name': cfg.page_name,
}
# Build per-platform detail
platform_details = []
for p in valid_profiles:
source_info = SOURCE_LABELS.get(p.source, SOURCE_LABELS[None])
oauth_for_platform = oauth_status.get(p.platform, {})
oauth_available = p.platform in OAUTH_PLATFORMS
# Determine which fields come from API vs scraping
fields_from_api = []
fields_from_scraping = []
fields_empty = []
if p.source == 'facebook_api':
if p.followers_count: fields_from_api.append('followers_count')
if p.engagement_rate: fields_from_api.append('engagement_rate')
if p.profile_completeness_score: fields_from_api.append('profile_completeness_score')
if p.has_bio is not None: fields_from_api.append('has_bio')
if p.page_name: fields_from_api.append('page_name')
# These are typically from scraping even with API source
if p.posts_count_30d: fields_from_scraping.append('posts_count_30d')
if p.last_post_date: fields_from_scraping.append('last_post_date')
if p.has_profile_photo is not None: fields_from_scraping.append('has_profile_photo')
elif p.source in ('website_scrape', 'brave_search'):
for field_name, field_val in [
('followers_count', p.followers_count),
('engagement_rate', p.engagement_rate),
('posts_count_30d', p.posts_count_30d),
('last_post_date', p.last_post_date),
('profile_completeness_score', p.profile_completeness_score),
]:
if field_val:
fields_from_scraping.append(field_name)
else:
fields_empty.append(field_name)
detail = {
'platform': p.platform,
'url': p.url,
'page_name': p.page_name,
'followers_count': p.followers_count or 0,
'has_profile_photo': p.has_profile_photo,
'has_cover_photo': p.has_cover_photo,
'has_bio': p.has_bio,
'profile_description': p.profile_description,
'posts_count_30d': p.posts_count_30d or 0,
'posts_count_365d': p.posts_count_365d or 0,
'last_post_date': p.last_post_date,
'posting_frequency_score': p.posting_frequency_score or 0,
'engagement_rate': float(p.engagement_rate) if p.engagement_rate else 0,
'content_types': p.content_types or {},
'profile_completeness_score': p.profile_completeness_score or 0,
'followers_history': p.followers_history or [],
'verified_at': p.verified_at,
'source': p.source,
'check_status': p.check_status,
'is_valid': p.is_valid,
'last_checked_at': p.last_checked_at,
# Data provenance
'source_label': source_info['label'],
'source_color': source_info['color'],
'source_icon': source_info['icon'],
'source_description': source_info['description'],
'source_priority': SOURCE_PRIORITY.get(p.source, 0),
'fields_from_api': fields_from_api,
'fields_from_scraping': fields_from_scraping,
'fields_empty': fields_empty,
# OAuth info
'oauth_available': oauth_available,
'oauth_connected': oauth_for_platform.get('connected', False),
'oauth_expired': oauth_for_platform.get('expired', False),
'oauth_page_configured': oauth_for_platform.get('page_configured', False),
'oauth_account_name': oauth_for_platform.get('account_name'),
'oauth_last_sync': oauth_for_platform.get('last_sync'),
}
# Computed Tier 1 metrics
growth_rate, growth_trend = _compute_followers_growth(p.followers_history or [])
detail['followers_growth_rate'] = growth_rate
detail['followers_growth_trend'] = growth_trend
activity_status, activity_label, activity_color = _compute_activity_status(p.last_post_date)
detail['activity_status'] = activity_status
detail['activity_label'] = activity_label
detail['activity_color'] = activity_color
platform_details.append(detail)
# Company-level computed scores
health_score = _compute_social_health_score(platform_details)
# Cross-platform score
num_platforms = len(platform_details)
cross_platform_map = {0: 0, 1: 20, 2: 45, 3: 70, 4: 85, 5: 95}
cross_platform_score = cross_platform_map.get(num_platforms, 100 if num_platforms >= 6 else 0)
# Overall activity status (best of all platforms)
best_activity = 'unknown'
best_activity_label = 'Brak danych'
best_activity_color = '#9ca3af'
activity_priority = ['active', 'moderate', 'slow', 'dormant', 'abandoned', 'unknown']
for p in platform_details:
if activity_priority.index(p['activity_status']) < activity_priority.index(best_activity):
best_activity = p['activity_status']
best_activity_label = p['activity_label']
best_activity_color = p['activity_color']
company_scores = {
'health_score': health_score,
'cross_platform_score': cross_platform_score,
'activity_status': best_activity,
'activity_label': best_activity_label,
'activity_color': best_activity_color,
}
# Recommendations
recommendations = []
platform_names = {p['platform'] for p in platform_details}
if not platform_details:
recommendations.append({'severity': 'critical', 'text': 'Firma nie ma żadnych profili social media.'})
else:
if 'facebook' not in platform_names:
recommendations.append({'severity': 'warning', 'text': 'Brak profilu Facebook — najpopularniejsza platforma wśród firm Izby.'})
if 'instagram' not in platform_names:
recommendations.append({'severity': 'info', 'text': 'Brak profilu Instagram.'})
if 'linkedin' not in platform_names:
recommendations.append({'severity': 'warning', 'text': 'Brak profilu LinkedIn — kluczowy dla kontaktów B2B.'})
for p in platform_details:
if p['profile_completeness_score'] > 0 and p['profile_completeness_score'] < 50:
recommendations.append({
'severity': 'warning',
'text': f'{p["platform"].capitalize()}: niska kompletność profilu ({p["profile_completeness_score"]}%). Uzupełnij zdjęcie, opis i dane kontaktowe.'
})
if p['url'] and 'profile.php?id=' in p['url']:
recommendations.append({
'severity': 'warning',
'text': 'Facebook: adres profilu zawiera profile.php — warto zmienić na niestandardowy URL z nazwą firmy.'
})
if p['last_post_date']:
days_since = (datetime.now() - p['last_post_date']).days
if days_since > 90:
recommendations.append({
'severity': 'warning',
'text': f'{p["platform"].capitalize()}: ostatni post {days_since} dni temu. Konto może wyglądać na porzucone.'
})
return render_template('admin/social_audit_detail.html',
company=company,
platform_details=platform_details,
invalid_profiles=invalid_profiles,
recommendations=recommendations,
company_scores=company_scores,
oauth_status=oauth_status,
oauth_platforms=OAUTH_PLATFORMS,
now=datetime.now()
)
finally:
db.close()
# ============================================================
# SOCIAL MEDIA ENRICHMENT (collect → review → approve/reject)
# ============================================================
# Field labels for display
_FIELD_LABELS = {
'page_name': 'Nazwa strony',
'followers_count': 'Obserwujący',
'has_profile_photo': 'Zdjęcie profilowe',
'has_cover_photo': 'Zdjęcie w tle',
'has_bio': 'Bio/opis',
# profile_description excluded — contains dynamic follower count from og:description,
# changes on every scrape (e.g. "119 followers" → "123 obserwujących"). Still saved to DB.
'posts_count_30d': 'Postów (30 dni)',
'posts_count_365d': 'Postów (rok)',
'last_post_date': 'Ostatni post',
'engagement_rate': 'Engagement rate',
'posting_frequency_score': 'Regularność',
'profile_completeness_score': 'Kompletność profilu',
}
# File-based shared state for enrichment jobs (works with multi-worker gunicorn)
import json
import tempfile
import os
import fcntl
_ENRICHMENT_STATE_FILE = os.path.join(tempfile.gettempdir(), 'nordabiz_enrichment_state.json')
_ENRICHMENT_DEFAULT = {
'running': False,
'progress': 0,
'total': 0,
'completed': 0,
'errors': 0,
'last_run': None,
'results': [],
'pending_changes': [],
'approved': False,
}
def _read_enrichment_state():
"""Read enrichment state from shared file."""
try:
with open(_ENRICHMENT_STATE_FILE, 'r') as f:
fcntl.flock(f, fcntl.LOCK_SH)
data = json.load(f)
fcntl.flock(f, fcntl.LOCK_UN)
return data
except (FileNotFoundError, json.JSONDecodeError, IOError):
return dict(_ENRICHMENT_DEFAULT)
def _write_enrichment_state(state):
"""Write enrichment state to shared file (atomic)."""
try:
tmp_path = _ENRICHMENT_STATE_FILE + '.tmp'
with open(tmp_path, 'w') as f:
fcntl.flock(f, fcntl.LOCK_EX)
json.dump(state, f, default=str)
fcntl.flock(f, fcntl.LOCK_UN)
os.replace(tmp_path, _ENRICHMENT_STATE_FILE)
except IOError as e:
logger.error(f"Failed to write enrichment state: {e}")
def _update_enrichment_state(**kwargs):
"""Read-modify-write enrichment state."""
state = _read_enrichment_state()
state.update(kwargs)
_write_enrichment_state(state)
return state
def _format_value(key, val):
"""Format a field value for display."""
if val is None:
return ''
if isinstance(val, bool):
return 'Tak' if val else 'Nie'
if key == 'last_post_date' and hasattr(val, 'strftime'):
return val.strftime('%d.%m.%Y')
if key == 'engagement_rate':
return f'{val}%'
if key == 'posting_frequency_score':
return f'{val}/10'
if key == 'profile_completeness_score':
return f'{val}%'
if key == 'followers_count' and isinstance(val, (int, float)):
return f'{int(val):,}'.replace(',', ' ')
if key == 'profile_description' and isinstance(val, str) and len(val) > 80:
return val[:80] + '...'
return str(val)
def _run_enrichment_background(company_ids, platforms_filter=None):
"""Collect enrichment data into staging area (NO database writes).
All state is persisted to shared file for multi-worker gunicorn compatibility.
platforms_filter: optional list of platform names to scan (e.g. ['facebook', 'linkedin']).
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / 'scripts'))
try:
from social_media_audit import SocialProfileEnricher
except ImportError as e:
logger.error(f"Could not import SocialProfileEnricher: {e}")
_update_enrichment_state(running=False, errors=1)
return
enricher = SocialProfileEnricher()
db = SessionLocal()
total = len(company_ids)
completed = 0
errors = 0
results = []
pending_changes = []
_update_enrichment_state(total=total, completed=0, errors=0,
results=[], pending_changes=[], approved=False)
tracked_fields = list(_FIELD_LABELS.keys())
try:
for company_id in company_ids:
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
completed += 1
continue
profiles_query = db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company_id,
CompanySocialMedia.is_valid == True
)
if platforms_filter:
profiles_query = profiles_query.filter(CompanySocialMedia.platform.in_(platforms_filter))
profiles = profiles_query.all()
company_result = {
'company_id': company_id,
'company_name': company.name,
'profiles': [],
'has_changes': False,
}
# Check if company has Facebook OAuth config — sync via Graph API
fb_config = None
if not platforms_filter or 'facebook' in platforms_filter:
fb_config = db.query(SocialMediaConfig).filter(
SocialMediaConfig.company_id == company_id,
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.page_id.isnot(None)
).first()
fb_synced = False
if fb_config:
try:
from facebook_graph_service import sync_facebook_to_social_media
sync_result = sync_facebook_to_social_media(db, company_id)
if sync_result.get('success'):
data = sync_result.get('data', {})
fb_synced = True
# Add a virtual profile result for the API sync
company_result['profiles'].append({
'profile_id': None,
'platform': 'facebook',
'url': f"Facebook Page: {fb_config.page_name or fb_config.page_id}",
'source': 'facebook_api',
'status': 'synced_api',
'reason': f"Graph API: {data.get('followers_count') or 0} obserwujących, engagement: {data.get('engagement_rate') or 0:.1f}%",
})
company_result['has_changes'] = True
else:
company_result['profiles'].append({
'profile_id': None,
'platform': 'facebook',
'url': f"Facebook Page: {fb_config.page_name or fb_config.page_id}",
'source': 'facebook_api',
'status': 'error',
'reason': f"Graph API: {sync_result.get('message', sync_result.get('error', 'nieznany błąd'))}",
})
except Exception as e:
logger.warning(f"Facebook API sync failed for {company.name}: {e}")
company_result['profiles'].append({
'profile_id': None,
'platform': 'facebook',
'url': f"Facebook Page: {fb_config.page_name or fb_config.page_id}",
'source': 'facebook_api',
'status': 'error',
'reason': f"Graph API: {str(e)[:100]}",
})
for profile in profiles:
profile_result = {
'profile_id': profile.id,
'platform': profile.platform,
'url': profile.url,
'source': profile.source,
}
# Skip scraping Facebook if we already synced via API
if fb_synced and profile.platform.lower() == 'facebook':
profile_result['status'] = 'no_changes'
profile_result['reason'] = 'Zsynchronizowano przez Graph API'
company_result['profiles'].append(profile_result)
continue
try:
enriched = enricher.enrich_profile(profile.platform, profile.url)
if enriched:
changes = []
for field in tracked_fields:
new_val = enriched.get(field)
if new_val is None:
continue
old_val = getattr(profile, field, None)
if old_val != new_val:
changes.append({
'field': field,
'label': _FIELD_LABELS.get(field, field),
'old': _format_value(field, old_val),
'new': _format_value(field, new_val),
'old_raw': str(old_val) if old_val is not None else None,
'new_raw': str(new_val) if new_val is not None else None,
})
profile_result['status'] = 'changes' if changes else 'no_changes'
profile_result['changes'] = changes
profile_result['enriched_data'] = {
k: (str(v) if hasattr(v, 'strftime') else v)
for k, v in enriched.items()
if (k in tracked_fields or k.startswith('_')) and v is not None
}
if changes:
company_result['has_changes'] = True
pending_changes.append({
'profile_id': profile.id,
'company_id': company_id,
'company_name': company.name,
'platform': profile.platform,
'enriched_data': profile_result['enriched_data'],
'changes': changes,
})
else:
profile_result['status'] = 'no_data'
platform_name = profile.platform.lower()
if platform_name == 'facebook':
if 'profile.php' in (profile.url or ''):
profile_result['reason'] = 'Profil osobisty — niedostępny publicznie. Podłącz Graph API dla stron firmowych.'
else:
profile_result['reason'] = 'Facebook blokuje dostęp publiczny. Podłącz Graph API (OAuth), aby pobierać dane.'
elif platform_name == 'instagram':
profile_result['reason'] = 'Instagram wymaga logowania. Podłącz Meta API (OAuth), aby pobierać dane.'
elif platform_name == 'linkedin':
profile_result['reason'] = 'LinkedIn blokuje boty (3 próby z opóźnieniem). Wyniki mogą się różnić między skanami.'
else:
profile_result['reason'] = f'{profile.platform} — brak danych publicznych do pobrania.'
except Exception as e:
logger.warning(f"Enrichment failed for {company.name}/{profile.platform}: {e}")
profile_result['status'] = 'error'
profile_result['reason'] = str(e)[:150]
errors += 1
company_result['profiles'].append(profile_result)
results.append(company_result)
except Exception as e:
logger.error(f"Enrichment error for company {company_id}: {e}")
errors += 1
completed += 1
progress = round(completed / total * 100) if total > 0 else 0
# Write state to file after each company (visible to all workers)
_write_enrichment_state({
'running': True,
'total': total,
'completed': completed,
'progress': progress,
'errors': errors,
'results': results,
'pending_changes': pending_changes,
'approved': False,
'last_run': None,
})
except Exception as e:
logger.error(f"Enrichment background thread crashed: {e}")
errors += 1
finally:
db.close()
_write_enrichment_state({
'running': False,
'total': total,
'completed': completed,
'progress': 100,
'errors': errors,
'results': results,
'pending_changes': pending_changes,
'approved': False,
'last_run': datetime.now().strftime('%d.%m.%Y %H:%M'),
})
logger.info(f"Enrichment scan completed: {completed}/{total}, "
f"{len(pending_changes)} pending changes, {errors} errors")
@bp.route('/social-audit/run-enrichment', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_social_audit_run_enrichment():
"""Start social media profile enrichment scan (collect only, no DB writes)."""
if not is_audit_owner():
return jsonify({'error': 'Brak uprawnień'}), 403
state = _read_enrichment_state()
if state.get('running'):
return jsonify({
'error': 'Audyt już działa',
'progress': state.get('progress', 0),
'completed': state.get('completed', 0),
'total': state.get('total', 0),
}), 409
company_ids_param = request.form.get('company_ids', '')
platforms_param = request.form.get('platforms', '')
platforms_filter = [p.strip().lower() for p in platforms_param.split(',') if p.strip()] if platforms_param else None
if company_ids_param:
company_ids = [int(x) for x in company_ids_param.split(',') if x.strip().isdigit()]
else:
db = SessionLocal()
try:
query = db.query(distinct(CompanySocialMedia.company_id)).filter(CompanySocialMedia.is_valid == True)
if platforms_filter:
query = query.filter(CompanySocialMedia.platform.in_(platforms_filter))
company_ids = [row[0] for row in query.all()]
finally:
db.close()
if not company_ids:
return jsonify({'error': 'Brak firm do audytu'}), 400
# Initialize state in shared file before starting thread
_write_enrichment_state({
'running': True,
'progress': 0,
'total': len(company_ids),
'completed': 0,
'errors': 0,
'results': [],
'pending_changes': [],
'approved': False,
'last_run': None,
'platforms_filter': platforms_filter,
})
thread = threading.Thread(
target=_run_enrichment_background,
args=(company_ids, platforms_filter),
daemon=True
)
thread.start()
return jsonify({
'status': 'started',
'total': len(company_ids),
'message': f'Rozpoczęto skanowanie {len(company_ids)} firm. Dane NIE zostaną zapisane bez Twojej zgody.',
})
@bp.route('/social-audit/enrichment-status')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_social_audit_enrichment_status():
"""Get current enrichment job status with live results feed."""
state = _read_enrichment_state()
pending = state.get('pending_changes', [])
results = state.get('results', [])
# Return last N results for live feed (since_index param for incremental updates)
since = request.args.get('since', 0, type=int)
new_results = results[since:]
# Build compact live feed entries
feed = []
for r in new_results:
profiles_summary = []
for p in r.get('profiles', []):
status = p.get('status', 'unknown')
icon = {'changes': '+', 'no_changes': '=', 'skipped': '~', 'error': '!', 'no_data': '-', 'synced_api': ''}.get(status, '?')
platform = p.get('platform', '?')
change_count = len(p.get('changes', []))
desc = ''
if status == 'changes':
desc = f'{change_count} zmian'
elif status == 'skipped':
desc = 'API'
elif status == 'error':
desc = p.get('reason', 'błąd')[:40]
elif status == 'no_data':
desc = 'brak dostępu'
elif status == 'synced_api':
desc = p.get('reason', 'zsynchronizowano')[:40]
elif status == 'no_changes':
desc = 'aktualne'
profiles_summary.append({
'platform': platform,
'icon': icon,
'status': status,
'desc': desc,
})
feed.append({
'company_name': r.get('company_name', '?'),
'company_id': r.get('company_id'),
'has_changes': r.get('has_changes', False),
'profiles': profiles_summary,
})
last_run = state.get('last_run')
api_synced = sum(1 for r in results for p in r.get('profiles', []) if p.get('status') == 'synced_api')
return jsonify({
'running': state.get('running', False),
'progress': state.get('progress', 0),
'completed': state.get('completed', 0),
'total': state.get('total', 0),
'errors': state.get('errors', 0),
'last_run': last_run,
'pending_count': len(pending),
'api_synced_count': api_synced,
'approved': state.get('approved', False),
'feed': feed,
'results_count': len(results),
})
@bp.route('/social-audit/enrichment-review')
@login_required
@role_required(SystemRole.ADMIN)
def admin_social_audit_enrichment_review():
"""Review page showing all collected changes before applying."""
if not is_audit_owner():
from flask import abort
abort(404)
state = _read_enrichment_state()
if state.get('running'):
flash('Audyt wciąż trwa. Poczekaj na zakończenie.', 'warning')
return redirect(url_for('admin.admin_social_audit'))
results = state.get('results', [])
pending = state.get('pending_changes', [])
# Summary stats
total_profiles_scanned = sum(len(r.get('profiles', [])) for r in results)
profiles_with_changes = len(pending)
profiles_skipped = sum(1 for r in results for p in r.get('profiles', []) if p.get('status') in ('skipped', 'synced_api'))
profiles_no_data = sum(1 for r in results for p in r.get('profiles', []) if p.get('status') in ('no_data', 'no_changes'))
profiles_errors = sum(1 for r in results for p in r.get('profiles', []) if p.get('status') == 'error')
companies_with_changes = len(set(c['company_id'] for c in pending))
summary = {
'total_companies': state.get('total', 0),
'total_profiles_scanned': total_profiles_scanned,
'profiles_with_changes': profiles_with_changes,
'profiles_skipped': profiles_skipped,
'profiles_no_data': profiles_no_data,
'profiles_errors': profiles_errors,
'companies_with_changes': companies_with_changes,
'last_run': state.get('last_run'),
}
# Only show companies that have changes or errors
results_to_show = [r for r in results if r.get('has_changes') or
any(p.get('status') == 'error' for p in r.get('profiles', []))]
return render_template('admin/social_audit_enrichment_review.html',
results=results_to_show,
all_results=results,
summary=summary,
approved=state.get('approved', False),
)
@bp.route('/social-audit/enrichment-approve', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_social_audit_enrichment_approve():
"""Apply all pending enrichment changes to database."""
if not is_audit_owner():
return jsonify({'error': 'Brak uprawnień'}), 403
state = _read_enrichment_state()
if state.get('running'):
return jsonify({'error': 'Audyt wciąż trwa'}), 409
pending = state.get('pending_changes', [])
if not pending:
return jsonify({'error': 'Brak oczekujących zmian do zatwierdzenia'}), 400
if state.get('approved'):
return jsonify({'error': 'Zmiany zostały już zatwierdzone'}), 409
db = SessionLocal()
applied = 0
errors = 0
try:
for change in pending:
try:
profile = db.query(CompanySocialMedia).filter_by(id=change['profile_id']).first()
if not profile:
errors += 1
continue
enriched = change['enriched_data']
for field, value in enriched.items():
if field.startswith('_') and field.endswith('_extra'):
# Store platform-specific extra data in content_types JSONB
ct = dict(profile.content_types or {})
ct.update(value if isinstance(value, dict) else {})
profile.content_types = ct
continue
if field == 'last_post_date' and isinstance(value, str):
try:
from dateutil.parser import parse as parse_date
value = parse_date(value)
except (ImportError, ValueError):
continue
if field == 'engagement_rate' and isinstance(value, str):
value = float(value)
setattr(profile, field, value)
profile.last_checked_at = datetime.now()
applied += 1
except Exception as e:
logger.error(f"Failed to apply enrichment for profile {change.get('profile_id')}: {e}")
errors += 1
db.commit()
_update_enrichment_state(approved=True)
logger.info(f"Enrichment approved: {applied} profiles updated, {errors} errors")
flash(f'Zatwierdzone: zaktualizowano {applied} profili.', 'success')
return jsonify({
'status': 'approved',
'applied': applied,
'errors': errors,
'message': f'Zaktualizowano {applied} profili.',
})
except Exception as e:
db.rollback()
logger.error(f"Enrichment approval failed: {e}")
return jsonify({'error': f'Błąd zapisu: {str(e)[:200]}'}), 500
finally:
db.close()
@bp.route('/social-audit/enrichment-discard', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_social_audit_enrichment_discard():
"""Discard all pending enrichment changes."""
if not is_audit_owner():
return jsonify({'error': 'Brak uprawnień'}), 403
state = _read_enrichment_state()
count = len(state.get('pending_changes', []))
_update_enrichment_state(pending_changes=[], results=[], approved=False)
flash(f'Odrzucono {count} oczekujących zmian. Baza danych nie została zmieniona.', 'info')
return jsonify({
'status': 'discarded',
'count': count,
})