nordabiz/services/social_publisher_service.py
Maciej Pienczyn 63ee509e1e
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: persist Facebook posts in DB for instant page load
- Migration 071: Add cached_posts (JSONB) and posts_cached_at to social_media_config
- Service: get_cached_posts() and save_all_posts_to_cache() methods
- Route: New POST endpoint to save posts cache, pass cached data to template
- Template: Render cached posts+charts instantly on page load from DB,
  save to DB after "Load all" or "Refresh", remove AJAX auto-load

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 08:35:25 +01:00

1096 lines
42 KiB
Python

"""
Social Media Publisher Service
==============================
Business logic for creating, editing, scheduling and publishing social media posts.
Supports per-company Facebook configuration via OAuth tokens.
"""
import logging
import os
import re
import time
from datetime import datetime
from typing import Optional, Dict, List, Tuple
from database import SessionLocal, SocialPost, SocialMediaConfig, Company, NordaEvent, OAuthToken, CompanySocialMedia
logger = logging.getLogger(__name__)
# Cache for Facebook page posts (in-memory, per-process)
_posts_cache = {} # (company_id, page_id) -> {'data': [...], 'ts': float}
_CACHE_TTL = 300 # 5 minutes
# Post types supported
POST_TYPES = {
'member_spotlight': 'Poznaj Członka NORDA',
'event_invitation': 'Zaproszenie na Wydarzenie',
'event_recap': 'Relacja z Wydarzenia',
'regional_news': 'Aktualności Regionalne',
'chamber_news': 'Aktualności Izby',
}
# AI Prompt templates for NORDA chamber posts (Polish)
AI_PROMPTS = {
'member_spotlight': """Napisz post na Facebook dla Izby Gospodarczej NORDA Biznes, przedstawiający firmę członkowską:
Firma: {company_name}
Branża: {category}
Miasto: {city}
Opis: {description}
Strona WWW: {website}
Media społecznościowe: {social_media_links}
Post powinien:
- Zaczynać się od ciepłego nagłówka typu "Poznajcie naszego członka!" lub podobnego
- Opisać czym zajmuje się firma (2-3 zdania)
- Podkreślić wartość tej firmy dla społeczności biznesowej
- Zachęcić do odwiedzenia strony firmy
- Jeśli firma ma media społecznościowe, dodaj linki na końcu posta (np. "Znajdziesz ich na: 👉 Facebook: ... 📸 Instagram: ...")
- Jeśli firma nie ma mediów społecznościowych, pomiń tę sekcję
- Być napisany ciepło, z dumą i wspierająco
- Mieć 100-200 słów
- Być po polsku
- NIE dodawaj hashtagów — zostaną wygenerowane osobno
Odpowiedz WYŁĄCZNIE tekstem postu, BEZ hashtagów.""",
'event_invitation': """Napisz post na Facebook zapraszający na wydarzenie Izby Gospodarczej NORDA Biznes:
Wydarzenie: {event_title}
Data: {event_date}
Miejsce: {event_location}
Opis: {event_description}
Post powinien:
- Być entuzjastyczny i zachęcający do udziału
- Zawierać najważniejsze informacje (co, kiedy, gdzie)
- Wspomnieć korzyści z udziału
- Zawierać CTA (zachęta do rejestracji/kontaktu)
- Mieć 100-150 słów
- Być po polsku
- NIE dodawaj hashtagów — zostaną wygenerowane osobno
Odpowiedz WYŁĄCZNIE tekstem postu, BEZ hashtagów.""",
'event_recap': """Napisz post na Facebook będący relacją z wydarzenia Izby Gospodarczej NORDA Biznes:
Wydarzenie: {event_title}
Data: {event_date}
Miejsce: {event_location}
Tematy: {event_topics}
Liczba uczestników: {attendees_count}
Post powinien:
- Dziękować uczestnikom za obecność
- Podsumować najważniejsze tematy/wnioski
- Wspomnieć atmosferę i wartość spotkania
- Zachęcić do udziału w kolejnych wydarzeniach
- Mieć 100-200 słów
- Być po polsku, relacyjny i dziękujący
- NIE dodawaj hashtagów — zostaną wygenerowane osobno
Odpowiedz WYŁĄCZNIE tekstem postu, BEZ hashtagów.""",
'regional_news': """Napisz post na Facebook dla Izby Gospodarczej NORDA Biznes komentujący aktualność regionalną:
Temat: {topic}
Źródło: {source}
Fakty: {facts}
Post powinien:
- Informować o temacie w kontekście biznesowym regionu
- Być informacyjny i ekspercki
- Dodać perspektywę Izby (jak to wpływa na lokalny biznes)
- Mieć 100-200 słów
- Być po polsku
- NIE dodawaj hashtagów — zostaną wygenerowane osobno
Odpowiedz WYŁĄCZNIE tekstem postu, BEZ hashtagów.""",
'chamber_news': """Napisz post na Facebook z aktualnościami Izby Gospodarczej NORDA Biznes:
Temat: {topic}
Szczegóły: {details}
Post powinien:
- Być oficjalny ale przystępny
- Przekazać najważniejsze informacje
- Zachęcić do interakcji (pytania, komentarze)
- Mieć 80-150 słów
- Być po polsku
- NIE dodawaj hashtagów — zostaną wygenerowane osobno
Odpowiedz WYŁĄCZNIE tekstem postu, BEZ hashtagów.""",
}
AI_MODELS = {
'3-flash': {
'label': 'Gemini 3 Flash',
'description': 'Szybki, tani — dobry do większości postów',
},
'3-pro': {
'label': 'Gemini 3 Pro',
'description': 'Najwyższa jakość — lepszy styl i kreatywność (4x droższy)',
},
'flash': {
'label': 'Gemini 2.5 Flash',
'description': 'Poprzednia generacja — stabilny, sprawdzony',
},
}
DEFAULT_AI_MODEL = '3-flash'
POST_TONES = {
'storytelling': {
'label': 'Opowieść',
'instruction': (
'Opowiedz historię — zacznij od wciągającego otwarcia, prowadź narrację z bohaterem (firmą/osobą/wydarzeniem). '
'Buduj emocje, pokaż kulisy i ludzką stronę. Pisz tak, jakbyś opowiadał znajomemu przy kawie ciekawą historię. '
'Używaj krótkich zdań, pauz (wielokropek, myślnik) i konkretu. Zakończ puentą lub refleksją.'
),
},
'proud_community': {
'label': 'Duma i wspólnota',
'instruction': (
'Pisz z autentyczną dumą ze społeczności biznesowej Wejherowa i Kaszub. '
'Podkreślaj siłę lokalnej współpracy, wzajemnego wsparcia i wspólnych wartości. '
'Ton ciepły, serdeczny, pełen szacunku. Używaj "nasz", "razem", "wspólnie". '
'Celebruj sukcesy członków jak własne. Pisz z sercem, nie korporacyjnym żargonem.'
),
},
'conversational': {
'label': 'Rozmowa',
'instruction': (
'Pisz lekko i naturalnie, jakby to był wpis osoby, nie instytucji. '
'Używaj bezpośredniego zwrotu: "Wiecie co?", "Zobaczcie!", "A Wy jak myślicie?". '
'Krótkie zdania, luźny styl, emotikony OK ale z umiarem. '
'Zachęcaj do interakcji — zadawaj pytanie na końcu. Zero sztywności.'
),
},
'professional': {
'label': 'Profesjonalny',
'instruction': (
'Pisz w tonie profesjonalnym i rzeczowym, ale przystępnym. '
'Unikaj suchego, korporacyjnego języka — bądź konkretny, kompetentny i wiarygodny. '
'Dobry dla komunikatów biznesowych, partnerstw i branżowych treści.'
),
},
'celebratory': {
'label': 'Świętowanie',
'instruction': (
'Pisz radośnie i celebracyjnie! To jest moment na gratulacje, brawa i wyrazy uznania. '
'Używaj entuzjastycznego języka: "Gratulujemy!", "Brawo!", "To ogromny sukces!". '
'Podkreślaj osiągnięcia i wkład — niech bohater poczuje się wyjątkowo. '
'Emotikony mile widziane (🎉👏🏆). Zachęcaj społeczność do gratulowania w komentarzach.'
),
},
'behind_scenes': {
'label': 'Za kulisami',
'instruction': (
'Pokaż to, czego ludzie normalnie nie widzą — kulisy, przygotowania, backstage. '
'Pisz autentycznie i nieformalnie, jakby to była relacja na żywo. '
'"Właśnie przygotowujemy...", "Zerknijcie, co się u nas dzieje...". '
'Buduj ciekawość i bliskość. Ludzki, surowy, prawdziwy ton.'
),
},
'inspiring': {
'label': 'Inspirujący',
'instruction': (
'Pisz motywująco — pokaż, co jest możliwe dzięki odwadze, pracy i współpracy. '
'Używaj storytellingu: od wyzwania do sukcesu. Podkreślaj wartości: determinację, '
'innowację, odwagę. Zakończ myślą, która zostaje w głowie. '
'Ton podnoszący na duchu, ale bez pustych frazesów — opieraj się na konkrecie.'
),
},
'informative': {
'label': 'Informacyjny',
'instruction': (
'Pisz rzeczowo — fakty, dane, konkrety. Struktura: co, kiedy, gdzie, dlaczego to ważne. '
'Unikaj emocjonalnego języka, skup się na wartości informacji. '
'Dobry dla aktualności, zmian w przepisach, raportów. '
'Na końcu dodaj kontekst: co to oznacza dla lokalnego biznesu.'
),
},
'formal': {
'label': 'Oficjalny',
'instruction': (
'Pisz formalnie, jak oficjalny komunikat instytucji. Zachowaj powagę i dystans. '
'Pełne zdania, bez skrótów, bez emotikonów. '
'Dobry dla oficjalnych stanowisk, komunikatów zarządu, współpracy z urzędami.'
),
},
'humorous': {
'label': 'Z przymrużeniem oka',
'instruction': (
'Pisz lekko, z humorem i dystansem do siebie. Używaj żartobliwych porównań, '
'lekkich przerysowań i ciepłego humoru (nie sarkazmu!). '
'Sprawdza się przy luźniejszych tematach, piątkowych postach, networkingowych anegdotach. '
'Cel: uśmiech czytelnika i chęć polubienia/skomentowania.'
),
},
}
DEFAULT_TONE = 'storytelling'
class SocialPublisherService:
"""Service for managing social media posts with per-company FB configuration."""
# ---- CRUD Operations ----
def get_posts(self, status: str = None, post_type: str = None,
publishing_company_id: int = None,
limit: int = 50, offset: int = 0) -> List[SocialPost]:
"""Get posts with optional filters."""
db = SessionLocal()
try:
query = db.query(SocialPost).order_by(SocialPost.created_at.desc())
if status:
query = query.filter(SocialPost.status == status)
if post_type:
query = query.filter(SocialPost.post_type == post_type)
if publishing_company_id:
query = query.filter(SocialPost.publishing_company_id == publishing_company_id)
return query.offset(offset).limit(limit).all()
finally:
db.close()
def get_post(self, post_id: int) -> Optional[SocialPost]:
"""Get single post by ID."""
db = SessionLocal()
try:
return db.query(SocialPost).filter(SocialPost.id == post_id).first()
finally:
db.close()
def create_post(self, post_type: str, content: str, user_id: int,
platform: str = 'facebook', hashtags: str = None,
company_id: int = None, event_id: int = None,
publishing_company_id: int = None,
image_path: str = None, ai_model: str = None,
ai_prompt_template: str = None) -> SocialPost:
"""Create a new post draft."""
db = SessionLocal()
try:
post = SocialPost(
post_type=post_type,
platform=platform,
content=content,
hashtags=hashtags,
image_path=image_path,
company_id=company_id,
event_id=event_id,
publishing_company_id=publishing_company_id,
status='draft',
ai_model=ai_model,
ai_prompt_template=ai_prompt_template,
created_by=user_id,
)
db.add(post)
db.commit()
db.refresh(post)
logger.info(f"Created social post #{post.id} type={post_type} pub_company={publishing_company_id}")
return post
except Exception as e:
db.rollback()
logger.error(f"Failed to create social post: {e}")
raise
finally:
db.close()
def update_post(self, post_id: int, **kwargs) -> Optional[SocialPost]:
"""Update post fields. Only draft/approved posts can be edited."""
db = SessionLocal()
try:
post = db.query(SocialPost).filter(SocialPost.id == post_id).first()
if not post:
return None
if post.status == 'published':
logger.warning(f"Cannot edit post #{post_id} with status={post.status}")
return None
allowed_fields = {'content', 'hashtags', 'image_path', 'post_type',
'company_id', 'event_id', 'publishing_company_id', 'scheduled_at'}
for key, value in kwargs.items():
if key in allowed_fields:
setattr(post, key, value)
post.updated_at = datetime.now()
db.commit()
db.refresh(post)
return post
except Exception as e:
db.rollback()
logger.error(f"Failed to update post #{post_id}: {e}")
raise
finally:
db.close()
def delete_post(self, post_id: int) -> bool:
"""Delete a post (only draft/approved)."""
db = SessionLocal()
try:
post = db.query(SocialPost).filter(SocialPost.id == post_id).first()
if not post:
return False
if post.status == 'published':
logger.warning(f"Cannot delete published post #{post_id}")
return False
db.delete(post)
db.commit()
logger.info(f"Deleted social post #{post_id}")
return True
except Exception as e:
db.rollback()
logger.error(f"Failed to delete post #{post_id}: {e}")
return False
finally:
db.close()
# ---- Workflow ----
def approve_post(self, post_id: int, user_id: int) -> Optional[SocialPost]:
"""Approve a draft post."""
db = SessionLocal()
try:
post = db.query(SocialPost).filter(SocialPost.id == post_id).first()
if not post or post.status != 'draft':
return None
post.status = 'approved'
post.approved_by = user_id
post.updated_at = datetime.now()
db.commit()
db.refresh(post)
logger.info(f"Approved post #{post_id} by user #{user_id}")
return post
finally:
db.close()
def schedule_post(self, post_id: int, scheduled_at: datetime) -> Optional[SocialPost]:
"""Schedule an approved post for future publishing."""
db = SessionLocal()
try:
post = db.query(SocialPost).filter(SocialPost.id == post_id).first()
if not post or post.status not in ('draft', 'approved'):
return None
post.status = 'scheduled'
post.scheduled_at = scheduled_at
post.updated_at = datetime.now()
db.commit()
db.refresh(post)
logger.info(f"Scheduled post #{post_id} for {scheduled_at}")
return post
finally:
db.close()
def _get_publish_token(self, db, publishing_company_id: int) -> Tuple[Optional[str], Optional[SocialMediaConfig]]:
"""Get access token and config for publishing.
Uses page token from social_media_config (set by select-page).
Falls back to OAuth token if config token is missing.
"""
config = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.company_id == publishing_company_id,
SocialMediaConfig.is_active == True
).first()
if not config or not config.page_id:
return None, None
# Prefer page token from config (guaranteed PAGE type, set by select-page)
if config.access_token:
return config.access_token, config
# Fallback to OAuth token (may be USER token — works for reading, not always for publishing)
from oauth_service import OAuthService
oauth = OAuthService()
oauth_token = oauth.get_valid_token(db, publishing_company_id, 'meta', 'facebook')
if oauth_token:
return oauth_token, config
return None, None
def publish_post(self, post_id: int, force_live: bool = False) -> Tuple[bool, str]:
"""Publish a post to Facebook immediately.
Args:
post_id: ID of the post to publish
force_live: If True, publish publicly even if config has debug_mode=True
Returns:
(success: bool, message: str)
"""
db = SessionLocal()
try:
post = db.query(SocialPost).filter(SocialPost.id == post_id).first()
if not post:
return False, "Post nie znaleziony"
allowed = ('draft', 'approved', 'scheduled', 'failed')
if force_live:
allowed = ('draft', 'approved', 'scheduled', 'published', 'failed')
if post.status not in allowed:
return False, f"Nie można opublikować posta ze statusem: {post.status}"
# Determine which company's FB page to publish on
pub_company_id = post.publishing_company_id
if not pub_company_id:
# Fallback: try legacy global config (no company_id)
config = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.company_id == None,
SocialMediaConfig.is_active == True
).first()
if config and config.access_token and config.page_id:
access_token = config.access_token
else:
return False, "Nie wybrano firmy publikującej. Ustaw 'Publikuj jako' lub skonfiguruj Facebook."
else:
access_token, config = self._get_publish_token(db, pub_company_id)
if not access_token or not config:
return False, "Facebook nie jest skonfigurowany dla wybranej firmy."
# Build message with hashtags
message = post.content
if post.hashtags:
message += f"\n\n{post.hashtags}"
# Determine if draft mode (force_live overrides debug)
published = True if force_live else (not config.debug_mode)
# Publish via Facebook Graph API
from facebook_graph_service import FacebookGraphService
fb = FacebookGraphService(access_token)
image_path = None
if post.image_path:
base_dir = os.path.dirname(os.path.abspath(__file__))
full_path = os.path.join(os.path.dirname(base_dir), post.image_path)
if os.path.exists(full_path):
image_path = full_path
result = fb.create_post(
page_id=config.page_id,
message=message,
image_path=image_path,
published=published,
)
if result and 'id' in result:
post.status = 'published'
post.published_at = datetime.now()
post.meta_post_id = result['id']
post.meta_response = result
post.is_live = published
post.updated_at = datetime.now()
db.commit()
# Invalidate posts cache for this company
if config.page_id:
_posts_cache.pop((pub_company_id, config.page_id), None)
mode = "LIVE (force)" if force_live else ("DRAFT (debug)" if config.debug_mode else "LIVE")
logger.info(f"Published post #{post_id} -> FB {result['id']} ({mode})")
return True, f"Post opublikowany ({mode}): {result['id']}"
else:
post.status = 'failed'
post.meta_response = result
post.updated_at = datetime.now()
db.commit()
return False, "Publikacja nie powiodła się. Sprawdź konfigurację Facebook."
except Exception as e:
db.rollback()
logger.error(f"Failed to publish post #{post_id}: {e}")
return False, f"Blad publikacji: {str(e)}"
finally:
db.close()
def toggle_visibility(self, post_id: int) -> Tuple[bool, str]:
"""Publish a debug/draft post live on Facebook (debug -> live only).
Note: Facebook API does not support unpublishing already-published posts.
Use withdraw_from_fb() to delete and reset to draft instead.
"""
db = SessionLocal()
try:
post = db.query(SocialPost).filter(SocialPost.id == post_id).first()
if not post:
return False, "Post nie znaleziony"
if post.status != 'published' or not post.meta_post_id:
return False, "Post musi być opublikowany na Facebook."
if post.is_live:
return False, "Post jest już publiczny. Facebook nie pozwala na cofnięcie publikacji — użyj opcji 'Usuń z Facebooka'."
pub_company_id = post.publishing_company_id
if not pub_company_id:
return False, "Brak firmy publikującej."
access_token, config = self._get_publish_token(db, pub_company_id)
if not access_token:
return False, "Brak tokena Facebook dla wybranej firmy."
from facebook_graph_service import FacebookGraphService
fb = FacebookGraphService(access_token)
result = fb.publish_draft(post.meta_post_id)
if result:
post.is_live = True
post.updated_at = datetime.now()
db.commit()
logger.info(f"Post #{post_id} toggled to LIVE (published)")
return True, "Post opublikowany publicznie — widoczny dla wszystkich."
return False, "Nie udało się upublicznić posta na Facebook."
except Exception as e:
db.rollback()
logger.error(f"Failed to toggle visibility for post #{post_id}: {e}")
return False, f"Błąd zmiany widoczności: {str(e)}"
finally:
db.close()
def withdraw_from_fb(self, post_id: int) -> Tuple[bool, str]:
"""Delete post from Facebook and reset status to draft.
Facebook API doesn't support unpublishing, so we delete and let
the user re-publish later if needed.
"""
db = SessionLocal()
try:
post = db.query(SocialPost).filter(SocialPost.id == post_id).first()
if not post:
return False, "Post nie znaleziony"
if post.status != 'published' or not post.meta_post_id:
return False, "Post nie jest opublikowany na Facebook."
pub_company_id = post.publishing_company_id
if not pub_company_id:
return False, "Brak firmy publikującej."
access_token, config = self._get_publish_token(db, pub_company_id)
if not access_token:
return False, "Brak tokena Facebook dla wybranej firmy."
from facebook_graph_service import FacebookGraphService
fb = FacebookGraphService(access_token)
if fb.delete_post(post.meta_post_id):
old_fb_id = post.meta_post_id
post.status = 'draft'
post.meta_post_id = None
post.is_live = False
post.published_at = None
post.updated_at = datetime.now()
db.commit()
logger.info(f"Post #{post_id} withdrawn from FB (deleted {old_fb_id}), reset to draft")
return True, "Post usunięty z Facebooka i przywrócony do szkicu. Możesz go ponownie opublikować."
return False, "Nie udało się usunąć posta z Facebooka."
except Exception as e:
db.rollback()
logger.error(f"Failed to withdraw post #{post_id} from FB: {e}")
return False, f"Błąd usuwania z Facebooka: {str(e)}"
finally:
db.close()
# ---- Facebook Page Posts (read from API) ----
def get_cached_posts(self, company_id: int) -> Dict:
"""Return DB-cached posts for instant page load. No API call."""
db = SessionLocal()
try:
config = db.query(SocialMediaConfig).filter_by(
company_id=company_id, platform='facebook'
).first()
if not config or not config.cached_posts:
return None
return {
'posts': config.cached_posts.get('posts', []),
'cached_at': config.posts_cached_at,
'total_count': config.cached_posts.get('total_count', 0),
}
finally:
db.close()
def _save_posts_to_db(self, company_id: int, posts: list):
"""Save posts to DB cache for instant page load."""
db = SessionLocal()
try:
config = db.query(SocialMediaConfig).filter_by(
company_id=company_id, platform='facebook'
).first()
if config:
config.cached_posts = {'posts': posts, 'total_count': len(posts)}
config.posts_cached_at = datetime.now()
db.commit()
except Exception as e:
db.rollback()
logger.error(f"Failed to save posts cache for company {company_id}: {e}")
finally:
db.close()
def get_page_recent_posts(self, company_id: int, limit: int = 10,
after: str = None) -> Dict:
"""Fetch recent posts from company's Facebook page with engagement metrics.
Uses in-memory cache with 5-minute TTL (first page only).
Saves first page to DB for instant page load.
"""
db = SessionLocal()
try:
access_token, config = self._get_publish_token(db, company_id)
if not access_token or not config or not config.page_id:
return {'success': False, 'error': 'Brak konfiguracji Facebook dla tej firmy.'}
page_id = config.page_id
# In-memory cache only for first page (no cursor)
if not after:
cache_key = (company_id, page_id)
cached = _posts_cache.get(cache_key)
if cached and (time.time() - cached['ts']) < _CACHE_TTL:
return {
'success': True,
'posts': cached['data'],
'next_cursor': cached.get('next_cursor'),
'page_name': config.page_name or '',
'cached': True,
}
from facebook_graph_service import FacebookGraphService
fb = FacebookGraphService(access_token)
result = fb.get_page_posts(page_id, limit, after=after)
if result is None:
return {'success': False, 'error': 'Nie udało się pobrać postów z Facebook API.'}
posts = result['posts']
next_cursor = result.get('next_cursor')
# Update in-memory cache for first page only
if not after:
cache_key = (company_id, page_id)
_posts_cache[cache_key] = {
'data': posts, 'next_cursor': next_cursor, 'ts': time.time()
}
return {
'success': True,
'posts': posts,
'next_cursor': next_cursor,
'page_name': config.page_name or '',
'cached': False,
}
finally:
db.close()
def save_all_posts_to_cache(self, company_id: int, posts: list):
"""Public method to save all loaded posts to DB cache."""
self._save_posts_to_db(company_id, posts)
def get_post_insights_detail(self, company_id: int, post_id: str) -> Dict:
"""Fetch detailed insights for a specific post (impressions, reach, clicks)."""
db = SessionLocal()
try:
access_token, config = self._get_publish_token(db, company_id)
if not access_token:
return {'success': False, 'error': 'Brak tokena Facebook.'}
from facebook_graph_service import FacebookGraphService
fb = FacebookGraphService(access_token)
insights = fb.get_post_insights_metrics(post_id)
if insights is None:
return {'success': False, 'error': 'Brak danych insights (strona może mieć <100 fanów).'}
return {'success': True, 'insights': insights}
finally:
db.close()
# ---- AI Content Generation ----
@staticmethod
def _split_hashtags(text: str) -> Tuple[str, str]:
"""Split content and hashtags. Returns (clean_content, hashtags)."""
lines = text.strip().split('\n')
content_lines = []
hashtag_words = []
for line in lines:
stripped = line.strip()
# Line is purely hashtags (all words start with #)
if stripped and all(w.startswith('#') for w in stripped.split()):
hashtag_words.extend(stripped.split())
else:
content_lines.append(line)
# Also extract inline hashtags from the last content line
if content_lines:
last = content_lines[-1].strip()
inline_tags = re.findall(r'#\w+', last)
if inline_tags and len(inline_tags) >= 2:
# Remove inline hashtags from last line
cleaned_last = re.sub(r'\s*#\w+', '', last).strip()
if cleaned_last:
content_lines[-1] = cleaned_last
else:
content_lines.pop()
hashtag_words.extend(inline_tags)
content = '\n'.join(content_lines).strip()
# Deduplicate hashtags preserving order
seen = set()
unique_tags = []
for tag in hashtag_words:
low = tag.lower()
if low not in seen:
seen.add(low)
unique_tags.append(tag)
return content, ' '.join(unique_tags)
def generate_content(self, post_type: str, context: dict, tone: str = None,
ai_model: str = None,
user_id: int = None, company_id: int = None) -> Tuple[str, str, str]:
"""Generate post content using AI.
Args:
post_type: One of POST_TYPES keys
context: Dict with template variables
tone: One of POST_TONES keys (default: DEFAULT_TONE)
ai_model: One of AI_MODELS keys (default: DEFAULT_AI_MODEL)
user_id: User ID for cost tracking
company_id: Company ID for cost tracking
Returns:
(content: str, hashtags: str, ai_model: str)
"""
template = AI_PROMPTS.get(post_type)
if not template:
raise ValueError(f"Unknown post type: {post_type}")
# Fill template with context
try:
prompt = template.format(**context)
except KeyError as e:
raise ValueError(f"Missing context field: {e}")
# Add tone instruction
tone_key = tone if tone in POST_TONES else DEFAULT_TONE
tone_info = POST_TONES[tone_key]
prompt += f"\n\nTONACJA: {tone_info['instruction']}"
# Select model
model_key = ai_model if ai_model in AI_MODELS else DEFAULT_AI_MODEL
# Generate with Gemini — with cost tracking
from gemini_service import generate_text
result = generate_text(
prompt, model=model_key,
feature='social_publisher_content',
user_id=user_id,
company_id=company_id,
related_entity_type='social_post',
)
if not result:
raise RuntimeError("AI nie wygenerował treści. Spróbuj ponownie.")
# Split out any hashtags AI may have included despite instructions
content, hashtags = self._split_hashtags(result)
model_label = AI_MODELS[model_key]['label']
return content, hashtags, model_label
def generate_hashtags(self, content: str, post_type: str = '',
ai_model: str = None,
user_id: int = None, company_id: int = None) -> Tuple[str, str]:
"""Generate hashtags for given post content using AI.
Returns:
(hashtags: str, ai_model_label: str)
"""
model_key = ai_model if ai_model in AI_MODELS else DEFAULT_AI_MODEL
prompt = f"""Na podstawie poniższej treści posta na Facebook Izby Gospodarczej NORDA Biznes,
wygeneruj 5-8 trafnych hashtagów.
Treść posta:
{content}
Zasady:
- Zawsze uwzględnij #NordaBiznes i #IzbaGospodarcza
- Dodaj hashtagi branżowe i lokalne (#Wejherowo, #Pomorze, #Kaszuby)
- Hashtagi powinny zwiększać zasięg i widoczność posta
- Każdy hashtag zaczynaj od #, oddzielaj spacjami
- Odpowiedz WYŁĄCZNIE hashtagami, nic więcej"""
from gemini_service import generate_text
result = generate_text(
prompt, model=model_key,
feature='social_publisher_hashtags',
user_id=user_id,
company_id=company_id,
related_entity_type='social_post',
)
if not result:
raise RuntimeError("AI nie wygenerował hashtagów. Spróbuj ponownie.")
# Clean up - ensure only hashtags
tags = ' '.join(w for w in result.strip().split() if w.startswith('#'))
return tags, AI_MODELS[model_key]['label']
def get_company_context(self, company_id: int) -> dict:
"""Get company data for AI prompt context including social media links."""
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return {}
category_name = ''
if company.category_id and company.category:
category_name = company.category.name
# Get social media profiles
profiles = db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company_id,
CompanySocialMedia.is_valid == True,
).all()
social_links = {}
for p in profiles:
social_links[p.platform] = p.url
# Build formatted string for AI prompt
social_text = ''
platform_labels = {
'facebook': 'Facebook', 'instagram': 'Instagram',
'linkedin': 'LinkedIn', 'youtube': 'YouTube',
'twitter': 'X/Twitter', 'tiktok': 'TikTok',
}
parts = []
for platform, url in social_links.items():
label = platform_labels.get(platform, platform)
parts.append(f"{label}: {url}")
if parts:
social_text = ', '.join(parts)
return {
'company_name': company.name or '',
'category': category_name,
'city': company.address_city or 'Wejherowo',
'description': company.description_full or company.description_short or '',
'website': company.website or '',
'social_media_links': social_text,
}
finally:
db.close()
def get_event_context(self, event_id: int) -> dict:
"""Get event data for AI prompt context."""
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
return {}
return {
'event_title': event.title or '',
'event_date': event.event_date.strftime('%d.%m.%Y %H:%M') if event.event_date else '',
'event_location': event.location or '',
'event_description': event.description or '',
}
finally:
db.close()
# ---- Facebook Config ----
def get_fb_config(self, company_id: int = None) -> Optional[SocialMediaConfig]:
"""Get Facebook configuration for a specific company.
If company_id is None, returns legacy global config (backward compat).
"""
db = SessionLocal()
try:
query = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook'
)
if company_id is not None:
query = query.filter(SocialMediaConfig.company_id == company_id)
else:
query = query.filter(SocialMediaConfig.company_id == None)
return query.first()
finally:
db.close()
def get_all_fb_configs(self) -> List[SocialMediaConfig]:
"""Get all active Facebook configurations across companies."""
db = SessionLocal()
try:
return db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.is_active == True,
SocialMediaConfig.company_id != None,
).all()
finally:
db.close()
def get_configured_companies(self, company_ids: list = None) -> List[Dict]:
"""Get companies that have an active FB configuration.
Args:
company_ids: If provided, filter to only these companies.
None = all companies (admin).
"""
db = SessionLocal()
try:
query = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.is_active == True,
SocialMediaConfig.company_id != None,
)
if company_ids is not None:
query = query.filter(SocialMediaConfig.company_id.in_(company_ids))
configs = query.all()
return [
{
'company_id': c.company_id,
'company_name': c.company.name if c.company else f'Firma #{c.company_id}',
'page_name': c.page_name,
'page_id': c.page_id,
'debug_mode': c.debug_mode,
}
for c in configs
]
finally:
db.close()
def save_fb_config(self, company_id: int, page_id: str, page_name: str,
debug_mode: bool, user_id: int,
access_token: str = None) -> SocialMediaConfig:
"""Save/update Facebook configuration for a company."""
db = SessionLocal()
try:
config = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.company_id == company_id,
).first()
if not config:
config = SocialMediaConfig(platform='facebook', company_id=company_id)
db.add(config)
config.page_id = page_id
config.page_name = page_name
if access_token is not None:
config.access_token = access_token
config.debug_mode = debug_mode
config.is_active = True
config.updated_by = user_id
config.updated_at = datetime.now()
db.commit()
db.refresh(config)
logger.info(f"Saved FB config: company={company_id} page={page_name} debug={debug_mode}")
return config
except Exception as e:
db.rollback()
logger.error(f"Failed to save FB config: {e}")
raise
finally:
db.close()
# ---- Engagement Tracking ----
def refresh_engagement(self, post_id: int) -> Optional[Dict]:
"""Refresh engagement metrics from Facebook for a published post."""
db = SessionLocal()
try:
post = db.query(SocialPost).filter(SocialPost.id == post_id).first()
if not post or not post.meta_post_id:
return None
# Get token from publishing company
access_token = None
if post.publishing_company_id:
access_token, _ = self._get_publish_token(db, post.publishing_company_id)
# Fallback to legacy global config
if not access_token:
config = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.is_active == True
).first()
if config:
access_token = config.access_token
if not access_token:
return None
from facebook_graph_service import FacebookGraphService
fb = FacebookGraphService(access_token)
engagement = fb.get_post_engagement(post.meta_post_id)
if engagement:
post.engagement_likes = engagement.get('likes', 0)
post.engagement_comments = engagement.get('comments', 0)
post.engagement_shares = engagement.get('shares', 0)
post.engagement_reach = engagement.get('reactions_total', 0)
post.engagement_updated_at = datetime.now()
post.updated_at = datetime.now()
db.commit()
return engagement
return None
finally:
db.close()
# ---- Stats ----
def get_stats(self) -> Dict:
"""Get summary statistics for social publisher dashboard."""
db = SessionLocal()
try:
from sqlalchemy import func
total = db.query(func.count(SocialPost.id)).scalar() or 0
by_status = dict(
db.query(SocialPost.status, func.count(SocialPost.id))
.group_by(SocialPost.status).all()
)
by_type = dict(
db.query(SocialPost.post_type, func.count(SocialPost.id))
.group_by(SocialPost.post_type).all()
)
# Avg engagement for published posts
avg_likes = db.query(func.avg(SocialPost.engagement_likes)).filter(
SocialPost.status == 'published'
).scalar() or 0
avg_comments = db.query(func.avg(SocialPost.engagement_comments)).filter(
SocialPost.status == 'published'
).scalar() or 0
return {
'total': total,
'by_status': by_status,
'by_type': by_type,
'avg_engagement': {
'likes': round(float(avg_likes), 1),
'comments': round(float(avg_comments), 1),
},
}
finally:
db.close()
# Singleton instance
social_publisher = SocialPublisherService()